中煤项目.md 7.4 KB

kafka模块

煤矿大数据平台作为智能矿山基础服务平台的一部分,承担智能矿山服务平台中煤矿数据的采集、传输、存储、分发、计算等任务,致力于达到打通煤矿数据壁垒、整合数据资源、规范煤矿数据等目标。我主要负责数据采集和管道监控这个两部分的功能。主要应用的组件是kafka,为了与下层数据收集器解耦,同时也为了缓解系统IO压力,引入了kafka模块做数据采集的主要组件。

该模块主要采集传感器生成的实时数据,为了保证数据的有序,所以要保证数据的顺序消费,因为采集的数据将按照时间顺序存入到hbase中。在生产者那一端可以从mysql或者kafka管道中收集数据实时数据,为了保证顺序消费,综合考虑将重试次数设置为0,因为这个系统更多的是在意吞吐量,对于消息是否丢失不是很在意。在消费者那一端则采用线程池对消费的数据进行多线程处理,主要采取策略模式,按照选择的不同策略进行计算,例如区间或者压强的简单计算,或者是数值替换,也支持三方jar包,然后通过futuer收集数据。最后存储到hbase中。为了保证消息的等幂性,消费者端我们采取redis进行缓存分区偏移量,缓存超时时间默认为3分钟,手动提交ACK。等方式保证消息顺序消费。

我完成的第二个模块就是kafka监控模块,主要监控kafka集群的状态,与kafka-egale类似。主要操作就是调用kafka提供的kafka-admin-client接口和JMX进行操作,这里进行了相应的优化,主要包括复用同一个连接,使用工厂模式,生产一个admin-client,这里主要加了懒汉式单例来保证线程安全,利用redis缓存topic的速率。最后对于该模块,采用热启动的方式优化,例如自动建表,提前获取相关admin-client链接等

import redis.clients.jedis.Jedis;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class KafkaConsumerExample {
    private static final String LOCK_PREFIX = "KAFKA_LOCK_";
    private static final int LOCK_EXPIRE_TIME = 10000; //锁过期时间
    private static final String OFFSET_SET_NAME = "KAFKA_OFFSETS"; //已处理记录的偏移量集合名称

    private Consumer<String, String> consumer;
    private Jedis jedis;

    public KafkaConsumerExample(Properties props, String redisHost, int redisPort) {
        consumer = new KafkaConsumer<>(props);
        jedis = new Jedis(redisHost, redisPort);
    }

    public void consume() {
        Map<TopicPartition, Long> lastProcessedOffsets = readLastProcessedOffsets(); //读取上次处理的偏移量

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); //批量拉取消息
            //遍历每个分区的消息
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                //获取分区的最新偏移量,并检查是否已经处理过该记录
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                long lastProcessedOffset = lastProcessedOffsets.getOrDefault(partition, -1L);

                if (lastProcessedOffset >= lastOffset) { //如果已经处理过,则跳过该分区
                    continue;
                }

                //尝试获取 Redis 锁
                String lockName = LOCK_PREFIX + partition.topic() + "_" + partition.partition();
                boolean locked = false;

                try {
                    while (!locked) {
                        Long result = jedis.setnx(lockName, "1"); //使用 SETNX 命令尝试获取锁

                        if (result != null && result == 1) { //如果成功获取锁,则设置超时时间并跳出循环
                            jedis.expire(lockName, LOCK_EXPIRE_TIME);
                            locked = true;
                        } else { //否则等待一段时间后重试
                            Thread.sleep(50);
                        }
                    }

                    //处理消息
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        if (record.offset() <= lastProcessedOffset) { //如果已经处理过,则跳过该记录
                            continue;
                        }

                        // TODO: 处理消息的业务逻辑

                        lastProcessedOffsets.put(partition, record.offset()); //将偏移量记录到最新处理偏移量表中
                    }
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastProcessedOffset))); //手动提交偏移量
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    if (locked) { //释放 Redis 锁
                        jedis.del(lockName);
                    }
                }
            }

            if (!lastProcessedOffsets.isEmpty()) { //保存已处理的偏移量至 Redis
                saveLastProcessedOffsets(lastProcessedOffsets);
            }
        }
    }

    private Map<TopicPartition, Long> readLastProcessedOffsets() {
        Set<String> offsetKeys = jedis.smembers(OFFSET_SET_NAME);
        Map<TopicPartition, Long> lastProcessedOffsets = new HashMap<>();

        for (String key : offsetKeys) {
            String[] parts = key.split("_");
            String topic = parts[0];
            int partition = Integer.parseInt(parts[1]);
            long offset = Long.parseLong(jedis.get(key));

            lastProcessedOffsets.put(new TopicPartition(topic, partition), offset);
        }

        return lastProcessedOffsets;
    }

    private void saveLastProcessedOffsets(Map<TopicPartition, Long> lastProcessedOffsets) {
        jedis.multi();

        for (Map.Entry<TopicPartition, Long> entry : lastProcessedOffsets.entrySet()) {
            String key = OFFSET_SET_NAME + "_" + entry.getKey().topic() + "_" + entry.getKey().partition();
            jedis.set(key, String.valueOf(entry.getValue()));
            jedis.sadd(OFFSET_SET_NAME, key);
        }

        jedis.exec();
    }
}

kafka监控

redis缓存

复用连接 --- 多个线程复用AdminClient

加锁

spi

jmx -- 多个端口

热启动 -- 自动化

kafka消费

线程池多线程消费 --- future CPU -N +1 IO 2N

幂等处理 -- redis缓存

hash计算多个分区????

ack = 1

顺序消费

采集任务合并

策略模式选择策略

流量分析项目

  • SPI
  • 策略模式
  • 责任链模式

流量分析项目是六所网络安全部门的一个子项目,它主要功能是识别工控设备异常流量,达到防范攻击和攻击溯源的功能,我主要负责的是工控协议流量解析部分,负责12种工控协议的解析。协议解析主要参照各个协议的官网进行翻译,主要通过策略模式进行协议选择,选择协议之后通过责任链模式进行协议判断,主要实现了对方提供的接口,输出的是解析成功的json格式的对象,通过SPI机制生成一个三方jar包。

该项目的主要难点就是协议需要参照协议官网进行一个字节一个字节的分析解析,并对其错误情况进行判断。