python - Pyspark 从 kafkaDirectStream 到 Postgresql
问题描述
我正在尝试使用 pyspark 从 kafka 流中插入数据
我的代码如下
cols = ['id','name']
topic = "testing_topic"
# spark context init
para_seconds = 10
sc = SparkContext(appName="PythonSparkStreamingKafka_Testing_01")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, para_seconds)
# receiver in kafka
brokers = 'localhost:9092'
topic = "testing_topic"
# get streaming data from kafka
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],{'bootstrap.servers':brokers})
lines = kafkaStream.map(lambda x: json.loads(x[1])).pprint()
给定数据
{“id”:{“测试”:“20210623190332610”},“名称”:{“测试”:“测试名称”}}
我尝试使用此代码
x = lines.take(1)
获取给 sparkContext 的数据,但它返回错误。
是否有任何指南可以将数据收入直接转换为字符串类型的数据框?谢谢
解决方案
我将首先在https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html开始阅读 spark 文档 ,它为您正在做的大部分事情提供了非常详细的示例。
此外,看看下面的代码是否有帮助。
from pyspark.sql.types import ArrayType, StringType, StructType, StructField
import json
from pyspark.sql.functions as F
#{ "id": { "testing": "20210623190332610" }, "name": { "testing": "testname" } }
data_spark_schema = StructType([
StructField("id",StructType([
StructField("testing", StringType(), True),
StructField("name",StructType([
StructField("testing", StringType(), True)]),True]),True])
streaming_df = (
spark.readStream
.format("kafka")
.option("kafka.group.id", "GROUP_ID")
.option("kafka.bootstrap.servers", "BOOTSTRAP")
.option("subscribe", "TOPIC")
.load()
).selectExpr("CAST(value AS STRING) as event_value").select(F.from_json("event_value",data_spark_schema).alias("value"))
print(streaming_df.schema)
def write_to_postgres(df, epoch_id):
mode="append"
url = "jdbc:postgresql://url:5432/database"
properties = {"user": "user", "password": "password", "driver": "org.postgresql.Driver"}
df.write.jdbc(url=url, table="schema.table_name", mode=mode, properties=properties)
streaming_df.writeStream \
.foreachBatch(write_to_postgres) \
.option("checkpointLocation", '/checkpoint_path') \
.outputMode('update') \
.start()
推荐阅读
- java - 多线程和停止线程
- excel - 使用 excel 行中的数据自动填充单词。每个 Excel 行的自动填充 Word 文档
- swift - 为 CGImage 创建渐变时崩溃:copy_read_only:vm_copy 失败:状态 1
- node.js - Lambda 不会写入 mongodb 但我可以在本地写入
- excel - Excel - 将带有文件名的文件路径字符串转换为其文件位置字符串
- html - 在插入 HTML 之前对哈希表进行排序
- html - HTML标题属性使css:悬停不起作用
- mysql - 触发器将日期从时区转换为另一个时区
- reactjs - 如何判断用户是否使用仅 http cookie 和 JWT 登录(客户端)
- mysql - 如何在mysql中使用给定日期计算前一个日期