google-cloud-platform - 无法在 Google DataProc 的 jupyter 中添加 jar pyspark
问题描述
我在 DataProc 上有一个 Jupyter 笔记本,我需要一个 jar 来运行一些工作。我知道编辑spark-defaults.conf
和使用--jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
从命令行提交作业 - 它们都运行良好。但是,如果我想直接将 jar 添加到 jupyter notebook,我尝试了下面的方法,但都失败了。
方法一:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar pyspark-shell'
方法二:
spark = SparkSession.builder.appName('Shakespeare WordCount')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar')\
.getOrCreate()
他们都有同样的错误:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-2b7692efb32b> in <module>()
19 # Read BQ data into spark dataframe
20 # This method reads from BQ directly, does not use GCS for intermediate results
---> 21 df = spark.read.format('bigquery').option('table', table).load()
22
23 df.show(5)
/usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
170 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
171 else:
--> 172 return self._df(self._jreader.load())
173
174 @since(1.4)
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o81.load.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
... 13 more
我尝试运行的任务非常简单:
table = 'publicdata.samples.shakespeare'
df = spark.read.format('bigquery').option('table', table).load()
df.show(5)
我知道有很多类似的问题和答案,但它们要么不起作用,要么不符合我的需求。我需要一些临时 jar,我不想将它们都保留在默认配置中。我想更加灵活并随时随地添加罐子。我该如何解决这个问题?谢谢!
解决方案
spark-defaults.conf
不幸的是,如果不有效地编辑和重新启动内核,就没有一种内置的方法可以动态地执行此操作。Spark 中有一个针对此的开放功能请求。
Zeppelin 具有一些通过 UI 添加 jar 的可用性功能,但即使在 Zeppelin 中,您也必须在这样做之后重新启动解释器,以便 Spark 上下文在其类加载器中拾取它。而且这些选项还要求 jarfile 已经在本地文件系统上暂存;您不能只引用远程文件路径或 URL。
一种解决方法是创建一个 init 操作,该操作设置一个 systemd 服务,该服务定期轮询一些 HDFS 目录以同步到现有的类路径目录之一,例如/usr/lib/spark/jars
:
#!/bin/bash
# Sets up continuous sync'ing of an HDFS directory into /usr/lib/spark/jars
# Manually copy jars into this HDFS directory to have them sync into
# ${LOCAL_DIR} on all nodes.
HDFS_DROPZONE='hdfs:///usr/lib/jars'
LOCAL_DIR='file:///usr/lib/spark/jars'
readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ "${ROLE}" == 'Master' ]]; then
hdfs dfs -mkdir -p "${HDFS_DROPZONE}"
fi
SYNC_SCRIPT='/usr/lib/hadoop/libexec/periodic-sync-jars.sh'
cat << EOF > "${SYNC_SCRIPT}"
#!/bin/bash
while true; do
sleep 5
hdfs dfs -ls ${HDFS_DROPZONE}/*.jar 2>/dev/null | grep hdfs: | \
sed 's/.*hdfs:/hdfs:/' | xargs -n 1 basename 2>/dev/null | sort \
> /tmp/hdfs_files.txt
hdfs dfs -ls ${LOCAL_DIR}/*.jar 2>/dev/null | grep file: | \
sed 's/.*file:/file:/' | xargs -n 1 basename 2>/dev/null | sort \
> /tmp/local_files.txt
comm -23 /tmp/hdfs_files.txt /tmp/local_files.txt > /tmp/diff_files.txt
if [ -s /tmp/diff_files.txt ]; then
for FILE in \$(cat /tmp/diff_files.txt); do
echo "$(date): Copying \${FILE} from ${HDFS_DROPZONE} into ${LOCAL_DIR}"
hdfs dfs -cp "${HDFS_DROPZONE}/\${FILE}" "${LOCAL_DIR}/\${FILE}"
done
fi
done
EOF
chmod 755 "${SYNC_SCRIPT}"
SERVICE_CONF='/usr/lib/systemd/system/sync-jars.service'
cat << EOF > "${SERVICE_CONF}"
[Unit]
Description=Period Jar Sync
[Service]
Type=simple
ExecStart=/bin/bash -c '${SYNC_SCRIPT} &>> /var/log/periodic-sync-jars.log'
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
chmod a+rw "${SERVICE_CONF}"
systemctl daemon-reload
systemctl enable sync-jars
systemctl restart sync-jars
systemctl status sync-jars
然后,每当您需要一个 jarfile 在任何地方都可用时,您只需将 jarfile 复制到 jarfilehdfs:///usr/lib/jars
中,周期性轮询器会自动将其粘贴到/usr/lib/spark/jars
其中,然后您只需重新启动内核即可获取它。您可以通过 SSH 进入并直接运行来将 jars 添加到该 HDFS 目录hdfs dfs -cp
,或者只是从 Jupyter 笔记本中进行子进程:
import subprocess
sp = subprocess.Popen(
['hdfs', 'dfs', '-cp',
'gs://spark-lib/bigquery/spark-bigquery-latest.jar',
'hdfs:///usr/lib/jars/spark-bigquery-latest.jar'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out, err = sp.communicate()
print(out)
print(err)
推荐阅读
- perl - 针对 xml 响应的 JMeter 响应断言验证失败
- ios - 屏蔽层和添加子层iOS之间的区别
- docker - 无法删除 docker 容器并且所有图像都已删除
- php - 使用策略限制操作
- android - Webrtc Firefox Android Safari iOS 流错误
- java - 从 Drools 规则调用 java 方法
- powershell - 在 Powershell 中创建动态哈希表
- kubernetes - Helm:无法用非 tls 表覆盖表
- android - Recyclerview 预览显示灰屏,android.support.v7.widget.RecyclerView 居中
- html - jquery验证select2.js中的errorPlacement