python - 使用 Pyspark 将 DataFrame 写入 Kafka 主题
问题描述
我有下面的 DataFrame,它有模式和数据。现在我只是把这些数据写到 Kafka 主题中vehicleData
。
Data
+---------------+------------+------+------+-------------+------+
|Registration_No|Plate_Number| Maker| Model|Vehicle_Color|Status|
+---------------+------------+------+------+-------------+------+
| LER-15A-9681| LER9681|Suzuki|Mehran| RED| 0|
| LEV-15-7044| LEV7044| Honda| Civic| GREEN| 0|
| LEC-15-1946| LEC1946| Honda| Civic| WHITE| 0|
Schema is:
root
|-- Registration_No: string (nullable = true)
|-- Plate_Number: string (nullable = true)
|-- Maker: string (nullable = true)
|-- Model: string (nullable = true)
|-- Vehicle_Color: string (nullable = true)
|-- Status: string (nullable = true)
Code is:
raw_data.selectExpr("CAST(Registration_No AS STRING)", "CAST(Plate_Number AS STRING)",
"CAST(Maker AS STRING)", "CAST(Model AS STRING)",
"CAST(Vehicle_Color AS STRING)", "CAST(Status AS STRING)").write\
.format("kafka") \
.option("kafka.bootstrap.servers", "127.0.0.1:9092")\
.option("topic","vehicleData") \
.save()
我得到的错误是:
Traceback (most recent call last):
File "/Users/saeed.butt/PycharmProjects/untitled4/Main.py", line 52, in <module>
.option("topic","vehicleData") \
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 737, in save
self._jwrite.save()
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'
kafka 服务器正在运行localhost:9092
。
解决方案
数据框应该有一个值列,以便写入 kafka。添加一个名为“value”的列并将其写入kafka。
推荐阅读
- reactjs - 尝试在 react-testing-library 中渲染组件时出错 - 得到一个对象
- docker - yml docker-compose 错误映射值在此处不允许
- java - 如何从 spring-config-server 应用程序使用 SSH 连接到 Bitbucket?
- android - 当我去另一个活动时滑翔崩溃
- java - 这是单例 SessionFactory 的正确方法吗?
- excel - 将变量与excel vba中的单元格进行比较
- math - 如何找到序列中任何连续数字的平均值?
- java - maven:执行而不实际构建任何东西
- java - chromeWebdriver - 当我将 Chrome 更新到 77 时无法设置 cookie
- android - 如何在 Android Studio 的 Image Asset 中使用所需的字体?