c# - NetMQ Actor 模式
问题描述
当 UI 发生变化时,我试图让一个 WPF 应用程序与另一个应用程序进行通信。例如:滑块更改并发送新值或文本框输入更改并发送新输入值。
第二个应用程序正在侦听并接收这些更改以更新不同的属性。
为了实现这一点,我按照文档中的 Actor 模式示例使用 NetMQ:https ://netmq.readthedocs.io/en/latest/actor/
现在我有这些客户端/服务器类:
服务器 :
public class NetMQServer
{
public class ShimHandler : IShimHandler
{
private PairSocket shim;
private NetMQPoller poller;
private PublisherSocket publisher;
private string Address;
public ShimHandler(string address)
{
Address = address;
}
public void Initialise(object state)
{
}
public void Run(PairSocket shim)
{
using (publisher = new PublisherSocket())
{
publisher.Bind(Address);
publisher.Options.SendHighWatermark = 1000;
this.shim = shim;
shim.ReceiveReady += OnShimReady;
shim.SignalOK();
poller = new NetMQPoller { shim, publisher };
poller.Run();
}
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveFrameString();
if(command == NetMQActor.EndShimMessage)
{
poller.Stop();
return;
}
else
{
byte[] byteMessage = e.Socket.ReceiveFrameBytes();
publisher.SendMoreFrame(command).SendFrame(byteMessage);
}
}
private void UpdateString(string stringmessage, string propertyToUpdate)
{
propertyToUpdate = stringmessage;
}
}
public NetMQServer(string ip, int port)
{
IP = ip;
Port = port;
Serializer = new CerasSerializer();
}
public CerasSerializer Serializer { get; set; }
private NetMQActor actor;
private string _name;
public string Name
{
get { return _name; }
set { _name = value;}
}
private int _port;
public int Port
{
get { return _port; }
set
{
_port = value;
ReStart();
}
}
private string _ip;
public string IP
{
get { return _ip; }
set
{
_ip = value;
ReStart();
}
}
public string Address
{
get { return String.Format("tcp://{0}:{1}", IP, Port); }
}
public void Start()
{
if (actor != null)
return;
actor = NetMQActor.Create(new ShimHandler(Address));
}
public void Stop()
{
if (actor != null)
{
actor.Dispose();
actor = null;
}
}
public void ReStart()
{
if (actor == null)
return;
Stop();
Start();
}
public void SendObject(string topic, object commandParameter)
{
if (actor == null)
return;
byte[] Serialized = Serializer.Serialize(commandParameter);
var message = new NetMQMessage();
message.Append(topic);
message.Append(Serialized);
actor.SendMultipartMessage(message);
}
}
在这里,当控件的属性发生更改时,我调用SendStringMessage(string stringToSend)
函数并且它与 NetMQ 发布者一起发送(字符串仅用于测试,我也可以将任何对象作为字节发送)。
无论如何,这是在第二个应用程序上运行的客户端:
public class NetMQClient
{
public class ShimHandler : IShimHandler
{
private PairSocket shim;
private NetMQPoller poller;
private SubscriberSocket subscriber;
private ByteMessage ByteMessage;
private string Address;
private string Topic;
private string MessageType;
public ShimHandler(ByteMessage byteMessage, string address, string topic)
{
this.ByteMessage = byteMessage;
this.Address = address;
this.Topic = topic;
}
public void Initialise(object state)
{
}
public void Run(PairSocket shim)
{
using (subscriber = new SubscriberSocket())
{
subscriber.Connect(Address);
subscriber.Subscribe(Topic);
subscriber.Options.ReceiveHighWatermark = 1000;
subscriber.ReceiveReady += OnSubscriberReady;
this.shim = shim;
shim.ReceiveReady += OnShimReady;
shim.SignalOK();
poller = new NetMQPoller { shim, subscriber };
poller.Run();
}
}
private void OnSubscriberReady(object sender, NetMQSocketEventArgs e)
{
string topic = e.Socket.ReceiveFrameString();
if (topic == Topic)
{
this.ByteMessage.Message = e.Socket.ReceiveFrameBytes();
}
}
private void OnShimReady(object sender, NetMQSocketEventArgs e)
{
string command = e.Socket.ReceiveFrameString();
if (command == NetMQActor.EndShimMessage)
{
poller.Stop();
}
}
}
public ByteMessage ByteMessage { get; set; }
private NetMQActor actor;
private string _command;
public string Command
{
get { return _command; }
set { _command = value;}
}
private string _topic;
public string Topic
{
get { return _topic; }
set
{
_topic = value;
ReStart();
}
}
private int _port;
public int Port
{
get { return _port; }
set
{
_port = value;
ReStart();
}
}
private string _ip;
public string IP
{
get { return _ip; }
set
{
_ip = value;
ReStart();
}
}
public string Address
{
get { return String.Format("tcp://{0}:{1}", IP, Port); }
}
public NetMQClient(string ip, int port, string topic)
{
ByteMessage = new ByteMessage();
IP = ip;
Port = port;
Topic = topic;
}
public void Start()
{
if (actor != null)
return;
actor = NetMQActor.Create(new ShimHandler(ByteMessage, Address, Topic));
}
public void Stop()
{
if (actor != null)
{
actor.Dispose();
actor = null;
}
}
public void ReStart()
{
if (actor == null)
return;
Stop();
Start();
}
}
现在 ByteMessage 类看起来就像这样简单:
public class ByteMessage : INotifyPropertyChanged
{
public ByteMessage()
{
}
private byte[] _message;
public byte[] Message
{
get { return _message; }
set
{
_message = value;
OnPropertyChanged("Message");
}
}
public event PropertyChangedEventHandler PropertyChanged;
private void OnPropertyChanged(string propertyName)
{
if (PropertyChanged != null) PropertyChanged(this, new PropertyChangedEventArgs(propertyName));
}
}
第二个应用程序之一,类将有一个 NetMQClient 作为属性并注册到OnPropertyChange
NetMQClient 的事件ByteMessage
。现在什么时候ByteMessage.Message
更新ShimHandler
我可以反序列化数据并对其进行处理。
到目前为止,这是可行的,但我真的不确定我是否做得正确......如果这是ThreadSafe
如果 Listening 应用程序上的每个类都有自己的 NetMQClient 来监听特定主题,难道不是更好吗?
即使在阅读了围绕 NetMQ 提供的所有示例之后,这里也真的很困惑如何使用所有这些。
谢谢
解决方案
推荐阅读
- typescript - 如何在 TypeScript 中将对象属性作为参数传递
- django - 如何在 Django 3.0.8 中更改 USERNAME_FIELD。不创建自定义用户?
- r - R Shiny动态编辑reactiveTimer定时器间隔
- php - 在 laravel 6 中使用电子邮件或电话登录不起作用
- c++ - 如何从 Emscripten 触发事件?
- filter - 设置参数控制和使用过滤器
- kotlin - 带有自签名 ssl 证书的 GRPC Okhttp android 客户端通道
- xamarin - 我们可以在 uno 平台上减小 android App 的大小吗?
- java - 使用连接字符串通过 Spring Boot 连接到 Azure EventHub(Kafka like)
- javascript - 如何在 jqueryUI 小部件插件“多日期选择器”中设置动态参数值