kafka的存储方案是顺序追加写日志 + 稀疏哈希索引
/index文件中会生成三份文件XX.log,XX.index,XX.timeindex,日志是分块存储的,时间戳索引从XX.timeindex中找到对应的offset,再从XX.index中索引XX.log。
这里通过mmap(内存映射)可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针
页缓存相对来说比较简单,页缓存在操作系统层面是保存数据的一个基本单位,Kafka 避免使用 JVM,直接使用操作系统的页缓存特性提高处理速度,进而避免了JVM GC 带来的性能损耗。
kafka就采用零拷贝技术来消费数据
在 kafka 中页提高了大量批处理的 API ,可以对数据进行统一的压缩合并,通过更小的数据包在网络中进行数据发送,再进行后续处理,这在大量数据处理中,效率提高是非常明显的。
kafka 出现消息重复消费的原因:
解决方案:
enable.auto.commit
参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。Kafka 事务与数据库的事务定义基本类似,主要是一个原子性:多个操作要么全部成功,要么全部失败。Kafka 中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理。
当 kafka producer 向 broker 中的 topic发送数据时,可能会因为网络抖动等各种原因,造成 producer 收不到 broker 的 ack 确认信息。kafka幂等性就会保证在生产者内部逻辑问题引起的消息重复消费的时候,只有一个数据可以被正确的发送。
需要注意的是如果使用try/catch捕获,用send手动发送,则会被视为不同的消息
原理
producer.initTransactions();
try {
producer.beginTransaction();
for (ProducerRecord<String, String> record : payload) {
producer.send(record);
}
Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
{
put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
}
};
producer.sendOffsetsToTransaction(groupCommit, "groupId");
producer.commitTransaction();
} catch (ProducerFencedException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
producer.send()
方法,发送数据到分区;producer.senOffsetsToTransaction()
接口,发送分区的 Offset 信息到事务协调者,协调者将分区信息增加到事务中;producer.commitTransaction()
或 abortTransaction()
方法,提交或回滚事务;(回滚就会跳过该事务)需要注意的是:如果事务性生产者(Transactional Producer)发送的消息没有被提交,消费者是不会读取该消息之后的数据的。
Kafka 触发 Rebalance 主要有以下几种情况:
主要有三点产生原因:
Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。
EX:假如有10个分区,3个消费者线程,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者线程为C1-0,C2-0,C2-1
如果有11个分区将会是:
C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10
该策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者
举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:
消费者C0:t0p0、t0p2、t1p1 消费者C1:t0p1、t1p0、t1p2
Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:
① 分区的分配要尽可能的均匀; ② 分区的分配尽可能的与上次分配的保持相同。
订单创建后,超过30分钟没有⽀付,则需要取消订单,这种场景可以通过延时队列来实现
kafka中创建创建相应的主题
如果是:去数据库中修改订单状态为已取消
如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次 向kafka拉取该offset及之后的消息,继续进⾏判断,以此反复。
对比方向 | 概要 |
---|---|
吞吐量 | 万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比 十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。 |
可用性 | 都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 |
时效性 | RabbitMQ 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其他三个都是 ms 级。 |
功能支持 | 除了 Kafka,其他三个功能都较为完备。 Kafka 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
消息丢失 | ActiveMQ 和 RabbitMQ 丢失的可能性非常低, RocketMQ 和 Kafka 理论上不会丢失。 |
总结:
Pulsar 是下一代云原生分布式消息流平台,最初由 Yahoo 开发 ,已经成为 Apache 顶级项目。
Pulsar 集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。
Pulsar 的关键特性如下(摘自官网):
综上所述,选择使用Kafka是出于其在实时数据流处理、高吞吐量、可扩展性、数据流治理以及与HBase的结合方面的优势。
Kafka | RocketMQ | RabbitMQ | |
---|---|---|---|
单机吞吐量 | 17.3w/s | 11.6w/s | 2.6w/s(消息做持久化) |
开发语言 | Scala/Java | Java | Erlang |
主要维护者 | Apache | Alibaba | Mozilla/Spring |
订阅形式 | 基于topic,按照topic进行正则匹配的发布订阅模式 | 基于topic/messageTag,按照消息类型、属性进行正则匹配的发布订阅模式 | 提供了4种:direct, topic ,Headers和fanout。fanout就是广播模式 |
持久化 | 支持大量堆积 | 支持大量堆积 | 支持少量堆积 |
顺序消息 | 支持 | 支持 | 不支持 |
集群方式 | 天然的Leader-Slave,无状态集群,每台服务器既是Master也是Slave | 常用 多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master | 支持简单集群,'复制’模式,对高级集群模式支持不好。 |
性能稳定性 | 较差 | 一般 | 好 |
Zookeeper在Kafka中扮演着关键的角色,它是Kafka集群中的一个核心组件,用于管理、协调和维护整个Kafka集群的状态。以下是Zookeeper在Kafka中的主要作用:
Raft 集群中每个节点都处于以下三种角色之一:
每开始一次新的选举,称为一个任期(term),每个 term 都有一个严格递增的整数与之关联。
每当 candidate 触发 leader election 时都会增加 term,如果一个 candidate 赢得选举,他将在本 term 中担任 leader 的角色。但并不是每个 term 都一定对应一个 leader,有时候某个 term 内会由于选举超时导致选不出 leader,这时 candicate 会递增 term 号并开始新一轮选举。
Term 更像是一个逻辑时钟(logic clock)的作用,有了它,就可以发现哪些节点的状态已经过期。每一个节点都保存一个 current term,在通信时带上这个 term 号。
节点间通过 RPC 来通信,主要有两类 RPC 请求:
Raft 的选主基于一种心跳机制,集群中每个节点刚启动时都是 follower 身份(Step: starts up),leader 会周期性的向所有节点发送心跳包来维持自己的权威,那么首个 leader 是如何被选举出来的呢?方法是如果一个 follower 在一段时间内没有收到任何心跳,也就是选举超时,那么它就会主观认为系统中没有可用的 leader,并发起新的选举(Step: times out, starts election)。
这里有一个问题,即这个“选举超时时间”该如何制定?如果所有节点在同一时刻启动,经过同样的超时时间后同时发起选举,整个集群会变得低效不堪,极端情况下甚至会一直选不出一个主节点。Raft 巧妙的使用了一个随机化的定时器,让每个节点的“超时时间”在一定范围内随机生成,这样就大大的降低了多个节点同时发起选举的可能性。
图:一个五节点 Raft 集群的初始状态,所有节点都是 follower 身份,term 为 1,且每个节点的选举超时定时器不同
若 follower 想发起一次选举,follower 需要先增加自己的当前 term,并将身份切换为 candidate。然后它会向集群其它节点发送“请给自己投票”的消息(RequestVote RPC)。
图:S1 率先超时,变为 candidate,term + 1,并向其它节点发出拉票请求
Follower 切换为 candidate 并向集群其他节点发送“请给自己投票”的消息后,接下来会有三种可能的结果,也即上面节点状态图中 candidate 状态向外伸出的三条线。
1. 选举成功(Step: receives votes from majority of servers)
当candicate从整个集群的大多数(N/2+1)节点获得了针对同一 term 的选票时,它就赢得了这次选举,立刻将自己的身份转变为 leader 并开始向其它节点发送心跳来维持自己的权威。
图:“大部分”节点都给了 S1 选票
图:S1 变为 leader,开始发送心跳维持权威
每个节点针对每个 term 只能投出一张票,并且按照先到先得的原则。这个规则确保只有一个 candidate 会成为 leader。
2. 选举失败(Step: discovers current leader or new term)
Candidate 在等待投票回复的时候,可能会突然收到其它自称是 leader 的节点发送的心跳包,如果这个心跳包里携带的 term 不小于 candidate 当前的 term,那么 candidate 会承认这个 leader,并将身份切回 follower。这说明其它节点已经成功赢得了选举,我们只需立刻跟随即可。但如果心跳包中的 term 比自己小,candidate 会拒绝这次请求并保持选举状态。
图:S4、S2 依次开始选举
图:S4 成为 leader,S2 在收到 S4 的心跳包后,由于 term 不小于自己当前的 term,因此会立刻切为 follower 跟随 S4
3. 选举超时(Step: times out, new election)
第三种可能的结果是 candidate 既没有赢也没有输。如果有多个 follower 同时成为 candidate,选票是可能被瓜分的,如果没有任何一个 candidate 能得到大多数节点的支持,那么每一个 candidate 都会超时。此时 candidate 需要增加自己的 term,然后发起新一轮选举。如果这里不做一些特殊处理,选票可能会一直被瓜分,导致选不出 leader 来。这里的“特殊处理”指的就是前文所述的随机化选举超时时间。
图:S1 ~ S5 都在参与选举
图:没有任何节点愿意给他人投票
图:如果没有随机化超时时间,所有节点将会继续同时发起选举……
节点状态图中的最后一条线是:discovers server with higher term。想象一个场景:当 leader 节点发生了宕机或网络断连,此时其它 follower 会收不到 leader 心跳,首个触发超时的节点会变为 candidate 并开始拉票(由于随机化各个 follower 超时时间不同),由于该 candidate 的 term 大于原 leader 的 term,因此所有 follower 都会投票给它,这名 candidate 会变为新的 leader。一段时间后原 leader 恢复了,收到了来自新leader 的心跳包,发现心跳中的 term 大于自己的 term,此时该节点会立刻切换为 follower 并跟随的新 leader。
上述流程的动画模拟如下:
图:S4 作为 term2 的 leader
图:S4 宕机,S5 即将率先超时
图:S5 当选 term3 的 leader
图:S4 宕机恢复后收到了来自 S5 的 term3 心跳
图:S4 立刻变为 S5 的 follower
以上就是 Raft 的选主逻辑,但还有一些细节(譬如是否给该 candidate 投票还有一些其它条件)依赖算法的其它部分基础,我们会在后续“安全性”一章描述。
当票选出 leader 后,leader 也该承担起相应的责任了,这个责任是什么?就是下一章将介绍的“日志复制”。
一旦 leader 被票选出来,它就承担起领导整个集群的责任了,开始接收客户端请求,并将操作包装成日志,并复制到其它节点上去。
整体流程如下:
整个集群的日志模型可以宏观表示为下图(x ← 3 代表 x 赋值为 3):
每条日志除了存储状态机的操作指令外,还会拥有一个唯一的整数索引值(log index)来表明它在日志集合中的位置。此外,每条日志还会存储一个 term 号(日志条目方块最上方的数字,相同颜色 term 号相同),该 term 表示 leader 收到这条指令时的当前任期,term 相同的 log 是由同一个 leader 在其任期内发送的。
当一条日志被 leader 节点认为可以安全的 apply 到状态机时,称这条日志是 committed(上图中的 committed entries)。那么什么样的日志可以被 commit 呢?答案是:当 leader 得知这条日志被集群过半的节点复制成功时。因此在上图中我们可以看到 (term3, index7) 这条日志以及之前的日志都是 committed,尽管有两个节点拥有的日志并不完整。
Raft 保证所有 committed 日志都已经被持久化,且“最终”一定会被状态机apply。
注:这里的“最终”用词很微妙,它表明了一个特点:Raft 保证的只是集群内日志的一致性,而我们真正期望的集群对外的状态机一致性需要我们做一些额外工作,这一点在《线性一致性与读性能优化》一章会着重介绍。
RocketMQ 实现延迟队列的原理是基于消息的存储和消费控制。
当生产者发送消息时,会根据设置的延迟级别将消息存储在对应的延迟消息队列中。延迟消息队列是一个特殊的存储区域,用于存放需要延迟发送的消息。
在 RocketMQ 的 Broker 端,延迟消息会以不同的延迟级别分别存储在具有不同持久化特性的文件中。这样可以根据消息的延迟级别和存储特性对消息进行管理,以便在合适的时间发送给消费者。
同时,RocketMQ 的消费者会启动并订阅延迟消息,但实际消费延迟消息的时间是由 RocketMQ 控制的。在消息存储时,Broker 会根据消息的延迟级别和存储特性来计算出消息的过期时间。当消息达到过期时间后,才会交给消费者进行消费。
需要注意的是,RocketMQ 的延迟队列并不是精确的实时延迟,而是近似延迟。这是因为 RocketMQ 在处理延迟消息时,并不会对每一条消息都进行实时的延迟计算和触发。相反,它使用了一种定时检查和触发机制来控制延迟消息的发送。这也是为了保证系统的高吞吐量和性能。
综上所述,RocketMQ 实现延迟队列的原理是通过将延迟消息存储在特定的延迟消息队列中,并在过期时间到达后交给消费者进行消费。延迟消息的存储和触发机制保证了延迟消息的可靠性和近似延迟。
RocketMQ 可以通过添加消息队列的方式来实现顺序消费。在 RocketMQ 中,每个主题(Topic)下可以包含多个消息队列(Message Queue),每个消息队列都是独立的。如果需要保证顺序消费,需要将同一业务的消息发送到同一个 Message Queue 中。
以下是实现顺序消费的方法:
需要注意的是,在使用顺序消费的情况下,建议同时设置消费者数量为1,否则可能会导致顺序混乱。此外,即使使用了顺序生产者和顺序消费者,仍然可能出现无法保证消息完全按照顺序进行处理的情况,例如网络问题、服务故障等原因。因此,在使用顺序消费时,应该仔细设计消息的生产和消费流程,并进行充分测试,以确保消息能够按照预期顺序进行处理。