首页 > 解决方案 > Spark Structured Streaming:导入图像文件以创建简单的 ML 应用程序

问题描述

我想构建一个结构化流应用程序,其目的是从 URL 检索图像并构建一个非常简单的 ML 模型,该模型将根据图像的内容进行分类。

我有一个 URL ( http://129.102.240.235/extraits/webcam/webcam.jpg ),每 X 单位时间更新一个新图像。我的目标是首先存储这些图像或使用 readStream 对象直接导入它们(如果可能的话?)。我知道从 Spark 2.X 开始,我们可以直接使用一种image格式将内容读入 Dataframe。我在不同的方法之间犹豫不决:

我的下一个 Scala 代码的目的只是试图显示图像的内容,但是当我使用它进行测试时它会引发不同spark-shell的错误,我将在下面代码的相应部分注释错误。

scala> val url = "http://129.102.240.235/extraits/webcam/webcam.jpg"
url: String = http://129.102.240.235/extraits/webcam/webcam.jpg

scala> spark.sparkContext.addFile(url)

scala> val image_df = spark.read.format("image").load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [image: struct<origin: string, height: int ... 4 more fields>]

scala> image_df.select("image.origin").show(false)
19/10/25 13:33:26 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: File /tmp/spark-28741963-fd2d-44c2-8a6b-a489fdaae96d/userFiles-95b99fde-a1e2-4da6-9a17-382bfd2292c4/webcam.jpg exists and does not ma
tch contents of http://129.102.240.235/extraits/webcam/webcam.jpg

我也尝试使用 readStream:

scala> val scheme = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"
scheme: String = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"

scala> val image_df = spark.readStream.format("image").schema(scheme).load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [origin: string, height: int ... 4 more fields]

scala> val query_show = image_df.collect.foreach(println).writeStream.format("console").start()
<console>:26: error: value writeStream is not a member of Unit
       val query_show = image_df.collect.foreach(println).writeStream.format("console").start()

// Based on what I red on StackO question, I suppose that this error might be caused because 
// .writeStream should be on the next line, so I tried to put it on 2 lines but..

scala> val query_show = image_df.collect.foreach(println).
     |   writeStream.format("console").start()
<console>:27: error: value writeStream is not a member of Unit
possible cause: maybe a semicolon is missing before `value writeStream'?
         writeStream.format("console").start()

// Also tried without declaring query_show but returns the same error..
// I know that if I make it work I will have to add query_show.awaitTermination()

任何有关调试此代码或想法以构建我的流管道的帮助将不胜感激!

标签: scalaimage-processingspark-structured-streaming

解决方案


我设法找到一种show()使用“图像”格式将我的数据框变为红色的方法。我分两步完成:
1/ 运行一个 Python 脚本,该脚本将从 URL 中保存 jpg 图像,该脚本是:

import urllib.request
import shutil


filename = '~/dididi/bpi-spark/images'
url = "http://129.102.240.235/extraits/webcam/webcam.jpg"

with urllib.request.urlopen(url) as response, open(filename, 'ab') as out_file:  
    shutil.copyfileobj(response, out_file)

2 / 然后,使用 spark-shell 我刚刚执行了这两行:

val image_df = spark.read.format("image").option("inferSchema", true).load("bpi-spark/images").select($"image.origin",$"image.height",$"image.width", $"image.mode", $"image.data")

scala> image_df.show()
+--------------------+------+-----+----+--------------------+
|              origin|height|width|mode|                data|
+--------------------+------+-----+----+--------------------+
|file:///home/niki...|   480|  720|  16|[3C 3D 39 3C 3D 3...|
+--------------------+------+-----+----+--------------------+

推荐阅读