首页 > 技术文章 > NetMQ用作IPC的实例

linxmouse 2017-11-29 01:27 原文

  • 发送端/接收端
  1 using System;
  2 using System.Threading;
  3 using NetMQ;
  4 using NetMQ.Sockets;
  5 
  6 namespace NetMQIPCServer
  7 {
  8     class Program
  9     {
 10         private const string topic = "unity3d";
 11         private static bool running = true;
 12         static void Main(string[] args)
 13         {
 14             Console.WriteLine("Main thread id = {0}", Thread.CurrentThread.ManagedThreadId);
 15 
 16             // 进程内通信
 17             //pub.Bind("inproc://unity3d");
 18             //sub.Connect("inproc://unity3d");
 19 
 20             // 跨进程通信
 21             //var pub = new PublisherSocket();
 22             //var sub = new SubscriberSocket();
 23             //pub.Bind("ipc:///unity3d/0");          
 24             //sub.Connect("ipc:///unity3d/0");
 25             //sub.Subscribe(topic);
 26 
 27             // 定向接收[NonBlock]
 28             //var pub = new PublisherSocket();
 29             //var sub = new SubscriberSocket();
 30             //pub.Bind("ipc:///unity3d/0");
 31             //sub.Connect("ipc:///unity3d/0");
 32             //sub.Subscribe(topic);
 33             //var proactor = new NetMQProactor(sub, (socket, message) =>
 34             //{
 35             //    //Console.WriteLine(message);
 36             //    foreach (var b in message[1].Buffer)
 37             //    {
 38             //        Console.Write("{0:x2}", b);
 39             //        Console.Write(" ");
 40             //    }
 41             //    Console.WriteLine("Received message in thread {0}", Thread.CurrentThread.ManagedThreadId);
 42             //});
 43 
 44             // TCP
 45             var pub = new PublisherSocket();
 46             var sub = new SubscriberSocket();
 47             pub.Bind("tcp://*:2017");
 48             sub.Connect("tcp://localhost:2017");
 49             sub.Subscribe(topic);
 50             var proactor = new NetMQProactor(sub, (socket, message) =>
 51             {
 52                 //Console.WriteLine(message);
 53                 foreach (var b in message[1].Buffer)
 54                 {
 55                     Console.Write("{0:x2}", b);
 56                     Console.Write(" ");
 57                 }
 58                 Console.WriteLine("Received message in thread {0}", Thread.CurrentThread.ManagedThreadId);
 59             });
 60 
 61             // 轮询模式
 62             //var poller = new NetMQPoller();
 63             //poller.Add(sub);
 64             //sub.ReceiveReady += (sender, eventArgs) =>
 65             //{
 66             //    bool more = false;
 67             //    byte[] bytes = null;
 68             //    eventArgs.Socket.ReceiveFrameBytes(out more);
 69             //    if (more)
 70             //    {
 71             //        bytes = eventArgs.Socket.ReceiveFrameBytes();
 72             //        foreach (var b in bytes)
 73             //        {
 74             //            Console.Write("{0:x2}", b);
 75             //            Console.Write(" ");
 76             //        }
 77             //        Console.WriteLine("Received bytes in thread {0}", Thread.CurrentThread.ManagedThreadId);
 78             //    }
 79             //};
 80 
 81             new Thread(() =>
 82             {
 83                 while (running)
 84                 {
 85                     Console.WriteLine("PublisherSocket:Send Bytes in thread {0}.", Thread.CurrentThread.ManagedThreadId);
 86                     pub.SendMoreFrame(topic).SendFrame(new byte[] { 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,
 87                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
 88                         0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,
 89                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
 90                         0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,
 91                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
 92                         0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,
 93                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
 94                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
 95                         0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,
 96                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
 97                         0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,
 98                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
 99                         0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3,
100                         0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3, 0xa, 0xb, 0xc,
101                         0x1, 0x2, 0x3, 0xa, 0xb, 0xc, 0x1, 0x2, 0x3 });
102                     Thread.Sleep(1000);
103                 }
104             }).Start();
105 
106             //new Thread(() =>
107             //{
108             //    bool more = false;
109             //    byte[] bytes = null;
110             //    while (running)
111             //    {
112             //        if (sub.HasIn)
113             //        {
114             //            sub.ReceiveFrameBytes(out more);
115             //            if (more)
116             //            {
117             //                bytes = sub.ReceiveFrameBytes();
118             //                foreach (var b in bytes)
119             //                {
120             //                    Console.Write("{0:x2}", b);
121             //                    Console.Write(" ");
122             //                }
123             //                Console.WriteLine("Received bytes in thread {0}", Thread.CurrentThread.ManagedThreadId);
124             //            }
125             //        }
126             //    }
127             //}).Start();
128             // 在主线程轮询[Block]
129             //poller.Run();
130             // 异步轮询[NonBlock]
131             //poller.RunAsync();
132 
133             Console.ReadKey();
134             running = false;
135             Thread.Sleep(50);
136             //poller.Stop();
137             //poller.StopAsync();
138             pub.Close();
139             sub.Close();
140             NetMQConfig.Cleanup();
141         }
142     }
143 }

 

  • 运行结果如图

NetMQ官方主页:http://netmq.readthedocs.io/en/latest/

推荐阅读