上一次,我们看了几样东西,分别是:
Options,选项
Identity,认证
SendMore,发送更多
这一次,我们将要讨论如何使用多个套接字进行处理。
代码在哪里?
如往常一样,在我们开始前,告知代码在哪里是礼貌的,并且代码如以前一样,放在Github上面:
为什么你需要处理多个套接字?
那么,为什么你需要处理多个套接字呢?对于这个问题,有非常多的原因,比如:
你可能在一个进程中具有多个套接字,并且它们之间相互依赖,你需要知道哪个套接字在何时已经准备就绪,可以接收任何内容。
你可能有一个Request,同样在相同的进程中还有一个Publisher
老实说,有时你可能要在程序中操作多个socket。这时你只想操作已就绪的socket。
实际上,ZeroMQ有一个Poller(轮询)的概念,它能检查socket是否就绪。NetMQ实现了Poller,它有以下功能:
监视一个socket是否就绪
监视一个IEnumerable<NetMQSocket>是否就绪
允许动态的增加NetMQSocket,并监视新增的socket
允许动态移除NetMQSocket
就绪时向socket实例发送事件
学习NetMQ Poller的一个好方法就是做测试。本文中我不会测试到所有东西,但NetMQ自已有非常非常好的测试用例。事实上,我还从它那里“借用”了一些。
正如我声明的那样,我并不是这些测试案例的作者。我这里采取了NetMQ Poller测试组件的子集,我觉得介绍一下围绕Poller类进行的一些讨论是比较恰当的。
注:这一系列的文章仅作为新手入门,一些高级ZeroMQ用户可能不会在这一些列的文章中学到很多东西。
单个套接字轮询测试
这些测试用例采用了我们最为熟知的(希望到目前为止)的Request/Response套接字配置。我们将会使用轮询器来警告我们(通过轮询器为我们准备的 `xxxxSocket.ReceiveReady`事件)`ResponseSocket`已经就绪。
这里是代码:
[Test] public void SingleSocketPollTest() { using (NetMQContext contex = NetMQContext.Create()) { using (var rep = contex.CreateResponseSocket()) { rep.Bind("tcp://127.0.0.1:5002"); using (var req = contex.CreateRequestSocket()) using (Poller poller = new Poller()) { req.Connect("tcp://127.0.0.1:5002"); //The ReceiveReady event is raised by the Poller rep.ReceiveReady += (s, a) => { bool more; string m = a.Socket.ReceiveString(out more); Assert.False(more); Assert.AreEqual("Hello", m); a.Socket.Send("World"); }; poller.AddSocket(rep); Task pollerTask = Task.Factory.StartNew(poller.Start); req.Send("Hello"); bool more2; string m1 = req.ReceiveString(out more2); Assert.IsFalse(more2); Assert.AreEqual("World", m1); poller.Stop(); Thread.Sleep(100); Assert.IsTrue(pollerTask.IsCompleted); } } } }
这个例子展示给我们如何在轮询器运行时添加额外的套接字,并且,轮询器依然会为我们提供一个xxxxSocket.ReceiveReady事件:
[Test] public void AddSocketDuringWorkTest() { using (NetMQContext contex = NetMQContext.Create()) { // we are using three responses to make sure we actually //move the correct socket and other sockets still work using (var router = contex.CreateRouterSocket()) using (var router2 = contex.CreateRouterSocket()) { router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (Poller poller = new Poller()) { dealer.Connect("tcp://127.0.0.1:5002"); dealer2.Connect("tcp://127.0.0.1:5003"); bool router1arrived = false; bool router2arrived = false; bool more; //The ReceiveReady event is raised by the Poller router2.ReceiveReady += (s, a) => { router2.Receive(out more); router2.Receive(out more); router2arrived = true; }; //The ReceiveReady event is raised by the Poller router.ReceiveReady += (s, a) => { router1arrived = true; router.Receive(out more); router.Receive(out more); poller.AddSocket(router2); }; poller.AddSocket(router); Task task = Task.Factory.StartNew(poller.Start); dealer.Send("1"); Thread.Sleep(300); dealer2.Send("2"); Thread.Sleep(300); poller.Stop(true); task.Wait(); Assert.IsTrue(router1arrived); Assert.IsTrue(router2arrived); } } }
移除测试之后添加套接字
此示例基于我们从轮询器中删除另一个套接字后添加一个新的套接字的轮询。
[Test] public void AddSocketAfterRemovingTest() { using (NetMQContext contex = NetMQContext.Create()) { // we are using three responses to make sure we actually //move the correct socket and other sockets still work using (var router = contex.CreateRouterSocket()) using (var router2 = contex.CreateRouterSocket()) using (var router3 = contex.CreateRouterSocket()) { router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); router3.Bind("tcp://127.0.0.1:5004"); using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (var dealer3 = contex.CreateDealerSocket()) using (Poller poller = new Poller()) { dealer.Connect("tcp://127.0.0.1:5002"); dealer2.Connect("tcp://127.0.0.1:5003"); dealer3.Connect("tcp://127.0.0.1:5004"); bool router1arrived = false; bool router2arrived = false; bool router3arrived = false; bool more; //The ReceiveReady event is raised by the Poller router.ReceiveReady += (s, a) => { router1arrived = true; router.Receive(out more); router.Receive(out more); poller.RemoveSocket(router); }; poller.AddSocket(router); //The ReceiveReady event is raised by the Poller router3.ReceiveReady += (s, a) => { router3.Receive(out more); router3.Receive(out more); router3arrived = true; }; //The ReceiveReady event is raised by the Poller router2.ReceiveReady += (s, a) => { router2arrived = true; router2.Receive(out more); router2.Receive(out more); poller.AddSocket(router3); }; poller.AddSocket(router2); Task task = Task.Factory.StartNew(poller.Start); dealer.Send("1"); Thread.Sleep(300); dealer2.Send("2"); Thread.Sleep(300); dealer3.Send("3"); Thread.Sleep(300); poller.Stop(true); task.Wait(); Assert.IsTrue(router1arrived); Assert.IsTrue(router2arrived); Assert.IsTrue(router3arrived); } } } }
移除测试之后添加2个套接字
在这个例子中,我们在轮询器中移除套接字之后再向其中添加一些套接字:
[Test] public void AddTwoSocketAfterRemovingTest() { using (NetMQContext contex = NetMQContext.Create()) { // we are using three responses to make sure we actually //move the correct socket and other sockets still work using (var router = contex.CreateRouterSocket()) using (var router2 = contex.CreateRouterSocket()) using (var router3 = contex.CreateRouterSocket()) using (var router4 = contex.CreateRouterSocket()) { router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); router3.Bind("tcp://127.0.0.1:5004"); router4.Bind("tcp://127.0.0.1:5005"); using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (var dealer3 = contex.CreateDealerSocket()) using (var dealer4 = contex.CreateDealerSocket()) using (Poller poller = new Poller()) { dealer.Connect("tcp://127.0.0.1:5002"); dealer2.Connect("tcp://127.0.0.1:5003"); dealer3.Connect("tcp://127.0.0.1:5004"); dealer4.Connect("tcp://127.0.0.1:5005"); int router1arrived = 0; int router2arrived = 0; bool router3arrived = false; bool router4arrived = false; bool more; //The ReceiveReady event is raised by the Poller router.ReceiveReady += (s, a) => { router1arrived++; router.Receive(out more); router.Receive(out more); poller.RemoveSocket(router); }; poller.AddSocket(router); //The ReceiveReady event is raised by the Poller router3.ReceiveReady += (s, a) => { router3.Receive(out more); router3.Receive(out more); router3arrived = true; }; //The ReceiveReady event is raised by the Poller router4.ReceiveReady += (s, a) => { router4.Receive(out more); router4.Receive(out more); router4arrived = true; }; //The ReceiveReady event is raised by the Poller router2.ReceiveReady += (s, a) => { router2arrived++; router2.Receive(out more); router2.Receive(out more); if (router2arrived == 1) { poller.AddSocket(router3); poller.AddSocket(router4); } }; poller.AddSocket(router2); Task task = Task.Factory.StartNew(poller.Start); dealer.Send("1"); Thread.Sleep(300); dealer2.Send("2"); Thread.Sleep(300); dealer3.Send("3"); dealer4.Send("4"); dealer2.Send("2"); dealer.Send("1"); Thread.Sleep(300); poller.Stop(true); task.Wait(); router.Receive(true, out more); Assert.IsTrue(more); router.Receive(true, out more); Assert.IsFalse(more); Assert.AreEqual(1, router1arrived); Assert.AreEqual(2, router2arrived); Assert.IsTrue(router3arrived); Assert.IsTrue(router4arrived); } } } }
取消套接字的测试
最后一个例子展示了3个RouterSockets分别链接3个DealerSockets(我们将会在后面的文章中讨论一下DealerSocket,目前,你把他们当作典型的异步工作线程就好了)。接下来,我们将为轮询器添加所有的路由。在第一个RouterSocket.ReceiveReady之内,我们将在轮询器中移除RouterSocket,这样它将不能分别接收到来自它DealerSocket的任何消息,下面是这个测试案例的代码:
[Test] public void CancelSocketTest() { using (NetMQContext contex = NetMQContext.Create()) { // we are using three responses to make sure we actually //move the correct socket and other sockets still work using (var router = contex.CreateRouterSocket()) using (var router2 = contex.CreateRouterSocket()) using (var router3 = contex.CreateRouterSocket()) { router.Bind("tcp://127.0.0.1:5002"); router2.Bind("tcp://127.0.0.1:5003"); router3.Bind("tcp://127.0.0.1:5004"); using (var dealer = contex.CreateDealerSocket()) using (var dealer2 = contex.CreateDealerSocket()) using (var dealer3 = contex.CreateDealerSocket()) using (Poller poller = new Poller()) { dealer.Connect("tcp://127.0.0.1:5002"); dealer2.Connect("tcp://127.0.0.1:5003"); dealer3.Connect("tcp://127.0.0.1:5004"); bool first = true; //The ReceiveReady event is raised by the Poller router2.ReceiveReady += (s, a) => { bool more; // identity byte[] identity = a.Socket.Receive(out more); // message a.Socket.Receive(out more); a.Socket.SendMore(identity); a.Socket.Send("2"); }; poller.AddSocket(router2); //The ReceiveReady event is raised by the Poller router.ReceiveReady += (s, a) => { if (!first) { Assert.Fail("This should happen because we cancelled the socket"); } first = false; bool more; // identity a.Socket.Receive(out more); string m = a.Socket.ReceiveString(out more); Assert.False(more); Assert.AreEqual("Hello", m); // cancelling the socket poller.RemoveSocket(a.Socket); }; poller.AddSocket(router); //The ReceiveReady event is raised by the Poller router3.ReceiveReady += (s, a) => { bool more; // identity byte[] identity = a.Socket.Receive(out more); // message a.Socket.Receive(out more); a.Socket.SendMore(identity).Send("3"); }; poller.AddSocket(router3); Task pollerTask = Task.Factory.StartNew(poller.Start); dealer.Send("Hello"); // sending this should not arrive on the poller, //therefore response for this will never arrive dealer.Send("Hello2"); Thread.Sleep(100); // sending this should not arrive on the poller, //therefore response for this will never arrive dealer.Send("Hello3"); Thread.Sleep(500); bool more2; // making sure the socket defined before the one cancelled still works dealer2.Send("1"); string msg = dealer2.ReceiveString(out more2); Assert.AreEqual("2", msg); // making sure the socket defined after the one cancelled still works dealer3.Send("1"); msg = dealer3.ReceiveString(out more2); Assert.AreEqual("3", msg); // we have to give this some time if we want to make sure //it's really not happening and it not only because of time Thread.Sleep(300); poller.Stop(); Thread.Sleep(100); Assert.IsTrue(pollerTask.IsCompleted); } } } }
以上就是这一次我想讨论和分享的全部内容。我希望你能学到如何在你的自己的套接字拓扑中使用Poller,并且它为什么是一个有用的工具。
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。 2KB翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务