软件编程
位置:首页>> 软件编程>> java编程>> springboot项目配置多个kafka的示例代码

springboot项目配置多个kafka的示例代码

作者:CccccDi  发布时间:2023-11-23 23:15:29 

标签:springboot,kafka

1.spring-kafka

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.3.5.RELEASE</version>
</dependency>

2.配置文件相关信息

kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并发消费的线程数 (通常与partition数量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false

kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false

3.kafka配置类

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Value("${kafka.consumer.group.id}")
   private String groupId;

@Value("${kafka.consumer.concurrency}")
   private int concurrency;

@Value("${kafka.consumer.enable.auto.commit}")
   private String autoCommit;

@Value("${kafka.bootstrap-servers}")
   private String bootstrapServer;

@Value("${kafka.consumer.group.id.pic}")
   private String groupIdPic;

@Value("${kafka.consumer.concurrency.pic}")
   private int concurrencyPic;

@Value("${kafka.consumer.enable.auto.commit.pic}")
   private String autoCommitPic;

@Value("${kafka.bootstrap-servers.pic}")
   private String bootstrapServerPic;

@Bean
   public ConsumerFactory<String, String> consumerFactory() {
       String bootstrapServers = bootstrapServer;
       Map<String, Object> configProps = new HashMap<>(16);
       configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
       configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
       return new DefaultKafkaConsumerFactory<>(configProps);
   }

@Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory =
               new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactory());
       factory.setConcurrency(concurrency);
       factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
       return factory;
   }

@Bean
   public ConsumerFactory<String, String> consumerFactoryPic() {
       String bootstrapServers = bootstrapServerPic;
       Map<String, Object> configProps = new HashMap<>(16);
       configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
       configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
       configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
       configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
       return new DefaultKafkaConsumerFactory<>(configProps);
   }

@Bean
   public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
       ConcurrentKafkaListenerContainerFactory<String, String> factory =
               new ConcurrentKafkaListenerContainerFactory<>();
       factory.setConsumerFactory(consumerFactoryPic());
       factory.setConcurrency(concurrencyPic);
       factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
       return factory;
   }
}

4.消费主题消息

@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
   public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
       try {
           String jsonString = message.value();
           if (StringUtils.isNoneBlank(jsonString)) {
               log.info("消费:{}",jsonString);
               //TODO ....
           }
       } catch (Exception e) {
           log.error(" receive topic error ", e);
       } finally {
           ack.acknowledge();
       }
   }

@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
   public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
       try {
           if (StringUtils.isNoneBlank(message.value())) {
                 //TODO ....
           }
       } catch (Exception e) {
           logger.error(" receive topic error ", e);
       } finally {
           ack.acknowledge();
       }
   }

来源:https://www.cnblogs.com/128-cdy/p/17293059.html

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com