apache-spark - 如何使用火花流传输 websocket?
问题描述
我需要使用 apache spark 将流从 websocket 写入 parquet 文件。当前的 apache spark 流功能似乎不支持开箱即用的 websocket。
有一个命令可以从 apache-spark 中的 TCP 套接字读取流,所以我尝试将 websocket 转换为常规套接字,但无法使用测试脚本让 spark 读取套接字:
我像这样设置服务器:
import socket, socketserver, time
class MyHandler(socketserver.BaseRequestHandler):
def handle(self):
counter = 1
while 1:
#dataReceived = self.request.recv(1024)
#if not dataReceived: break
str_send = 'msg ' + str(counter)
self.request.send(str_send.encode("utf-8"))
counter+=1
time.sleep(2)
myServer = socketserver.TCPServer(('localhost',5146), MyHandler)
myServer.serve_forever( )
这适用于普通客户:
import socket, socketserver, time
def client(ip, port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
while True:
response = str(sock.recv(1024))
print("Received: {}".format(response))
ip = 'localhost'
port = 5146
client(ip, port)
但是当我使用 spark 的示例读取 TCP 流时,我仍然没有得到任何数据:
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 5146) \
.load()
query = lines.writeStream\
.format("console")\
.outputMode('append')\
.start()\
.awaitTermination()
我也尝试写入文件,但文件是空白的。
连接已建立,但没有数据通过:
$ netstat -na | grep "5146"
tcp4 0 0 127.0.0.1.5146 127.0.0.1.59823 ESTABLISHED
tcp4 0 0 127.0.0.1.59823 127.0.0.1.5146 ESTABLISHED
tcp4 0 0 127.0.0.1.5146 *.* LISTEN
解决方案
推荐阅读
- javascript - 将对象推入作为对象属性的数组中
- algolia - @parcel/transformer-js 处的构建错误“无法访问范围内的线程局部变量没有...”
- c# - 反序列化时的映射问题
- ubuntu - Ubuntu Server 21.10 如何将特定接口用于 Internet,将另一个接口用于本地网络
- c++ - c++ winsock tcp 服务器和客户端与互联网的连接
- windows - 如何在 Windows 系统上安装 Minizinc 的 OR 工具?
- scala - Scala 3 隐式转换:比较值和文字
- node.js - 尝试发布扩展时,收到错误消息:Part URI is not valid per rules defined in the Open Packaging Conventions specification
- python - aws - 是否可以在 Typescript 中为 Python CDK 进行单元测试?
- momentjs - Moment JS diff() 在 10 月 31 日到 11 月 1 日之间返回 0 天