kafka.md 38 KB

kafka为什么快?

顺序读写

kafka的存储方案是顺序追加写日志 + 稀疏哈希索引

image-20230519214345638

  1. kafka 中消息是以主题 Topic 为基本单位进行归类的,这里的 Topic 是逻辑上的概念,实际上在磁盘存储是根据分区 Partition 存储的, 即每个 Topic 被分成多个 Partition,分区 Partition 的数量可以在主题 Topic 创建的时候进行指定。
  2. Partition 分区主要是为了解决 Kafka 存储的水平扩展问题而设计的, 如果一个 Topic 的所有消息都只存储到一个 Kafka Broker上的话, 对于 Kafka 每秒写入几百万消息的高并发系统来说,这个 Broker 肯定会出现瓶颈, 故障时候不好进行恢复,所以 Kafka 将 Topic 的消息划分成多个 Partition, 然后均衡的分布到整个 Kafka Broker 集群中。
  3. Partition 分区内每条消息都会被分配一个唯一的消息 id,即我们通常所说的 偏移量 Offset, 因此 kafka 只能保证每个分区内部有序性,并不能保证全局有序性。
  4. 然后每个 Partition 分区又被划分成了多个 LogSegment,这是为了防止 Log 日志过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegement,相当于一个巨型文件被平均分割为一些相对较小的文件,这样也便于消息的查找、维护和清理。这样在做历史数据清理的时候,直接删除旧的 LogSegement 文件就可以了。
  5. Log 日志在物理上只是以文件夹的形式存储,而每个 LogSegement 对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".snapshot"为后缀的快照索引文件等)

/index文件中会生成三份文件XX.log,XX.index,XX.timeindex,日志是分块存储的,时间戳索引从XX.timeindex中找到对应的offset,再从XX.index中索引XX.log。

这里通过mmap(内存映射)可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针

img

页缓存--pagecache

页缓存相对来说比较简单,页缓存在操作系统层面是保存数据的一个基本单位,Kafka 避免使用 JVM,直接使用操作系统的页缓存特性提高处理速度,进而避免了JVM GC 带来的性能损耗。

零拷贝

kafka就采用零拷贝技术来消费数据

批量操作

在 kafka 中页提高了大量批处理的 API ,可以对数据进行统一的压缩合并,通过更小的数据包在网络中进行数据发送,再进行后续处理,这在大量数据处理中,效率提高是非常明显的。

kafka如何保证顺序消费

  1. 1 个 Topic 只对应一个 Partition。
  2. (推荐)发送消息的时候指定 key/Partition。

Kafka 如何保证消息不重复消费

kafka 出现消息重复消费的原因:

  • 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
  • Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。

解决方案:

  • 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
  • enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。

kafka 的事务机制

Kafka 事务与数据库的事务定义基本类似,主要是一个原子性:多个操作要么全部成功,要么全部失败。Kafka 中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理。

  • KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件;
  • 事实上,开启 kafka事务时,kafka 会自动开启幂等生产者。

kafka的幂等性

当 kafka producer 向 broker 中的 topic发送数据时,可能会因为网络抖动等各种原因,造成 producer 收不到 broker 的 ack 确认信息。kafka幂等性就会保证在生产者内部逻辑问题引起的消息重复消费的时候,只有一个数据可以被正确的发送。

需要注意的是如果使用try/catch捕获,用send手动发送,则会被视为不同的消息

原理

  • 在 producer 端,每个 producer 都被 broker 自动分配了一个 Producer Id (PID), producer 向 broker 发送的每条消息,在内部都附带着该 pid 和一个递增的 sequence number;
  • 在 broker 端,broker 为每个 topic 的每个 partition 都维护了一个当前写成功的消息的最大 PID-Sequence Number 元组;
  • 当 broker 收到一个比当前最大 PID-Sequence Number 元组小的 sequence number 消息时,就会丢弃该消息,以避免造成数据重复存储;
  • 当 broker 失败重新选举新的 leader 时, 以上去重机制仍然有效:因为 broker 的 topic 中存储的消息体中附带了 PID-sequence number 信息,且 leader 的所有消息都会被复制到 followers 中。当某个原来的 follower 被选举为新的 leader 时,它内部的消息中已经存储了PID-sequence number 信息,也就可以执行消息去重了。

kafka事务基本流程

  • initTransactions:方法用来初始化事务,这个方法能够执行的前提是配置了transactionalId,如果没有则会报出IllegalStateException:
  • beginTransaction:方法用来开启事务;
  • sendOffsetsToTransaction:方法为消费者提供在事务内的位移提交的操作;将偏移量提交到事务中,仅当整个交易(消费和生产)成功时,它才会提交。
  • commitTransaction:方法用来提交事务;
  • abortTransaction:方法用来中止事务,类似于事务回滚。
producer.initTransactions();
try {
    producer.beginTransaction();
    for (ProducerRecord<String, String> record : payload) {
        producer.send(record);
    }

    Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
        {
            put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
        }
    };
    producer.sendOffsetsToTransaction(groupCommit, "groupId");
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
}

事务基本流程

  1. 存储对应关系,通过请求增加分区
    • Producer 在向新分区发送数据之前,首先向 TransactionalCoordinator 发送请求,使 TransactionalCoordinator 存储对应关系 (transactionalId, TopicPartition) 到主题 __transaction_state 中。
  2. 生产者发送消息
    • 基本与普通的发送消息相同,生产者调用 producer.send() 方法,发送数据到分区;
    • 发送的请求中,包含 pid, epoch, sequence number 字段;
  3. 增加消费 offset 到事务
    • 生产者通过 producer.senOffsetsToTransaction() 接口,发送分区的 Offset 信息到事务协调者,协调者将分区信息增加到事务中;
  4. 事务提交位移
    • 在前面生产者调用事务提交 offset 接口后,会发送一个 TxnOffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。协调者会根据请求的 pid 与 epoch 验证生产者是否允许发起这个请求。
    • 只有当事务提交之后,offset 才会对外可见。
  5. 提交或回滚事务
    • 用户调用 producer.commitTransaction()abortTransaction() 方法,提交或回滚事务;(回滚就会跳过该事务)
    • 生产者完成事务之后,客户端需要显式调用结束事务,或者回滚事务。前者使消息对消费者可见,后者使消息标记为 abort 状态,对消费者不可见。无论提交或者回滚,都会发送一个 EndTxnRequest 请求到事务协调者,同时写入 PREPARE_COMMIT 或者 PREPARE_ABORT 信息到事务记录日志中。

需要注意的是:如果事务性生产者(Transactional Producer)发送的消息没有被提交,消费者是不会读取该消息之后的数据的。

kafka 触发 Rebalance

Kafka 触发 Rebalance 主要有以下几种情况:

  1. 消费者组中新增或移除了消费者:当一个新的消费者加入消费者组或者一个消费者从消费者组中退出时,Kafka 会触发 Rebalance。这可以通过添加或删除消费者来实现动态扩缩容。
  2. 消费者组订阅的主题发生变化:当消费者组订阅的主题发生变化,即增加或删除了主题,Kafka 也会触发 Rebalance。这样可以确保新增或删除的主题的分区被正确地分配给消费者。
  3. 消费者健康状态发生变化:如果某个消费者长时间未发送心跳,或者被判定为失效,Kafka 会将该消费者标记为“退出”,并触发 Rebalance,重新分配该消费者的分区给其他正常的消费者。

kafka消息积压问题

产生原因

主要有三点产生原因:

  1. 实时/消费任务挂掉
  2. Kafka分区数设置的不合理(太少)和消费者"消费能力"不足
  3. Kafka消息的key不均匀,导致分区间数据不均衡

消息挤压会有什么严重的后果

  1. 消息被丢弃
  2. 磁盘空间被挤占
  3. 海量消息被挤压,系统反应较慢

如何解决消息挤压问题

  1. 生产者减少消息发送速率
  2. 消费端做如下处理

img

  1. 尽快复盘,找到挤压的原因

Kafka 消费组消费者分配策略

1.Range(范围)

Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

EX:假如有10个分区,3个消费者线程,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者线程为C1-0,C2-0,C2-1

如果有11个分区将会是:

C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10

2.RoundRobin(轮询)默认的策略

该策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者

在这里插入图片描述

举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者C0:t0p0、t0p2、t1p1 消费者C1:t0p1、t1p0、t1p2

3.Sticky

Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

① 分区的分配要尽可能的均匀; ② 分区的分配尽可能的与上次分配的保持相同。

kafka实现延时队列的效果

应⽤场景

订单创建后,超过30分钟没有⽀付,则需要取消订单,这种场景可以通过延时队列来实现

具体⽅案

image-20210914195129983

  • kafka中创建创建相应的主题

    • 消费者消费该主题的消息(轮询)
    • 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没⽀付)
    • 如果是:去数据库中修改订单状态为已取消

    • 如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次 向kafka拉取该offset及之后的消息,继续进⾏判断,以此反复。

消息队列对比

对比方向 概要
吞吐量 万级的 ActiveMQ 和 RabbitMQ 的吞吐量(ActiveMQ 的性能最差)要比 十万级甚至是百万级的 RocketMQ 和 Kafka 低一个数量级。
可用性 都可以实现高可用。ActiveMQ 和 RabbitMQ 都是基于主从架构实现高可用性。RocketMQ 基于分布式架构。 kafka 也是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
时效性 RabbitMQ 基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。其他三个都是 ms 级。
功能支持 除了 Kafka,其他三个功能都较为完备。 Kafka 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
消息丢失 ActiveMQ 和 RabbitMQ 丢失的可能性非常低, RocketMQ 和 Kafka 理论上不会丢失。

总结:

  • ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
  • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做 erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
  • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用 RocketMQ 挺好的
  • Kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

Pulsar

Pulsar 是下一代云原生分布式消息流平台,最初由 Yahoo 开发 ,已经成为 Apache 顶级项目。

Pulsar 集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。

Pulsar 的关键特性如下(摘自官网):

  • 是下一代云原生分布式消息流平台。
  • Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  • 极低的发布延迟和端到端延迟。
  • 可无缝扩展到超过一百万个 topic。
  • 简单的客户端 API,支持 Java、Go、Python 和 C++。
  • 主题的多种订阅模式(独占、共享和故障转移)。
  • 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
  • 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  • 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
  • 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储(如 S3、GCS)中。

为什么选择使用kafka

  1. 实时数据流处理: 您的项目需要处理煤矿大数据的实时采集、传输、存储和分发,其中实时性是关键。Kafka作为一个分布式流处理平台,能够高效地处理大量的实时数据流。结合HBase,Kafka可以将实时数据快速传输到HBase存储层,并在整个数据处理流程中保持低延迟。
  2. 高吞吐量和可扩展性: 您提到的每秒10000条的消息速率以及每日产生80GB数据,显示了您需要处理大规模数据量的能力。Kafka以其高吞吐量和可扩展性,能够有效地处理这些数据负载,同时保持较低的延迟。
  3. 数据流治理和重复消费防护: 在您的项目中,数据传输模块需要能够确保数据流的可靠性。Kafka提供了数据流治理的能力,您可以使用策略模式来判断数据治理函数,同时利用Redis缓存Topic分区偏移量以防止重复消费。
  4. 生态系统和社区支持: Kafka作为一个成熟的消息中间件,拥有广泛的生态系统和社区支持。这对于您在整合各个组件、解决问题以及在将来可能的需求变化时都非常有帮助。
  5. 与HBase的结合: Kafka和HBase是一对很好的搭档。通过Kafka将实时数据传输到HBase,您可以实现实时数据的快速存储和查询。Kafka提供了一个连接数据产生者和数据消费者的桥梁,使得数据流处理能够与数据存储紧密结合。

综上所述,选择使用Kafka是出于其在实时数据流处理、高吞吐量、可扩展性、数据流治理以及与HBase的结合方面的优势。

为什么选择kafka

Kafka RocketMQ RabbitMQ
单机吞吐量 17.3w/s 11.6w/s 2.6w/s(消息做持久化)
开发语言 Scala/Java Java Erlang
主要维护者 Apache Alibaba Mozilla/Spring
订阅形式 基于topic,按照topic进行正则匹配的发布订阅模式 基于topic/messageTag,按照消息类型、属性进行正则匹配的发布订阅模式 提供了4种:direct, topic ,Headers和fanout。fanout就是广播模式
持久化 支持大量堆积 支持大量堆积 支持少量堆积
顺序消息 支持 支持 不支持
集群方式 天然的Leader-Slave,无状态集群,每台服务器既是Master也是Slave 常用 多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master 支持简单集群,'复制’模式,对高级集群模式支持不好。
性能稳定性 较差 一般
  • RabbitMQ是开源的,有比较稳定的支持,活跃度也高,但是不是Java语言开发的。
  • 很多公司用RocketMQ,是阿里出品的。
  • 如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的。

让你写一个消息队列,该如何进行架构设计?

image.png

  1. 首先是消息队列的整体流程,producer发送消息给broker,broker存储好,broker再发送给consumer消费,consumer回复消费确认等。
  2. producer发送消息给broker,broker发消息给consumer消费,那就需要两次RPC了,RPC如何设计呢?可以参考开源框架Dubbo,你可以说说服务发现、序列化协议等等
  3. broker考虑如何持久化呢,是放文件系统还是数据库呢,会不会消息堆积呢,消息堆积如何处理呢。
  4. 消费关系如何保存呢? 点对点还是广播方式呢?广播关系又是如何维护呢?zk还是config server
  5. 消息可靠性如何保证呢?如果消息重复了,如何幂等处理呢?
  6. 消息队列的高可用如何设计呢? 可以参考Kafka的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
  7. 消息事务特性,与本地业务同个事务,本地消息落库;消息投递到服务端,本地才删除;定时任务扫描本地消息库,补偿发送。
  8. MQ得伸缩性和可扩展性,如果消息积压或者资源不够时,如何支持快速扩容,提高吞吐?可以参照一下 Kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了吗。

Zookeeper 在 Kafka 中的作用

Zookeeper在Kafka中扮演着关键的角色,它是Kafka集群中的一个核心组件,用于管理、协调和维护整个Kafka集群的状态。以下是Zookeeper在Kafka中的主要作用:

  1. 集群管理和协调: Zookeeper负责管理Kafka集群中的各个节点的状态和信息,包括Broker、Topic、Partition等的元数据信息。它维护了整个集群的拓扑结构和分布式状态,确保集群的正常运行。
  2. Leader选举: 在Kafka的分区中,每个分区都有一个Leader和多个Follower。当Leader节点故障或不可用时,Zookeeper协助进行新Leader的选举过程,确保分区的高可用性。
  3. Topic和Partition的元数据管理: Kafka的元数据,如Topic和Partition的信息、副本分布等,都由Zookeeper进行管理和存储。消费者可以通过Zookeeper获取这些元数据,以便知道在哪里找到所需的数据。
  4. Broker注册和发现: Kafka集群中的每个Broker在启动时都会向Zookeeper注册自己的信息,包括主题分区信息、副本分布等。消费者和生产者可以通过Zookeeper获取Broker的地址和状态信息,以便与合适的Broker通信。
  5. 消费者组协调: 当消费者以消费者组的形式订阅主题时,Zookeeper协助管理消费者的组成员,以及每个消费者在分区中的位置(偏移量)。这有助于实现消费者的负载均衡和故障转移。
  6. 偏移量管理: Zookeeper存储了消费者的偏移量信息,即消费者上次消费的位置。这样即使消费者断开连接后重新连接,也可以从之前的位置继续消费,确保数据不会重复消费或遗漏。
  7. 负载均衡:上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。

kafka为什么在2.8之后舍弃了zookeeper

  1. 简化架构: 在以前的版本中,Kafka使用了Zookeeper来管理元数据、选举、存储偏移量等,这导致了Kafka集群的架构比较复杂。采用KRaft可以使得整体架构更加简化,因为KRaft将元数据和状态的管理集成到Kafka本身中,减少了对外部依赖。
  2. 一致性和可靠性: KRaft基于Raft一致性算法,提供了更强的一致性保证。这可以帮助Kafka更好地处理元数据和状态的管理,从而提高整个系统的可靠性。
  3. 降低维护成本: 使用Kafka自身来管理元数据和状态,可以减少对Zookeeper的依赖,从而降低了维护的复杂性。此外,Zookeeper和Kafka的不同版本之间可能会出现不兼容性问题,这也增加了维护和升级的难度。
  4. 性能优化: KAFKA与zk之间需要网络通信去请求,如果网络超时或者抖动,会导致kafka异常

Raft

Raft 集群中每个节点都处于以下三种角色之一:

  • Leader: 所有请求的处理者,接收客户端发起的操作请求,写入本地日志后同步至集群其它节点。
  • Follower: 请求的被动更新者,从 leader 接收更新请求,写入本地文件。如果客户端的操作请求发送给了 follower,会首先由 follower 重定向给 leader。
  • Candidate: 如果 follower 在一定时间内没有收到 leader 的心跳,则判断 leader 可能已经故障,此时启动 leader election 过程,本节点切换为 candidate 直到选主结束。

选举

每开始一次新的选举,称为一个任期term),每个 term 都有一个严格递增的整数与之关联。

每当 candidate 触发 leader election 时都会增加 term,如果一个 candidate 赢得选举,他将在本 term 中担任 leader 的角色。但并不是每个 term 都一定对应一个 leader,有时候某个 term 内会由于选举超时导致选不出 leader,这时 candicate 会递增 term 号并开始新一轮选举。

img

Term 更像是一个逻辑时钟logic clock)的作用,有了它,就可以发现哪些节点的状态已经过期。每一个节点都保存一个 current term,在通信时带上这个 term 号。

节点间通过 RPC 来通信,主要有两类 RPC 请求:

  • RequestVote RPCs: 用于 candidate 拉票选举。
  • AppendEntries RPCs: 用于 leader 向其它节点复制日志以及同步心跳。

状态变化

img

Raft 的选主基于一种心跳机制,集群中每个节点刚启动时都是 follower 身份(Step: starts up),leader 会周期性的向所有节点发送心跳包来维持自己的权威,那么首个 leader 是如何被选举出来的呢?方法是如果一个 follower 在一段时间内没有收到任何心跳,也就是选举超时,那么它就会主观认为系统中没有可用的 leader,并发起新的选举(Step: times out, starts election)。

这里有一个问题,即这个“选举超时时间”该如何制定?如果所有节点在同一时刻启动,经过同样的超时时间后同时发起选举,整个集群会变得低效不堪,极端情况下甚至会一直选不出一个主节点。Raft 巧妙的使用了一个随机化的定时器,让每个节点的“超时时间”在一定范围内随机生成,这样就大大的降低了多个节点同时发起选举的可能性。

img

图:一个五节点 Raft 集群的初始状态,所有节点都是 follower 身份,term 为 1,且每个节点的选举超时定时器不同

若 follower 想发起一次选举,follower 需要先增加自己的当前 term,并将身份切换为 candidate。然后它会向集群其它节点发送“请给自己投票”的消息(RequestVote RPC)。

img

图:S1 率先超时,变为 candidate,term + 1,并向其它节点发出拉票请求

Candicate 状态转换过程

Follower 切换为 candidate 并向集群其他节点发送“请给自己投票”的消息后,接下来会有三种可能的结果,也即上面节点状态图中 candidate 状态向外伸出的三条线

1. 选举成功(Step: receives votes from majority of servers)

当candicate从整个集群的大多数(N/2+1)节点获得了针对同一 term 的选票时,它就赢得了这次选举,立刻将自己的身份转变为 leader 并开始向其它节点发送心跳来维持自己的权威。

img

图:“大部分”节点都给了 S1 选票

img

图:S1 变为 leader,开始发送心跳维持权威

每个节点针对每个 term 只能投出一张票,并且按照先到先得的原则。这个规则确保只有一个 candidate 会成为 leader。

2. 选举失败(Step: discovers current leader or new term)

Candidate 在等待投票回复的时候,可能会突然收到其它自称是 leader 的节点发送的心跳包,如果这个心跳包里携带的 term 不小于 candidate 当前的 term,那么 candidate 会承认这个 leader,并将身份切回 follower。这说明其它节点已经成功赢得了选举,我们只需立刻跟随即可。但如果心跳包中的 term 比自己小,candidate 会拒绝这次请求并保持选举状态。

img

图:S4、S2 依次开始选举

img

图:S4 成为 leader,S2 在收到 S4 的心跳包后,由于 term 不小于自己当前的 term,因此会立刻切为 follower 跟随 S4

3. 选举超时(Step: times out, new election)

第三种可能的结果是 candidate 既没有赢也没有输。如果有多个 follower 同时成为 candidate,选票是可能被瓜分的,如果没有任何一个 candidate 能得到大多数节点的支持,那么每一个 candidate 都会超时。此时 candidate 需要增加自己的 term,然后发起新一轮选举。如果这里不做一些特殊处理,选票可能会一直被瓜分,导致选不出 leader 来。这里的“特殊处理”指的就是前文所述的随机化选举超时时间

img

图:S1 ~ S5 都在参与选举

img

图:没有任何节点愿意给他人投票

img

图:如果没有随机化超时时间,所有节点将会继续同时发起选举……

Leader 状态转换过程

节点状态图中的最后一条线是:discovers server with higher term。想象一个场景:当 leader 节点发生了宕机或网络断连,此时其它 follower 会收不到 leader 心跳,首个触发超时的节点会变为 candidate 并开始拉票(由于随机化各个 follower 超时时间不同),由于该 candidate 的 term 大于原 leader 的 term,因此所有 follower 都会投票给它,这名 candidate 会变为新的 leader。一段时间后原 leader 恢复了,收到了来自新leader 的心跳包,发现心跳中的 term 大于自己的 term,此时该节点会立刻切换为 follower 并跟随的新 leader。

上述流程的动画模拟如下:

img

图:S4 作为 term2 的 leader

img

图:S4 宕机,S5 即将率先超时

图:S5 当选 term3 的 leader

img

图:S4 宕机恢复后收到了来自 S5 的 term3 心跳

img

图:S4 立刻变为 S5 的 follower

以上就是 Raft 的选主逻辑,但还有一些细节(譬如是否给该 candidate 投票还有一些其它条件)依赖算法的其它部分基础,我们会在后续“安全性”一章描述。

当票选出 leader 后,leader 也该承担起相应的责任了,这个责任是什么?就是下一章将介绍的“日志复制”。

复制解析

一旦 leader 被票选出来,它就承担起领导整个集群的责任了,开始接收客户端请求,并将操作包装成日志,并复制到其它节点上去。

整体流程如下:

  • Leader 为客户端提供服务,客户端的每个请求都包含一条即将被状态复制机执行的指令。
  • Leader 把该指令作为一条新的日志附加到自身的日志集合,然后向其它节点发起附加条目请求AppendEntries RPC),来要求它们将这条日志附加到各自本地的日志集合。
  • 当这条日志已经确保被安全的复制,即大多数(N/2+1)节点都已经复制后,leader 会将该日志 apply 到它本地的状态机中,然后把操作成功的结果返回给客户端。

整个集群的日志模型可以宏观表示为下图(x ← 3 代表 x 赋值为 3):

img

每条日志除了存储状态机的操作指令外,还会拥有一个唯一的整数索引值log index)来表明它在日志集合中的位置。此外,每条日志还会存储一个 term 号(日志条目方块最上方的数字,相同颜色 term 号相同),该 term 表示 leader 收到这条指令时的当前任期,term 相同的 log 是由同一个 leader 在其任期内发送的。

当一条日志被 leader 节点认为可以安全的 apply 到状态机时,称这条日志是 committed(上图中的 committed entries)。那么什么样的日志可以被 commit 呢?答案是:当 leader 得知这条日志被集群过半的节点复制成功时。因此在上图中我们可以看到 (term3, index7) 这条日志以及之前的日志都是 committed,尽管有两个节点拥有的日志并不完整。

Raft 保证所有 committed 日志都已经被持久化,且“最终”一定会被状态机apply。

注:这里的“最终”用词很微妙,它表明了一个特点:Raft 保证的只是集群内日志的一致性,而我们真正期望的集群对外的状态机一致性需要我们做一些额外工作,这一点在《线性一致性与读性能优化》一章会着重介绍。

kafka优化

  • 去除zk,优化
  • 没有实现延迟队列,可以参考rocket mq

RocketMQ 实现延迟队列的原理是基于消息的存储和消费控制。

当生产者发送消息时,会根据设置的延迟级别将消息存储在对应的延迟消息队列中。延迟消息队列是一个特殊的存储区域,用于存放需要延迟发送的消息。

在 RocketMQ 的 Broker 端,延迟消息会以不同的延迟级别分别存储在具有不同持久化特性的文件中。这样可以根据消息的延迟级别和存储特性对消息进行管理,以便在合适的时间发送给消费者。

同时,RocketMQ 的消费者会启动并订阅延迟消息,但实际消费延迟消息的时间是由 RocketMQ 控制的。在消息存储时,Broker 会根据消息的延迟级别和存储特性来计算出消息的过期时间。当消息达到过期时间后,才会交给消费者进行消费。

需要注意的是,RocketMQ 的延迟队列并不是精确的实时延迟,而是近似延迟。这是因为 RocketMQ 在处理延迟消息时,并不会对每一条消息都进行实时的延迟计算和触发。相反,它使用了一种定时检查和触发机制来控制延迟消息的发送。这也是为了保证系统的高吞吐量和性能。

综上所述,RocketMQ 实现延迟队列的原理是通过将延迟消息存储在特定的延迟消息队列中,并在过期时间到达后交给消费者进行消费。延迟消息的存储和触发机制保证了延迟消息的可靠性和近似延迟。

  • 没有实现顺序消费

RocketMQ 可以通过添加消息队列的方式来实现顺序消费。在 RocketMQ 中,每个主题(Topic)下可以包含多个消息队列(Message Queue),每个消息队列都是独立的。如果需要保证顺序消费,需要将同一业务的消息发送到同一个 Message Queue 中。

以下是实现顺序消费的方法:

  1. 按照业务进行分组:将同一业务的消息发送到同一个 Group 的消息队列中。
  2. 使用顺序生产者:RocketMQ 提供了顺序生产者(OrderProducer)来保证发送到同一个 Message Queue 中的消息顺序发送。在构造消息时,可以指定消息的业务 ID,RocketMQ 会根据该 ID 将消息发送到同一个 Message Queue 中。
  3. 使用顺序消费者:RocketMQ 提供了顺序消费者(OrderConsumer)来保证同一个 Message Queue 内的消息按照顺序被消费。在订阅消息时,需要设置消费模式为 Orderly,并实现 MessageListenerOrderly 接口,在消息处理完成后需要手动调用 ack 方法通知服务器消息已被成功消费。

需要注意的是,在使用顺序消费的情况下,建议同时设置消费者数量为1,否则可能会导致顺序混乱。此外,即使使用了顺序生产者和顺序消费者,仍然可能出现无法保证消息完全按照顺序进行处理的情况,例如网络问题、服务故障等原因。因此,在使用顺序消费时,应该仔细设计消息的生产和消费流程,并进行充分测试,以确保消息能够按照预期顺序进行处理。

  • 没有支持云原生,类似Pulsar支持云原生,kafka在添加结点的时候需要全量复制,导致耗时过长。