首页>>后端>>SpringBoot->SpringBoot 集成 Apache Kafka

SpringBoot 集成 Apache Kafka

时间:2023-11-29 本站 点击:0

前言

项目从0到1,没有什么基础建设,前段时间刚整合完nacos配置中心,nacos对于区分环境变量各种配置真是太友好了,之前可能写死在代码里面每次发布修改配置,对于开发和运维同学来说都是巨大的工作量,公司的kafka集群,Hbase集群通过vip区分不同的环境。结合nacos动态配置,简化配置的工作量,让开发同学专注与业务和工程性能上面来。

官网: https://spring.io/projects/spring-kafka

版本选择

其实官网也介绍了很多除了SpringBoot其他方式对于版本选择的方式。

SpringBoot很简单直接引入Pom文件依赖。

<!--SpringKafka依赖--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

Nacos 配置

#kafka配置spring.kafka.bootstrap-servers=127.0.0.1:9092#producer配置spring.kafka.producer.retries=0#每次批量发送消息的数量,produce积累到一定数据,一次发送spring.kafka.producer.batch-size=16384#produce积累数据一次发送,缓存大小达到buffer.memory就发送数据spring.kafka.producer.buffer-memory=33554432#ack机制:all,-1,0,1spring.kafka.producer.acks=1#指定消息key和消息体的编解码方式spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer#consumer#指定默认消费者groupid-->由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名spring.kafka.consumer.group-id=dzlog-smp-tiny-url-info-group#earliest和latest才有效,如果earliest重新0开始读取,如果是latest从logfile的offset读取。一般情况下我们都是设置earliestspring.kafka.consumer.auto-offset-reset=earliest#enable.auto.commit:true-->设置自动提交offsetspring.kafka.consumer.enable-auto-commit=true#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。spring.kafka.consumer.auto-commit-interval=100#指定消息key和消息体的编解码方式spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

Kafka Producer 生产者

用kafkaTemplate生产的就很简单了,和引用SpringRedisTemplate一样直接@Autoware进来就可以了。

@ComponentpublicclassTestProducer{@ResourceprivateKafkaTemplatekafkaTemplate;publicvoidsend(Stringmsg){kafkaTemplate.send("test_topic",msg);}}

Kafka Consumer

@ComponentpublicclassConsumer{@KafkaListener(topics="test_topic")publicvoidonMessage1(Stringmessage){//处理System.out.println(message);}}

单元测试

@SpringBootTest@ActiveProfiles("dev")publicclassTestProducerTest{@AutowiredprivateTestProducertestProducer;@Testpublicvoidsend(){for(inti=0;i<30;i++){RecordDTOrecordDTO=newRecordDTO();recordDTO.setTinyCode("SX03").setTinyUrl("z.dz.cn").setPhoneNo(18795969912L).setLongUrl("www.baidu.com").setTaskId(100L).setRecordId(101L);tinyUrlInfoProducer.send(JSON.toJSONString(recordDTO));}}}


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/SpringBoot/389.html