首页 > 解决方案 > 发送消息,一个播放器类的实例,可以和其他播放器通信

问题描述

首先很抱歉,问了一个问题而没有展示我关于如何解决它的任何想法。我有一个任务如下

Create 2 player instances
one player should send message to second player ("initiator player")
when a player recieves message it should reply with a message that contains the recieved message 
concatenated with counter of how many message this player already sent. And both players run in thesame process.

我不知道该怎么做,尤其是发送消息和回复消息的想法。

现在,它假设是一个纯粹的 java 实现,不使用任何框架、spring、websocket 等。

我也想到了生产者消费者方法,但我想得到一个简单的解释,可能是发送和回复的最小工作代码,我认为逃避我的是通信模式,这只是一个递归发送方法和另一个 get方法(接收)此消息

标签: javamessage

解决方案


消息传递是一个抽象概念。消息可以被认为是必须以某种方式发出、传输、接收和处理的数据的一部分。它不依赖于特定的编程语言、框架或库。除了 Spring 之外,Websocket、JMS 等消息在 Win32 API、D-Bus、网络等低级别上无处不在。

至于您的任务,消息可以只是一个字符串。无需将其表示为 JSON/XML/等,因为两个播放器都在同一进程中运行。

原始任务描述相当广泛。从我的角度来看,您不需要递归,而是需要一种pendulum。第一个播放器发送初始化消息并等待响应。同时,第二个玩家收到初始化消息,将他的计数器添加到消息中并回复。然后第一个玩家在收到响应后醒来,添加他的计数器并发回。这种交流一次又一次地重复。此外,每个玩家在得到对上一条消息的答复之前不得发送下一条消息。

阻塞队列是纯 Java 在一个进程中的最佳方法。

考虑以下代码:

import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class MessageTask
{
    // Blocking queue looks superfluous for single message. But such a queue saves us from cumbersome
    // synchronization of the threads.
    private static final int MAX_MESSAGES_IN_QUEUE = 1;

    public static void main(String[] args)
    {
        BlockingQueue<String> firstToSecond = new ArrayBlockingQueue<String>(MAX_MESSAGES_IN_QUEUE);
        BlockingQueue<String> secondToFirst = new ArrayBlockingQueue<String>(MAX_MESSAGES_IN_QUEUE);

        // Both players use the same queues symmetrically.
        InitiatorPlayer firstPlayer = new InitiatorPlayer(firstToSecond, secondToFirst);
        Player secondPlayer = new Player(secondToFirst, firstToSecond);

        // Please note that we can start threads in reverse order. But thankfully to
        // blocking queues the second player will wait for initialization message from
        // the first player.
        new Thread(secondPlayer).start();
        new Thread(firstPlayer).start();
    }
}

class Player implements Runnable
{
    protected final BlockingQueue<String> sent;
    protected final BlockingQueue<String> received;

    // Please aware that integer field may overflow during prolonged run
    // of the program. So after 2147483647 we'll get -2147483648. We can
    // either use BigInteger or compare the field with Integer.MAX_VALUE
    // before each increment.
    //
    // Let's choose BigInteger for simplicity.
    private BigInteger numberOfMessagesSent = new BigInteger("0");

    public Player(BlockingQueue<String> sent, BlockingQueue<String> received)
    {
        this.sent = sent;
        this.received = received;
    }

    @Override
    public void run()
    {
        while (true)
        {
            String receivedMessage = receive();
            reply(receivedMessage);
        }
    }

    protected String receive()
    {
        String receivedMessage;
        try
        {
            // Take message from the queue if available or wait otherwise.
            receivedMessage = received.take();
        }
        catch (InterruptedException interrupted)
        {
            String error = String.format(
                    "Player [%s] failed to receive message on iteration [%d].",
                    this, numberOfMessagesSent);
            throw new IllegalStateException(error, interrupted);
        }
        return receivedMessage;
    }

    protected void reply(String receivedMessage)
    {
        String reply = receivedMessage + " " + numberOfMessagesSent;
        try
        {
            // Send message if the queue is not full or wait until one message
            // can fit.
            sent.put(reply);
            System.out.printf("Player [%s] sent message [%s].%n", this, reply);
            numberOfMessagesSent = numberOfMessagesSent.add(BigInteger.ONE);

            // All players will work fine without this delay. It placed here just
            // for slowing the console output down.
            Thread.sleep(1000);
        }
        catch (InterruptedException interrupted)
        {
            String error = String.format(
                    "Player [%s] failed to send message [%s] on iteration [%d].",
                    this, reply, numberOfMessagesSent);
            throw new IllegalStateException(error);
        }
    }
}

class InitiatorPlayer extends Player
{
    private static final String INIT_MESSAGE = "initiator player";

    public InitiatorPlayer(BlockingQueue<String> sent, BlockingQueue<String> received)
    {
        super(sent, received);
    }

    @Override
    public void run()
    {
        sendInitMessage();
        while (true)
        {
            String receivedMessage = receive();
            reply(receivedMessage);
        }
    }

    private void sendInitMessage()
    {
        try
        {
            sent.put(INIT_MESSAGE);
            System.out.printf("Player [%s] sent message [%s].%n", this, INIT_MESSAGE);
        }
        catch (InterruptedException interrupted)
        {
            String error = String.format(
                    "Player [%s] failed to sent message [%s].",
                    this, INIT_MESSAGE);
            throw new IllegalStateException(error, interrupted);
        }
    }
}

样本输出:

Player [InitiatorPlayer@712dc5e9] sent message [initiator player].
Player [Player@69bf9b51] sent message [initiator player 0].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0].
Player [Player@69bf9b51] sent message [initiator player 0 0 1].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3 3 4].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3 4 4].
Player [Player@69bf9b51] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5].
Player [InitiatorPlayer@712dc5e9] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5 5].

PS你可能会在你的机器上得到稍微不同的输出,像这样:

Player [InitiatorPlayer@82b9342] sent message [initiator player].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0].
Player [Player@5d7a0209] sent message [initiator player 0].
Player [Player@5d7a0209] sent message [initiator player 0 0 1].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3 3 4].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3 4 4].
Player [Player@5d7a0209] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5].
Player [InitiatorPlayer@82b9342] sent message [initiator player 0 0 1 1 2 2 3 3 4 4 5 5].

发生这种情况是因为玩家在不同的线程中工作。以下情况是可能的:

  1. 第一个玩家发送消息并将日志记录打印到控制台。
  2. 第二个玩家接收消息并发送回复。但是相应的线程在发送回复后立即被线程调度程序暂停。
  3. 第一个玩家收到回复,发送另一条消息并将日志记录打印到控制台。
  4. 第二个玩家的线程被线程调度程序唤醒并打印关于他在第 3 点中提到的回复的日志记录。

这种行为是正确的。玩家通过队列同步,例如第一个玩家在回答之前不会发送新消息。但是将日志打印到控制台与发送/接收消息不同步(也不能同步)。

PS2该任务可以仅使用一个阻塞队列(甚至使用单个互斥体)来解决。但是两个单独的队列更适合说明和解决方案的可能扩展。


推荐阅读