kafka.md 12 KB

kafka为什么快?

顺序读写

kafka的存储方案是顺序追加写日志 + 稀疏哈希索引

image-20230519214345638

  1. kafka 中消息是以主题 Topic 为基本单位进行归类的,这里的 Topic 是逻辑上的概念,实际上在磁盘存储是根据分区 Partition 存储的, 即每个 Topic 被分成多个 Partition,分区 Partition 的数量可以在主题 Topic 创建的时候进行指定。
  2. Partition 分区主要是为了解决 Kafka 存储的水平扩展问题而设计的, 如果一个 Topic 的所有消息都只存储到一个 Kafka Broker上的话, 对于 Kafka 每秒写入几百万消息的高并发系统来说,这个 Broker 肯定会出现瓶颈, 故障时候不好进行恢复,所以 Kafka 将 Topic 的消息划分成多个 Partition, 然后均衡的分布到整个 Kafka Broker 集群中。
  3. Partition 分区内每条消息都会被分配一个唯一的消息 id,即我们通常所说的 偏移量 Offset, 因此 kafka 只能保证每个分区内部有序性,并不能保证全局有序性。
  4. 然后每个 Partition 分区又被划分成了多个 LogSegment,这是为了防止 Log 日志过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegement,相当于一个巨型文件被平均分割为一些相对较小的文件,这样也便于消息的查找、维护和清理。这样在做历史数据清理的时候,直接删除旧的 LogSegement 文件就可以了。
  5. Log 日志在物理上只是以文件夹的形式存储,而每个 LogSegement 对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件(比如以".snapshot"为后缀的快照索引文件等)

/index文件中会生成三份文件XX.log,XX.index,XX.timeindex,日志是分块存储的,时间戳索引从XX.timeindex中找到对应的offset,再从XX.index中索引XX.log。

这里通过mmap(内存映射)可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针

img

页缓存--pagecache

页缓存相对来说比较简单,页缓存在操作系统层面是保存数据的一个基本单位,Kafka 避免使用 JVM,直接使用操作系统的页缓存特性提高处理速度,进而避免了JVM GC 带来的性能损耗。

零拷贝

kafka就采用零拷贝技术来消费数据

批量操作

在 kafka 中页提高了大量批处理的 API ,可以对数据进行统一的压缩合并,通过更小的数据包在网络中进行数据发送,再进行后续处理,这在大量数据处理中,效率提高是非常明显的。

kafka如何保证顺序消费

  1. 1 个 Topic 只对应一个 Partition。
  2. (推荐)发送消息的时候指定 key/Partition。

Kafka 如何保证消息不重复消费

kafka 出现消息重复消费的原因:

  • 服务端侧已经消费的数据没有成功提交 offset(根本原因)。
  • Kafka 侧 由于服务端处理业务时间长或者网络链接等等原因让 Kafka 认为服务假死,触发了分区 rebalance。

解决方案:

  • 消费消息服务做幂等校验,比如 Redis 的 set、MySQL 的主键等天然的幂等功能。这种方法最有效。
  • enable.auto.commit 参数设置为 false,关闭自动提交,开发者在代码中手动提交 offset。

kafka 的事务机制

Kafka 事务与数据库的事务定义基本类似,主要是一个原子性:多个操作要么全部成功,要么全部失败。Kafka 中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理。

  • KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件;
  • 事实上,开启 kafka事务时,kafka 会自动开启幂等生产者。

kafka的幂等性

当 kafka producer 向 broker 中的 topic发送数据时,可能会因为网络抖动等各种原因,造成 producer 收不到 broker 的 ack 确认信息。kafka幂等性就会保证在生产者内部逻辑问题引起的消息重复消费的时候,只有一个数据可以被正确的发送。

需要注意的是如果使用try/catch捕获,用send手动发送,则会被视为不同的消息

原理

  • 在 producer 端,每个 producer 都被 broker 自动分配了一个 Producer Id (PID), producer 向 broker 发送的每条消息,在内部都附带着该 pid 和一个递增的 sequence number;
  • 在 broker 端,broker 为每个 topic 的每个 partition 都维护了一个当前写成功的消息的最大 PID-Sequence Number 元组;
  • 当 broker 收到一个比当前最大 PID-Sequence Number 元组小的 sequence number 消息时,就会丢弃该消息,以避免造成数据重复存储;
  • 当 broker 失败重新选举新的 leader 时, 以上去重机制仍然有效:因为 broker 的 topic 中存储的消息体中附带了 PID-sequence number 信息,且 leader 的所有消息都会被复制到 followers 中。当某个原来的 follower 被选举为新的 leader 时,它内部的消息中已经存储了PID-sequence number 信息,也就可以执行消息去重了。

kafka事务基本流程

  • initTransactions:方法用来初始化事务,这个方法能够执行的前提是配置了transactionalId,如果没有则会报出IllegalStateException:
  • beginTransaction:方法用来开启事务;
  • sendOffsetsToTransaction:方法为消费者提供在事务内的位移提交的操作;将偏移量提交到事务中,仅当整个交易(消费和生产)成功时,它才会提交。
  • commitTransaction:方法用来提交事务;
  • abortTransaction:方法用来中止事务,类似于事务回滚。
producer.initTransactions();
try {
    producer.beginTransaction();
    for (ProducerRecord<String, String> record : payload) {
        producer.send(record);
    }

    Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
        {
            put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
        }
    };
    producer.sendOffsetsToTransaction(groupCommit, "groupId");
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
}

事务基本流程

  1. 存储对应关系,通过请求增加分区
    • Producer 在向新分区发送数据之前,首先向 TransactionalCoordinator 发送请求,使 TransactionalCoordinator 存储对应关系 (transactionalId, TopicPartition) 到主题 __transaction_state 中。
  2. 生产者发送消息
    • 基本与普通的发送消息相同,生产者调用 producer.send() 方法,发送数据到分区;
    • 发送的请求中,包含 pid, epoch, sequence number 字段;
  3. 增加消费 offset 到事务
    • 生产者通过 producer.senOffsetsToTransaction() 接口,发送分区的 Offset 信息到事务协调者,协调者将分区信息增加到事务中;
  4. 事务提交位移
    • 在前面生产者调用事务提交 offset 接口后,会发送一个 TxnOffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。协调者会根据请求的 pid 与 epoch 验证生产者是否允许发起这个请求。
    • 只有当事务提交之后,offset 才会对外可见。
  5. 提交或回滚事务
    • 用户调用 producer.commitTransaction()abortTransaction() 方法,提交或回滚事务;(回滚就会跳过该事务)
    • 生产者完成事务之后,客户端需要显式调用结束事务,或者回滚事务。前者使消息对消费者可见,后者使消息标记为 abort 状态,对消费者不可见。无论提交或者回滚,都会发送一个 EndTxnRequest 请求到事务协调者,同时写入 PREPARE_COMMIT 或者 PREPARE_ABORT 信息到事务记录日志中。

需要注意的是:如果事务性生产者(Transactional Producer)发送的消息没有被提交,消费者是不会读取该消息之后的数据的。

kafka 触发 Rebalance

Kafka 触发 Rebalance 主要有以下几种情况:

  1. 消费者组中新增或移除了消费者:当一个新的消费者加入消费者组或者一个消费者从消费者组中退出时,Kafka 会触发 Rebalance。这可以通过添加或删除消费者来实现动态扩缩容。
  2. 消费者组订阅的主题发生变化:当消费者组订阅的主题发生变化,即增加或删除了主题,Kafka 也会触发 Rebalance。这样可以确保新增或删除的主题的分区被正确地分配给消费者。
  3. 消费者健康状态发生变化:如果某个消费者长时间未发送心跳,或者被判定为失效,Kafka 会将该消费者标记为“退出”,并触发 Rebalance,重新分配该消费者的分区给其他正常的消费者。

kafka消息积压问题

产生原因

主要有三点产生原因:

  1. 实时/消费任务挂掉
  2. Kafka分区数设置的不合理(太少)和消费者"消费能力"不足
  3. Kafka消息的key不均匀,导致分区间数据不均衡

消息挤压会有什么严重的后果

  1. 消息被丢弃
  2. 磁盘空间被挤占
  3. 海量消息被挤压,系统反应较慢

如何解决消息挤压问题

  1. 生产者减少消息发送速率
  2. 消费端做如下处理

img

  1. 尽快复盘,找到挤压的原因

Kafka 消费组消费者分配策略

1.Range(范围)

Range是对每个Topic而言的(即一个Topic一个Topic分),首先对同一个Topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。然后用Partitions分区的个数除以消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

EX:假如有10个分区,3个消费者线程,把分区按照序号排列0,1,2,3,4,5,6,7,8,9;消费者线程为C1-0,C2-0,C2-1

如果有11个分区将会是:

C1-0:0,1,2,3
C2-0:4,5,6,7
C2-1:8,9,10

2.RoundRobin(轮询)默认的策略

该策略的原理是将消费组内所有消费者以及消费者所订阅的所有topic的partition按照字典序排序,然后通过轮询方式逐个将分区以此分配给每个消费者

在这里插入图片描述

举例,假设消费组中有2个消费者C0和C1,都订阅了主题t0和t1,并且每个主题都有3个分区,那么所订阅的所有分区可以标识为:t0p0、t0p1、t0p2、t1p0、t1p1、t1p2。最终的分配结果为:

消费者C0:t0p0、t0p2、t1p1 消费者C1:t0p1、t1p0、t1p2

3.Sticky

Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

① 分区的分配要尽可能的均匀; ② 分区的分配尽可能的与上次分配的保持相同。

kafka实现延时队列的效果

应⽤场景

订单创建后,超过30分钟没有⽀付,则需要取消订单,这种场景可以通过延时队列来实现

具体⽅案

image-20210914195129983

  • kafka中创建创建相应的主题

    • 消费者消费该主题的消息(轮询)
    • 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没⽀付)
    • 如果是:去数据库中修改订单状态为已取消

    • 如果否:记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次 向kafka拉取该offset及之后的消息,继续进⾏判断,以此反复。