2KB项目,专业的源码交易网站 帮助 收藏 每日签到

运用 SynchronousQueue 完成生产者/花费者模子

  • 时间:2019-06-21 10:45 编辑:2KB 来源:2KB.COM 阅读:499
  • 扫一扫,手机访问
  • 分享
摘要: 英文原文:Imp
英文原文:Implementing Producer / Consumer using SynchronousQueue

在那末多java供给的用于支撑并发的类外面,我想讲讲此中的 SynchronousQueue. 特殊是怎么运用便利的SynchronousQueue 作为交流机制来完成生产者/花费者模子。

假如没有细心看过 SynchronousQueue  的完成,可能说不明白为何运用这类行列来实行生产者/花费者之间的通讯。依照以往我们对行列的了解,它实践上算不上是一种行列,而只是相似于包括最多一个元素的聚集。 为何它有效呢?好吧,这里有几个缘由。从生产者的角度来看,只要一个元素(或消息)可以放到行列外面。生产者需求比及花费者将行列中以后的阿谁 元素(或消息)花费了才干继续下一个。从花费者的角度来看,它轮询行列外面可用的下一个元素(或消息)就行了。就是这么容易,而如许的极大益处是:生产者没法以快过花费者花费的速度来发生消息。 其它翻译版本 (1) 加载中 这里有一个我比来碰到的例子:比较两个数据库表(可能十分宏大), 检测出分歧的数据或者类似的数据(或备份 )。同队伍列( SynchronousQueue )恰是针对这类问题的便当Tools。它不单答应 每一个表 在本人的线程中实行处置,并且可以作为当从两个分歧的数据库中读取数据可能呈现超时/等候的补偿。 其它翻译版本 (1) 加载中

Start了。我们先定义compare函数,它承受三个参数:源数据源、目的数据源和表名。我运用了Spring框架的JdbcTemplate,它把处置衔接和PreparedStatement的一切烦人细节都笼统掉了,十分有效。

public boolean compare( final DataSource source, final DataSource destination, final String table )  {
    final JdbcTemplate from = new  JdbcTemplate( source );
    final JdbcTemplate to = new JdbcTemplate( destination );
}
在Start真实的比较之前,先比较一下二者的行数是一个很好的做法:
if( from.queryForLong("SELECT count(1) FROM " + table ) != to.queryForLong("SELECT count(1) FROM " + table ) ) {
    return false;
}

如今,我们最少曾经晓得在两张表包括异样行数(的数据记载),我们可以Start实行数据比较了。(比较)算法相当容易:

  • 为生产者和目的(生产者)数据库创立独自的线程
  • 生产者线程(每次)从数据库表中读取一行数据并将其寄存到SynchronousQueue
  • 花费者线程异样(每次)从数据库表中读取一行数据,然后向行列请求现有可用的行(数据)实行比较(假如有需要可等候)。终极比较两个后果集

在线程池(完成中)运用Java并发Tools类库其他很棒的部分,让我们定义一个包括有固定数目线程的线程池

final ExecutorService executor = Executors.newFixedThreadPool( 2 );
final SynchronousQueue< List< ? > > resultSets = new SynchronousQueue< List< ? > >();     
下面是算法的详细描绘, 花费者函数履行体可以表现成一个独自的callable
Callable< Void > producer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        from.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {                   
                        List< ? > row = ...; // convert ResultSet to List
                        if( !resultSets.offer( row, 2, TimeUnit.MINUTES ) ) {
                            throw new SQLException( "Having more data but consumer has already completed" );
                        }
                    } catch( InterruptedException ex ) {
                        throw new SQLException( "Having more data but producer has been interrupted" );
                    }
                }
            }
        );

        return  null;
    }
};

由于Java的语法的关系,这段代码看起来有点繁琐,但实在并没有做非常多工作。每一个从生产者数据库表中读取的后果集都被转换成了一个链表(作为样板文件,它的详细完成被疏忽了)然后将其寄存到行列中。假如行列非空,生产者线程被梗阻直到花费者线程完成了它的任务。花费者线程,可以表现成下面的callable

Callable< Void > consumer = new Callable< Void >() {
    @Override
    public Void call() throws Exception {
        to.query( "SELECT * FROM " + table,
            new RowCallbackHandler() {
                @Override
                public void processRow(ResultSet rs) throws SQLException {
                    try {
                        List< ? > source = resultSets.poll( 2, TimeUnit.MINUTES );
                        if( source == null ) {
                            throw new SQLException( "Having more data but producer has already completed" );
                        }                                     
 
                        List< ? > destination = ...; // convert ResultSet to List
                        if( !source.equals( destination ) ) {
                            throw new SQLException( "Row data is not the same" );
                        }
                    } catch ( InterruptedException ex ) {
                        throw new SQLException( "Having more data but consumer has been interrupted" );
                    }
                }
            }
        );
                    
        return  null;
    }
};

花费者线程对行列实行了一个相反的操作,与往行列中寄存数据相反,(花费者线程)是从行列中拉取数据。假如行列为空,花费者线程被梗阻直到生产者线程向行列中添加下一行(数据)。这部分残剩的代码只要在提交那些callable后才会履行。任何从Futured 的get办法前往的异常均标明两张数据表其实不包括异样的数据(或者在从数据库获得数据的过程当中发作了问题)

    List< Future< Void > > futures = executor.invokeAll( Arrays.asList( producer, consumer ) );
    for( final Future< Void > future: futures ) {
        future.get( 5, TimeUnit.MINUTES );
    }
这些根本上是今日...甚至本年一切的(内容)。列位,新年高兴! 本文中的一切译文仅用于进修和交换目标,转载请务必注明文章译者、出处、和本文链接。 2KB翻译任务按照 CC 协议,假如我们的任务有进犯到您的权益,请实时联络我们。


2KB项目(www.2kb.com,源码交易平台),供给担保交易、源码交易、虚拟商品、在家创业、在线创业、Task交易、网站设计、软件设计、网络兼职、站长交易、域名交易、链接生意、网站交易、广告生意、站长培训、建站美工等服务

  • 全部评论(0)
上一篇:Docker 公司已死
下一篇:奇虎360 和 go
资讯详情页最新发布上方横幅
最新发布的资讯信息
【计算机/互联网|】Nginx出现502错误(2020-01-20 21:02)
【计算机/互联网|】网站运营全智能软手V0.1版发布(2020-01-20 12:16)
【计算机/互联网|】淘宝这是怎么了?(2020-01-19 19:15)
【行业动态|】谷歌关闭小米智能摄像头,因为窃听器显示了陌生人家中的照片(2020-01-15 09:42)
【行业动态|】据报道谷歌新闻终止了数字杂志,退还主动订阅(2020-01-15 09:39)
【行业动态|】康佳将OLED电视带到美国与LG和索尼竞争(2020-01-15 09:38)
【行业动态|】2020年最佳AV接收机(2020-01-15 09:35)
【行业动态|】2020年最佳流媒体设备:Roku,Apple TV,Firebar,Chromecast等(2020-01-15 09:31)
【行业动态|】CES 2020预览:更多的流媒体服务和订阅即将到来(2020-01-08 21:41)
【行业动态|】从埃隆·马斯克到杰夫·贝佐斯,这30位人物定义了2010年代(2020-01-01 15:14)
联系我们

Q Q: 7090832

电话:400-0011-990

邮箱:7090832@qq.com

时间:9:00-23:00

联系客服
商家入住 服务咨询 投拆建议 联系客服
0577-67068160
手机版

扫一扫进手机版
返回顶部