python - 套接字阻止在 Java 服务器和 Python 客户端之间发送消息
问题描述
我需要在本地 Windows 机器上的 Java 应用程序和 python 脚本之间传递一些数据字符串。因此,我决定使用与 TCP 通信的 Java 套接字服务器与 python 客户端进行通信。Java 创建了两个线程来处理本地主机端口 9998 和 9999 上的两个套接字连接。我使用端口 9998 来处理传入消息,而我使用端口 9999 来处理发送消息。我的两个应用程序在发送/接收的前几条消息中运行顺利,并且在某些时候,它停止了将字符串从 Java 发送到 Python 的调用。这是我的代码的一部分:
这个 Java 类处理套接字服务器的创建和通信
public class ServerSocketConnection {
private int port;
private Socket socket;
private ServerSocket serverSocket;
private Logger logger;
private BufferedWriter out;
private BufferedReader in;
public ServerSocketConnection(int port) {
this.port = port;
logger = App.getLogger();
}
// Create a server for a socket connection
public void createServer() {
try {
// Create a server socket
serverSocket = new ServerSocket(port);
// Socket creation
socket = serverSocket.accept();
// Create a print writer
out = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
// Create a buffered reader
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
} catch (IOException e) {
logger.severe("Error creating server socket");
}
}
// Close the server socket
public void closeServer() {
try {
serverSocket.close();
} catch (IOException e) {
logger.severe("Error closing server socket");
}
}
public void sendMessage(String message) {
try {
// Sending the byte lenght of the message
byte[] ptext = message.getBytes("UTF-8");
send(String.valueOf(ptext.length));
// Sending the message
send(message);
} catch (IOException e) {
logger.severe("Error sending message:" + e.getMessage());
}
}
private void send(String message) throws IOException {
out.write(message);
out.newLine();
out.flush();
}
public String receiveMessage() {
try {
return in.readLine();
} catch (IOException e) {
logger.severe("Error receiving message");
return null;
}
}
这是处理消息发送的 Java 线程。它从在其他线程之间共享的队列中获取要发送的消息。
public class SendToPlatform implements Runnable {
private static final int PORT = 9999;
private Thread worker;
private AtomicBoolean running;
private AtomicBoolean stopped = new AtomicBoolean(false);
private BlockingQueue<String> queueOut;
private Logger logger;
private ServerSocketConnection serverSocketConnection;
public SendToPlatform(BlockingQueue<String> queueOut, AtomicBoolean running) {
this.queueOut = queueOut;
this.running = running;
this.logger = App.getLogger();
serverSocketConnection = new ServerSocketConnection(PORT);
}
public void run() {
stopped.set(false);
serverSocketConnection.createServer();
while (running.get()) {
socketSender();
}
stopped.set(true);
}
private void socketSender() {
if (!queueOut.isEmpty()) {
String element = null;
try {
element = queueOut.poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.severe("SendToPlatform: InterruptedException: " + e.getMessage());
}
serverSocketConnection.sendMessage(element);
}
}
}
这是用于从 Java 套接字服务器接收消息的 python 线程:
def __socket_reading_no_server(self, queue_input : queue.Queue, is_running : bool):
HOST = "localhost"
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
while is_running:
data = s.recv(4)
message_size = int(data.decode('UTF-8').strip())
data = s.recv(min(message_size + 2, 1024))
message = data.decode('UTF-8').strip()
queue_input.put(message)
s.close()
并且这个方法是作为一个线程启动的,带有这些指令:
input_thread = threading.Thread(target=self.__socket_reading_no_server , args =(self.__queue_input, self.__running, ), daemon=True)
input_thread.start()
通过调试、记录和使用 Wireshark 来理解我的代码中的问题,我得出的结论是,在out.write
正确发送大约 10 条消息后,在发送消息时阻塞的指令经常出现问题。当我关闭套接字连接时,挂起的消息被释放。我尝试使用PrintWriter
andDataOutputStream
而不是,BufferedWriter,
但发生了同样的问题。我尝试在发送字符串以适应大小之前不发送消息的长度s.recv()
,但发生了同样的问题。我是套接字编程的新手,可能我做了一些非常错误的事情,但我找不到问题所在。也许有更好的方法在我不知道的进程之间传递数据,我可以用它来满足我的需要而不是套接字?
@absuu 回答后的编辑
在应用答案中建议的更正后,我仍然遇到out.write
在尝试写入套接字时发送方法阻塞的相同问题。我编辑我的代码如下:
public class ServerSocketConnection {
[...]
public void sendMessage(String message) {
try {
send(message);
} catch (IOException e) {
logger.severe("Error sending message:" + e.getMessage());
}
}
private void send(String message) throws IOException {
message += "\r\n";
byte[] ptext = message.getBytes("UTF-8");
out.write(String.format("%2d",ptext.length));
out.write("\r\n");
out.flush();
out.write(new String(ptext));
logger.info("Data sent");
out.flush();
}
}
我也增加了s.recv
尺寸,但没有任何改变
解决方案
这是一个 Java 客户端和 Python 服务器的“精简”实现,它演示了一种发送任意长度消息(在本例中为字符串)的有效机制:
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
public class Client {
public static void main(String[] args) throws IOException {
String message = "Talking isn't doing. It is a kind of good deed to say well; and yet words are not deeds.";
try (Socket socket = new Socket("localhost", 7070)) {
try (DataOutputStream out = new DataOutputStream(socket.getOutputStream())) {
byte[] dts = message.getBytes();
out.writeInt(dts.length);
out.write(dts);
}
}
}
}
请注意客户端在发送实际消息之前如何将即将到来的消息的长度作为 32 位整数发送。
from multiprocessing.connection import Listener
def main():
listener = Listener(address=('0.0.0.0', 7070), family='AF_INET', authkey=None)
with listener.accept() as conn:
print(conn.recv_bytes().decode())
if __name__ == '__main__':
main()
连接类期望使用客户端实现的协议(在本例中为 Java)接收消息 - 即,一个 32 位整数,它给出了要遵循的数据量。
我希望这可以澄清问题
推荐阅读
- javascript - 动态 require 语句在 vuejs 中不起作用
- javascript - 使用 AJAX 加载代码时添加 rel="noopener"
- vuejs2 - 未找到 Mixins Vuejs 中的函数
- reactjs - Redux Map Store 按道具名称
- java - Lambda 性能改进,Java 8 与 11
- facebook-sdk-4.0 - 测试 Instagram Graph API 停止返回媒体
- java - 减少和重新扩展 java long 类型
- bash - 7z。如何排除目录中除一个文件外的所有内容
- selenium - Geb 选择器不选择
- docker - Gitlab CI/Docker:ssh-add 不断要求输入密码