首页 > 解决方案 > 如何在pyspark中使用来自java代码的字节串处理流式RDD输出

问题描述

我有一个流管道,其中嵌套的 json 格式数据被馈送到 Cloud PubSub。然后使用 Spark Streaming
将数据作为流获取。当打印流(rdd)数据以检查模式给出时_corrupted_record : String

上面的自定义接收器代码是用 Java 编写的。然后使用 Pyspark 处理输出 RDD id。

在打印 rdd 时发现它以 b'(字节字符串格式)为前缀。

(b'{"_id": {"_data": "abv1"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1622466457, "i": 393}}, "fullDocument": {"_id": "nhmm7779", "uid": "5FZ", "cb": {"key1": "jk8", "user_id": "jk-3"}, "cmp": {"c": "Organic"}, "ts": 1622466405939.0, "cd": {"$date": 1622466457352}, "d": "2021:5:31",  "sg": {"key5": "hj", "key6": "TV Menu", "key7": "NA"}, "s": 0, "dur": 0}, "ns": {"db": "database", "coll": "abnv55666"}, "documentKey": {"_id": "nhmm7779"}}', 
b'{"_id": {"_data": "bhg4"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1622466480, "i": 542}}, "fullDocument": {"_id": "nm1", "uid": "qS", "cb": {"key1": "jku", "user_id": "ty6"}, "cmp": {"c": "Organic"}, "ts": 1622466434999.0, "cd": {"$date": 1622466480445}, "d": "2021:5:31", "sg": {"key5": "bn", "key6": 0, "key7": "val2"}, "s": 0, "dur": 0}, "ns": {"db": "database", "coll": "bvgh678"}, "documentKey": {"_id": "nm1"}}')

如何在 pyspark 中处理此 rdd,其中记录可以保存为 2 个不同的行而不是 1 个。

我尝试过的事情:

进一步调试发现,从 Pyspark 调用 java 代码时,这是更多的序列化问题。因此改变了问题的主题。

标签: apache-sparkpysparkapache-spark-2.0

解决方案


推荐阅读