azure-data-lake - Azure HD Insight 是否支持自动加载程序进行新文件检测?
问题描述
我指的是以下链接 https://docs.microsoft.com/en-us/azure/databricks/spark/latest/structured-streaming/auto-loader ,它使用火花流处理 azure databricks 中的增量文件。我想知道带有Data Lake stroage Gen2 的HD Insight 集群是否支持增量文件。我尝试了高清洞察火花集群中的示例我收到以下错误
示例代码:
input_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format","json") \
.option("cloudFiles.connectionString", connection_string) \
.option("cloudFiles.resourceGroup", resource_group) \
.option("cloudFiles.subscriptionId", subscription_id) \
.option("cloudFiles.tenantId", tenant_id) \
.option("cloudFiles.clientId", client_id) \
.option("cloudFiles.clientSecret", client_secret) \
.option("cloudFiles.includeExistingFiles", "true") \
.schema(schema) \
.load(input_folder)
错误
Traceback (most recent call last):
File "<stdin>", line 12, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/sql/streaming.py", line 398, in load
return self._df(self._jreader.load(path))
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o95.load.
: java.lang.ClassNotFoundException: Failed to find data source: cloudFiles. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:225)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: cloudFiles.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
.
解决方案
不幸的是,Azure HDInsight 不支持自动加载程序来检测新文件。
什么是自动装载机?
Autoloader – Databricks的新功能允许从各种数据源将数据增量地摄取到 Delta Lake。Auto Loader 是针对 Apache Spark 的优化云文件源,可在新数据到达时从云存储中连续高效地加载数据。合作伙伴集成的数据摄取网络允许您将来自数百个数据源的数据直接摄取到 Delta Lake。
在后台(在 Azure Databricks 中),运行 Auto Loader 将自动设置Azure 事件网格和队列存储服务。通过这些服务,自动加载器使用 Azure 存储中的队列轻松找到新文件,将它们传递给 Spark,从而在流式处理或批处理作业中以低延迟和低成本加载数据。Auto Loader 记录处理了哪些文件,这保证了对传入数据的一次性处理。
Auto Loader 在新数据文件到达云存储时以增量方式高效地处理它们,无需任何额外设置。Auto Loader 提供了一个名为 cloudFiles 的新结构化流式传输源。给定云文件存储上的输入目录路径,cloudFiles 源会在新文件到达时自动处理它们,并可选择处理该目录中的现有文件。
有关详细信息,请参阅使用自动加载程序从 Azure Blob 存储或 Azure Data Lake Storage Gen2 加载文件。
推荐阅读
- angularjs - 如何使用 $transition$.params 更改状态参数的值
- ethereum - 你可以在 Solidity 中将事件作为函数参数传递吗?
- reactjs - 将 iframe 渲染到 React
- javascript - 我可以取消 Firestore 写入吗?(使用离线持久化)
- r - 仅在某些变量/嵌套循环上在 lapply 中使用 sapply
- java - 如何从通配符类型转到参数化类型?
- c# - 套件装配中的库存属性列
- error-handling - pcall() 忽略 os.execute() 错误
- css - 如何使中间的锐利匹配文本行。文字不清晰
- python - 在 pygame 中循环 - 它永远不会继续,但会一直循环;我错过了什么