在本教程中,我将会运用Spray-Client、DataStacks Cassandra驱动和Akka构建一个下载tweets而且将对应的id、文本、名字和日期存储到Cassandra表中的使用。这个使用展现了怎么构建具有一两个actor的Akka使用,怎么运用Akka IO发送HTTP恳求,怎么在Cassandra中存储数据。同时也展现了测试这类使用的办法,包含功能测试。
进修怎么构建基于Akka的号令行使用;怎么测试(运用TestKit)和Specs2;和怎么运用Spray-Client发送异步HTTP恳求。
我从系统的中心部离开始构建。中心部分包括三个actors,此中的两个与tweet数据库交互,别的一个下载tweets。TwitterReadActor从Cluster中读,TweetWriteActor向Cluster中写,TweetScanActor下载tweets而且通报给TweetWriteActor写入。这些依靠关系表现在actor的结构办法中。
class TweetReadActor(cluster: Cluster) extends Actor { ... } class TweetWriterActor(cluster: Cluster) extends Actor { ... } class TweetScanActor(tweetWrite: ActorRef, queryUrl: String => String) extends Actor { ... }
读actor和写actor的结构办法参数就是Cassandra Cluster实例;扫描actor的结构办法参数包括一个写actor的ActorRef和一个给定字符串查询可以构建下载tweets的查询URL的函数。(这就是我怎么构建要害字搜素。)
为了构建使用,我们需求做的是依照准确的次序实例化actors
val system = ActorSystem() def queryUrl(query: String): String = ??? val cluster: Cluster = ??? val reader = system.actorOf(Props(new TweetReaderActor(cluster))) val writer = system.actorOf(Props(new TweetWriterActor(cluster))) val scanner = system.actorOf(Props(new TweetScannerActor(writer, queryUrl)))
我将cluster和queryUrl的完成留为???,链中扭结(kink in the chain),其它完美系统中逻辑纷歧致(logical inconsistency in otherwise perfect system),亦称the bottom type。
【译者注:bottom type是类型论(type theory)中的一个观点,大致的意思就是没有类型,比方Scala中的Nothing。】
既然我们有了恰当的代码构造,那末就先看一下TwitterWriterActor。它接纳tweet实例,而且写入Cassandra的tweets键空间。
class TweetWriterActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val preparedStatement = session.prepare( "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);") def receive: Receive = { case tweets: List[Tweet] => case tweet: Tweet => } }
为了保管tweets,我们需求衔接到准确的键空间,衔接以后我们就具有了CassandraSession。由于我们想要尽量有效力,一切我们将应用Cassandra的PreparedStatements和BoundStatements。PreparedStatement是一个预处置的CQL语句,BoundStatement是一个设了参数值的PreparedStatemnt。
所以这为我们编写saveTweet函数供给了表示。
class TweetWriterActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val preparedStatement = session.prepare( "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);") def saveTweet(tweet: Tweet): Unit = session.executeAsync(preparedStatement.bind( tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt)) def receive: Receive = { case tweets: List[Tweet] => case tweet: Tweet => } }
剩下需求做的只是在receive部分函数中运用saveTweet函数。
class TweetWriterActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val preparedStatement = session.prepare( "INSERT INTO tweets(key, user_user, text, createdat) VALUES (?, ?, ?, ?);") def saveTweet(tweet: Tweet): Unit = session.executeAsync(preparedStatement.bind( tweet.id.id, tweet.user.user, tweet.text.text, tweet.createdAt)) def receive: Receive = { case tweets: List[Tweet] => tweets foreach saveTweet case tweet: Tweet => saveTweet(tweet) } }
如今我们就有了将tweet实例保管到Cassandra集群键空间的代码。
读数据稍稍庞杂一点儿,由于我们想要支撑count和find all操纵。我们需求可以构建Cassandra查询;然后前往一个CassandraRow,我们需求可以将它转换为Tweet工具。天然地,我们也想要应用Cassandra驱动的异步特征。侥幸的是,工作不会那末庞杂。让我们从TweetReaderActor的构造Start。
object TweetReaderActor { case class FindAll(maximum: Int = 100) case object CountAll } class TweetReaderActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val countAll = new BoundStatement(session.prepare( "select count(*) from tweets;")) def receive: Receive = { case FindAll(maximum) => // reply with List[Tweet] case CountAll => // reply with Long } }
在伴生工具中,我界说了actor交互的FindAll和CountAll音讯;我曾经给出了获得Session,然后运用Session构建统计行数的BoundStatement代码。接下来,我需求构建表现一行数据的Tweet实例。
class TweetReaderActor(cluster: Cluster) extends Actor { ... def buildTweet(r: Row): Tweet = { val id = r.getString("key") val user = r.getString("user_user") val text = r.getString("text") val createdAt = r.getDate("createdat") Tweet(id, user, text, createdAt) } ... }
这里也没甚么特别的,我们只是容易地提取行中的各列数据,然后生成一个Tweet实例。如今让我们运用Cassandra的邪术。我们想要履行(异步地)一些查询,将查询履行前往的行映照到tweets,然后管道输出后果到发送者。(斜体字给了一些表示,一切让我们将这些表示写成代码吧)
class TweetReaderActor(cluster: Cluster) extends Actor { val session = cluster.connect(Keyspaces.akkaCassandra) val countAll = new BoundStatement(session.prepare( "select count(*) from tweets;")) import scala.collection.JavaConversions._ import cassandra.resultset._ import context.dispatcher import akka.pattern.pipe def buildTweet(r: Row): Tweet = {...} def receive: Receive = { case FindAll(maximum) => val query = QueryBuilder.select(). all(). from(Keyspaces.akkaCassandra, "tweets"). limit(maximum) session.executeAsync(query). map(_.all().map(buildTweet).toList) pipeTo sender case CountAll => session.executeAsync(countAll) map(_.one.getLong(0)) pipeTo sender } }
让我细心剖析一下FindAll音讯处置顺序。起首,我运用Cassandra的QueryBuilder构建查询。这是通俗的Cassandra代码。
接下来的代码就更风趣了。挪用session的executeAsync办法,这将前往ResultSetFuture。运用cassandra.resultset._中的隐式转化。我将ResultSetFuture转化为Scala的Future[ResultSet]。这答应我运用Future.map办法将ResultSet转化为List[Tweet]。在我继续开发之前,我需求说明一下Cluster值来在那里。思索我们正在写的系统,我们可能需求分歧的Cluster值用于测试和主系统。并且测试Cluster可能需求一些特别的设置。由于我如今还不克不及决议,一切我只是容易的界说了一个前往Cluster的CassandraCluster trait;准确地完成该trait使得或许从ActorsSystem的设置装备摆设中加载设置装备摆设,或许硬编码用于测试。
trait CassandraCluster { def cluster: Cluster }
基于设置装备摆设的完成与测试的设置装备摆设仅仅在用于生成Cluster实例的值分歧。
// in src/scala/main trait ConfigCassandraCluster extends CassandraCluster { def system: ActorSystem private def config = system.settings.config import scala.collection.JavaConversions._ private val cassandraConfig = config.getConfig( "akka-cassandra.main.db.cassandra") private val port = cassandraConfig.getInt("port") private val hosts = cassandraConfig.getStringList("hosts").toList lazy val cluster: Cluster = Cluster.builder(). addContactPoints(hosts: _*). withCompression(ProtocolOptions.Compression.SNAPPY). withPort(port). build() } // in src/scala/test trait TestCassandraCluster extends CassandraCluster { def system: ActorSystem private def config = system.settings.config import scala.collection.JavaConversions._ private val cassandraConfig = config.getConfig( "akka-cassandra.test.db.cassandra") private val port = cassandraConfig.getInt("port") private val hosts = cassandraConfig.getStringList("hosts").toList lazy val cluster: Cluster = Cluster.builder(). addContactPoints(hosts: _*). withPort(port). withCompression(ProtocolOptions.Compression.SNAPPY). build() }
这答应我运用适宜的trait,获得设置装备摆设适宜的Cluster。可是在用于测试的时分有点歪曲,我想要让cluster处于明白清楚的形态。为了处理这个问题,我新建了CleanCassandra trait,用于为给定CassandraCluster.cluster重置Cluster。
trait CleanCassandra extends SpecificationStructure { this: CassandraCluster => private def runClq(session: Session, file: File): Unit = { val query = Source.fromFile(file).mkString query.split(";").foreach(session.execute) } private def runAllClqs(): Unit = { val session = cluster.connect(Keyspaces.akkaCassandra) val uri = getClass.getResource("/").toURI new File(uri).listFiles().foreach { file => if (file.getName.endsWith(".cql")) runClq(session, file) } session.shutdown() } override def map(fs: => Fragments) = super.map(fs) insert Step(runAllClqs()) }
当我将该trait整合到测试的时分,runAllClqs()被注册在一切其它测试步调履行前。
如今我可以写第一个测试了,用于验证TwitterReaderActor和TwitterWriterActor。测试的主体相当的长,可是从观点上了解发作的工作其实不难。
class TweetActorsSpec extends TestKit(ActorSystem()) with SpecificationLike with TestCassandraCluster with CleanCassandra with ImplicitSender { sequential val writer = TestActorRef(new TweetWriterActor(cluster)) val reader = TestActorRef(new TweetReaderActor(cluster)) "Slow & steady" >> { def write(count: Int): List[Tweet] = { val tweets = (1 to count).map(id => Tweet(id.toString, "@honzam399", "Yay!", new Date)) tweets.foreach(writer !) Thread.sleep(1000) // wait for the tweets to hit the db tweets.toList } "Single tweet" in { val tweet = write(1).head reader ! FindAll(1) val res = expectMsgType[List[Tweet]] res mustEqual List(tweet) } "100 tweets" in { val writtenTweets = write(100) reader ! FindAll(100) val readTweets = expectMsgType[List[Tweet]] readTweets must containTheSameElementsAs(writtenTweets) } } }
我们运用了非常多组件构成测试。起首我们扩大了TestKit,给它一个ActorSystem()做为结构参数;接下来我们运用了Specs2的SpecificationLike,然后是我们的Cassandra测试情况,运用ImplicitSender完成了全部设置使得我们可以查验呼应。
"Slow & steady"标准的实践主体验证我们可以写读一个和一百个tweets。
在运转测试之前,你必需确保Cassandra处于运转形态,而且曾经准确创立了键空间。为了便利,你可以运转位于src/data下的CQL剧本。你需求按次序履行
keyspaces.cql Then, in the correct keyspace: tables.cql words.cql
既然我们可以从Cassandra中平安地存储和检索tweets,如今我们需求编写用于下载tweets的组件。在我们的系统中,指的是TweetScannerActor。它接纳一个String类型的音讯,履行HTTP恳求下载tweets。(为了坚持本教程简练,我们运用便利的Twitter代办署理 “http//twitter-search-proxy.herokuapp.com/search/tweets。在恣意状况下,扫描actor的Task就是构建HTTP恳求,接纳呼应,转化为List[Tweet]而且发送给TweetWriterActor的ActorRef。
class TweetScannerActor(tweetWrite: ActorRef, queryUrl: String => String) extends Actor with TweetMarshaller { import context.dispatcher import akka.pattern.pipe private val pipeline = sendReceive ~> unmarshal[List[Tweet]] def receive: Receive = { case query: String => pipeline(Get(queryUrl(query))) pipeTo tweetWrite } }
实践上我们运用Spray-Client构建HTTP流水线,发送HTTP恳求(sendReceive),解包原始的HTTP呼应(也就是转化为系统中类型的实例)。
当使用于HttpRequest时,pipelineStart任务;在我们的代码中,Get(url: String)表现一种可以结构这类HttpRequests的机制。当使用于query的时分,函数queryUrl前往pipeline操纵的实践URL。在Spray-Client中,typeclass是trait Unmarshaller[A],它的apply办法接纳HttpEntity前往Deserialized[A]。 Unmarshaller[A]实际上是trait Unmarshaller[A] extends (HttpEntity => Deserialized[A])的别号。(是的,在Scala中你可以扩大(A=B),这是trait Unmarshaller[A] extends Function1[HttpEntity, Deserialized[A]]的语法糖。)我们在后面运用的unmarshal指令界说为
def unmarshal[A : Unmarshaller]: HttpResponse => A
: Unmarshaller是一个绑定到类型参数A的高低文,这将招致编译器将函数扩大为
def unmarshal[A](implicit ev: Unmarshaller[A]): HttpResponse => A
unmarshal函数接纳一个类型A的typeclass Unmarshaller的实例;在我们的例子中,我们指定类型A为List[Tweet]。我们用List[Tweet]交换A,使得unmarshal[List[Tweet]](implicit ev: Unmarshaller[List[Tweet]]): .... 为了使使用可以任务,在以后implicit范畴内需求一个Unmarshaller[List[Tweet]]类型的值。当我们给定这类值时,我们就能够说给定了Unmarshaller typeclass实例。
trait TweetMarshaller { type Tweets = List[Tweet] implicit object TweetUnmarshaller extends Unmarshaller[Tweets] { val dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy") def mkTweet(status: JsValue): Deserialized[Tweet] = { val json = status.asJsObject ... } def apply(entity: HttpEntity): Deserialized[Tweets] = { val json = JsonParser(entity.asString).asJsObject ... } } }
我们的typeclass实例是TweetUnmarshaller单例,扩大了Unmarshaller[Tweets]。留意我也界说了一个类型别号Tweets = List[Tweet]使得我不用写太多方括号。扩大Unmarshaller[Tweets],我们必需完成apply办法,该办法接纳HttpEntity,前往串行化tweets或许抛出一个过错。
我们简直完成了一切的工具。可是我们怎么验证TweetScannerActor任务正常呢?
2KB项目(www.2kb.com,源码交易平台),提供担保交易、源码交易、虚拟商品、在家创业、在线创业、任务交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接买卖、网站交易、广告买卖、站长培训、建站美工等服务