python - 独立模式下pyspark上的连接被拒绝错误
问题描述
spark 文件在 txt 文件中打印出前 10 个单词。
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf().setAppName("read text file in pyspark")
sc = SparkContext(conf=conf)
path_to_file = "C:/Users/Admin/Desktop/beeline_project/lorem2.txt"
path_to_save_result = "C:/Users/Admin/Desktop/beeline_project/output/"
words = sc.textFile(path_to_file).flatMap(lambda line: line.split(" "))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)
output = wordCounts.collect()
output = sorted(output, key = lambda tup: tup[1], reverse=True)
output = output[:10]
for (word, count) in output:
print("%s: %i" % (word, count))
sc.stop()
但随后它给出了这个错误,我只想让程序正确停止。它最后说“没有建立连接 - 机器拒绝了它”,但我没有在任何地方建立连接。
可能是因为在没有 main() 的情况下运行 pyspark?对输出有影响吗?
SUCCESS: The process with PID 13904 (child process of PID 6048) has been terminated.
SUCCESS: The process with PID 6048 (child process of PID 13948) has been terminated.
SUCCESS: The process with PID 13948 (child process of PID 8892) has been terminated.
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1152, in send_command
answer = smart_decode(self.stream.readline()[:-1])
File "C:\python37\lib\socket.py", line 589, in readinto
return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:62315)
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:62315)
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\python37\lib\site-packages\py4j\java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it
编辑:
使用 spark2.4.6、scala 2.11.12、java8、python3.7
解决方案
尝试这个:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
path_to_file = "C:/Users/Admin/Desktop/beeline_project/lorem2.txt"
path_to_save_result = "C:/Users/Admin/Desktop/beeline_project/output/"
df = spark.read.text(path_to_file)
words = df.rdd.flatMap(lambda row: row[0].split(" ")).collect()
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)
output = wordCounts.collect()
output = sorted(output, key = lambda tup: tup[1], reverse=True)
output = output[:10]
for (word, count) in output:
print("%s: %i" % (word, count))
请注意,此方法仅使用 Spark 读取文件并将其转换为字符串列表。将所有数据收集到驱动程序节点很慢,如果数据集很大,则会出错。这段代码可能有效,但绝对不是您“以 Spark 方式”执行分析的方式。您应该考虑使用 Spark DataFrames/本机 Spark 函数执行此分析,以便它可以在集群的多个节点上并行执行并利用 Spark 引擎的强大功能。
推荐阅读
- swift - 'NSInvalidArgumentException' 关闭
- c++ - 虚拟继承中大多数派生类的大小
- ios - 当 iphone 设备被锁定时,有什么方法可以打开我们的自定义通话屏幕?
- javascript - 使用 javascript 到 laravel 方法的 ajax 帖子中的错误 405
- mysql - 如何从此 ER 图中创建表?
- php - 如何避免此错误(502 Bad Gateway,PhpStorm)
- sql - 带有分隔符的列的查询计数
- swift - 如何使用 WKURLSchemeHandler 提供大文件?
- r - R:解压缩几个文件,每个文件都在新的子目录中或重命名
- spring - 如果在 json 中删除,则使用带有 spring 的 @version 不起作用