python - Spark 的套接字文本流为空
问题描述
我正在关注 Spark 的流媒体指南。我没有使用nc -lk 9999
,而是创建了自己的简单 Python 服务器,如下所示。从下面的代码可以看出,它会a
通过随机生成字母z
。
import socketserver
import time
from random import choice
class AlphaTCPHandler(socketserver.BaseRequestHandler):
def handle(self):
print('AlphaTCPHandler')
alphabets = list('abcdefghikjklmnopqrstuvwxyz')
try:
while True:
s = f'{choice(alphabets)}'
b = bytes(s, 'utf-8')
self.request.sendall(b)
time.sleep(1)
except BrokenPipeError:
print('broken pipe detected')
if __name__ == '__main__':
host = '0.0.0.0'
port = 301
server = socketserver.TCPServer((host, port), AlphaTCPHandler)
print(f'server starting {host}:{port}')
server.serve_forever()
我用客户端代码测试了这个服务器,如下所示。
import socket
import sys
import time
HOST, PORT = 'localhost', 301
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((HOST, PORT))
print('socket opened')
while True:
received = str(sock.recv(1024), 'utf-8')
if len(received.strip()) > 0:
print(f'{received}')
time.sleep(1)
finally:
sock.close()
print('socket closed')
但是,我的 Spark 流代码似乎没有收到任何数据,或者它没有打印任何内容。代码如下。
from pyspark.streaming import StreamingContext
from time import sleep
ssc = StreamingContext(sc, 1)
ssc.checkpoint('/tmp')
lines = ssc.socketTextStream('0.0.0.0', 301)
words = lines.flatMap(lambda s: s.split(' '))
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
sleep(5)
ssc.stop(stopSparkContext=False, stopGraceFully=True)
我从输出中看到的只是下面的重复模式。
------------------------------------------ 时间:2019-10-31 08:38:22 ------------------------------------------ ------------------------------------------ 时间:2019-10-31 08:38:23 ------------------------------------------ ------------------------------------------ 时间:2019-10-31 08:38:24 ------------------------------------------
关于我做错了什么的任何想法?
解决方案
Your streaming code is working properly. It is your server that is feeding it the wrong information - there are no line separators after each letter, so what Spark sees is one constantly growing line and it simply keeps waiting for that line to finish, which never happens. Modify your server to send a new line with each letter:
while True:
s = f'{choice(alphabets)}\n' # <-- inserted \n in here
b = bytes(s, 'utf-8')
self.request.sendall(b)
time.sleep(1)
And the result:
-------------------------------------------
Time: 2019-10-31 12:09:26
-------------------------------------------
('t', 1)
-------------------------------------------
Time: 2019-10-31 12:09:27
-------------------------------------------
('t', 1)
-------------------------------------------
Time: 2019-10-31 12:09:28
-------------------------------------------
('x', 1)
推荐阅读
- ecmascript-6 - WebStorm 无法自动完成样式化组件导出
- xcode - 场景编辑器在哪里?
- c# - 尝试运行测试时出现 System.InvalidOperationException 错误
- mysql - 查询有效,但需要很长时间才能执行
- vba - 仅当它存在于工作簿中时才选择活动工作表
- android - Android远程查看未决意图不起作用
- wordpress - 仅在后端强制 HTTP WordPress 多站点
- javascript - 如何使用 TypeScript 从包装的组件中访问公共方法?
- python - 混淆矩阵 - 奇怪的数字输出
- docker - 提交后无法访问 Docker 容器