首页 > 解决方案 > Spark 作业读取数据框中已排序的 AVRO 文件,但无序写入 kafka

问题描述

我有按 ID 排序的 AVRO 文件,每个 ID 都有一个名为“ID=234”的文件夹,文件夹内的数据是 AVRO 格式并根据日期排序。我正在运行 spark 作业,它采用输入路径并在数据帧中读取 avro。然后,此数据帧以 5 个分区写入 kafka 主题。

val properties: Properties = getProperties(args)


val spark = SparkSession.builder().master(properties.getProperty("master"))
  .appName(properties.getProperty("appName")).getOrCreate()
val sqlContext = spark.sqlContext

val sourcePath = properties.getProperty("sourcePath")

val dataDF = sqlContext.read.avro(sourcePath).as("data")
val count = dataDF.count();
val schemaRegAdd = properties.getProperty("schemaRegistry")

val schemaRegistryConfs = Map(
  SchemaManager.PARAM_SCHEMA_REGISTRY_URL          -> schemaRegAdd,
  SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME
)
val start = Instant.now

dataDF.select(functions.struct(properties.getProperty("message.key.name")).alias("key"), functions.struct("*").alias("value"))
  .toConfluentAvroWithPlainKey(properties.getProperty("topic"), properties.getProperty("schemaName"),
  properties.getProperty("schemaNamespace"))(schemaRegistryConfs)
  .write.format("kafka")
  .option("kafka.bootstrap.servers",properties.getProperty("kafka.brokers"))
  .option("topic",properties.getProperty("topic")).save()

}

我的用例是按顺序写入来自每个 ID(按日期排序)的所有消息,例如应首先添加来自一个 ID 1 的所有排序数据,然后从 ID 2 添加,依此类推。Kafka 消息具有密钥作为 ID。

标签: apache-sparkapache-kafkaavro

解决方案


不要忘记,当你进行转换时,RDD/数据集中的数据是随机的,所以你会丢失顺序。

实现这一点的最佳方法是一个一个地读取文件并将其发送到 kafka 而不是读取您的完整目录val sourcePath = properties.getProperty("sourcePath")


推荐阅读