首页
 

通知公告

Kafka 批量消费

来源:欧亿体育点击:时间:2024-01-13 01:02

业务背景
项目有个需求需要统计IM聊天相关数据,原设计思想是在聊天产生时通过消息队列进行数据记录,利用rocketMQ实现。上线后发现由于内部IM活跃用户量级较大,MQ生产者生产消息过多,消费者实时消费会造成服务器CPU和硬盘读写压力,在改不了硬件配置的情况下,笔者通过了解到kafka批量消费的实现可解决这个问题,记录下该方案。

环境

kafka、Springboot、JDK8

依赖

使用的是Springboot v2.1.5.RELEASE版本,pom依赖如下:

        
        <dependency>
            <groupId>org.springframework.kafkagroupId>
            <artifactId>spring-kafkaartifactId>
            <version>2.2.6.RELEASEversion>
        dependency>

配置文件

生产者配置

核心配置是:

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=10000

单条消费和提交有时候会影响性能,spring-kafka提供了批量拉取数据和手动提交的策略

#设置是否批量消费,默认 single(单条),batch(批量)
spring.kafka.listener.type=batch
# 手动
spring.kafka.listener.ack-mode=manual_immediate
# 集群地址
spring.kafka.bootstrap-servers=192.168.2.135:9092
# 重试次数
spring.kafka.producer.retries=3
# 应答级别
# acks=0 把消息发送到kafka就认为发送成功
# acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
# acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
spring.kafka.producer.acks=all
# 批量处理的最大大小 单位 byte
spring.kafka.producer.batch-size=4096
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.buffer-memory=33554432
# 客户端ID
spring.kafka.producer.client-id=im-kafka
# Key 序列化类
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Value 序列化类
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 消息压缩:none、lz4、gzip、snappy,默认为 none。
spring.kafka.producer.compression-type=gzip
# 发送延时,当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka
spring.kafka.producer.properties.linger.ms=1000
# KafkaProducer.send()partitionsFor() 方法的最长阻塞时间 单位 ms
spring.kafka.producer.properties.max.block.ms=6000

消费者配置

核心配置是:

kafka:
    listener:
      # 手动
      ack-mode: manual_immediate
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: batch
      # 自动提交 offset 默认 true
      enable-auto-commit: false
      # 批量消费最大数量
      max-poll-records: 100

在配置文件中关闭自动提交,开启手动提交和批量消费就可以批量消费了,但是最后需要手动提交offset

  kafka:
    listener:
      # 手动
      ack-mode: manual_immediate
      #设置是否批量消费,默认 single(单条),batch(批量)
      type: batch
      # 集群地址
    bootstrap-servers: 192.168.2.135:9092
      # 消费者配置
    consumer:
      # 默认消费者组
      group-id: imStatisticsConsumerGroup
      # 自动提交 offset 默认 true
      enable-auto-commit: false
      # 自动提交的频率 单位 ms
      auto-commit-interval: 1000
      # 批量消费最大数量
      max-poll-records: 100
      # Key 反序列化类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value 反序列化类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 当kafka中没有初始offset或offset超出范围时将自动重置offset
      # earliest:重置为分区中最小的offset
      # latest:重置为分区中最新的offset(消费分区中新产生的数据)
      # none:只要有一个分区不存在已提交的offset,就抛出异常
      auto-offset-reset: latest
      properties:
        session:
          timeout:
            # session超时,超过这个时间consumer没有发送心跳,就会触发rebalance操作
            ms: 120000
        request:
          timeout:
            # 请求超时
            ms: 120000

生产者端代码

    public void sendToImStatistics(List<ImChatStatistics> statistics) {
        kafkaTemplate.send(KAFKA_IM_CHAT_STATISTICS, JsonUtils.toString(statistics));
    }

消费者端代码

    @KafkaListener(topics = {"imChatStatistics"}, groupId = "{imStatisticsConsumerGroup}")
    public void listen(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment acknowledgment) {
        try {
            if (CollectionUtils.isEmpty(consumerRecords)) {
                return;
            }
            LogUtils.info("KafkaImStatisticsListener 处理推送消息[data大小: {}]", consumerRecords.size());
            List<ImChatStatistics> totalList = new ArrayList<>();
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                List<ImChatStatistics> list = JSON.parseArray(consumerRecord.value(), ImChatStatistics.class);
                list.stream().forEach(item -> {
                    item.setWeek(DateUtils.getWeek(item.getDate()));
                });
                totalList.addAll(list);
            }
            imChatStatisticsMapper.batchInsertOrUpdate(totalList);
            // 手动提交offset
            acknowledgment.acknowledge();
        } catch (Exception e) {
            LogUtils.error("ImChartConsumer 消息消费失败 :" + e.getMessage(), e);
        }
    }