首页 > 解决方案 > 如何使用结构化流优化 Kafka 主题的分区策略以供消费?

问题描述

我对 kafka 非常陌生,并尝试将数据写入主题并从同一主题中读取(我们现在充当源团队来摄取数据。因此,我们正在执行写入到 Kafk 主题的操作和从同一主题)。我在 spark-shell 上编写了以下代码,以将数据写入 Kafka 主题。

pyspark --packages io.delta:delta-core_2.11:0.6.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,io.strimzi:kafka-oauth-client:0.5.0

from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType
tn = "topic_name"
kafka_broker = "brokerurl:9500"
endpoint_uri = "endpoint_uri"
client_id = "clientid"
client_secret = "secret_key"
jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
oauth_client = " oauth.client.id='{0}'".format(client_id)
oauth_secret = " oauth.client.secret='{0}'".format(client_secret)
oauth_token_endpoint_uri = " oauth.token.endpoint.uri='{0}'".format(endpoint_uri)
oauth_config = jaas_config + oauth_client + oauth_secret + oauth_token_endpoint_uri + " oauth.max.token.expiry.seconds='30000' ;"

df = spark.sql("select * from dbname.tablename where geography in ('ASIA', 'LATIN_AMERICA') and geo_year in (2020, 2021)").select(F.to_json(F.struct(F.col("*"))).alias("value"))


# WRITE TO TOPIC
df.write.format("kafka")\
        .option("kafka.bootstrap.servers", kafka_broker)\
        .option("kafka.batch.size", 51200)\
        .option("retries", 3)\
        .option("kafka.max.request.size", 500000)\
        .option("kafka.max.block.ms", 120000)\
        .option("kafka.metadata.max.age.ms", 120000)\
        .option("kafka.request.timeout.ms", 120000)\
        .option("kafka.linger.ms", 0)\
        .option("kafka.delivery.timeout.ms", 130000)\
        .option("acks", "1")\
        .option("kafka.compression.type", "snappy")\
        .option("kafka.security.protocol", "SASL_SSL")\
        .option("kafka.sasl.jaas.config", oauth_config)\
        .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
        .option("kafka.sasl.mechanism", "OAUTHBEARER")\
        .option("topic", tn)\
        .save()

后来我才知道,一个 Kafka 主题可以包含分区中的数据。所以我删除并重新创建了相同的主题,但这次有 3 个分区。

我所有的火花经验都是在批处理中,即使在我们使用读取表或文件时分区数据的地方也存在分区的概念

df = spark.read.format('jdbc').option('', '')
               ...
               ...
               .option('partitionColumn', 'partitionColumn_name')
               .load()

批处理中使用的这个分区列通常是一个具有高基数的列,我们还可以指定我们想要将数据拆分为使用的分区数

df = spark.read.format('jdbc').option('', '')
               ...
               ...
               .option('partitionColumn', 'partitionColumn_name').option('numPartitions', INTEGER_VALUE_OF_PARTITIONS)
               .load()

我在普通的 Kafka 代码中看到了自定义分区器类,但我使用的是 spark-streaming,甚至不确定如何集成它。我对 Kafka 主题分区的困惑在于以下几点:

  1. 如何选择每个主题的分区数?我正在用 Kafka 实现火花流。
  2. 有没有办法可以使用 Spark 流来管理分区数据?
  3. 如果没有,有没有办法可以确保主题分区中数据的均匀分布。

我已经浏览了这个官方文档。

但是在那里找不到有关分区策略的任何信息。谁能告诉我如何将数据写入主题的特定分区,或者将其留给 Kafka 更好。

编辑1:我刚刚浏览了这个链接,并且提到了一个公式来计算基于吞吐量需要的分区数。这是我们可以遵循的方法来确定每个主题的分区数吗?

任何澄清对我来说都很有价值。

标签: apache-sparkpysparkapache-kafkaspark-structured-streaming

解决方案


这是一个相当广泛的主题,其中的问题需要一些彻底的答案。总之,最重要的是:

  • 一般来说,Kafka 会随着主题中的分区数量而扩展
  • Spark 随工作节点数量和可用内核/插槽的数量而扩展
  • Kafka 主题的每个分区只能由单个 Spark 任务使用(parallelsim 然后取决于 Spark wcore 的数量)
  • 如果您有多个 Spark 工作人员但只有一个 Kafka 主题分区,则只有一个核心可以使用数据
  • 同样,如果您有多个 Kafka 主题分区但只有一个具有单核的工作节点,则“并行度”为 1
  • 请记住,公式通常代表一种理论,为了简单起见而忽略了细节。您引用的公式是一个很好的起点,但最终它取决于您的环境,例如:延迟或吞吐量要求、网络带宽/流量、可用硬件、成本等。话虽如此,只有您可以进行优化测试.

作为旁注,当从 Spark Structured Streaming 写入 Kafka 时,如果您的 Dataframe 包含“partition”列,它将用于将记录发送到相应的分区(从 0 开始)。您还可以在数据框中包含“主题”列,它允许您将记录发送到某个主题。

Spark Structured Streaming 会将每条记录单独发送到 Kafka。


推荐阅读