我很乐于见到基于流的编程技术出现在许多语言的编程范式中。流的存在已经好久了:在 Unix 中就可以看到‘ol | 操作符对它进行了很好的运用。流为非常实用的管道处理技术提供了一个有趣的概念模型:一个输入,然后产生输出。你会把这些小小的函数绑到一起,来构建更大更复杂的管道。大多数时间你可以让这些函数在输入数据上异步并行运行,以实现规模和数据吞吐量的最大化。有了流,数据的处理几乎都隐藏在幕后:它只是通过函数在流动,从某些输入产生新的输出。例如,对于一个 Http 服务器,所有客户端的请求-响应模型就都会是一个基于流的处理:将一个请求映射到一个响应,使其在各种在输入上进行处理的函数间传递。忘掉 MVC 吧,它就是个中间件。不需要设置变量,不需要对集合进行迭代,不需要对函数的调用进行精心的安排. 只要把能实现流的函数集中串联到一起,然后运行你的代码就可以了。流为一项处理提供了一个简洁的编程模型。事实上其可扩展性也是一项很棒的好处。
各种语言中的基于流的编程都是有可能的,而我也鼓励你去研究这一领域。有一本优秀的面向Node的流技术的手书,还有一种探索性的来自以Ruby而闻名的 Yukihiro “Matz” Matsumoto 的流式语言,Spark Streaming。当然还有 Akka-Streams 加入了现有面向 Scala 的 scalaz-stream 库。设置于 Go 的 HttpHandler 函数也是流式的:你可以很容易的拿其它函数对某个函数进行层层封装, 建立起一个流程,从而对响应流进行相应的操作。
Akka-Streams在Akka现有的角色模型之上提供了一种更高层级的抽象. 角色模型为编写并发、可扩展软件提供了一个很不错的起点, 但它仍然是一种原始的方式; 要发现这个模型的一些缺点不难. 那么有没有可能鱼与熊掌兼得呢? 我们能不能够让角色变成一堆函数的调用,来对我们想要实现的功能进行抽象? 我们可否把角色的消息作为函数的输入和输出, 并且是类型安全的? 欢迎你, Akka-Streams.
这儿有一份很棒的 面向Akka-Streams的启发模板 , 它提供了哟管Akka-Streams一些方面的深入教程. 想要更多的介绍,请继续往下读.
为了烹调出炉一道美味佳肴,我们将使用 reactive-rabbit 库来从 RabbitMq 订阅消息,并将消息在控制台输出. 代码在 GitHub 上. 如果你想跟着一起做,运行 git clone 然后是 git checkout intro 命令; 后续文章中我将会构建更多的功能出来,所以主分支可能会有所不同.
让我们先从一个代码片段开始吧:
object RabbitMqConsumer { def consume(implicit flowMaterializer: FlowMaterializer) = { Source(connection.consume("streams-playground")) .map(_.message.body.utf8String) .foreach(println(_)) } }
使用一个 RabbitMq 连接从一个叫做 streams-playground 的队列上订阅消息.
针对每个消息,我们提取出消息内容,并将内含的字节作为UTF-8字符串进行解码
在控制条打印消息
来源(Source )就是恰好只产生一个输出的东西. 如果你需要某些能生成数据的东西,你就需要一个来源. 我们上面用到的来源就产生自 connection.consume 函数。
处理槽(Sink)就是恰好只需要一个输入的东西. 处理槽是一个流处理过程的最后阶段. .foreach 调用就是一个处理槽,它会借助println将输入(_)写到控制台。
流过程(Flow)就是恰好只有一个输入和一个输出的东西. 它可以让数据流经一个函数: 比方说调用 map 的同时,它也会返回一个集合中的元素. 如上对 map 的调用就是一个流过程: 它接收并处理传递过来的消息,然后输出了一个字符串。
为了能使用Akka-Streams来实际运行某些东西,你必须让一个来源和处理槽附加到同一个管道上。这样做能让你创建出一个可运行的流过程(RunnableFlow),并开始对流进行处理. 正如你可以对函数和类进行组合, 你也可以对流来进行组合,从而构建起更加丰富的功能. 这是对其执行过程的一个强大的抽象,让你可以独立的构建出处理逻辑. 你可以想象得到一个流处理库,在其中你可以“插入”一块块的流,并且能有根据的进行定制。
你会留意到上面的代码块需要一个 implicit flowMaterializer: FlowMaterializer. FlowMaterializer 在实际要运行一个流过程时是必要的. 在上面的代码块中,foreach同时扮演了一个处理槽,以及一个运行流过程的run()调用. 如果你看看 Main.scala 文件,你会发现我只简单地在一个调用中就开始了流处理的过程:
implicit val flowMaterializer = FlowMaterializer() RabbitMqConsumer.consume
借助RabbitMq Admin UI 创建一个叫做 streams-playground 的队列,然后运行应用程序. 你可以在RabbitMq Admin UI中使消息发布,它们就会出现在控制台中. 尝试某些 UTF-8 字符 , 比方说 ????!
原始的代码片段是不错的,但是它需要隐含FlowMaterializer在 consume 上去建立并运行流。如果你移除它,你将会得到一个编译错误。有没有一种方式去分离运行时流的定义?答案是可以,那就是通过简单移除 foreach 的调用。foreach仅仅是一个 map 和 run() 的语法糖。通过显式地设置一个 Sink 不去调用 run() 我们可以构造我们的流图(blueprint)产生一个新的 RunnableFlow 类型对象。直观上来说,它是一个 Flow 可以被 run()。
这里是一个变更的程序:
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received + message + "); }
我们通过终止流使用to()调用和一个简单的Sink.ignore丢弃消息,除去flowMaterializer隐含。这个流将不会在被运行时调用。与之代替的是我们必须在Main.scala中隐含调用它:
implicit val flowMaterializer = FlowMaterializer() RabbitMqConsumer.consume().run()
我们把整个过程分开成两个阶段:在建立阶段,通过consume的调用,再调用run()运行。理想情况下,你最好在每个组件接通app时编写你的流,类似RabbitMqConsume,就提供了整个流的一部分。
可以研究一下 rabbitmq 教程 中面向Java的示例,它是一种替代方案。这里有一块来自该站点的代码块:
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received + message + "); }
这是典型的当务之急的风格. 我们的流过程是由while循环来控制的, 我们必须对变量进行明确的管理,没有什么对流过程的控制. 我们可以从while循环处将处理主体分离出来,但我们也就因此会有一个疯狂的函数签名. 与此相比,我们也可以选择Akka阵营中可靠的 amqp-client 库, 它提供了一个基于RabbitMq上的模型的动作执行器(Actor):
// create an actor that will receive AMQP deliveries val listener = system.actorOf(Props(new Actor { def receive = { case Delivery(consumerTag, envelope, properties, body) => { println("got a message: " + new String(body)) sender ! Ack(envelope.getDeliveryTag) } } })) // create a consumer that will route incoming AMQP messages to our listener // it starts with an empty list of queues to consume from val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(listener, channelParams = None, autoack = false))
你可以通过动作执行器(actor)系统上的配置来获得并发原语, 但是在动作执行器的接收日志中我们仍然会陷入要以当务之急来进行编程的境地(当然,也可以对此进行某种程度的重构). 通常,如果可以将我们的处理过程模型化为一堆流, 就可以获得使用函数式编程会得到的相同的好处: 对将要发生的(而不是如何发生的)进行清晰明确的组合。
流可以应用到各种不同的情况中. 我很高兴的看到面向Rest风格web服务的迷人而强大的 spray.io 库将会被作为一个能实现流处理过程的http工具集整合到Akka中. 在其它的语言中也不难发现使用 scalaz-streams 已经可以做到的一些什么东西,以及如此多的工具已经可以供人们使用.
本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接。 2KB翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务