首页 > 解决方案 > Azure HD Insight 是否支持自动加载程序进行新文件检测?

问题描述

我指的是以下链接 https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader ,它使用火花流处理 azure databricks 中的增量文件。我想知道带有Data Lake stroage Gen2 的HD Insight 集群是否支持增量文件。我尝试了高清洞察火花集群中的示例我收到以下错误

示例代码:

input_df = spark.readStream \
            .format("cloudFiles") \
            .option("cloudFiles.format","json") \
            .option("cloudFiles.connectionString", connection_string) \
            .option("cloudFiles.resourceGroup", resource_group) \
            .option("cloudFiles.subscriptionId", subscription_id) \
            .option("cloudFiles.tenantId", tenant_id) \
            .option("cloudFiles.clientId", client_id) \
            .option("cloudFiles.clientSecret", client_secret) \
            .option("cloudFiles.includeExistingFiles", "true") \
            .schema(schema) \
            .load(input_folder) 

错误

  Traceback (most recent call last):
  File "<stdin>", line 12, in <module>
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/streaming.py", line 398, in load
    return self._df(self._jreader.load(path))
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.load.
: java.lang.ClassNotFoundException: Failed to find data source: cloudFiles. Please find packages at http://spark.apache.org/third-party-projects.html
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: cloudFiles.DefaultSource
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
        at scala.util.Try.orElse(Try.scala:84)
        at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)

.

标签: azure-data-lakeazure-databricksazure-hdinsightazure-data-lake-gen2

解决方案


不幸的是,Azure HDInsight 不支持自动加载程序来检测新文件。

什么是自动装载机?

Autoloader – Databricks的新功能允许从各种数据源将数据增量地摄取到 Delta Lake。Auto Loader 是针对 Apache Spark 的优化云文件源,可在新数据到达时从云存储中连续高效地加载数据。合作伙伴集成的数据摄取网络允许您将来自数百个数据源的数据直接摄取到 Delta Lake。

在后台(在 Azure Databricks 中),运行 Auto Loader 将自动设置Azure 事件网格队列存储服务。通过这些服务,自动加载器使用 Azure 存储中的队列轻松找到新文件,将它们传递给 Spark,从而在流式处理或批处理作业中以低延迟和低成本加载数据。Auto Loader 记录处理了哪些文件,这保证了对传入数据的一次性处理。

Auto Loader 在新数据文件到达云存储时以增量方式高效地处理它们,无需任何额外设置。Auto Loader 提供了一个名为 cloudFiles 的新结构化流式传输源。给定云文件存储上的输入目录路径,cloudFiles 源会在新文件到达时自动处理它们,并可选择处理该目录中的现有文件。

在此处输入图像描述

有关详细信息,请参阅使用自动加载程序从 Azure Blob 存储或 Azure Data Lake Storage Gen2 加载文件


推荐阅读