scala - 数据未通过 Avro 序列化发送到 Kafka
问题描述
我正在尝试使用 Avro 将数据发送到 Kafka,但“消息”仍然为空。我尝试手动将模式添加到 Kafka,但没有结果。我在应用程序的日志、模式注册表日志或 Kafka 的日志中看不到任何错误。你能告诉我我做错了什么吗?
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
import com.sksamuel.avro4s.{ AvroSchema, Record, RecordFormat, SchemaFor }
case class User(name: String, age: Int)
val connectConfig: Map[String, Object] =
Map(
"bootstrap.servers" -> "localhost:9092",
"client.id" -> s"clientID${UUID.randomUUID().toString}",
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"schema.registry.url" -> "localhost:8081"
)
implicit val producer = new KafkaProducer[String, Record](connectConfig.asJava)
def sendAvro(msg: DIL): Future[RecordMetadata] = {
val newUser = User("Joe", 42)
val schema = AvroSchema[User]
implicit val schemaFor = SchemaFor[User]
val format = RecordFormat[User]
val record = new ProducerRecord[String, Record]("kafkaLogTopic", format.to(newUser))
producer.send(record)
}
解决方案
Kafka 批量发送数据,你的单条记录小于默认的批量大小。
使用.send(data).get()
或producer.flush()
立即等待要发送的一条记录。
否则,您应该定义一个线程scala.sys.ShutdownHookThread
来刷新生产者
推荐阅读
- azure - 如何正确配置 Azure 应用程序网关重写 URL 规则?
- sql-server - 左连接后 SQL SERVER 更新或插入
- r - 如何在只有一个初始值的变异中使用滞后/领先?
- php - Jquery滚动一次加载所有行,一次只能加载5个
- firebase - kotlin 发生 Firebase StorageException
- python - 在 Python 中读取 Excel 图表
- visual-studio - 访问冲突写入位置 0x00000000 在 for 循环中间
- python - 如何从我的 views.py 函数将超过 1 张图像发送到我的模板?
- reactjs - 无法通过使用 Reactjs 单击图标来显示菜单项
- javascript - 从函数中使用 POST