首页 > 解决方案 > 开发和测试 Python 代码以连接本地机器上的 kafka 流

问题描述

我是本地机器上的 Python 新手。到目前为止,我可以在 Azure Databricks 中进行编码。我想创建和部署连接到 confluent kafka 并将数据保存到增量表的库。我很困惑 - 1] 我是否需要使用 python 从我的本地计算机连接到 Databricks Delta 以将流存储到 delta 或通过如下设置将流存储到本地 delta(我能够创建 delta 表)

spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

并将 lib 部署到 databricks 中,当它运行时它将指向 Databricks Delta

我也想使用 dbfs 文件存储连接到 kafka

.option("kafka.ssl.truststore.location", "/dbfs/FileStore/tables/test.jks") \

我是新手,请分享有关如何在 Python 中创建流式应用程序的详细信息?以及如何部署到 Databricks?

标签: pythonapache-kafkadatabricksspark-structured-streamingazure-databricks

解决方案


要在没有笔记本的 Databricks 上执行 Python 代码,您需要配置一个作业。正如 OneCricketeer 所提到的,这egg是库的文件格式,您需要有一个 Python 文件作为作业的入口点——它将初始化 Spark 会话,然后调用您的库。

可以配置作业(您还需要上传库):

  1. 通过 UI,但仅限于配置笔记本和 jar,而不是 Python 代码。但是您仍然可以使用该spark-submit选项运行 Python 代码。
  2. 通过REST API - 使用它,您可以创建一个直接执行 Python 代码的作业
  3. 通过命令行(在后台使用 REST API),您需要自己创建 JSON,方法与 REST API 相同。
  4. 通过Databricks Terraform Provider - 它也使用 REST API,但可以更轻松地在一个地方配置所有内容 - 上传库、上传文件到 DBFS、创建/修改作业。

在 Databricks 上,Delta 已预先安装,因此您无需设置选项、指定 maven 坐标和其他所有内容,因此您的初始化代码将是:

spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .getOrCreate()

推荐阅读