2KB项目,专业的源码交易网站 帮助 收藏 每日签到

ZeroMQ : 多个套接字轮询

  • 时间:2019-01-23 18:34 编辑:2KB 来源:2KB.COM 阅读:327
  • 扫一扫,手机访问
  • 分享
摘要:
ZeroMQ 英文原文:ZeroMQ #4 : Multiple Sockets Polling

上一次,我们看了几样东西,分别是:

  • Options,选项

  • Identity,认证

  • SendMore,发送更多

这一次,我们将要讨论如何使用多个套接字进行处理。

代码在哪里?

如往常一样,在我们开始前,告知代码在哪里是礼貌的,并且代码如以前一样,放在Github上面:

为什么你需要处理多个套接字?

那么,为什么你需要处理多个套接字呢?对于这个问题,有非常多的原因,比如:

  • 你可能在一个进程中具有多个套接字,并且它们之间相互依赖,你需要知道哪个套接字在何时已经准备就绪,可以接收任何内容。

  • 你可能有一个Request,同样在相同的进程中还有一个Publisher

老实说,有时你可能要在程序中操作多个socket。这时你只想操作已就绪的socket。

实际上,ZeroMQ有一个Poller(轮询)的概念,它能检查socket是否就绪。NetMQ实现了Poller,它有以下功能:

  • 监视一个socket是否就绪

  • 监视一个IEnumerable<NetMQSocket>是否就绪

  • 允许动态的增加NetMQSocket,并监视新增的socket

  • 允许动态移除NetMQSocket

  • 就绪时向socket实例发送事件

学习NetMQ Poller的一个好方法就是做测试。本文中我不会测试到所有东西,但NetMQ自已有非常非常好的测试用例。事实上,我还从它那里“借用”了一些。

几个例子

正如我声明的那样,我并不是这些测试案例的作者。我这里采取了NetMQ Poller测试组件的子集,我觉得介绍一下围绕Poller类进行的一些讨论是比较恰当的。

注:这一系列的文章仅作为新手入门,一些高级ZeroMQ用户可能不会在这一些列的文章中学到很多东西。

单个套接字轮询测试

这些测试用例采用了我们最为熟知的(希望到目前为止)的Request/Response套接字配置。我们将会使用轮询器来警告我们(通过轮询器为我们准备的 `xxxxSocket.ReceiveReady`事件)`ResponseSocket`已经就绪。

这里是代码:

[Test]
 public void SingleSocketPollTest()
 {
     using (NetMQContext contex = NetMQContext.Create())
     {
         using (var rep = contex.CreateResponseSocket())
         {
             rep.Bind("tcp://127.0.0.1:5002");

             using (var req = contex.CreateRequestSocket())
             using (Poller poller = new Poller())
             {
                 req.Connect("tcp://127.0.0.1:5002");

                 //The ReceiveReady event is raised by the Poller
                 rep.ReceiveReady += (s, a) =>
                 {
                     bool more;
                     string m = a.Socket.ReceiveString(out more);

                     Assert.False(more);
                     Assert.AreEqual("Hello", m);

                     a.Socket.Send("World");
                 };

                 poller.AddSocket(rep);

                 Task pollerTask = Task.Factory.StartNew(poller.Start);
                 req.Send("Hello");

                 bool more2;
                 string m1 = req.ReceiveString(out more2);

                 Assert.IsFalse(more2);
                 Assert.AreEqual("World", m1);

                 poller.Stop();

                 Thread.Sleep(100);
                 Assert.IsTrue(pollerTask.IsCompleted);
             }
         }
     }
 }

在测试工作中添加套接字

这个例子展示给我们如何在轮询器运行时添加额外的套接字,并且,轮询器依然会为我们提供一个xxxxSocket.ReceiveReady事件:

[Test]
 public void AddSocketDuringWorkTest()
 {
     using (NetMQContext contex = NetMQContext.Create())
     {
         // we are using three responses to make sure we actually 
         //move the correct socket and other sockets still work
         using (var router = contex.CreateRouterSocket())
         using (var router2 = contex.CreateRouterSocket())
         {
             router.Bind("tcp://127.0.0.1:5002");
             router2.Bind("tcp://127.0.0.1:5003");

             using (var dealer = contex.CreateDealerSocket())
             using (var dealer2 = contex.CreateDealerSocket())
             using (Poller poller = new Poller())
             {
                 dealer.Connect("tcp://127.0.0.1:5002");
                 dealer2.Connect("tcp://127.0.0.1:5003");

                 bool router1arrived = false;
                 bool router2arrived = false;

                 bool more;

                 //The ReceiveReady event is raised by the Poller
                 router2.ReceiveReady += (s, a) =>
                 {
                     router2.Receive(out more);
                     router2.Receive(out more);
                     router2arrived = true;
                 };

                 //The ReceiveReady event is raised by the Poller
                 router.ReceiveReady += (s, a) =>
                 {
                     router1arrived = true;

                     router.Receive(out more);
                     router.Receive(out more);

                     poller.AddSocket(router2);
                 };

                 poller.AddSocket(router);

                 Task task = Task.Factory.StartNew(poller.Start);

                 dealer.Send("1");
                 Thread.Sleep(300);
                 dealer2.Send("2");
                 Thread.Sleep(300);

                 poller.Stop(true);
                 task.Wait();

                 Assert.IsTrue(router1arrived);
                 Assert.IsTrue(router2arrived);
             }
         }
     }

移除测试之后添加套接字

此示例基于我们从轮询器中删除另一个套接字后添加一个新的套接字的轮询。

[Test]
 public void AddSocketAfterRemovingTest()
 {
     using (NetMQContext contex = NetMQContext.Create())
     {
         // we are using three responses to make sure we actually 
         //move the correct socket and other sockets still work
         using (var router = contex.CreateRouterSocket())
         using (var router2 = contex.CreateRouterSocket())
         using (var router3 = contex.CreateRouterSocket())
         {
             router.Bind("tcp://127.0.0.1:5002");
             router2.Bind("tcp://127.0.0.1:5003");
             router3.Bind("tcp://127.0.0.1:5004");

             using (var dealer = contex.CreateDealerSocket())
             using (var dealer2 = contex.CreateDealerSocket())
             using (var dealer3 = contex.CreateDealerSocket())
             using (Poller poller = new Poller())
             {
                 dealer.Connect("tcp://127.0.0.1:5002");
                 dealer2.Connect("tcp://127.0.0.1:5003");
                 dealer3.Connect("tcp://127.0.0.1:5004");

                 bool router1arrived = false;
                 bool router2arrived = false;
                 bool router3arrived = false;

                 bool more;

                 //The ReceiveReady event is raised by the Poller
                 router.ReceiveReady += (s, a) =>
                 {
                     router1arrived = true;

                     router.Receive(out more);
                     router.Receive(out more);

                     poller.RemoveSocket(router);
                 };

                 poller.AddSocket(router);

                 //The ReceiveReady event is raised by the Poller
                 router3.ReceiveReady += (s, a) =>
                 {
                     router3.Receive(out more);
                     router3.Receive(out more);
                     router3arrived = true;
                 };

                 //The ReceiveReady event is raised by the Poller
                 router2.ReceiveReady += (s, a) =>
                 {
                     router2arrived = true;
                     router2.Receive(out more);
                     router2.Receive(out more);

                     poller.AddSocket(router3);
                 };
                 poller.AddSocket(router2);

                 Task task = Task.Factory.StartNew(poller.Start);

                 dealer.Send("1");
                 Thread.Sleep(300);
                 dealer2.Send("2");
                 Thread.Sleep(300);
                 dealer3.Send("3");
                 Thread.Sleep(300);

                 poller.Stop(true);
                 task.Wait();

                 Assert.IsTrue(router1arrived);
                 Assert.IsTrue(router2arrived);
                 Assert.IsTrue(router3arrived);
             }
         }
     }
 }

移除测试之后添加2个套接字

在这个例子中,我们在轮询器中移除套接字之后再向其中添加一些套接字:

[Test]
 public void AddTwoSocketAfterRemovingTest()
 {
     using (NetMQContext contex = NetMQContext.Create())
     {
         // we are using three responses to make sure we actually 
         //move the correct socket and other sockets still work
         using (var router = contex.CreateRouterSocket())
         using (var router2 = contex.CreateRouterSocket())
         using (var router3 = contex.CreateRouterSocket())
         using (var router4 = contex.CreateRouterSocket())
         {
             router.Bind("tcp://127.0.0.1:5002");
             router2.Bind("tcp://127.0.0.1:5003");
             router3.Bind("tcp://127.0.0.1:5004");
             router4.Bind("tcp://127.0.0.1:5005");

             using (var dealer = contex.CreateDealerSocket())
             using (var dealer2 = contex.CreateDealerSocket())
             using (var dealer3 = contex.CreateDealerSocket())
             using (var dealer4 = contex.CreateDealerSocket())
             using (Poller poller = new Poller())
                   
             {
                 dealer.Connect("tcp://127.0.0.1:5002");
                 dealer2.Connect("tcp://127.0.0.1:5003");
                 dealer3.Connect("tcp://127.0.0.1:5004");
                 dealer4.Connect("tcp://127.0.0.1:5005");

                 int router1arrived = 0;
                 int router2arrived = 0;
                 bool router3arrived = false;
                 bool router4arrived = false;

                 bool more;

                 //The ReceiveReady event is raised by the Poller
                 router.ReceiveReady += (s, a) =>
                 {
                     router1arrived++;

                     router.Receive(out more);
                     router.Receive(out more);

                     poller.RemoveSocket(router);
                 };

                 poller.AddSocket(router);

                 //The ReceiveReady event is raised by the Poller
                 router3.ReceiveReady += (s, a) =>
                 {
                     router3.Receive(out more);
                     router3.Receive(out more);
                     router3arrived = true;
                 };

                 //The ReceiveReady event is raised by the Poller
                 router4.ReceiveReady += (s, a) =>
                 {
                     router4.Receive(out more);
                     router4.Receive(out more);
                     router4arrived = true;
                 };

                 //The ReceiveReady event is raised by the Poller
                 router2.ReceiveReady += (s, a) =>
                 {
                     router2arrived++;
                     router2.Receive(out more);
                     router2.Receive(out more);

                     if (router2arrived == 1)
                     {
                         poller.AddSocket(router3);


                         poller.AddSocket(router4);
                     }
                 };

                 poller.AddSocket(router2);

                 Task task = Task.Factory.StartNew(poller.Start);

                 dealer.Send("1");
                 Thread.Sleep(300);
                 dealer2.Send("2");
                 Thread.Sleep(300);
                 dealer3.Send("3");
                 dealer4.Send("4");
                 dealer2.Send("2");
                 dealer.Send("1");
                 Thread.Sleep(300);

                 poller.Stop(true);
                 task.Wait();

                 router.Receive(true, out more);

                 Assert.IsTrue(more);

                 router.Receive(true, out more);

                 Assert.IsFalse(more);

                 Assert.AreEqual(1, router1arrived);
                 Assert.AreEqual(2, router2arrived);
                 Assert.IsTrue(router3arrived);
                 Assert.IsTrue(router4arrived);
             }
         }
     }
 }

取消套接字的测试

最后一个例子展示了3个RouterSockets分别链接3个DealerSockets(我们将会在后面的文章中讨论一下DealerSocket,目前,你把他们当作典型的异步工作线程就好了)。接下来,我们将为轮询器添加所有的路由。在第一个RouterSocket.ReceiveReady之内,我们将在轮询器中移除RouterSocket,这样它将不能分别接收到来自它DealerSocket的任何消息,下面是这个测试案例的代码:

[Test]
 public void CancelSocketTest()
 {
     using (NetMQContext contex = NetMQContext.Create())
     {
         // we are using three responses to make sure we actually 
         //move the correct socket and other sockets still work
         using (var router = contex.CreateRouterSocket())
         using (var router2 = contex.CreateRouterSocket())
         using (var router3 = contex.CreateRouterSocket())
         {
             router.Bind("tcp://127.0.0.1:5002");
             router2.Bind("tcp://127.0.0.1:5003");
             router3.Bind("tcp://127.0.0.1:5004");

             using (var dealer = contex.CreateDealerSocket())
             using (var dealer2 = contex.CreateDealerSocket())
             using (var dealer3 = contex.CreateDealerSocket())
             using (Poller poller = new Poller())
             {
                 dealer.Connect("tcp://127.0.0.1:5002");
                 dealer2.Connect("tcp://127.0.0.1:5003");
                 dealer3.Connect("tcp://127.0.0.1:5004");

                 bool first = true;

                 //The ReceiveReady event is raised by the Poller
                 router2.ReceiveReady += (s, a) =>
                 {
                     bool more;

                     // identity
                     byte[] identity = a.Socket.Receive(out more);

                     // message
                     a.Socket.Receive(out more);

                     a.Socket.SendMore(identity);
                     a.Socket.Send("2");
                 };

                 poller.AddSocket(router2);

                 //The ReceiveReady event is raised by the Poller
                 router.ReceiveReady += (s, a) =>
                 {
                     if (!first)
                     {
                         Assert.Fail("This should happen because we cancelled the socket");
                     }
                     first = false;

                     bool more;

                     // identity
                     a.Socket.Receive(out more);

                     string m = a.Socket.ReceiveString(out more);

                     Assert.False(more);
                     Assert.AreEqual("Hello", m);

                     // cancelling the socket
                     poller.RemoveSocket(a.Socket);
                 };

                 poller.AddSocket(router);

                 //The ReceiveReady event is raised by the Poller
                 router3.ReceiveReady += (s, a) =>
                 {
                     bool more;

                     // identity
                     byte[] identity = a.Socket.Receive(out more);

                     // message
                     a.Socket.Receive(out more);

                     a.Socket.SendMore(identity).Send("3");
                 };

                 poller.AddSocket(router3);

                 Task pollerTask = Task.Factory.StartNew(poller.Start);

                 dealer.Send("Hello");

                 // sending this should not arrive on the poller, 
                 //therefore response for this will never arrive
                 dealer.Send("Hello2");

                 Thread.Sleep(100);

                 // sending this should not arrive on the poller, 
                 //therefore response for this will never arrive                        
                 dealer.Send("Hello3");

                 Thread.Sleep(500);

                 bool more2;

                 // making sure the socket defined before the one cancelled still works
                 dealer2.Send("1");
                 string msg = dealer2.ReceiveString(out more2);
                 Assert.AreEqual("2", msg);

                 // making sure the socket defined after the one cancelled still works
                 dealer3.Send("1");
                 msg = dealer3.ReceiveString(out more2);
                 Assert.AreEqual("3", msg);

                 // we have to give this some time if we want to make sure 
                 //it&apos;s really not happening and it not only because of time
                 Thread.Sleep(300);

                 poller.Stop();

                 Thread.Sleep(100);
                 Assert.IsTrue(pollerTask.IsCompleted);
             }
         }
     }
}

以上就是这一次我想讨论和分享的全部内容。我希望你能学到如何在你的自己的套接字拓扑中使用Poller,并且它为什么是一个有用的工具。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。 2KB翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。


2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务

  • 全部评论(0)
资讯详情页最新发布上方横幅
最新发布的资讯信息
【计算机/互联网|】Nginx出现502错误(2020-01-20 21:02)
【计算机/互联网|】网站运营全智能软手V0.1版发布(2020-01-20 12:16)
【计算机/互联网|】淘宝这是怎么了?(2020-01-19 19:15)
【行业动态|】谷歌关闭小米智能摄像头,因为窃听器显示了陌生人家中的照片(2020-01-15 09:42)
【行业动态|】据报道谷歌新闻终止了数字杂志,退还主动订阅(2020-01-15 09:39)
【行业动态|】康佳将OLED电视带到美国与LG和索尼竞争(2020-01-15 09:38)
【行业动态|】2020年最佳AV接收机(2020-01-15 09:35)
【行业动态|】2020年最佳流媒体设备:Roku,Apple TV,Firebar,Chromecast等(2020-01-15 09:31)
【行业动态|】CES 2020预览:更多的流媒体服务和订阅即将到来(2020-01-08 21:41)
【行业动态|】从埃隆·马斯克到杰夫·贝佐斯,这30位人物定义了2010年代(2020-01-01 15:14)
联系我们

Q Q: 7090832

电话:400-0011-990

邮箱:7090832@qq.com

时间:9:00-23:00

联系客服
商家入住 服务咨询 投拆建议 联系客服
0577-67068160
手机版

扫一扫进手机版
返回顶部