首页 > 解决方案 > 无法运行 s3 sink 连接器以在 Minio 上持久保存 kafka 数据

问题描述

我有一个使用 minikube 运行的 Kubernetes 集群,集群内部运行一个 kafka pod、一个 zookeeper pod 和 minio pod,每个人都使用它服务。一切看起来工作正常。我在 kafka 上生成了一个名为 minio-topic 的 minio 主题,并且 minio 有一个名为 kafka-bucket 的存储桶,我尝试使用以下属性运行 s3-sink-connector:

name=s3-sink
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=minio_topic
s3.region=us-east-1
s3.bucket.name=kafka-bucket
s3.part.size=5242880
flush.size=3
store.url=http://l27.0.0.1:9000/
storage.class=io.confluent.connect.s3.storage.S3Storage
#format.class=io.confluent.connect.s3.format.avro.AvroFormat
schema.generator.class=io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator
format.class=io.confluent.connect.s3.format.json.JsonFormat
partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner

schema.compatibility=NONE

运行连接器后,我收到此错误

[2020-05-06 18:00:46,238] ERROR WorkerSinkTask{id=s3-sink-0} Task threw an uncaught and unrecoverable 
exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: java.lang.reflect.InvocationTargetException
    at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:55)
    at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:99)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at io.confluent.connect.storage.StorageFactory.createStorage(StorageFactory.java:50)
    ... 10 more
Caused by: java.lang.IllegalArgumentException: hostname cannot be null
    at com.amazonaws.util.AwsHostNameUtils.parseRegion(AwsHostNameUtils.java:79)
    at com.amazonaws.util.AwsHostNameUtils.parseRegionName(AwsHostNameUtils.java:59)
    at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:277)
    at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:229)
    at com.amazonaws.services.s3.AmazonS3Client.setEndpoint(AmazonS3Client.java:688)
    at com.amazonaws.client.builder.AwsClientBuilder.setRegion(AwsClientBuilder.java:362)
    at 
com.amazonaws.client.builder.AwsClientBuilder.configureMutableProperties(AwsClientBuilder.java:337)
    at com.amazonaws.client.builder.AwsSyncClientBuilder.build(AwsSyncClientBuilder.java:38)
    at io.confluent.connect.s3.storage.S3Storage.newS3Client(S3Storage.java:96)
    at io.confluent.connect.s3.storage.S3Storage.<init>(S3Storage.java:65)
    ... 15 more

凭据在 .aws/credentials 中得到了很好的定义,有谁知道配置中可能有什么错误?

标签: amazon-s3apache-kafkaapache-kafka-connectminio

解决方案


hostname cannot be null at com.amazonaws.util.AwsHostNameUtils.parseRegion

我建议您阅读Minio 博客上的store.url正确设置以及验证您的 Minio 集群认为它在哪个区域运行。


推荐阅读