首页 > 解决方案 > 带有多个 .jar 的 SparkSubmitOperator

问题描述

我正在尝试编写一个从 Azure Blob 存储读取 .tsv 文件并将数据写入 MySQL 数据库的数据管道。我有一个传感器,它在我的存储容器中查找具有给定前缀的文件,然后是一个 SparkSubmitOperator,它实际读取数据并将其写入数据库。

传感器工作正常,当我将数据从本地存储写入 MySQL 时,它也工作正常。但是,我在从 Blob 存储中读取数据时遇到了很多麻烦。

这是我正在尝试运行的简单 Spark 作业,

spark = (SparkSession
    .builder \
    .config("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
    .config("fs.azure.account.key.{}.blob.core.windows.net".format(blob_account_name), blob_account_key) \
    .getOrCreate()
)
sc = spark.sparkContext
sc.setLogLevel("WARN")

df_tsv = spark.read.csv("wasb://{}@{}.blob.core.windows.net/{}".format(blob_container, blob_account_name, blob_name), sep=r'\t', header=True)

mysql_url = 'jdbc:mysql://' + mysql_server
df_tsv.write.jdbc(url=mysql_url, table=mysql_table, mode="append", properties={"user":mysql_user, "password": mysql_password, "driver: "com.mysql.cj.jdbc.Driver" })

这是我的 SparkSubmitOperator,

spark_job = SparkSubmitOperator(
    task_id="my-spark-app",
    application="path/to/my/spark/job.py", # Spark application path created in airflow and spark cluster
    name="my-spark-app",
    conn_id="spark_default",
    verbose=1,
    conf={"spark.master":spark_master},
    application_args=[tsv_file, mysql_server, mysql_user, mysql_password, mysql_table],
    jars=azure_hadoop_jar + ", " + mysql_driver_jar,
    driver_class_path=azure_hadoop_jar + ", " + mysql_driver_jar,
    dag=dag)

我不断收到此错误,

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.azure.NativeAzureFileSystem not found

我到底做错了什么?

我在我的应用程序中mysql-connector-java-8.0.27.jar都有。hadoop-azure-3.3.1.jar我已经在driver_class_pathjars参数中给出了这些路径。我在这里的做法有问题吗?

我尝试按照此处给出的建议将Pyspark Dataframe 保存到 Azure 存储,但它们没有帮助。

标签: apache-sparkpysparkairflowspark-submit

解决方案


推荐阅读