springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

一、背景

    因业务发展需要,需要对接kafka,快速批量接收消息日志,避免消息日志累积过多,必须做到数据处理后,动态插入到库表(相同表结构,不同表名)下,并且还要支持批量事务提交,实现消息快速消费。(注意:源码文章最后有获取方式)

二、核心代码

2.1、开启批量、并发消费

kafka:    bootstrap-servers: 10.1.*.*:9092     #服务器的ip及端口,可以写多个,服务器之间用“:”间隔    producer: #生产者配置       key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer: #消费者配置      #指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名      group-id: myGroup                 #设置消费者的组id defaultGroup      enable-auto-commit: true  #设置自动提交offset      auto-commit-interval: 2000  #默认值为5000      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      #值的反序列化方式      value-serializer: org.apache.kafka.common.serialization.StringSerializer      auto-offset-reset: latest      max-poll-records: 2000  #批量一次最大拉取数据量 默认500    listener:      # poll-timeout: 1000      type: batch  # 开启批量消费      concurrency: 3  #指定listener 容器中的线程数,用于提高并发量    properties:      session:        timeout:          ms: 120000  #默认10000        max:          poll:            interval:              ms: 600000  #默认300000(5分钟)

     说明:type: batch  # 开启批量消费, max-poll-records: 2000,批量消费每次最多消费记录数。这里设置 max-poll-records是2000,并不是说如果没有达到2000条消息,我们就一直等待。而是说一次poll最多返回的记录数为2000。concurrency: 3  #指定listener 容器中的线程数,用于提高并发量。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。例如:设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition。

2.2、多线程异步配置

    具体配置参加前面文章:SpringBoot使用@Async实现多线程异步

    注意:在启动类上需要加上注解@EnableAsync,开启异步。

2.3、redis相关配置

1、yml相关配置:

spring:  redis:    # 地址    host: 127.0.0.1    # 端口,默认为6379    port: 6379    # 密码    # 连接超时时间    timeout: 10s    lettuce:      pool:        # 连接池中的最小空闲连接        min-idle: 0        # 连接池中的最大空闲连接        max-idle: 8        # 连接池的最大数据库连接数        max-active: 8        # #连接池最大阻塞等待时间(使用负值表示没有限制)        max-wait: -1ms

2、RedisConfig配置

package com.wonders.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;import com.fasterxml.jackson.annotation.PropertyAccessor;import com.fasterxml.jackson.databind.ObjectMapper;import org.springframework.cache.annotation.CachingConfigurerSupport;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer;
/** * 〈自定义redis序列化方式〉 * @author yangyalin * @create 2018/11/1 * @since 1.0.0 */@Configurationpublic class RedisConfig extends CachingConfigurerSupport {    /**     * @Author yangyalin     * @Description redisTemplate序列化使用的jdkSerializeable, 存储二进制字节码(默认), 所以自定义序列化类     * 用于存储可视化内容     * @Date 15:07 2018/11/1     * @Param [redisConnectionFactory]     * @return org.springframework.data.redis.core.RedisTemplate     **/    @Bean    public RedisTemplate        RedisTemplate        redisTemplate.setConnectionFactory(redisConnectionFactory);        //使用jackson2JsonRedisSerializer替换默认序列化        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer=new Jackson2JsonRedisSerializer(Object.class);        ObjectMapper objectMapper=new ObjectMapper();        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);        //设置key和value的序列化规则        redisTemplate.setKeySerializer(new StringRedisSerializer());        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);        redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);        redisTemplate.afterPropertiesSet();        return redisTemplate;    }}

2.4、动态表名

        "insertMsgInfoTemp" parameterType="com.wonders.entity.KafkaMsgConfig">      INSERT INTO ${logTableName}("EVN_LOG_ID", "TABLE_NAME", "OPERATION", "PK_VALUE1", "PK_VALUE2",           "PK_VALUE3", "PK_VALUE4", "PK_VALUE5", "TRANS_FLAG", "PKS", "BASE_CODE", "PLA_BRANCH_CODE",           "CREATE_TIME","MSG_PRODUCE_TIME")      VALUES (#{id,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR},            #{pk1,jdbcType=VARCHAR}, #{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},            #{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR}, 'Y',            #{pks,jdbcType=VARCHAR}, #{baseCode,jdbcType=VARCHAR},            #{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})    

    说明:1、#{} :会根据参数的类型进行处理,当传入String类型,则会为参数加上双引号(占位符);2、${} :将参数取出不做任何处理,直接放入语句中,就是简单的字符串替换(替换符)。

2.5、sql批量提交

    public void batchInsert(List kafkaMsgInfoList) throws Exception{        //如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交        // 创建session实列        SqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory");        // 开启批量处理模式 BATCH 、关闭自动提交事务 false        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);        KafkaMsgConfigMapper KafkaMsgMapper = sqlSession.getMapper(KafkaMsgConfigMapper.class);        int BATCH = 1000;        for (int i = 0,size=kafkaMsgInfoList.size(); i < size; i++) {            //循环插入 + 开启批处理模式            KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));            if (i != 0 && i % BATCH == 0) {                sqlSession .commit();            }        }        // 一次性提交事务        sqlSession.commit();        // 关闭资源        sqlSession.close();    }

 

2.6、业务代码

    @KafkaListener(topics = {"${mykafka.topics:mytopic}"})    public void myMQConsumer(List msgList){        log.info("接收到的消息条数size:"+msgList.size());        //计算程序耗时时间        StopWatch stopWatch = new StopWatch();        // 开始计时        stopWatch.start();        this.getKafkaMsgAndDel(msgList);  //2、接收kafka日志并解析        stopWatch.stop();        log.info("本次任务耗时(秒):" + stopWatch.getLastTaskTimeMillis()/1000 + "s");    }

三、测试结果

序号
kafka数量(万条)
消耗(秒)
1 1 3
2 10 13
3 100 120

阅读全文
下载说明:
1、本站所有资源均从互联网上收集整理而来,仅供学习交流之用,因此不包含技术服务请大家谅解!
2、本站不提供任何实质性的付费和支付资源,所有需要积分下载的资源均为网站运营赞助费用或者线下劳务费用!
3、本站所有资源仅用于学习及研究使用,您必须在下载后的24小时内删除所下载资源,切勿用于商业用途,否则由此引发的法律纠纷及连带责任本站和发布者概不承担!
4、本站站内提供的所有可下载资源,本站保证未做任何负面改动(不包含修复bug和完善功能等正面优化或二次开发),但本站不保证资源的准确性、安全性和完整性,用户下载后自行斟酌,我们以交流学习为目的,并不是所有的源码都100%无错或无bug!如有链接无法下载、失效或广告,请联系客服处理!
5、本站资源除标明原创外均来自网络整理,版权归原作者或本站特约原创作者所有,如侵犯到您的合法权益,请立即告知本站,本站将及时予与删除并致以最深的歉意!
6、如果您也有好的资源或教程,您可以投稿发布,成功分享后有站币奖励和额外收入!
7、如果您喜欢该资源,请支持官方正版资源,以得到更好的正版服务!
8、请您认真阅读上述内容,注册本站用户或下载本站资源即您同意上述内容!
原文链接:https://www.shuli.cc/?p=14763,转载请注明出处。
0

评论0

显示验证码
没有账号?注册  忘记密码?