2KB项目,专业的源码交易网站 帮助 收藏 每日签到

Akka 和 Cassandra Activator

  • 时间:2019-03-19 20:32 编辑:2KB 来源:2KB.COM 阅读:377
  • 扫一扫,手机访问
  • 分享
摘要:
Akka Cassandra 英文原文:Akka & Cassandra Activator

在本教程中,我将会运用Spray-Client、DataStacks Cassandra驱动和Akka构建一个下载tweets而且将对应的id、文本、名字和日期存储到Cassandra表中的使用。这个使用展现了怎么构建具有一两个actor的Akka使用,怎么运用Akka IO发送HTTP恳求,怎么在Cassandra中存储数据。同时也展现了测试这类使用的办法,包含功能测试。

overall

进修怎么构建基于Akka的号令行使用;怎么测试(运用TestKit)和Specs2;和怎么运用Spray-Client发送异步HTTP恳求。

中心部分

我从系统的中心部离开始构建。中心部分包括三个actors,此中的两个与tweet数据库交互,别的一个下载tweets。TwitterReadActor从Cluster中读,TweetWriteActor向Cluster中写,TweetScanActor下载tweets而且通报给TweetWriteActor写入。这些依靠关系表现在actor的结构办法中。

class TweetReadActor(cluster: Cluster) extends Actor {
  ...
}

class TweetWriterActor(cluster: Cluster) extends Actor {
  ...
}

class TweetScanActor(tweetWrite: ActorRef, queryUrl: String => String) 
  extends Actor {
  ...
}

actor和actor的结构办法参数就是Cassandra Cluster实例;扫描actor的结构办法参数包括一个actor的ActorRef和一个给定字符串查询可以构建下载tweets的查询URL的函数。(这就是我怎么构建要害字搜素。)

为了构建使用,我们需求做的是依照准确的次序实例化actors

val system = ActorSystem()

def queryUrl(query: String): String = ???
val cluster: Cluster = ???

val reader  = system.actorOf(Props(new TweetReaderActor(cluster)))
val writer  = system.actorOf(Props(new TweetWriterActor(cluster)))
val scanner = system.actorOf(Props(new TweetScannerActor(writer, queryUrl)))

我将cluster和queryUrl的完成留为???,链中扭结(kink in the chain),其它完美系统中逻辑纷歧致(logical inconsistency in otherwise perfect system,亦称the bottom type

【译者注:bottom type是类型论(type theory)中的一个观点,大致的意思就是没有类型,比方Scala中的Nothing。】

向Cassandra中写

既然我们有了恰当的代码构造,那末就先看一下TwitterWriterActor。它接纳tweet实例,而且写入Cassandra的tweets键空间。

class TweetWriterActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val preparedStatement = session.prepare(
    "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")

  def receive: Receive = {
    case tweets: List[Tweet] =>
    case tweet: Tweet        =>
  }
}

为了保管tweets,我们需求衔接到准确的键空间,衔接以后我们就具有了CassandraSession。由于我们想要尽量有效力,一切我们将应用Cassandra的PreparedStatements和BoundStatements。PreparedStatement是一个预处置的CQL语句,BoundStatement是一个设了参数值的PreparedStatemnt。

所以这为我们编写saveTweet函数供给了表示。

class TweetWriterActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val preparedStatement = session.prepare(
    "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")

  def saveTweet(tweet: Tweet): Unit =
    session.executeAsync(preparedStatement.bind(
      tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))

  def receive: Receive = {
    case tweets: List[Tweet] =>
    case tweet: Tweet        =>
  }
}

剩下需求做的只是在receive部分函数中运用saveTweet函数。

class TweetWriterActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val preparedStatement = session.prepare(
    "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);")

  def saveTweet(tweet: Tweet): Unit =
    session.executeAsync(preparedStatement.bind(
      tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt))

  def receive: Receive = {
    case tweets: List[Tweet] => tweets foreach saveTweet
    case tweet: Tweet        => saveTweet(tweet)
  }
}

如今我们就有了将tweet实例保管到Cassandra集群键空间的代码。

从Cassandra中读

读数据稍稍庞杂一点儿,由于我们想要支撑count和find all操纵。我们需求可以构建Cassandra查询;然后前往一个CassandraRow,我们需求可以将它转换为Tweet工具。天然地,我们也想要应用Cassandra驱动的异步特征。侥幸的是,工作不会那末庞杂。让我们从TweetReaderActor的构造Start。

object TweetReaderActor {
  case class FindAll(maximum: Int = 100)
  case object CountAll
}

class TweetReaderActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val countAll  = new BoundStatement(session.prepare(
   "select count(*) from tweets;"))

  def receive: Receive = {
    case FindAll(maximum)  =>
      // reply with List[Tweet]
    case CountAll =>
      // reply with Long
  }
}

在伴生工具中,我界说了actor交互的FindAll和CountAll音讯;我曾经给出了获得Session,然后运用Session构建统计行数的BoundStatement代码。接下来,我需求构建表现一行数据的Tweet实例。

class TweetReaderActor(cluster: Cluster) extends Actor {
  ...

  def buildTweet(r: Row): Tweet = {
    val id = r.getString("key")
    val user = r.getString("user_user")
    val text = r.getString("text")
    val createdAt = r.getDate("createdat")
    Tweet(id, user, text, createdAt)
  }
  ...
}

这里也没甚么特别的,我们只是容易地提取行中的各列数据,然后生成一个Tweet实例。如今让我们运用Cassandra的邪术。我们想要履行(异步地)一些查询,将查询履行前往的映照到tweets,然后管道输出后果到发送者。(斜体字给了一些表示,一切让我们将这些表示写成代码吧)

class TweetReaderActor(cluster: Cluster) extends Actor {
  val session = cluster.connect(Keyspaces.akkaCassandra)
  val countAll  = new BoundStatement(session.prepare(
    "select count(*) from tweets;"))

  import scala.collection.JavaConversions._
  import cassandra.resultset._
  import context.dispatcher
  import akka.pattern.pipe

  def buildTweet(r: Row): Tweet = {...}

  def receive: Receive = {
    case FindAll(maximum)  =>
      val query = QueryBuilder.select().
                               all().
                               from(Keyspaces.akkaCassandra, "tweets").
                               limit(maximum)
      session.executeAsync(query).
        map(_.all().map(buildTweet).toList) pipeTo sender
    case CountAll =>
      session.executeAsync(countAll) map(_.one.getLong(0)) pipeTo sender
  }
}

让我细心剖析一下FindAll音讯处置顺序。起首,我运用Cassandra的QueryBuilder构建查询。这是通俗的Cassandra代码。

接下来的代码就更风趣了。挪用session的executeAsync办法,这将前往ResultSetFuture。运用cassandra.resultset._中的隐式转化。我将ResultSetFuture转化为Scala的Future[ResultSet]。这答应我运用Future.map办法将ResultSet转化为List[Tweet]。

挪用session.executeAsync(query) map等待它的参数是从ResultSet到B类型的函数。在我们的例子中,B是List[Tweet]。ResultSet包括办法all(),该办法前往java.util.List[Row]。为了可以映照java.util.List[Row],我们需求将它转化为Scala的List[Row]。为了到达这个目标,我们运用scala.collection.JavaConversions中的隐式转化。如今我们可以完成Future.map函数的参数。

因而session.executeAsync(query) map(_.all().map(buildTweet).toList)前往Future[List[Tweet]],这十分靠近我们需求的。我们不想要梗阻后果,我们太懒了而不想运用onSuccess函数,由于一切这些都是为了将后果转递给sender。所以session.executeAsync(query) map(_.all().map(buildTweet).toList) pipeTo sender这整行的用处是将将来的胜利查询管道输出到sender。

衔接到Cassandra

在我继续开发之前,我需求说明一下Cluster值来在那里。思索我们正在写的系统,我们可能需求分歧的Cluster值用于测试和主系统。并且测试Cluster可能需求一些特别的设置。由于我如今还不克不及决议,一切我只是容易的界说了一个前往Cluster的CassandraCluster trait;准确地完成该trait使得或许从ActorsSystem的设置装备摆设中加载设置装备摆设,或许硬编码用于测试。

trait CassandraCluster {
  def cluster: Cluster
}

基于设置装备摆设的完成与测试的设置装备摆设仅仅在用于生成Cluster实例的值分歧。

// in src/scala/main
trait ConfigCassandraCluster extends CassandraCluster {
  def system: ActorSystem

  private def config = system.settings.config

  import scala.collection.JavaConversions._
  private val cassandraConfig = config.getConfig(
    "akka-cassandra.main.db.cassandra")
  private val port = cassandraConfig.getInt("port")
  private val hosts = cassandraConfig.getStringList("hosts").toList

  lazy val cluster: Cluster =
    Cluster.builder().
      addContactPoints(hosts: _*).
      withCompression(ProtocolOptions.Compression.SNAPPY).
      withPort(port).
      build()
}

// in src/scala/test
trait TestCassandraCluster extends CassandraCluster {
  def system: ActorSystem

  private def config = system.settings.config

  import scala.collection.JavaConversions._
  private val cassandraConfig = config.getConfig(
    "akka-cassandra.test.db.cassandra")
  private val port = cassandraConfig.getInt("port")
  private val hosts = cassandraConfig.getStringList("hosts").toList

  lazy val cluster: Cluster =
    Cluster.builder().
      addContactPoints(hosts: _*).
      withPort(port).
      withCompression(ProtocolOptions.Compression.SNAPPY).
      build()

}

这答应我运用适宜的trait,获得设置装备摆设适宜的Cluster。可是在用于测试的时分有点歪曲,我想要让cluster处于明白清楚的形态。为了处理这个问题,我新建了CleanCassandra trait,用于为给定CassandraCluster.cluster重置Cluster。

trait CleanCassandra extends SpecificationStructure {
  this: CassandraCluster =>

  private def runClq(session: Session, file: File): Unit = {
    val query = Source.fromFile(file).mkString
    query.split(";").foreach(session.execute)
  }

  private def runAllClqs(): Unit = {
    val session = cluster.connect(Keyspaces.akkaCassandra)
    val uri = getClass.getResource("/").toURI
    new File(uri).listFiles().foreach { file =>
      if (file.getName.endsWith(".cql")) runClq(session, file)
    }
    session.shutdown()
  }

  override def map(fs: => Fragments) = super.map(fs) insert Step(runAllClqs())
}

当我将该trait整合到测试的时分,runAllClqs()被注册在一切其它测试步调履行

测试

如今我可以写第一个测试了,用于验证TwitterReaderActor和TwitterWriterActor。测试的主体相当的长,可是从观点上了解发作的工作其实不难。

class TweetActorsSpec extends TestKit(ActorSystem())
  with SpecificationLike with TestCassandraCluster 
  with CleanCassandra with ImplicitSender {
  sequential

  val writer = TestActorRef(new TweetWriterActor(cluster))
  val reader = TestActorRef(new TweetReaderActor(cluster))

  "Slow & steady" >> {
    def write(count: Int): List[Tweet] = {
      val tweets = (1 to count).map(id => 
        Tweet(id.toString, "@honzam399", "Yay!", new Date))
      tweets.foreach(writer !)
      Thread.sleep(1000)    // wait for the tweets to hit the db
      tweets.toList
    }

    "Single tweet" in {
      val tweet = write(1).head

      reader ! FindAll(1)
      val res = expectMsgType[List[Tweet]]
      res mustEqual List(tweet)
    }

    "100 tweets" in {
      val writtenTweets = write(100)

      reader ! FindAll(100)
      val readTweets = expectMsgType[List[Tweet]]
      readTweets must containTheSameElementsAs(writtenTweets)
    }
  }

}

我们运用了非常多组件构成测试。起首我们扩大了TestKit,给它一个ActorSystem()做为结构参数;接下来我们运用了Specs2的SpecificationLike,然后是我们的Cassandra测试情况,运用ImplicitSender完成了全部设置使得我们可以查验呼应。

"Slow & steady"标准的实践主体验证我们可以写读一个和一百个tweets。

在运转测试之前,你必需确保Cassandra处于运转形态,而且曾经准确创立了键空间。为了便利,你可以运转位于src/data下的CQL剧本。你需求按次序履行

keyspaces.cql
Then, in the correct keyspace:
   tables.cql
   words.cql

扫描tweets

既然我们可以从Cassandra中平安地存储和检索tweets,如今我们需求编写用于下载tweets的组件。在我们的系统中,指的是TweetScannerActor。它接纳一个String类型的音讯,履行HTTP恳求下载tweets。(为了坚持本教程简练,我们运用便利的Twitter代办署理 “http//twitter-search-proxy.herokuapp.com/search/tweets。在恣意状况下,扫描actor的Task就是构建HTTP恳求,接纳呼应,转化为List[Tweet]而且发送给TweetWriterActor的ActorRef。

class TweetScannerActor(tweetWrite: ActorRef, queryUrl: String => String)
  extends Actor with TweetMarshaller {

  import context.dispatcher
  import akka.pattern.pipe

  private val pipeline = sendReceive ~> unmarshal[List[Tweet]]

  def receive: Receive = {
    case query: String => pipeline(Get(queryUrl(query))) pipeTo tweetWrite
  }
}

实践上我们运用Spray-Client构建HTTP流水线,发送HTTP恳求(sendReceive),解包原始的HTTP呼应(也就是转化为系统中类型的实例)。

当使用于HttpRequest时,pipelineStart任务;在我们的代码中,Get(url: String)表现一种可以结构这类HttpRequests的机制。当使用于query的时分,函数queryUrl前往pipeline操纵的实践URL。

pipeline的履行前往Future[List[Tweet]],我们可以pipeTo到tweetWrite actor。

剩下的独一的任务就是完成解包。在Spray-Client中,unmarshaller是一个typeclass,完成是一个typeclass实例。思索typeclass最容易的方法是想象typeclass是界说了行动的trait,typeclass实例就是trait的现实。

在Spray-Client中,typeclass是trait Unmarshaller[A],它的apply办法接纳HttpEntity前往Deserialized[A]。 Unmarshaller[A]实际上是trait Unmarshaller[A] extends (HttpEntity => Deserialized[A])的别号。(是的,在Scala中你可以扩大(A=B),这是trait Unmarshaller[A] extends Function1[HttpEntity, Deserialized[A]]的语法糖。)我们在后面运用的unmarshal指令界说为

def unmarshal[A : Unmarshaller]: HttpResponse => A

: Unmarshaller是一个绑定到类型参数A的高低文,这将招致编译器将函数扩大为

def unmarshal[A](implicit ev: Unmarshaller[A]): HttpResponse => A

unmarshal函数接纳一个类型A的typeclass Unmarshaller的实例;在我们的例子中,我们指定类型A为List[Tweet]。我们用List[Tweet]交换A,使得unmarshal[List[Tweet]](implicit ev: Unmarshaller[List[Tweet]]): .... 为了使使用可以任务,在以后implicit范畴内需求一个Unmarshaller[List[Tweet]]类型的值。当我们给定这类值时,我们就能够说给定了Unmarshaller typeclass实例。

trait TweetMarshaller {
  type Tweets = List[Tweet]

  implicit object TweetUnmarshaller extends Unmarshaller[Tweets] {

    val dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy")

    def mkTweet(status: JsValue): Deserialized[Tweet] = {
      val json = status.asJsObject
      ...
    }

    def apply(entity: HttpEntity): Deserialized[Tweets] = {
      val json = JsonParser(entity.asString).asJsObject
      ...
    }
  }

}

我们的typeclass实例是TweetUnmarshaller单例,扩大了Unmarshaller[Tweets]。留意我也界说了一个类型别号Tweets = List[Tweet]使得我不用写太多方括号。扩大Unmarshaller[Tweets],我们必需完成apply办法,该办法接纳HttpEntity,前往串行化tweets或许抛出一个过错。

我们简直完成了一切的工具。可是我们怎么验证TweetScannerActor任务正常呢?

本文中的一切译文仅用于进修和交换目标,转载请务必注明文章译者、出处、和本文链接。 2KB翻译任务按照 CC 协议,假如我们的任务有进犯到您的权益,请实时联络我们。


2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务

  • 全部评论(0)
资讯详情页最新发布上方横幅
最新发布的资讯信息
【计算机/互联网|】Nginx出现502错误(2020-01-20 21:02)
【计算机/互联网|】网站运营全智能软手V0.1版发布(2020-01-20 12:16)
【计算机/互联网|】淘宝这是怎么了?(2020-01-19 19:15)
【行业动态|】谷歌关闭小米智能摄像头,因为窃听器显示了陌生人家中的照片(2020-01-15 09:42)
【行业动态|】据报道谷歌新闻终止了数字杂志,退还主动订阅(2020-01-15 09:39)
【行业动态|】康佳将OLED电视带到美国与LG和索尼竞争(2020-01-15 09:38)
【行业动态|】2020年最佳AV接收机(2020-01-15 09:35)
【行业动态|】2020年最佳流媒体设备:Roku,Apple TV,Firebar,Chromecast等(2020-01-15 09:31)
【行业动态|】CES 2020预览:更多的流媒体服务和订阅即将到来(2020-01-08 21:41)
【行业动态|】从埃隆·马斯克到杰夫·贝佐斯,这30位人物定义了2010年代(2020-01-01 15:14)
联系我们

Q Q: 7090832

电话:400-0011-990

邮箱:7090832@qq.com

时间:9:00-23:00

联系客服
商家入住 服务咨询 投拆建议 联系客服
0577-67068160
手机版

扫一扫进手机版
返回顶部