首页 > 解决方案 > S3 接收器记录字段 TimeBasedPartitioner 不起作用

问题描述

我正在尝试部署 s3 接收器连接器,其中 s3 分区需要基于数据props.eventTime中的字段

以下是我的配置:

{
  "name" : "test_timeBasedPartitioner",
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "partition.duration.ms": "3600000",
  "s3.region": "us-east-1",
  "topics.dir": "dream11",
  "flush.size": "50000",
  "topics": "test_topic",
  "s3.part.size": "5242880",
  "tasks.max": "5",
  "timezone": "Etc/UTC",
  "locale": "en",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "rotate.schedule.interval.ms": "1800000",
  "path.format": "'EventDate'=YYYYMMdd",
  "s3.bucket.name": "test_bucket",
  "partition.duration.ms": "86400000",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "props.eventTime"
}

以下是我在 kafka 主题中的示例 json:

{
    "eventName": "testEvent",
    "props": {
        "screen_resolution": "1436x720",
        "userId": 0,
        "device_name": "1820",
        "eventTime": "1565792661712"
    }
}

我得到的例外是:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid format: "1564561484906" is malformed at "4906"
    at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187)
    at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826)
    at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:281)
    at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:199)
    at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:176)
    at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
    ... 10 more

这里有什么我想配置的吗?任何帮助,将不胜感激。

标签: apache-kafkaapache-kafka-connect

解决方案


您的字段props.eventTimemicrosecond 而不是 millisecond的形式出现。

这可以在堆栈跟踪中以及通过检查org.joda.time doParseMillis方法中的相关代码来识别,当 是 a,连接器分区TimeBasedPartitioner器及其时间戳提取器从消息有效负载 中RecordFieldTimestampExtractor使用该timestamp.field方法STRING

Caused by: java.lang.IllegalArgumentException: Invalid format: "1564561484906" is malformed at "4906"
    at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187)
    at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826)
    at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:281)

您可以遵循以下解决方案之一:

  1. 编写自己的 TimestampExtractor 以支持微秒。TimestampExtractor 您可以在此处查看如何编写自定义。
  2. 更改/转换您的源数据以包含以毫秒而不是微秒出现的字段
  3. 跟进讨论默认 TimestampExtractor 灵活性的一些问题,并建议/贡献以使其支持您的用例。

推荐阅读