首页 > 解决方案 > 从 kafka 主题打印 Pyspark 流数据

问题描述

我是 kafka 和 pyspark 的新手,正在尝试编写简单的程序,所以我有 2 个 JSon 格式的 kafka Topics 文件,我正在从 pyspark 流中读取这个文件。

我的生产者代码如下:

  from kafka import *
import json
import time
import boto3
import json
from Consumer_Group import *
from json import loads
class producer :
            def json_serializer(data):
                    return json.dumps(data).encode("utf-8")

            def read_s3():
                p1 = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=producer.json_serializer)
                s3 = boto3.resource('s3')
                bucket = s3.Bucket('kakfa')
                for obj in bucket.objects.all():
                    key = obj.key
                    body = obj.get()['Body'].read().decode('utf-8')
                p1.send("Uber_Eats",body)
                p1.flush()

我的消费者代码如下:

from pyspark.sql import SparkSession
from kafka import *
import time
class consumer:
                def read_from_topic(self,spark):
                        df = spark.readStream \
                            .format("kafka") \
                            .option("kafka.bootstrap.servers", "localhost:9092") \
                            .option("subscribe", "Uber_Eats") \
                             .option("startingOffsets", "earliest") \
                            .load()
                        df.createOrReplaceTempView("kafka")
                        spark.sql("select * from kafka")
                        print(df.isStreaming())
                                  


                def get_consumer(self):
                    consumer = KafkaConsumer("Uber_Eats", group_id='group1', bootstrap_servers=
                    "localhost:9092")
                    return  consumer

                def print_details(self,c1):
                    #    self.consumer=self.get_consumer(self)
                        # Read and print message from consumer
                     try:
                                for msg in c1:
                                    print(msg.topic, msg.value)
                                print("Done")
                     except Exception  as e:
                                print(e)

主类:

from Producer_Group import *
from Consumer_Group import *
from Spark_Connection import *
class client:
    def transfer(self):
        spark = connection.get_connection(self)
        producer.read_s3()
        c1 = consumer.get_consumer(spark)
        consumer.read_from_topic(self,spark)
      #  consumer.print_details(self,c1)

c=client()
c.transfer()

我正在读入 kafka 主题的 S3 中的示例数据:

{
    
        {
            "Customer Number": "1",
            "Customer Name": "Aditya",
            "Restaurant Number": "2201",
            "Restaurant NameOrdered": "Bawarchi",
            "Number of Items": "3",
            "price": "10",
            "Operating Start hours": "9:00",
            "Operating End hours": "23:00"
        },
        {
            "Customer Number": "2",
            "Customer Name": "Sarva",
            "Restaurant Number": "2202",
            "Restaurant NameOrdered": "Sarvana Bhavan",
            "Number of Items": "4",
            "price": "20",
            "Operating Start hours": "8:00",
            "Operating End hours": "20:00"
        },
        {
            "Customer Number": "3",
            "Customer Name": "Kala",
            "Restaurant Number": "2203",
            "Restaurant NameOrdered": "Taco Bell",
            "Number of Items": "5",
            "price": "30",
            "Operating Start hours": "11:00",
            "Operating End hours": "21:00"
        }
    
}

到目前为止我尝试了什么::我尝试在控制台上打印以检查条件,如果它通过了,则将其插入数据库。为了检查条件,我正在从“read_from_topic”函数读取数据并创建一个视图(createOrReplaceTempView)来查看数据,但没有打印,有人可以指导我如何打印并验证我的条件或数据是否被正确读取?

提前致谢 !!!!

标签: pythonapache-sparkpysparkapache-kafkaspark-structured-streaming

解决方案


创建视图 (createOrReplaceTempView) 以查看数据,但没有打印任何内容

因为spark.sql返回一个新的数据框。

如果你想打印它,那么你需要

spark.sql("select * from kafka").show()

但是,仅此一项就至少是两个字节数组列,而不是 JSON 字符串,因此您需要在某个时候定义一个模式以提取任何内容或CAST至少具有人类可读的数据

还值得指出的是,您显示的数据不是有效的 JSON,并且boto3不是必需的,因为 Spark 可以从 S3 本身读取文件(因此并不严格需要 Kafka,因为您可以将 S3 数据直接带到您的最终位置,中间有一个 Sparkpersist()函数)


推荐阅读