python - 从 kafka 主题打印 Pyspark 流数据
问题描述
我是 kafka 和 pyspark 的新手,正在尝试编写简单的程序,所以我有 2 个 JSon 格式的 kafka Topics 文件,我正在从 pyspark 流中读取这个文件。
我的生产者代码如下:
from kafka import *
import json
import time
import boto3
import json
from Consumer_Group import *
from json import loads
class producer :
def json_serializer(data):
return json.dumps(data).encode("utf-8")
def read_s3():
p1 = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=producer.json_serializer)
s3 = boto3.resource('s3')
bucket = s3.Bucket('kakfa')
for obj in bucket.objects.all():
key = obj.key
body = obj.get()['Body'].read().decode('utf-8')
p1.send("Uber_Eats",body)
p1.flush()
我的消费者代码如下:
from pyspark.sql import SparkSession
from kafka import *
import time
class consumer:
def read_from_topic(self,spark):
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Uber_Eats") \
.option("startingOffsets", "earliest") \
.load()
df.createOrReplaceTempView("kafka")
spark.sql("select * from kafka")
print(df.isStreaming())
def get_consumer(self):
consumer = KafkaConsumer("Uber_Eats", group_id='group1', bootstrap_servers=
"localhost:9092")
return consumer
def print_details(self,c1):
# self.consumer=self.get_consumer(self)
# Read and print message from consumer
try:
for msg in c1:
print(msg.topic, msg.value)
print("Done")
except Exception as e:
print(e)
主类:
from Producer_Group import *
from Consumer_Group import *
from Spark_Connection import *
class client:
def transfer(self):
spark = connection.get_connection(self)
producer.read_s3()
c1 = consumer.get_consumer(spark)
consumer.read_from_topic(self,spark)
# consumer.print_details(self,c1)
c=client()
c.transfer()
我正在读入 kafka 主题的 S3 中的示例数据:
{
{
"Customer Number": "1",
"Customer Name": "Aditya",
"Restaurant Number": "2201",
"Restaurant NameOrdered": "Bawarchi",
"Number of Items": "3",
"price": "10",
"Operating Start hours": "9:00",
"Operating End hours": "23:00"
},
{
"Customer Number": "2",
"Customer Name": "Sarva",
"Restaurant Number": "2202",
"Restaurant NameOrdered": "Sarvana Bhavan",
"Number of Items": "4",
"price": "20",
"Operating Start hours": "8:00",
"Operating End hours": "20:00"
},
{
"Customer Number": "3",
"Customer Name": "Kala",
"Restaurant Number": "2203",
"Restaurant NameOrdered": "Taco Bell",
"Number of Items": "5",
"price": "30",
"Operating Start hours": "11:00",
"Operating End hours": "21:00"
}
}
到目前为止我尝试了什么::我尝试在控制台上打印以检查条件,如果它通过了,则将其插入数据库。为了检查条件,我正在从“read_from_topic”函数读取数据并创建一个视图(createOrReplaceTempView)来查看数据,但没有打印,有人可以指导我如何打印并验证我的条件或数据是否被正确读取?
提前致谢 !!!!
解决方案
创建视图 (createOrReplaceTempView) 以查看数据,但没有打印任何内容
因为spark.sql
返回一个新的数据框。
如果你想打印它,那么你需要
spark.sql("select * from kafka").show()
但是,仅此一项就至少是两个字节数组列,而不是 JSON 字符串,因此您需要在某个时候定义一个模式以提取任何内容或CAST
至少具有人类可读的数据
还值得指出的是,您显示的数据不是有效的 JSON,并且boto3
不是必需的,因为 Spark 可以从 S3 本身读取文件(因此并不严格需要 Kafka,因为您可以将 S3 数据直接带到您的最终位置,中间有一个 Sparkpersist()
函数)
推荐阅读
- python - 使用Angular调用python函数?
- android - 直接创建AndroidViewModel
- css - SASS 语法错误:应为数字。得到:计算()
- c# - 无法建立 SQL 客户端连接 tp SQL Server
- azure - 通过 ARM 模板创建启用 Azure 服务总线会话的订阅
- algorithm - 计算介数中心性的简单易懂的算法
- javascript - Immutable.js Map:从值中查找键
- angular - 尝试获取值时获取 [Object Object]
- c# - 如何使用引导类在 asp.net mvc 中创建 checkboxFor 列表?
- angular - 无法在 Angular 9 中绑定 ngmodel