apache-spark - 如何使用 pyspark 写入 Kafka?
问题描述
我正在尝试使用 PySpark 给 Kafka 写信。
我被困在零阶段:
[Stage 0:> (0 + 8) / 9]
然后我得到一个超时错误:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
代码是:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 pyspark-shell'
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
def main():
spark = SparkSession.builder.master("local").appName("Spark CSV Reader")
.getOrCreate();
dirpath = os.path.abspath(sys.argv[1])
os.chdir(dirpath)
mySchema = StructType([
StructField("id", IntegerType()),StructField("name", StringType()),\
StructField("year", IntegerType()),StructField("rating", DoubleType()),\
StructField("duration", IntegerType()) ])
streamingDataFrame = spark.readStream.schema(mySchema)
.csv('file://' + dirpath + "/" )
streamingDataFrame.selectExpr("CAST(id AS STRING) AS key",
"to_json(struct(*)) AS value").\
writeStream.format("kafka").option("topic", "topicName")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("checkpointLocation", "./chkpt").start()
我正在运行 HDP 2.6。
解决方案
正如我在评论中提到的,Spark 在多台机器上运行,所有这些机器都不太可能成为 Kafka 代理。
使用 Kafka 集群的外部地址
.option("kafka.bootstrap.servers", "<kafka-broker-1>:9092,<kafka-broker-2>:9092")\
推荐阅读
- java - 如何使用 JAVA 在 S3 文件中更新 CSV 文件的标题
- google-cloud-identity - API - 客户 ID 不容易找到 - 请支持 my_customer
- google-sheets-formula - 查找包含键的所有行并连接单元格值
- cordova - 离子文件插件在根内部存储上创建目录时出错
- html - CSS网格消除间隙
- python - 如何限制单个唯一 ID 可以拥有的行数?/文本游戏的有限游戏库存
- shell - Makefile:如何正确扩展 shell 函数中的变量?
- ios - Two different values from the same kSecAttrAccount in Keychain
- python - torch transform.resize() vs cv2.resize()
- linux - linux prevent user read other directory