apache-spark - 如何从 pyspark 中的 kafka 以字符串格式从 Confluent Schema Registry 获取 Avro 数据?
问题描述
我正在从火花(结构化流)中读取来自 Kafka 的数据,但是从火花中的卡夫卡获取火花的数据不是字符串格式。火花:2.3.4
卡夫卡数据格式:
{"Patient_ID":316,"Name":"Richa","MobileNo":{"long":7049123177},"BDate":{"int":740},"Gender":"female"}
这是 kafka 触发结构化流的代码:
# spark-submit --jars kafka-clients-0.10.0.1.jar --packages org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,org.apache.spark:spark-streaming-kafka-0-8-assembly_2.11:2.3.4,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 /home/kinjalpatel/kafka_sppark.py
import pyspark
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json
from pyspark.sql.functions import from_json, col, struct
from pyspark.sql.types import StructField, StructType, StringType, DoubleType
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from pyspark.sql.column import Column, _to_java_column
sc = SparkContext()
sc.setLogLevel("ERROR")
spark = SparkSession(sc)
schema_registry_client = CachedSchemaRegistryClient(
url='http://localhost:8081')
serializer = MessageSerializer(schema_registry_client)
df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "mysql-01-Patient") \
.option("partition.assignment.strategy", "range") \
.option("valueConverter", "org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter") \
.load()
df.printSchema()
mta_stream=df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)", "CAST(timestamp AS STRING)", "CAST(timestampType AS STRING)")
mta_stream.printSchema()
qry = mta_stream.writeStream.outputMode("append").format("console").start()
qry.awaitTermination()
这是我得到的输出:
+----+--------------------+----------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+----------------+---------+------+--------------------+-------------+
|null|�
Richa���...|mysql-01-Patient| 0| 160|2019-12-27 11:56:...| 0|
+----+--------------------+----------------+---------+------+--------------------+-------------+
如何获取value
字符串格式的列?
解决方案
来自Spark 文档
import org.apache.spark.sql.avro._
// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc" )))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
val output = df
.select(from_avro('value, jsonFormatSchema) as 'user)
.where("user.favorite_color == \"red\"")
.select(to_avro($"user.name") as 'value)
val query = output
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic2")
.start()
import org.apache.spark.sql.avro._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
推荐阅读
- cypress - 柏树 - 黄瓜 - X 射线 - 测试失败时停止
- lint - Spectral Linting 检查标签是否存在
- c# - 如何为 AddOutgoingGoogleTraceHandler 扩展方法创建的跟踪添加标签?
- arduino-ide - 我在使用 gsm 800l 向 firebase 发送数据时遇到问题
- r - R 不能映射到 ggplot 并同时使用 facet wrap
- wordpress - Worpress 表格定价表插件
- c++ - 非聚合类可以是 POD 类 C++
- javascript - 从上层函数中捕获 JavaScript 错误
- java - 只有 Firebase 中的一些字段被传输到我在 Android Studio 中的代码
- html - 如何从数据库中检索数据并将其显示在文本框中以供用户更新