python - NotImplementedError apache 梁 python
问题描述
我正在使用 apache 梁将 json 写入 gcs。但是遇到如下错误
NotImplementedError: offset: 0, whence: 0, position: 50547, last: 50547 [while running 'Writing new data to gcs/write data gcs/Write/WriteImpl/WriteBundles/WriteBundles']
不知道为什么会出现这个错误。相同的代码如下:
class WriteDataGCS(beam.PTransform):
"""
To write data to GCS
"""
def __init__(self, bucket):
"""
Initiate the bucket as a class field
:type bucket:string
:param bucket: query to be run for data
"""
self.bucket = bucket
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
(pcoll | "print intermediate" >> beam.Map(print_row))
return (pcoll | "write data gcs" >> beam.io.WriteToText(self.bucket, coder=JsonCoder(), file_name_suffix=".json"))
class JsonCoder:
"""
This class represents dump and load operations performed on json
"""
def encode(self,data):
"""
Encodes the json data.
:type data: string
:param data: Data to be encoded
"""
# logger.info("JSON DATA for encoding - {}".format(data))
return json.dumps(data,default=str)
def decode(self,data):
"""
Decodes the json data.
:type data: string
:param data: Data to be decoded
"""
# logger.info("JSON DATA for decoding - {}".format(data))
return json.loads(data)
解决方案
的coder
参数WriteToText
需要一个apache_beam.coders.Coder
实例。您可以尝试JsonCoder
从基Coder
类继承,但我认为您也可以使用 a 将数据转换为字符串Map
:
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
return (pcoll
| "print intermediate" >> beam.Map(print_row))
| "to_json" >> beam.Map(lambda x: json.dumps(x, default=str)))
| "write data gcs" >> beam.io.WriteToText(self.bucket, file_name_suffix=".json"))
推荐阅读
- python - 在 TD Ameritrade API 上创建触发止损单的问题
- android - 单元测试:Android Rx java
- r - 如何在 R 上从 Internet 下载文件?
- java - 处理共享库版本控制、部署和使用的最佳方式
- github - 使用 GitHub GraphQL 搜索带有 *.yml 文件的所有存储库
- filebeat - 将 Filebeat 作为 Windows 服务运行。如何查看调试信息
- kubernetes - 如果您的文件存储为 1tb,但持久卷要求仅为 20gb,会发生什么情况?
- google-bigquery - 一列中的值总和作为单独的列
- ruby-on-rails - Ruby 2.7.2 使用 2.7.0 库
- java - ImageView 固定大小位图