首页 > 解决方案 > Pyspark 从 kafkaDirectStream 到 Postgresql

问题描述

我正在尝试使用 pyspark 从 kafka 流中插入数据

我的代码如下

    cols = ['id','name']
    topic = "testing_topic"
    # spark context init
    para_seconds = 10
    sc = SparkContext(appName="PythonSparkStreamingKafka_Testing_01")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc, para_seconds)
    # receiver in kafka
    brokers = 'localhost:9092'
    topic = "testing_topic"
    # get streaming data from kafka
    kafkaStream = KafkaUtils.createDirectStream(ssc, [topic],{'bootstrap.servers':brokers})
    lines = kafkaStream.map(lambda x: json.loads(x[1])).pprint()

给定数据

{“id”:{“测试”:“20210623190332610”},“名称”:{“测试”:“测试名称”}}

我尝试使用此代码

x = lines.take(1)

获取给 sparkContext 的数据,但它返回错误。

是否有任何指南可以将数据收入直接转换为字符串类型的数据框?谢谢

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


我将首先在https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html开始阅读 spark 文档 ,它为您正在做的大部分事情提供了非常详细的示例。

此外,看看下面的代码是否有帮助。

    from pyspark.sql.types import ArrayType, StringType, StructType, StructField
    import json
    from pyspark.sql.functions as F
    #{ "id": { "testing": "20210623190332610" }, "name": { "testing": "testname" } }
    data_spark_schema = StructType([
            StructField("id",StructType([
            StructField("testing", StringType(), True),
            StructField("name",StructType([
            StructField("testing", StringType(), True)]),True]),True])

    streaming_df = (
            spark.readStream
                .format("kafka")
                .option("kafka.group.id", "GROUP_ID")
                .option("kafka.bootstrap.servers", "BOOTSTRAP")
                .option("subscribe", "TOPIC")
                .load()
        ).selectExpr("CAST(value AS STRING) as event_value").select(F.from_json("event_value",data_spark_schema).alias("value"))
    print(streaming_df.schema)

    def write_to_postgres(df, epoch_id):
        mode="append"
        url = "jdbc:postgresql://url:5432/database"
        properties = {"user": "user", "password": "password", "driver": "org.postgresql.Driver"}
        df.write.jdbc(url=url, table="schema.table_name", mode=mode, properties=properties)

    streaming_df.writeStream \
    .foreachBatch(write_to_postgres) \
    .option("checkpointLocation", '/checkpoint_path') \
    .outputMode('update') \
    .start()

推荐阅读