scala - Spark Structured Streaming:导入图像文件以创建简单的 ML 应用程序
问题描述
我想构建一个结构化流应用程序,其目的是从 URL 检索图像并构建一个非常简单的 ML 模型,该模型将根据图像的内容进行分类。
我有一个 URL ( http://129.102.240.235/extraits/webcam/webcam.jpg ),每 X 单位时间更新一个新图像。我的目标是首先存储这些图像或使用 readStream 对象直接导入它们(如果可能的话?)。我知道从 Spark 2.X 开始,我们可以直接使用一种image
格式将内容读入 Dataframe。我在不同的方法之间犹豫不决:
- 使用消息的总线解决方案(如 Kafka)将生成我的内容以在 Spark 中使用,我认为这不会是坏事,因为 Kafka 可用于复制文件,因此数据丢失较弱。
- 直接使用 readStream 对象读取图像(这是我尝试做的,见下文)
我的下一个 Scala 代码的目的只是试图显示图像的内容,但是当我使用它进行测试时它会引发不同spark-shell
的错误,我将在下面代码的相应部分注释错误。
scala> val url = "http://129.102.240.235/extraits/webcam/webcam.jpg"
url: String = http://129.102.240.235/extraits/webcam/webcam.jpg
scala> spark.sparkContext.addFile(url)
scala> val image_df = spark.read.format("image").load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [image: struct<origin: string, height: int ... 4 more fields>]
scala> image_df.select("image.origin").show(false)
19/10/25 13:33:26 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
org.apache.spark.SparkException: File /tmp/spark-28741963-fd2d-44c2-8a6b-a489fdaae96d/userFiles-95b99fde-a1e2-4da6-9a17-382bfd2292c4/webcam.jpg exists and does not ma
tch contents of http://129.102.240.235/extraits/webcam/webcam.jpg
我也尝试使用 readStream:
scala> val scheme = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"
scheme: String = " origin STRING, height INT, width INT, nChannels INT, mode INT, data BINARY"
scala> val image_df = spark.readStream.format("image").schema(scheme).load("file://"+SparkFiles.get("webcam.jpg"))
image_df: org.apache.spark.sql.DataFrame = [origin: string, height: int ... 4 more fields]
scala> val query_show = image_df.collect.foreach(println).writeStream.format("console").start()
<console>:26: error: value writeStream is not a member of Unit
val query_show = image_df.collect.foreach(println).writeStream.format("console").start()
// Based on what I red on StackO question, I suppose that this error might be caused because
// .writeStream should be on the next line, so I tried to put it on 2 lines but..
scala> val query_show = image_df.collect.foreach(println).
| writeStream.format("console").start()
<console>:27: error: value writeStream is not a member of Unit
possible cause: maybe a semicolon is missing before `value writeStream'?
writeStream.format("console").start()
// Also tried without declaring query_show but returns the same error..
// I know that if I make it work I will have to add query_show.awaitTermination()
任何有关调试此代码或想法以构建我的流管道的帮助将不胜感激!
解决方案
我设法找到一种show()
使用“图像”格式将我的数据框变为红色的方法。我分两步完成:
1/ 运行一个 Python 脚本,该脚本将从 URL 中保存 jpg 图像,该脚本是:
import urllib.request
import shutil
filename = '~/dididi/bpi-spark/images'
url = "http://129.102.240.235/extraits/webcam/webcam.jpg"
with urllib.request.urlopen(url) as response, open(filename, 'ab') as out_file:
shutil.copyfileobj(response, out_file)
2 / 然后,使用 spark-shell 我刚刚执行了这两行:
val image_df = spark.read.format("image").option("inferSchema", true).load("bpi-spark/images").select($"image.origin",$"image.height",$"image.width", $"image.mode", $"image.data")
scala> image_df.show()
+--------------------+------+-----+----+--------------------+
| origin|height|width|mode| data|
+--------------------+------+-----+----+--------------------+
|file:///home/niki...| 480| 720| 16|[3C 3D 39 3C 3D 3...|
+--------------------+------+-----+----+--------------------+
推荐阅读
- java - 从字典 Java 打印
- wordpress - 按优惠券分组 Woocommerce 用户
- c# - WebUntisAPI 在 WPF 项目中调用方法后返回“未验证”
- sql - 通过添加列值删除重复行
- apache-kafka - 使用 environment var 在 seedstack 上配置 kafka 主机
- angular - Angular:如何仅在特定组件而不是整个 ngModule 中使用自定义 EventPlugin?
- javascript - 数组反转函数JS
- android - Android Jet Pack Navigation,setupWithNavController() 重新创建片段
- java - Android/Java:内存增加2小时,然后稳定。这是正常行为吗?
- javascript - Wordpress 中的 Jquery 无法用于查找切换