|
@@ -0,0 +1,67 @@
|
|
|
|
+## kafka 的事务机制
|
|
|
|
+
|
|
|
|
+Kafka 事务与数据库的事务定义基本类似,主要是一个原子性:**多个操作要么全部成功,要么全部失败**。Kafka 中的事务可以使应用程序将**消费消息、生产消息、提交消费位移**当作原子操作来处理。
|
|
|
|
+
|
|
|
|
+- KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件;
|
|
|
|
+- 事实上,开启 kafka事务时,kafka 会自动开启幂等生产者。
|
|
|
|
+
|
|
|
|
+## kafka的幂等性
|
|
|
|
+
|
|
|
|
+当 kafka producer 向 broker 中的 topic发送数据时,可能会因为网络抖动等各种原因,造成 producer 收不到 broker 的 ack 确认信息。kafka幂等性就会保证在生产者内部逻辑问题引起的消息重复消费的时候,只有一个数据可以被正确的发送。
|
|
|
|
+
|
|
|
|
+> **需要注意的是如果使用try/catch捕获,用send手动发送,则会被视为不同的消息**
|
|
|
|
+
|
|
|
|
+**原理**
|
|
|
|
+
|
|
|
|
+- 在 producer 端,每个 producer 都被 broker 自动分配了一个 Producer Id (PID), producer 向 broker 发送的每条消息,在内部都附带着该 pid 和一个递增的 sequence number;
|
|
|
|
+- 在 broker 端,broker 为每个 topic 的每个 partition 都维护了一个当前写成功的消息的最大 PID-Sequence Number 元组;
|
|
|
|
+- 当 broker 收到一个比当前最大 PID-Sequence Number 元组小的 sequence number 消息时,就会丢弃该消息,以避免造成数据重复存储;
|
|
|
|
+- 当 broker 失败重新选举新的 leader 时, 以上去重机制仍然有效:因为 broker 的 topic 中存储的消息体中附带了 PID-sequence number 信息,且 leader 的所有消息都会被复制到 followers 中。当某个原来的 follower 被选举为新的 leader 时,它内部的消息中已经存储了PID-sequence number 信息,也就可以执行消息去重了。
|
|
|
|
+
|
|
|
|
+## kafka事务基本流程
|
|
|
|
+
|
|
|
|
+* **initTransactions**:方法用来初始化事务,这个方法能够执行的前提是配置了transactionalId,如果没有则会报出IllegalStateException:
|
|
|
|
+* **beginTransaction**:方法用来开启事务;
|
|
|
|
+* **sendOffsetsToTransaction**:方法为消费者提供在事务内的位移提交的操作;将偏移量提交到事务中,仅当整个交易(消费和生产)成功时,它才会提交。
|
|
|
|
+* **commitTransaction**:方法用来提交事务;
|
|
|
|
+* **abortTransaction**:方法用来中止事务,类似于事务回滚。
|
|
|
|
+
|
|
|
|
+```java
|
|
|
|
+producer.initTransactions();
|
|
|
|
+try {
|
|
|
|
+ producer.beginTransaction();
|
|
|
|
+ for (ProducerRecord<String, String> record : payload) {
|
|
|
|
+ producer.send(record);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
|
|
|
|
+ {
|
|
|
|
+ put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ producer.sendOffsetsToTransaction(groupCommit, "groupId");
|
|
|
|
+ producer.commitTransaction();
|
|
|
|
+} catch (ProducerFencedException e) {
|
|
|
|
+ producer.close();
|
|
|
|
+} catch (KafkaException e) {
|
|
|
|
+ producer.abortTransaction();
|
|
|
|
+}
|
|
|
|
+```
|
|
|
|
+
|
|
|
|
+## 事务基本流程
|
|
|
|
+
|
|
|
|
+1. **存储对应关系,通过请求增加分区**
|
|
|
|
+ * Producer 在向新分区发送数据之前,首先向 TransactionalCoordinator 发送请求,使 TransactionalCoordinator 存储对应关系 (transactionalId, TopicPartition) 到主题 __transaction_state 中。
|
|
|
|
+2. **生产者发送消息**
|
|
|
|
+ * 基本与普通的发送消息相同,生产者调用 `producer.send()` 方法,发送数据到分区;
|
|
|
|
+ * 发送的请求中,包含 pid, epoch, sequence number 字段;
|
|
|
|
+3. **增加消费 offset 到事务**
|
|
|
|
+ * 生产者通过 `producer.senOffsetsToTransaction()` 接口,发送分区的 Offset 信息到事务协调者,协调者将分区信息增加到事务中;
|
|
|
|
+4. **事务提交位移**
|
|
|
|
+ * 在前面生产者调用事务提交 offset 接口后,会发送一个 TxnOffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。协调者会根据请求的 pid 与 epoch 验证生产者是否允许发起这个请求。
|
|
|
|
+ * 只有当事务提交之后,offset 才会对外可见。
|
|
|
|
+5. **提交或回滚事务**
|
|
|
|
+ * 用户调用 `producer.commitTransaction()` 或 `abortTransaction()` 方法,提交或回滚事务;
|
|
|
|
+ * 生产者完成事务之后,客户端需要显式调用结束事务,或者回滚事务。前者使消息对消费者可见,后者使消息标记为 abort 状态,对消费者不可见。无论提交或者回滚,都会发送一个 EndTxnRequest 请求到事务协调者,同时写入 PREPARE_COMMIT 或者 PREPARE_ABORT 信息到事务记录日志中。
|
|
|
|
+
|
|
|
|
+> **需要注意的是:如果事务性生产者(Transactional Producer)发送的消息没有被提交,消费者是不会读取该消息之后的数据的。**
|