本文介绍了如何使用ZeroMQ来进行内部服务的RPC调用。HTTP是面向公众服务的标准选择,但在由许多小部分组成的系统内调用内部RPC时,最好使用ZeroMQ代替HTTP。
总结一下,ZeroMQ较HTTP提供的好处如下:
Clojure的Java程序可能无法一眼便了解意图,这里是一个简单的注释。
;; Quick Clojure tutorial, with Java equivalents ;; Java method call on string instance. Returns "A STRING" (.toUpperCase "a string") "a string".toUpperCase() ;; Java method call with one argument. Returns byte[]. (.getBytes "a string" "UTF8") "a string".getBytes("UTF8") ;; Calls the constructor and creates a new instance of Point (Point. 15 21) new Point(15, 21) ;; Create a new thread and start it (.start (Thread. (fn [] ...))) Thread t = new Thread(aClojureFunctionHere) t.start()
ZeroMQ命名的确很巧妙。它不是一个消息队列,没有服务器或你需要下载并运行的第三方依赖。事实上,ZeroMQ没有集中的组件。换句话说,ZeroMQ的名字很恰当:ZeroMQ中有零个消息队列零。那它是什么?
ZeroMQ是一个网络库。你创建不同类型的socket(REQ、REP、DEALER、ROUTER、PUB、SUB……),以各种有趣的方式连接这些socket,发送信息到这些socket。socket的类型决定了消息如何路由和传递。
如何连接socket、如何使用它们调用RPC代替HTTP,就是这篇博客涵盖的内容。
不要以为ZeroMQ是ActiveMQ或RabbitMQ的一种替代方案,或者认为ZeroMQ是一个原子支持网络或socket库。
第一个例子是你永远不会在生产代码中使用,因为它没有并发。我们对一些短小独立的代码改善迭代,让他们达到我们预期的效果,这些代码段演示了一些ZeroMQ的重要概念。
我们将创建两个socket。一个REQ(请求)socket。ZeroMQ的socket更具类型有不同的API。如果你打算将两个信息在一组从socket发送,你会得到一个错误。你需要先发送,在等待到答复后你才能再次发送。
另一个socket是一个REP(回复)socket。它有一个类似但相反的API。如果你对REP做的第一件事是发送数据,你会得到一个错误,你必须以等待消息开始,只有当你发送一个消息才可以做其他的。然后你必须等待另一个消息。
这个API使它容易做RPC调用,正如ZeroMQ将为我们处理所有的细节。我们需要做的是等待响应和答复答复回复,发出请求并等待响应。
显然,例子中的一个简单的ZeroMQ的REQ或REP的处理并不会有并发。你一次只能发送一个请求或者处理一个相应。后面我们会看到并发的例子。
;; 非常简单的REQ/REP初始化,只是用于基本的演示 ;; 一次一个请求,没有并发。 (.start (Thread. (fn [] (let [sock (.socket (ZMQ/context 1) ZMQ/REP)] (.bind sock "tcp://127.0.0.1:1337") (while true ;; 阻塞住,直到收到消息 (let [req (.recv sock 0)] ;; req是一个byte[]类型。你可以对其做一些处理! ;; 将req回显给客户端。 (.send sock (.getBytes (str (String. req) " - echoed!")) ZMQ/NOBLOCK))))))) (.start (Thread. (fn [] (let [sock (.socket (ZMQ/context 1) ZMQ/REQ)] (.connect sock "tcp://127.0.0.1:1337") (dotimes [n 5] ;; 处理一次请求 (.send sock (.getBytes (str "Hello, " n)) 0) ;; 阻塞住,直到收到响应 (let [res (.recv sock 0)] ;; res是byte[]类型,包含REP包的回复内容。 (println (String. res))))))))
从这个小例子中我们可以了解很多东西。
到现在为止,我们一直保持请求代码不变,不过我们将替代每次只能进行一个应答的单个线程。
ZeroMQ想到了进程内消息传递,而且这就是我们用来获得并发应答的方法。我们将创建多个线程,并且每个线程对应一个REP套接字,然后使用具有新套接字的新线程把这些线程连接为一个大对象。这时就没有跨线程的状态了,我们在线程间要传递的唯一“状态”就是ZeroMQ消息。
为了获得并发应答,我们创建一个类型为DEALER的套接字。我们把这个套接字与"inproc://“协议捆绑在一起,然后把真正的REP套接字和这个DEALER套接字连接在一起。当DEALER套接字接收到消息的时候,它就把这个消息传递给一个REP套接字,然后看看那些套接字正忙。当一个REP套接字应答的时候,它将给只把消息按原样向前传递DEALER同样的应答。这种配置考虑到对许多REP套接字前转消息的一个顶级套接字。
在这片文章的所有例子里,我们将在一个单独的TCP连接上使用多个REQ套接字来实现并发请求。不过,如果你需要支持到我们服务器的多个TCP连接的话,我们当然需要这样的连接,那么我们需要在DEALER前面有一个ROUTER。例如我们可能有多个系统正连接这一个服务,并要求每个服务/进程对应各自的TCP连接。或者我们可能需要连接到服务器来维护RPC调用。
ROUTER将给每个连接到它的套接字一个内部ID号,然后立刻前转附带有这个号码的元数据的消息,包含内部套接字ID。当它获得返回的消息的时候,它立刻前转消息给已经连接的并出现在元数据里的套接字ID对应的REQ套接字。
(defn my-response-handler "Takes a req (bytes), returns the response (also bytes)." [req] (.getBytes (str (String. req) " - echoed!"))) (let [ctx (ZMQ/context 1) worker-url "inproc://responders" router-socket (.socket ctx ZMQ/ROUTER) dealer-socket (.socket ctx ZMQ/DEALER)] (.bind router-socket "tcp://127.0.0.1:1337") (.bind dealer-socket worker-url) ;;现在我们可以并发地响应10个请求了 (dotimes [n 10] (.start (Thread. (fn [] (let [sock (.socket ctx ZMQ/REP)] ;; 我们应答DEALER (.connect sock worker-url) (while true ;; 同以前相同的API- 接收消息,然后应答。 (let [req (.recv sock 0)] (.send sock (my-response-handler req) ZMQ/NOBLOCK)))))))) (.start (Thread. ;; 前转来自ROUTER的消息给DEALER或者相反。 (fn [] (.run (ZMQQueue. ctx router-socket dealer-socket))))))你可以用前面例子里的REP线程替代这段代码,而且它也可以正常运行。不同之处在于现在我们已经有十个响应请求的线程了。 要注意的几个事项:
现在我没拥有了应答多个的多线程。是时候用更聪明的且让我们可获得并发的东西替代单个REQ套接字了。
你可能已经猜到,我们只要创建无数REQ套接字就可以了。虽然这个主意很糟糕,因为每个REQ套接字都需要拥有它们自己的TCP连接。我们可以拥有一个可用的REQ套接字池,这样我们每次做请求的时候就不需要花费大力气创建新的TCP连接了。然而ZeroMQ有一个更好的方案,这种情况下它可以处理完们遇到的所有令人讨厌的问题。
我们不能脱离这样一个事实:REQ用一个阻塞的API。我们假设你处在阻塞请求是可行的这种环境里。我们所使用的例子一直是Java HTTPservlet响应者,它需要通过ZeroMQ调用内部服务。我不能确定ZeroMQ是否具有异步API。好,现在看看真实的代码。
(defn connect [server-url] (let [ctx (ZMQ/context 1) worker-url (str "inproc://" (java.util.UUID/randomUUID)) queue-thread (Thread. (fn [] (let [client-sock (.socket ctx ZMQ/DEALER) worker-sock (.socket ctx ZMQ/ROUTER)] (.connect client-sock server-url) (.bind worker-sock worker-url) (.run (ZMQQueue. ctx client-sock worker-sock)))))] (.start queue-thread) {:ctx ctx :worker-url worker-url :queue-thread queue-thread})) (defn disconnect "Useful for tests etc. Pass the map returned by `connect` above." [connection] (.interrupt (get connection :queue-thread)) (.term (get connection :ctx))) (defn with-req-sock "Takes the connection and a higher order function that is passed a new REQ socket. When this function returns, the REQ socket is destroyed." [connection handler] (let [socket (.socket (get connection :ctx) ZMQ/REQ)] (.connect socket (get connection :worker-url)) (try (handler socket) (finally (.close socket))))) ;; Usage (def connection (connect "tcp://127.0.0.1:1337")) (dotimes [n 5] (.start (Thread. (fn [] (with-req-sock connection (fn [sock] (.send sock (.getBytes (str "Hello, " n)) 0) (let [res (.recv sock 0)] (println (String. res)))))))))(connectserver-url)创建了一些新连接。我们为我们的进程创建了一个单独的连接,让后在我们需要进行请求的时候把请求传给它。我们对每个请求创建一个新的REQ套接字,然后使用"inproc://"发送消息给ROUTER。ROUTER然后前转消息给DEALER(像以前一样通过ZMQQueue)。DEALER通过TCP连接真正的服务器。 要注意的几个事项:
迄今为止,你已经学习到的所有东西就是如何使用ZeroMQ来来回回的发送消息。我们已经看到我们能在一个TCP连接上并发地发送任意数量的请求和应答。我们启动一个客户端,然后发送请求,接着我们启动服务器,然后即使在我们没有代理的情况下,一切运行正常。我们不需要管理连接而且也不需要手动地重复多次的处理了。
然而怎样才能真正的实现RPC呢?迄今为止,我们已经发送了字节并显示了这些字节。我们需要有一种调用多个过程的方法,而且不需要过程调用还可以发送数据。这里我们应该做什么才能发送这些有用的东西呢?
让我们效仿一下HTTP!我喜欢这种方法、路径和内容体的语义,因此让我们保留这些。类似于HTTP,我们总是应答,而且有时候是含有错误的应答-不存在“null"应答。由于ZeroMQ系统字节,所以我们使用SMILE做为数据格式,它是类JSON的格式,使用这种格式它能知道如何把映射、列表、集合和字符串编码为后来可以解码的东西 。这就是所谓的“二进制”格式,因此它可以传送原始字节,这是就不存在任何字符串编码的问题了。
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。 2KB翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务