|
@@ -1,16 +1,123 @@
|
|
# kafka模块
|
|
# kafka模块
|
|
|
|
|
|
-煤矿大数据平台作为智能矿山基础服务平台的一部分,承担智能矿山服务平台中煤矿数据的采集、传输、存储、分发、计算等任务,致力于达到打通煤矿数据壁垒、整合数据资源、规范煤矿数据等目标。我主要负责数据采集和该模块监控这个两部分的功能。主要应用的组件是kafka,为了与下层数据传感器或者叫收集器解耦,也为了缓解系统IO压力,我们引入了kafka模块做数据采集的主要组件,为了保证顺序消费,因为采集的数据将按照时间顺序存入到hbase中。在生产者那一段主要是使用对象池技术进行缓存kafka producer。在消费者那一端则采用线程池对消费的数据进行处理,主要采取策略模式,按照选择的不同策略进行计算,例如区间或者压强的简单计算,或者是数值替换,也支持三方jar包,然后通过futuer收集数据。最后存储到hbase中。为了保证消息的等幂性,消费者端我们采取redis进行缓存数据,缓存超时时间默认为3分钟,将ACK设置为1保证消息顺序消费,如果消息发送失败,则默认重试一次,因为这个系统更多的是在意吞吐量,对于消息是否丢失不是很在意。
|
|
|
|
|
|
+煤矿大数据平台作为智能矿山基础服务平台的一部分,承担智能矿山服务平台中煤矿数据的采集、传输、存储、分发、计算等任务,致力于达到打通煤矿数据壁垒、整合数据资源、规范煤矿数据等目标。我主要负责数据采集和管道监控这个两部分的功能。主要应用的组件是kafka,为了与下层数据收集器解耦,同时也为了缓解系统IO压力,引入了kafka模块做数据采集的主要组件。
|
|
|
|
+
|
|
|
|
+该模块主要采集传感器生成的实时数据,为了保证数据的有序,所以要保证数据的顺序消费,因为采集的数据将按照时间顺序存入到hbase中。在生产者那一端可以从mysql或者kafka管道中收集数据实时数据,为了保证顺序消费,综合考虑将重试次数设置为0,因为这个系统更多的是在意吞吐量,对于消息是否丢失不是很在意。在消费者那一端则采用线程池对消费的数据进行多线程处理,主要采取策略模式,按照选择的不同策略进行计算,例如区间或者压强的简单计算,或者是数值替换,也支持三方jar包,然后通过futuer收集数据。最后存储到hbase中。为了保证消息的等幂性,消费者端我们采取redis进行缓存分区偏移量,缓存超时时间默认为3分钟,手动提交ACK。等方式保证消息顺序消费。
|
|
|
|
+
|
|
|
|
+我完成的第二个模块就是kafka监控模块,主要监控kafka集群的状态,与kafka-egale类似。主要操作就是调用kafka提供的kafka-admin-client接口和JMX进行操作,这里进行了相应的优化,主要包括复用同一个连接,使用工厂模式,生产一个admin-client,这里主要加了饿汉式单例来保证线程安全,利用redis缓存topic的速率。最后对于该模块,采用热启动的方式优化,例如自动建表,提前获取相关admin-client链接等
|
|
|
|
+
|
|
|
|
+```java
|
|
|
|
+import redis.clients.jedis.Jedis;
|
|
|
|
+import org.apache.kafka.clients.consumer.*;
|
|
|
|
+import org.apache.kafka.common.TopicPartition;
|
|
|
|
+
|
|
|
|
+import java.time.Duration;
|
|
|
|
+import java.util.*;
|
|
|
|
+
|
|
|
|
+public class KafkaConsumerExample {
|
|
|
|
+ private static final String LOCK_PREFIX = "KAFKA_LOCK_";
|
|
|
|
+ private static final int LOCK_EXPIRE_TIME = 10000; //锁过期时间
|
|
|
|
+ private static final String OFFSET_SET_NAME = "KAFKA_OFFSETS"; //已处理记录的偏移量集合名称
|
|
|
|
|
|
-我完成的第二个模块就是kafka监控模块,主要监控kafka集群的状态,与kafka-egale类似。主要操作就是调用kafka提供的kafka-admin-client接口和JMX进行操作,这里进行了相应的优化,主要包括复用同一个连接,多个用户同时调用接口,都会通过工厂模式,单例的产生一个admin-client,这里主要加了两段锁和volatile关键字保证线程安全,利用redis缓存topic的速率。最后对于该模块,采用热启动的方式优化,例如自动建表,提前获取相关admin-client链接等
|
|
|
|
|
|
+ private Consumer<String, String> consumer;
|
|
|
|
+ private Jedis jedis;
|
|
|
|
|
|
|
|
+ public KafkaConsumerExample(Properties props, String redisHost, int redisPort) {
|
|
|
|
+ consumer = new KafkaConsumer<>(props);
|
|
|
|
+ jedis = new Jedis(redisHost, redisPort);
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ public void consume() {
|
|
|
|
+ Map<TopicPartition, Long> lastProcessedOffsets = readLastProcessedOffsets(); //读取上次处理的偏移量
|
|
|
|
|
|
-我参与了一个煤矿大数据平台的开发项目,旨在为智能矿山服务平台提供支持并优化业务流程。在项目中,我负责设计和实现了数据采集和处理模块。我使用了Kafka作为数据采集工具,通过Kafka监控模块对数据进行实时监控和处理,利用Redis进行缓存以提高数据查询效率。
|
|
|
|
|
|
+ while (true) {
|
|
|
|
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); //批量拉取消息
|
|
|
|
+ //遍历每个分区的消息
|
|
|
|
+ for (TopicPartition partition : records.partitions()) {
|
|
|
|
+ List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
|
|
|
|
+
|
|
|
|
+ //获取分区的最新偏移量,并检查是否已经处理过该记录
|
|
|
|
+ long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
|
|
|
|
+ long lastProcessedOffset = lastProcessedOffsets.getOrDefault(partition, -1L);
|
|
|
|
+
|
|
|
|
+ if (lastProcessedOffset >= lastOffset) { //如果已经处理过,则跳过该分区
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //尝试获取 Redis 锁
|
|
|
|
+ String lockName = LOCK_PREFIX + partition.topic() + "_" + partition.partition();
|
|
|
|
+ boolean locked = false;
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ while (!locked) {
|
|
|
|
+ Long result = jedis.setnx(lockName, "1"); //使用 SETNX 命令尝试获取锁
|
|
|
|
+
|
|
|
|
+ if (result != null && result == 1) { //如果成功获取锁,则设置超时时间并跳出循环
|
|
|
|
+ jedis.expire(lockName, LOCK_EXPIRE_TIME);
|
|
|
|
+ locked = true;
|
|
|
|
+ } else { //否则等待一段时间后重试
|
|
|
|
+ Thread.sleep(50);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //处理消息
|
|
|
|
+ for (ConsumerRecord<String, String> record : partitionRecords) {
|
|
|
|
+ if (record.offset() <= lastProcessedOffset) { //如果已经处理过,则跳过该记录
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // TODO: 处理消息的业务逻辑
|
|
|
|
+
|
|
|
|
+ lastProcessedOffsets.put(partition, record.offset()); //将偏移量记录到最新处理偏移量表中
|
|
|
|
+ }
|
|
|
|
+ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset))); //手动提交偏移量
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ e.printStackTrace();
|
|
|
|
+ } finally {
|
|
|
|
+ if (locked) { //释放 Redis 锁
|
|
|
|
+ jedis.del(lockName);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!lastProcessedOffsets.isEmpty()) { //保存已处理的偏移量至 Redis
|
|
|
|
+ saveLastProcessedOffsets(lastProcessedOffsets);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private Map<TopicPartition, Long> readLastProcessedOffsets() {
|
|
|
|
+ Set<String> offsetKeys = jedis.smembers(OFFSET_SET_NAME);
|
|
|
|
+ Map<TopicPartition, Long> lastProcessedOffsets = new HashMap<>();
|
|
|
|
+
|
|
|
|
+ for (String key : offsetKeys) {
|
|
|
|
+ String[] parts = key.split("_");
|
|
|
|
+ String topic = parts[0];
|
|
|
|
+ int partition = Integer.parseInt(parts[1]);
|
|
|
|
+ long offset = Long.parseLong(jedis.get(key));
|
|
|
|
+
|
|
|
|
+ lastProcessedOffsets.put(new TopicPartition(topic, partition), offset);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return lastProcessedOffsets;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void saveLastProcessedOffsets(Map<TopicPartition, Long> lastProcessedOffsets) {
|
|
|
|
+ jedis.multi();
|
|
|
|
+
|
|
|
|
+ for (Map.Entry<TopicPartition, Long> entry : lastProcessedOffsets.entrySet()) {
|
|
|
|
+ String key = OFFSET_SET_NAME + "_" + entry.getKey().topic() + "_" + entry.getKey().partition();
|
|
|
|
+ jedis.set(key, String.valueOf(entry.getValue()));
|
|
|
|
+ jedis.sadd(OFFSET_SET_NAME, key);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ jedis.exec();
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+```
|
|
|
|
|
|
-为了保证项目的性能和稳定性,我针对Kafka监控模块进行了优化,包括对消费者组进行调优、调整分区和副本数量、设置合理的消息大小和缓冲区大小等。这些优化措施显著提升了系统的数据处理速度和稳定性。
|
|
|
|
|
|
|
|
-通过我在数据采集和处理模块的设计和实现,项目成功地实现了高效、可靠的数据采集和处理流程,为智能矿山服务平台提供了稳定的数据支持,并在业务流程优化方面取得了显著的成果。项目上线后,数据处理速度提升了30%,用户体验得到了明显的改善。此外,通过合理的资源管理和团队协作,项目按计划完成,并且得到了用户和业务部门的高度认可。
|
|
|
|
|
|
|
|
## kafka监控
|
|
## kafka监控
|
|
|
|
|
|
@@ -48,4 +155,6 @@ ack = 1
|
|
* 策略模式
|
|
* 策略模式
|
|
* 责任链模式
|
|
* 责任链模式
|
|
|
|
|
|
-流量分析项目是六所网络安全部门的一个子项目,它主要功能是识别工控设备异常流量,达到防范攻击和攻击溯源的功能,我主要负责的是工控协议流量解析部分,负责12种工控协议的解析。协议解析主要参照各个协议的官网进行翻译,主要通过策略模式进行协议选择,和通过责任链模式进行协议判断,然后将该jar包通过SPI注入到服务方的服务中去
|
|
|
|
|
|
+流量分析项目是六所网络安全部门的一个子项目,它主要功能是识别工控设备异常流量,达到防范攻击和攻击溯源的功能,我主要负责的是工控协议流量解析部分,负责12种工控协议的解析。协议解析主要参照各个协议的官网进行翻译,主要通过策略模式进行协议选择,选择协议之后通过责任链模式进行协议判断,主要实现了对方提供的接口,输出的是解析成功的json格式的对象,通过SPI机制生成一个三方jar包。
|
|
|
|
+
|
|
|
|
+该项目的主要难点就是协议需要参照协议官网进行一个字节一个字节的分析解析,并对其错误情况进行判断。
|