首页 > 解决方案 > 使用 Databricks 使用来自 CloudKarafka 的消息

问题描述

正如标题所说,我需要在我的 Databricks 笔记本中使用来自 CloudKarafka(免费 Kafka 集群)中某个主题的消息。我有以下代码:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
   .appName('Simple CloudKarafka Read') \
   .getOrCreate()

df = spark \
   .readStream \
   .format('kafka') \
   .option('kafka.bootstrap.servers', 'SERVERS_COMMA_SEPARATED') \
   .option('subscribe', 'TOPIC_NAME') \
   .option('kafka.security.protocol','SASL_SSL')\
   .option('kafka.sasl.mechanisms','SCRAM-SHA-256')\
   .option('kafka.sasl.jaas.config', 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";')\
   .load()

但是当我执行这段代码时:

df.writeStream \
 .format('console') \
 .trigger(processingTime='2 seconds') \
 .start()

我收到了这个错误:java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config

你能帮助我吗?

提前致谢。

标签: apache-sparkpysparkapache-kafkadatabricksspark-kafka-integration

解决方案


推荐阅读