首页 > 解决方案 > 使用 Apache Beam io.jdbc.ReadFromJdbc 时出现错误 beam:logical_type:javasdk:v1

问题描述

我正在尝试使用 Apache Beam 的 python SDK 从 postgres 表中读取数据。如文档所述,我已经安装了 Java SDK。我正在使用最新版本。我的代码如下:

import logging
import typing

import apache_beam as beam
from apache_beam import coders
from apache_beam.io.jdbc import ReadFromJdbc
from apache_beam.options.pipeline_options import PipelineOptions
from past.builtins import unicode


def run(argv=None):
    beam_options = PipelineOptions()

    ExampleRow = typing.NamedTuple('ExampleRow', [('id', int), ('name', unicode)])

    coders.registry.register_coder(ExampleRow, coders.RowCoder)

    with beam.Pipeline(options=beam_options) as p:
        result = (
            p
            | 'Read from jdbc' >> ReadFromJdbc(
                                    table_name='jdbc_external_test_read',
                                    driver_class_name='org.postgresql.Driver',
                                    jdbc_url='jdbc:postgresql://localhost:5432/example',
                                    username='postgres',
                                    password='postgres')
        )


if __name__ == '__main__':
    logging.getLogger(__name__).setLevel(logging.INFO)
    run()

但是当我运行它时,我得到了错误ValueError: No logical type registered for URN 'beam:logical_type:javasdk:v1'

标签: pythonjdbcapache-beam

解决方案


导致此问题的原因是该字段在其架构中VARCHAR作为 Apache Beam逻辑类型返回。在这种情况下,逻辑类型由它们的 URN 表示beam:logical_type:javasdk:v1。对于逻辑类型,必须为相应的 URN 注册“解码器”才能读取值。你可以这样做:

from apache_beam.typehints.schemas import LogicalType

@LogicalType.register_logical_type
class db_str(LogicalType):
    @classmethod
    def urn(cls):
        return "beam:logical_type:javasdk:v1"

    @classmethod
    def language_type(cls):
        return str

    def to_language_type(self, value):
        return str(value)

    def to_representation_type(self, value):
        return str(value)

这必须在运行管道之前完成,因此逻辑类型将被识别为字符串并转换为字符串。


推荐阅读