首页 > 解决方案 > 如何使用火花流传输 websocket?

问题描述

我需要使用 apache spark 将流从 websocket 写入 parquet 文件。当前的 apache spark 流功能似乎不支持开箱即用的 websocket。

有一个命令可以从 apache-spark 中的 TCP 套接字读取流,所以我尝试将 websocket 转换为常规套接字,但无法使用测试脚本让 spark 读取套接字:

我像这样设置服务器:

import socket, socketserver, time

class MyHandler(socketserver.BaseRequestHandler):
    def handle(self):
        counter = 1
        while 1:
            #dataReceived = self.request.recv(1024)
            #if not dataReceived: break
            str_send = 'msg ' + str(counter)
            self.request.send(str_send.encode("utf-8"))
            counter+=1
            time.sleep(2)

myServer = socketserver.TCPServer(('localhost',5146), MyHandler)
myServer.serve_forever(  )

这适用于普通客户:

import socket, socketserver, time
def client(ip, port):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((ip, port))
    while True:
        response = str(sock.recv(1024))
        print("Received: {}".format(response))

ip = 'localhost'
port = 5146
client(ip, port)

但是当我使用 spark 的示例读取 TCP 流时,我仍然没有得到任何数据:

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 5146) \
    .load()

query = lines.writeStream\
      .format("console")\
      .outputMode('append')\
      .start()\
      .awaitTermination()

我也尝试写入文件,但文件是空白的。

连接已建立,但没有数据通过:

$ netstat -na | grep "5146"
tcp4       0      0  127.0.0.1.5146         127.0.0.1.59823        ESTABLISHED
tcp4       0      0  127.0.0.1.59823        127.0.0.1.5146         ESTABLISHED
tcp4       0      0  127.0.0.1.5146         *.*                    LISTEN

标签: apache-sparkwebsocketspark-streamingdatabricks

解决方案


推荐阅读