apache-spark - 如何在pyspark中使用来自java代码的字节串处理流式RDD输出
问题描述
我有一个流管道,其中嵌套的 json 格式数据被馈送到 Cloud PubSub。然后使用 Spark Streaming
将数据作为流获取。当打印流(rdd)数据以检查模式给出时_corrupted_record : String
。
上面的自定义接收器代码是用 Java 编写的。然后使用 Pyspark 处理输出 RDD id。
在打印 rdd 时发现它以 b'(字节字符串格式)为前缀。
(b'{"_id": {"_data": "abv1"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1622466457, "i": 393}}, "fullDocument": {"_id": "nhmm7779", "uid": "5FZ", "cb": {"key1": "jk8", "user_id": "jk-3"}, "cmp": {"c": "Organic"}, "ts": 1622466405939.0, "cd": {"$date": 1622466457352}, "d": "2021:5:31", "sg": {"key5": "hj", "key6": "TV Menu", "key7": "NA"}, "s": 0, "dur": 0}, "ns": {"db": "database", "coll": "abnv55666"}, "documentKey": {"_id": "nhmm7779"}}',
b'{"_id": {"_data": "bhg4"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1622466480, "i": 542}}, "fullDocument": {"_id": "nm1", "uid": "qS", "cb": {"key1": "jku", "user_id": "ty6"}, "cmp": {"c": "Organic"}, "ts": 1622466434999.0, "cd": {"$date": 1622466480445}, "d": "2021:5:31", "sg": {"key5": "bn", "key6": 0, "key7": "val2"}, "s": 0, "dur": 0}, "ns": {"db": "database", "coll": "bvgh678"}, "documentKey": {"_id": "nm1"}}')
如何在 pyspark 中处理此 rdd,其中记录可以保存为 2 个不同的行而不是 1 个。
我尝试过的事情:
进一步调试发现,从 Pyspark 调用 java 代码时,这是更多的序列化问题。因此改变了问题的主题。
解决方案
推荐阅读
- sql-server - 多用途存储过程:如何知道哪个语句是稳定的?
- python - Delphi Datasnap RESTful API 方法适用于浏览器,但不适用于 python 中的 POST
- entity-framework - EntityFramework:使用视图扩展表
- javascript - 如何将类导入其他文件
- nginx - NGINX - 在标准错误中发送的 FastCGI:“主要脚本未知”,同时从上游读取响应标头
- docker - 重新启动容器时将数据保存在带有卷的 postgres docker 上
- sql - 使用 CASE 条件返回 NULL 值的 SQL 语句
- visual-studio - MSTest 忽略环境测试以避免在 TeamCity 中运行
- node.js - 如何正确将 nginx 超时设置为代理?
- c# - WPF:关闭窗口无法正常工作