apache-spark - 如何使用结构化流优化 Kafka 主题的分区策略以供消费?
问题描述
我对 kafka 非常陌生,并尝试将数据写入主题并从同一主题中读取(我们现在充当源团队来摄取数据。因此,我们正在执行写入到 Kafk 主题的操作和从同一主题)。我在 spark-shell 上编写了以下代码,以将数据写入 Kafka 主题。
pyspark --packages io.delta:delta-core_2.11:0.6.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,io.strimzi:kafka-oauth-client:0.5.0
from pyspark.sql.functions import col
from pyspark.sql.functions import from_json
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, TimestampType, DateType
tn = "topic_name"
kafka_broker = "brokerurl:9500"
endpoint_uri = "endpoint_uri"
client_id = "clientid"
client_secret = "secret_key"
jaas_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
oauth_client = " oauth.client.id='{0}'".format(client_id)
oauth_secret = " oauth.client.secret='{0}'".format(client_secret)
oauth_token_endpoint_uri = " oauth.token.endpoint.uri='{0}'".format(endpoint_uri)
oauth_config = jaas_config + oauth_client + oauth_secret + oauth_token_endpoint_uri + " oauth.max.token.expiry.seconds='30000' ;"
df = spark.sql("select * from dbname.tablename where geography in ('ASIA', 'LATIN_AMERICA') and geo_year in (2020, 2021)").select(F.to_json(F.struct(F.col("*"))).alias("value"))
# WRITE TO TOPIC
df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", tn)\
.save()
后来我才知道,一个 Kafka 主题可以包含分区中的数据。所以我删除并重新创建了相同的主题,但这次有 3 个分区。
我所有的火花经验都是在批处理中,即使在我们使用读取表或文件时分区数据的地方也存在分区的概念
df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name')
.load()
批处理中使用的这个分区列通常是一个具有高基数的列,我们还可以指定我们想要将数据拆分为使用的分区数
df = spark.read.format('jdbc').option('', '')
...
...
.option('partitionColumn', 'partitionColumn_name').option('numPartitions', INTEGER_VALUE_OF_PARTITIONS)
.load()
我在普通的 Kafka 代码中看到了自定义分区器类,但我使用的是 spark-streaming,甚至不确定如何集成它。我对 Kafka 主题分区的困惑在于以下几点:
- 如何选择每个主题的分区数?我正在用 Kafka 实现火花流。
- 有没有办法可以使用 Spark 流来管理分区数据?
- 如果没有,有没有办法可以确保主题分区中数据的均匀分布。
我已经浏览了这个官方文档。
但是在那里找不到有关分区策略的任何信息。谁能告诉我如何将数据写入主题的特定分区,或者将其留给 Kafka 更好。
编辑1:我刚刚浏览了这个链接,并且提到了一个公式来计算基于吞吐量需要的分区数。这是我们可以遵循的方法来确定每个主题的分区数吗?
任何澄清对我来说都很有价值。
解决方案
这是一个相当广泛的主题,其中的问题需要一些彻底的答案。总之,最重要的是:
- 一般来说,Kafka 会随着主题中的分区数量而扩展
- Spark 随工作节点数量和可用内核/插槽的数量而扩展
- Kafka 主题的每个分区只能由单个 Spark 任务使用(parallelsim 然后取决于 Spark wcore 的数量)
- 如果您有多个 Spark 工作人员但只有一个 Kafka 主题分区,则只有一个核心可以使用数据
- 同样,如果您有多个 Kafka 主题分区但只有一个具有单核的工作节点,则“并行度”为 1
- 请记住,公式通常代表一种理论,为了简单起见而忽略了细节。您引用的公式是一个很好的起点,但最终它取决于您的环境,例如:延迟或吞吐量要求、网络带宽/流量、可用硬件、成本等。话虽如此,只有您可以进行优化测试.
作为旁注,当从 Spark Structured Streaming 写入 Kafka 时,如果您的 Dataframe 包含“partition”列,它将用于将记录发送到相应的分区(从 0 开始)。您还可以在数据框中包含“主题”列,它允许您将记录发送到某个主题。
Spark Structured Streaming 会将每条记录单独发送到 Kafka。