昆明java培训
达内昆明广州春城路

18487146383

热门课程

kafka知识点:kafka架构

  • 时间:2016-10-21
  • 发布:昆明Java培训
  • 来源:达内新闻

昆明Java培训机构的老师今天给大家讲kafka架构。

2.2相关概念

kafka相关名词解释如下:

1.producer:

消息生产者,发布消息到kafka集群的终端或服务。

2.broker:

kafka集群中包含的服务器。

3.topic:

每条发布到kafka集群的消息属于的类别,即kafka是面向topic的。

4.partition:

partition是物理上的概念,每个topic包含一个或多个partition。kafka分配的单位是partition。

5.consumer:

从kafka集群中消费消息的终端或服务。

6.Consumer group:

high-level consumer API中,每个consumer都属于一个consumer group,每条消息只能被consumer group中的一个Consumer消费,但可以被多个consumer group消费。

7.replica:

partition的副本,保障partition的高可用。

8.leader:

replica中的一个角色,producer和consumer只跟leader交互。

9.follower:

replica中的一个角色,从leader中复制数据。

10.controller:

kafka集群中的其中一个服务器,用来进行leader election以及各种failover。

12.zookeeper:

kafka通过zookeeper来存储集群的meta信息。

2.3 zookeeper节点

kafka在zookeeper中的存储结构

3. producer发布消息

3.1写入方式

producer采用push模式将消息发布到broker,每条消息都被append到patition中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。

3.2消息路由

producer发送消息到broker时,会根据分区算法选择将其存储到哪一个partition。其路由机制为:

1.指定了patition,则直接使用;

2.未指定patition但指定key,通过对key的value进行hash选出一个patition

3. patition和key都未指定,使用轮询选出一个patition。

附上java客户端分区源码,一目了然:

//创建消息实例

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {

if (topic == null)

throw new IllegalArgumentException("Topic cannot be null");

if (timestamp != null && timestamp < 0)

throw new IllegalArgumentException("Invalid timestamp " + timestamp);

this.topic = topic;

this.partition = partition;

this.key = key;

this.value = value;

this.timestamp = timestamp;

}

//计算patition,如果指定了patition则直接使用,否则使用key计算

private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {

Integer partition = record.partition();

if (partition != null) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());

int lastPartition = partitions.size() - 1;

if (partition < 0 || partition > lastPartition) {

throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));

}

return partition;

}

return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);

}

//使用key选取patition

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);

int numPartitions = partitions.size();

if (keyBytes == null) {

int nextValue = counter.getAndIncrement();

List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);

if (availablePartitions.size() > 0) {

int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();

return availablePartitions.get(part).partition();

} else {

return DefaultPartitioner.toPositive(nextValue) % numPartitions;

}

} else {

//对keyBytes进行hash选出一个patition

return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

}

}

3.3写入流程

producer写入消息序列

流程说明:

1. producer先从zookeeper的"/brokers/.../state"节点找到该partition的leader

2. producer将消息发送给该leader

3. leader将消息写入本地log

4. followers从leader pull消息,写入本地log后leader发送ACK

5. leader收到所有ISR中的replica的ACK后,增加HW(high watermark,最后commit的offset)并向producer发送ACK

3.4 producer delivery guarantee

一般情况下存在三种情况:

1. At most once消息可能会丢,但绝不会重复传输

2. At least one消息绝不会丢,但可能会重复传输

3. Exactly once每条消息肯定会被传输一次且仅传输一次

当producer向broker发送消息时,一旦这条消息被commit,由于replication的存在,它就不会丢。但是如果producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once,但目前还并未实现。所以目前默认情况下一条消息从producer到broker是确保了At least once,可通过设置producer异步发送实现At most once。

昆明java培训班10月31日开班,还不快来我们java的这个大家庭,让自己掉进java的知识的海洋里。

上一篇:kafka知识点:为什么需要消息系统
下一篇:kafka知识点:broker保存消息

腾讯游戏Switch独立销量领先——昆明达内

达内java语言编程学以致用

苹果技术:A11芯片上新菜【达内培训】

达内培训之国产手机vivo领跑,小米再上榜

选择城市和中心
贵州省

广西省

海南省