apache-kafka - S3 接收器记录字段 TimeBasedPartitioner 不起作用
问题描述
我正在尝试部署 s3 接收器连接器,其中 s3 分区需要基于数据props.eventTime中的字段
以下是我的配置:
{
"name" : "test_timeBasedPartitioner",
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"partition.duration.ms": "3600000",
"s3.region": "us-east-1",
"topics.dir": "dream11",
"flush.size": "50000",
"topics": "test_topic",
"s3.part.size": "5242880",
"tasks.max": "5",
"timezone": "Etc/UTC",
"locale": "en",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"rotate.schedule.interval.ms": "1800000",
"path.format": "'EventDate'=YYYYMMdd",
"s3.bucket.name": "test_bucket",
"partition.duration.ms": "86400000",
"timestamp.extractor": "RecordField",
"timestamp.field": "props.eventTime"
}
以下是我在 kafka 主题中的示例 json:
{
"eventName": "testEvent",
"props": {
"screen_resolution": "1436x720",
"userId": 0,
"device_name": "1820",
"eventTime": "1565792661712"
}
}
我得到的例外是:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:302)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Invalid format: "1564561484906" is malformed at "4906"
at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187)
at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826)
at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:281)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:199)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:176)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:195)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
... 10 more
这里有什么我想配置的吗?任何帮助,将不胜感激。
解决方案
您的字段props.eventTime
以microsecond 而不是 millisecond的形式出现。
这可以在堆栈跟踪中以及通过检查org.joda.time doParseMillis
方法中的相关代码来识别,当 是 a时,连接器分区TimeBasedPartitioner
器及其时间戳提取器从消息有效负载 中RecordFieldTimestampExtractor
使用该timestamp.field
方法STRING
:
Caused by: java.lang.IllegalArgumentException: Invalid format: "1564561484906" is malformed at "4906"
at org.joda.time.format.DateTimeParserBucket.doParseMillis(DateTimeParserBucket.java:187)
at org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:826)
at io.confluent.connect.storage.partitioner.TimeBasedPartitioner$RecordFieldTimestampExtractor.extract(TimeBasedPartitioner.java:281)
您可以遵循以下解决方案之一:
推荐阅读
- reactjs - 状态更改后组件未更新
- javascript - Ant Design - 过滤后如何获取表格行数?
- android - 如何在不知道列表总大小的情况下使用 PagedList 占位符
- php - 为什么我的 PHP/HTML 表单没有向我发送数据
- java - 将字符串解析为日期时间不会返回正确的时间
- android - 在我的应用程序中获取权限时获取空数组
- python - 如何在熊猫中选择可变数量的列匹配的行?
- java - 如何在不连接到电子邮件服务器的情况下创建空 IMAPMessage?
- c++ - 使用存储在 std::any 中的 std::map ,然后通过 std::any_cast 访问
- c# - Unity C#循环一些文本而不重复它们