首页 > 解决方案 > NetMQ Actor 模式

问题描述

当 UI 发生变化时,我试图让一个 WPF 应用程序与另一个应用程序进行通信。例如:滑块更改并发送新值或文本框输入更改并发送新输入值。

第二个应用程序正在侦听并接收这些更改以更新不同的属性。

为了实现这一点,我按照文档中的 Actor 模式示例使用 NetMQ:https ://netmq.readthedocs.io/en/latest/actor/

现在我有这些客户端/服务器类:

服务器 :

public class NetMQServer
{
    public class ShimHandler : IShimHandler
    {
        private PairSocket shim;
        private NetMQPoller poller;
        private PublisherSocket publisher;
        private string Address;

        public ShimHandler(string address)
        {
            Address = address;
        }

        public void Initialise(object state)
        {
        }

        public void Run(PairSocket shim)
        {
            using (publisher = new PublisherSocket())
            {
                publisher.Bind(Address);
                publisher.Options.SendHighWatermark = 1000;
                this.shim = shim;
                shim.ReceiveReady += OnShimReady;
                shim.SignalOK();
                poller = new NetMQPoller { shim, publisher };
                poller.Run();
            }
        }

        private void OnShimReady(object sender, NetMQSocketEventArgs e)
        {
            string command = e.Socket.ReceiveFrameString();

            if(command == NetMQActor.EndShimMessage)
            {
                poller.Stop();
                return;
            }
            else
            {
                byte[] byteMessage = e.Socket.ReceiveFrameBytes();
                publisher.SendMoreFrame(command).SendFrame(byteMessage);
            }
        }

        private void UpdateString(string stringmessage, string propertyToUpdate)
        {
            propertyToUpdate = stringmessage;
        }
    }

    public NetMQServer(string ip, int port)
    {
        IP = ip;
        Port = port;
        Serializer = new CerasSerializer();
    }

    public CerasSerializer Serializer { get; set; }
    private NetMQActor actor;

    private string _name;
    public string Name
    {
        get { return _name; }
        set { _name = value;}
    }

    private int _port;
    public int Port
    {
        get { return _port; }
        set
        {
            _port = value;
            ReStart();
        }
    }

    private string _ip;
    public string IP
    {
        get { return _ip; }
        set
        {
            _ip = value;
            ReStart();
        }
    }

    public string Address
    {
        get { return String.Format("tcp://{0}:{1}", IP, Port); }
    }

    public void Start()
    {
        if (actor != null)
            return;
        actor = NetMQActor.Create(new ShimHandler(Address));
    }

    public void Stop()
    {
        if (actor != null)
        {
            actor.Dispose();
            actor = null;
        }
    }

    public void ReStart()
    {
        if (actor == null)
            return;
        Stop();
        Start();
    }

    public void SendObject(string topic, object commandParameter)
    {
        if (actor == null)
            return;

        byte[] Serialized = Serializer.Serialize(commandParameter);
        var message = new NetMQMessage();
        message.Append(topic);
        message.Append(Serialized);
        actor.SendMultipartMessage(message);
    }
}

在这里,当控件的属性发生更改时,我调用SendStringMessage(string stringToSend)函数并且它与 NetMQ 发布者一起发送(字符串仅用于测试,我也可以将任何对象作为字节发送)。

无论如何,这是在第二个应用程序上运行的客户端:

public class NetMQClient
{
    public class ShimHandler : IShimHandler
    {
        private PairSocket shim;
        private NetMQPoller poller;
        private SubscriberSocket subscriber;
        private ByteMessage ByteMessage;
        private string Address;
        private string Topic;
        private string MessageType;

        public ShimHandler(ByteMessage byteMessage, string address, string topic)
        {
            this.ByteMessage = byteMessage;
            this.Address = address;
            this.Topic = topic;
        }

        public void Initialise(object state)
        {
        }

        public void Run(PairSocket shim)
        {
            using (subscriber = new SubscriberSocket())
            {
                subscriber.Connect(Address);
                subscriber.Subscribe(Topic);
                subscriber.Options.ReceiveHighWatermark = 1000;
                subscriber.ReceiveReady += OnSubscriberReady;
                this.shim = shim;
                shim.ReceiveReady += OnShimReady;
                shim.SignalOK();
                poller = new NetMQPoller { shim, subscriber };
                poller.Run();
            }
        }

        private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
        {
            string topic = e.Socket.ReceiveFrameString();
            if (topic == Topic)
            {
                this.ByteMessage.Message = e.Socket.ReceiveFrameBytes();
            }
        }

        private void OnShimReady(object sender, NetMQSocketEventArgs e)
        {
            string command = e.Socket.ReceiveFrameString();
            if (command == NetMQActor.EndShimMessage)
            {
                poller.Stop();
            }
        }
    }

    public ByteMessage ByteMessage { get; set; }
    private NetMQActor actor;

    private string _command;
    public string Command
    {
        get { return _command; }
        set { _command = value;}
    }

    private string _topic;
    public string Topic
    {
        get { return _topic; }
        set
        {
            _topic = value;
            ReStart();
        }
    }

    private int _port;
    public int Port
    {
        get { return _port; }
        set
        {
            _port = value;
            ReStart();
        }
    }

    private string _ip;
    public string IP
    {
        get { return _ip; }
        set
        {
            _ip = value;
            ReStart();
        }
    }

    public string Address
    {
        get { return String.Format("tcp://{0}:{1}", IP, Port); }
    }

    public NetMQClient(string ip, int port, string topic)
    {
        ByteMessage = new ByteMessage();
        IP = ip;
        Port = port;
        Topic = topic;
    }

    public void Start()
    {
        if (actor != null)
            return;
        actor = NetMQActor.Create(new ShimHandler(ByteMessage, Address, Topic));
    }

    public void Stop()
    {
        if (actor != null)
        {
            actor.Dispose();
            actor = null;
        }
    }

    public void ReStart()
    {
        if (actor == null)
            return;

        Stop();
        Start();
    }
}

现在 ByteMessage 类看起来就像这样简单:

public class ByteMessage : INotifyPropertyChanged
{
    public ByteMessage()
    {

    }

    private byte[] _message;
    public byte[] Message
    {
        get { return _message; }
        set
        {
            _message = value;
            OnPropertyChanged("Message");
        }
    }

    public event PropertyChangedEventHandler PropertyChanged;
    private void OnPropertyChanged(string propertyName)
    {
        if (PropertyChanged != null) PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
    }
}

第二个应用程序之一,类将有一个 NetMQClient 作为属性并注册到OnPropertyChangeNetMQClient 的事件ByteMessage。现在什么时候ByteMessage.Message更新ShimHandler我可以反序列化数据并对其进行处理。

到目前为止,这是可行的,但我真的不确定我是否做得正确......如果这是ThreadSafe

如果 Listening 应用程序上的每个类都有自己的 NetMQClient 来监听特定主题,难道不是更好吗?

即使在阅读了围绕 NetMQ 提供的所有示例之后,这里也真的很困惑如何使用所有这些。

谢谢

标签: c#netmq

解决方案


推荐阅读