python - 使用 Apache Beam io.jdbc.ReadFromJdbc 时出现错误 beam:logical_type:javasdk:v1
问题描述
我正在尝试使用 Apache Beam 的 python SDK 从 postgres 表中读取数据。如文档所述,我已经安装了 Java SDK。我正在使用最新版本。我的代码如下:
import logging
import typing
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions
from past.builtins import unicode
def run(argv=None):
beam_options = PipelineOptions()
ExampleRow = typing.NamedTuple('ExampleRow', [('id', int), ('name', unicode)])
coders.registry.register_coder(ExampleRow, coders.RowCoder)
with beam.Pipeline(options=beam_options) as p:
result = (
p
| 'Read from jdbc' >> ReadFromJdbc(
table_name='jdbc_external_test_read',
driver_class_name='org.postgresql.Driver',
jdbc_url='jdbc:postgresql://localhost:5432/example',
username='postgres',
password='postgres')
)
if __name__ == '__main__':
logging.getLogger(__name__).setLevel(logging.INFO)
run()
但是当我运行它时,我得到了错误ValueError: No logical type registered for URN 'beam:logical_type:javasdk:v1'
解决方案
导致此问题的原因是该字段在其架构中VARCHAR
作为 Apache Beam逻辑类型返回。在这种情况下,逻辑类型由它们的 URN 表示beam:logical_type:javasdk:v1
。对于逻辑类型,必须为相应的 URN 注册“解码器”才能读取值。你可以这样做:
from apache_beam.typehints.schemas import LogicalType
@LogicalType.register_logical_type
class db_str(LogicalType):
@classmethod
def urn(cls):
return "beam:logical_type:javasdk:v1"
@classmethod
def language_type(cls):
return str
def to_language_type(self, value):
return str(value)
def to_representation_type(self, value):
return str(value)
这必须在运行管道之前完成,因此逻辑类型将被识别为字符串并转换为字符串。
推荐阅读
- yii2 - Yii2 - 按 IP 限制页面
- hadoop - Hive:如何获取集群的名称
- python - 发布并保存 Django 变量,不起作用
- module - 如何在 gem5 中运行 MathExprPowerModel?
- powershell - 如何使用powershell删除所有oracle java
- user-interface - JavaFx:创建文本输入框
- c++ - 在堆上实现二维数组的最佳方法
- python - 如何在 Python 中用变量值而不是 0 填充矩阵
- flutter - 向setstate发送参数?
- java - Gradle 5 和 Java 11 构建错误:为什么 Gradle 认为我正在使用 Java 10?