amazon-s3 - Alpakka s3`multipartUpload`不上传文件
问题描述
我有一个关于alpakka_kafka+alpakka_s3
集成的问题。当我使用 alpakka kafka 源时, Alpakka s3multipartUpload
似乎没有上传文件。
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
但是,只要我.take(100)
在 kafkaSource 之后添加。一切正常。
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
任何帮助将不胜感激。提前致谢!
这是完整的代码片段:
// Source
val kafkaSource: Source[(CommittableOffset, Array[Byte]), Consumer.Control] = {
Consumer
.committableSource(consumerSettings, Subscriptions.topics(prefixedTopics))
.map(committableMessage => (committableMessage.committableOffset, committableMessage.record.value))
.watchTermination() { (mat, f: Future[Done]) =>
f.foreach { _ =>
log.debug("consumer source shutdown, consumerId={}, group={}, topics={}", consumerId, group, prefixedTopics.mkString(", "))
}
mat
}
}
// Flow
val commitFlow: Flow[CommittableOffset, Done, NotUsed] = {
Flow[CommittableOffset]
.groupedWithin(batchingSize, batchingInterval)
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.mapAsync(parallelism = 3) { msg =>
log.debug("committing offset, msg={}", msg)
msg.commitScaladsl().map { result =>
log.debug("committed offset, msg={}", msg)
result
}
}
}
private val kafkaMsgToByteStringFlow = Flow[KafkaMessage[Any]].map(x => ByteString(x.msg + "\n"))
private val kafkaMsgToOffsetFlow = {
implicit val askTimeout: Timeout = Timeout(5.seconds)
Flow[KafkaMessage[Any]].mapAsync(parallelism = 5) { elem =>
Future(elem.offset)
}
}
// Sink
val s3Sink = {
val BUCKET = "test-data"
s3Client.multipartUpload(BUCKET, s"tmp/data.txt")
// Doesnt' work..... ( no files are showing up on the S3)
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
// This one works...
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
解决方案
实际上,它确实可以上传。问题是,您需要向 s3 发送完成请求以完成上传,然后您的文件将在存储桶中可用。我打赌,因为 kafka 源take(n)
永远不会停止在下游生成数据,接收器永远不会向 s3 发送完成请求,因为流程实际上从未完成,所以接收器总是希望在完成请求之前上传更多数据。
没有办法只将所有内容上传到一个文件中,所以我的建议是:将您的kafkaSource
消息分组并将压缩的 Array[Byte] 发送到接收器。诀窍是您必须为每个文件创建一个接收器,而不是只使用一个接收器。
推荐阅读
- excel - VBA 仅返回函数的唯一值
- python - 如果 2020 年日期的预订价值小于 25,则将其替换为去年 7 天的熊猫平均价值
- typescript - 为什么构造函数不能被键入为具有静态成员的可调用对象,或者它只是打字稿限制,是否有正当理由?
- python - 在 Python 上使用 GTFS 获取中转时间
- python - 使用opencv python从移动相机(无人机)拍摄的视频中提取移动对象
- pyspark - 除非在 mmlspark 中使用 lgbm 模型的数据块中使用 repartition(1),否则大预测结果
- python - 如何使用迁移的 Django 模型将“没有时区的时间戳”更改为“有时区的时间戳”?
- python - 缺少 Python 构造函数属性
- javascript - 使用 fetch 方法从 AWS S3 下载文件会引发 CORS 错误
- php - 告诉 Laravel 在 X 秒后停止尝试连接