python - 通过 PySpark 将 Kafka 数据摄取到 HBase - 调用 None.org.apache.spark.streaming.api.java.JavaStreamingContext 时发生错误
问题描述
我尝试按照本教程通过 PySpark 将实时 Kafka 数据摄取配置到 HBase 中。我对 Spark Streaming 有疑问,也就是说,我收到如下错误:
Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.java.JavaStreamingContext.
: java.lang.NullPointerException
at org.apache.spark.streaming.api.java.JavaStreamingContext.<init>(JavaStreamingContext.scala:130)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
spark --version
返回version 2.4.0 Using Scala version 2.11.12
。如果需要任何其他信息,请告诉我。我的源代码:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /my/path/spark/spark-streaming-kafka-0-8-assembly_2.11-2.4.0.jar pyspark-shell'
import findspark
findspark.init()
import pyspark
import random
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import *;
# from pyspark_ext import *
import happybase
appName = "Kafka_MapR-Streams_to_HBase"
config = SparkConf().setAppName(appName)
props = []
props.append(("spark.rememberDuration", "10"))
props.append(("spark.batchDuration", "10"))
props.append(("spark.eventLog.enabled", "true"))
props.append(("spark.streaming.timeout", "30"))
props.append(("spark.ui.enabled", "true"))
config = config.setAll(props)
sc.stop()
sc = SparkContext(conf=config)
sc.stop()
ssc = StreamingContext(sc, int(config.get("spark.batchDuration")))
def runApplication(ssc, config):
ssc.start()
if config.get("spark.streaming.timeout") == '':
ssc.awaitTermination()
else:
stopped = ssc.awaitTerminationOrTimeout(int(config.get("spark.streaming.timeout")))
if not stopped :
print("Stopping streaming context after timeout...")
ssc.stop(True)
print("Streaming context stopped.")
hbase_table = 'clicks'
hconn = happybase.Connection('hostname')
ctable = hconn.table(hbase_table)
更新
我认为这个问题与sc.stop()
. 删除它并更改sc = SparkContext(conf=config)
为SparkContext.getOrCreate(conf=config)
可能解决了问题。
解决方案
推荐阅读
- python - 如何从两个字典 Python 构建 DataFrame
- javascript - Babylon.js 中的球体旋转
- c++ - 为什么 SFML 不需要 OpenGL 库?
- bokeh - 使用 Dask、Xarray、Holoviews、Bokeh 数据集
- mysql - Mysql GROUP BY with MAX(DATE) and MIN (DATE)
- javascript - 如何在不循环的情况下替换字符串变量中的每个字符?
- c# - 如何定义自引用的泛型类型参数
- python - Python Dataclasses:在冻结的 Dataclass 中模拟默认工厂
- php - Magento 1 Rest 调用在每秒调用一次时导致 php 错误
- javascript - 你能告诉我为什么“未定义”出现在点击随机图像时?