首页 > 解决方案 > 使用 Pyspark 将 DataFrame 写入 Kafka 主题

问题描述

我有下面的 DataFrame,它有模式和数据。现在我只是把这些数据写到 Kafka 主题中vehicleData

Data
+---------------+------------+------+------+-------------+------+
|Registration_No|Plate_Number| Maker| Model|Vehicle_Color|Status|
+---------------+------------+------+------+-------------+------+
|   LER-15A-9681|     LER9681|Suzuki|Mehran|          RED|     0|
|    LEV-15-7044|     LEV7044| Honda| Civic|        GREEN|     0|
|    LEC-15-1946|     LEC1946| Honda| Civic|        WHITE|     0|

Schema is:
root
 |-- Registration_No: string (nullable = true)
 |-- Plate_Number: string (nullable = true)
 |-- Maker: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle_Color: string (nullable = true)
 |-- Status: string (nullable = true)


Code is:
raw_data.selectExpr("CAST(Registration_No AS STRING)", "CAST(Plate_Number AS STRING)",
                         "CAST(Maker AS STRING)", "CAST(Model AS STRING)",
                         "CAST(Vehicle_Color AS STRING)", "CAST(Status AS STRING)").write\
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")\
    .option("topic","vehicleData") \
    .save()

我得到的错误是:

Traceback (most recent call last):
  File "/Users/saeed.butt/PycharmProjects/untitled4/Main.py", line 52, in <module>
    .option("topic","vehicleData") \
  File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/readwriter.py", line 737, in save
    self._jwrite.save()
  File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/Users/saeed.butt/PycharmProjects/untitled4/venv/lib/python2.7/site-packages/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'

kafka 服务器正在运行localhost:9092

标签: pythonpysparkapache-kafka

解决方案


数据框应该有一个值列,以便写入 kafka。添加一个名为“value”的列并将其写入kafka。


推荐阅读