在那末多java供给的用于支撑并发的类外面,我想讲讲此中的 SynchronousQueue. 特殊是怎么运用便利的SynchronousQueue 作为交流机制来完成生产者/花费者模子。
假如没有细心看过 SynchronousQueue 的完成,可能说不明白为何运用这类行列来实行生产者/花费者之间的通讯。依照以往我们对行列的了解,它实践上算不上是一种行列,而只是相似于包括最多一个元素的聚集。 为何它有效呢?好吧,这里有几个缘由。从生产者的角度来看,只要一个元素(或消息)可以放到行列外面。生产者需求比及花费者将行列中以后的阿谁 元素(或消息)花费了才干继续下一个。从花费者的角度来看,它轮询行列外面可用的下一个元素(或消息)就行了。就是这么容易,而如许的极大益处是:生产者没法以快过花费者花费的速度来发生消息。 其它翻译版本 (1) 加载中 这里有一个我比来碰到的例子:比较两个数据库表(可能十分宏大), 检测出分歧的数据或者类似的数据(或备份 )。同队伍列( SynchronousQueue )恰是针对这类问题的便当Tools。它不单答应 每一个表 在本人的线程中实行处置,并且可以作为当从两个分歧的数据库中读取数据可能呈现超时/等候的补偿。 其它翻译版本 (1) 加载中Start了。我们先定义compare函数,它承受三个参数:源数据源、目的数据源和表名。我运用了Spring框架的JdbcTemplate,它把处置衔接和PreparedStatement的一切烦人细节都笼统掉了,十分有效。
public boolean compare( final DataSource source, final DataSource destination, final String table ) { final JdbcTemplate from = new JdbcTemplate( source ); final JdbcTemplate to = new JdbcTemplate( destination ); }在Start真实的比较之前,先比较一下二者的行数是一个很好的做法:
if( from.queryForLong("SELECT count(1) FROM " + table ) != to.queryForLong("SELECT count(1) FROM " + table ) ) { return false; }
如今,我们最少曾经晓得在两张表包括异样行数(的数据记载),我们可以Start实行数据比较了。(比较)算法相当容易:
在线程池(完成中)运用Java并发Tools类库其他很棒的部分,让我们定义一个包括有固定数目线程的线程池
final ExecutorService executor = Executors.newFixedThreadPool( 2 ); final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();下面是算法的详细描绘, 花费者函数履行体可以表现成一个独自的callable
Callable< Void > producer = new Callable< Void >() { @Override public Void call() throws Exception { from.query( "SELECT * FROM " + table, new RowCallbackHandler() { @Override public void processRow(ResultSet rs) throws SQLException { try { List< ? > row = ...; // convert ResultSet to List if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) { throw new SQLException( "Having more data but consumer has already completed" ); } } catch( InterruptedException ex ) { throw new SQLException( "Having more data but producer has been interrupted" ); } } } ); return null; } };
由于Java的语法的关系,这段代码看起来有点繁琐,但实在并没有做非常多工作。每一个从生产者数据库表中读取的后果集都被转换成了一个链表(作为样板文件,它的详细完成被疏忽了)然后将其寄存到行列中。假如行列非空,生产者线程被梗阻直到花费者线程完成了它的任务。花费者线程,可以表现成下面的callable
Callable< Void > consumer = new Callable< Void >() { @Override public Void call() throws Exception { to.query( "SELECT * FROM " + table, new RowCallbackHandler() { @Override public void processRow(ResultSet rs) throws SQLException { try { List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES ); if( source == null ) { throw new SQLException( "Having more data but producer has already completed" ); } List< ? > destination = ...; // convert ResultSet to List if( !source.equals( destination ) ) { throw new SQLException( "Row data is not the same" ); } } catch ( InterruptedException ex ) { throw new SQLException( "Having more data but consumer has been interrupted" ); } } } ); return null; } };
花费者线程对行列实行了一个相反的操作,与往行列中寄存数据相反,(花费者线程)是从行列中拉取数据。假如行列为空,花费者线程被梗阻直到生产者线程向行列中添加下一行(数据)。这部分残剩的代码只要在提交那些callable后才会履行。任何从Futured 的get办法前往的异常均标明两张数据表其实不包括异样的数据(或者在从数据库获得数据的过程当中发作了问题)
List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) ); for( final Future< Void > future: futures ) { future.get( 5, TimeUnit.MINUTES ); }这些根本上是今日...甚至本年一切的(内容)。列位,新年高兴! 本文中的一切译文仅用于进修和交换目标,转载请务必注明文章译者、出处、和本文链接。 2KB翻译任务按照 CC 协议,假如我们的任务有进犯到您的权益,请实时联络我们。
2KB项目(www.2kb.com,源码交易平台),供给担保交易、源码交易、虚拟商品、在家创业、在线创业、Task交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接生意、网站交易、广告生意、站长培训、建站美工等服务