课程咨询 :186 8716 1620      qq:2066486918

昆明Java培训 > 达内新闻 > kafka知识点:consumer消费消息
  • kafka知识点:consumer消费消息

    发布:昆明Java培训      来源:达内新闻      时间:2016-10-21

  • 昆明Java培训的老师这一期给大家讲consumer消费消息。

    6.1 consumer API

    kafka提供了两套consumer API:

    1. The high-level Consumer API

    2. The SimpleConsumer API

    其中high-level consumer API提供了一个从kafka消费数据的高层抽象,而SimpleConsumer API则需要开发人员更多地关注细节。

    6.1.1 The high-level consumer API

    high-level consumer API提供了consumer group的语义,一个消息只能被group内的一个consumer所消费,且consumer消费消息时不关注offset,最后一个offset由zookeeper保存。

    使用high-level consumer API可以是多线程的应用,应当注意:

    1.如果消费线程大于patition数量,则有些线程将收不到消息

    2.如果patition数量大于线程数,则有些线程多收到多个patition的消息

    3.如果一个线程消费多个patition,则无法保证你收到的消息的顺序,而一个patition内的消息是有序的

    6.1.2 The SimpleConsumer API

    如果你想要对patition有更多的控制权,那就应该使用SimpleConsumer API,比如:

    1.多次读取一个消息

    2.只消费一个patition中的部分消息

    3.使用事务来保证一个消息仅被消费一次

    但是使用此API时,partition、offset、broker、leader等对你不再透明,需要自己去管理。你需要做大量的额外工作:

    1.必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息

    2.应用程序需要通过程序获知每个Partition的leader是谁

    3.需要处理leader的变更

    使用SimpleConsumer API的一般流程如下:

    1.查找到一个“活着”的broker,并且找出每个partition的leader

    2.找出每个partition的follower

    3.定义好请求,该请求应该能描述应用程序需要哪些数据

    4. fetch数据

    5.识别leader的变化,并对之作出必要的响应

    以下针对high-level Consumer API进行说明。

    6.2 consumer group

    如2.2节所说,kafka的分配单位是patition。每个consumer都属于一个group,一个partition只能被同一个group内的一个consumer所消费(也就保障了一个消息只能被group内的一个consuemr所消费),但是多个group可以同时消费这个partition。

    kafka的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用spark/Storm这些实时处理系统对消息在线处理,同时使用Hadoop批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的consumer group。

    6.3消费方式

    consumer采用pull模式从broker中读取数据。

    push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

    对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

    6.4 consumer delivery guarantee

    如果将consumer设置为autocommit,consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka确保了Exactly once。

    但实际使用中应用程序并非在consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了consumer delivery guarantee:

    1.读完消息先commit再处理消息。

    这种模式下,如果consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once

    2.读完消息先处理再commit。

    这种模式下,如果在处理完消息之后commit之前consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。

    3.如果一定要做到Exactly once,就需要协调offset和实际操作的输出。

    精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high-level API而言,offset是存于Zookeeper中的,无法存于HDFS,而SimpleConsuemr API的offset是由自己去维护的,可以将之存于HDFS中)

    总之,Kafka默认保证At least once,并且允许通过设置producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是kafka提供的offset可以非常直接非常容易得使用这种方式。

    6.5 consumer rebalance

    当有consumer加入或退出、以及partition的改变(如broker加入或退出)时会触发rebalance。consumer rebalance算法如下:

    1.将目标topic下的所有partirtion排序,存于PT

    2.对某consumer group下所有consumer排序,存于CG,第i个consumer记为Ci

    3. N=size(PT)/size(CG),向上取整

    4.解除Ci对原来分配的partition的消费权(i从0开始)

    5.将第i*N到(i+1)*N-1个partition分配给Ci

    在0.8.*版本,每个consumer都只负责调整自己所消费的partition,为了保证整个consumer group的一致性,当一个consumer触发了rebalance时,该consumer group内的其它所有其它consumer也应该同时触发rebalance。这会导致以下几个问题:

    1.Herd effect

    任何broker或者consumer的增减都会触发所有的consumer的rebalance

    2.Split Brain

    每个consumer分别单独通过zookeeper判断哪些broker和consumer宕机了,那么不同consumer在同一时刻从zookeeper看到的view就可能不一样,这是由zookeeper的特性决定的,这就会造成不正确的reblance尝试。

    3.调整结果不可控

    所有的consumer都并不知道其它consumer的rebalance是否成功,这可能会导致kafka工作在一个不正确的状态。

    基于以上问题,kafka设计者考虑在0.9.*版本开始使用中心coordinator来控制consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在consumer客户端实现分配方案。

    了解详情请登陆昆明达内Java培训官网(km.Java.tedu.cn)!

    推荐文章

上一篇:kafka知识点:kafka HA

下一篇:kafka需要注意的事项

最新开班日期  |  更多

Java--零基础全日制班

Java--零基础全日制班

开班日期:12/29

Java--零基础业余班

Java--零基础业余班

开班日期:12/29

Java--周末提升班

Java--周末提升班

开班日期:12/29

Java--零基础周末班

Java--零基础周末班

开班日期:12/29

  • 网址:http://km .java.tedu.cn      地址:昆明市官渡区春城路62号证券大厦附楼6楼
  • 课程培训电话:186 8716 1620      qq:2066486918    全国服务监督电话:400-827-0010
  • 服务邮箱 ts@tedu.cn
  • 2001-2016 达内国际公司(TARENA INTERNATIONAL,INC.) 版权所有 京ICP证08000853号-56