软件编程
位置:首页>> 软件编程>> java编程>> SpringBoot+Nacos+Kafka微服务流编排的简单实现

SpringBoot+Nacos+Kafka微服务流编排的简单实现

作者:热黄油啤酒  发布时间:2023-03-21 10:34:53 

标签:SpringBoot,Nacos,Kafka,流编排
目录
  • 前言

  • 准备工作

    • Nacos安装及使用入门

    • 准备三个SpringBoot服务,引入Nacos及Kafka

    • 业务解读

    • Nacos配置

      • 创建配置

    • 读取配置

      • 监听配置改变

      • 总结

        前言

        最近一直在做微服务开发,涉及了一些数据处理模块的开发,每个处理业务都会开发独立的微服务,便于后面拓展和流编排,学习了SpringCloud Data Flow等框架,感觉这个框架对于我们来说太重了,维护起来也比较麻烦,于是根据流编排的思想,基于我们目前的技术栈实现简单的流编排功能。

        简单的说,我们希望自己的流编排就是微服务可插拔,微服务数据入口及输出可不停机修改。

        准备工作

        Nacos安装及使用入门

        自己学习的话推荐使用docker安装,命令如下

        • 拉取镜像 docker pull nacos/nacos-server

        • 创建服务 docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server

        然后在浏览器输入 ip:8848/nacos 账号nacos 密码nacos

        SpringBoot+Nacos+Kafka微服务流编排的简单实现

        docker能够帮助我们快速安装服务,减少再环境准备花的时间

        准备三个SpringBoot服务,引入Nacos及Kafka


        <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.1.0.RELEASE</version>
        </parent>

        <dependency>
          <groupId>org.springframework.kafka</groupId>
          <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
          <groupId>com.alibaba.boot</groupId>
          <artifactId>nacos-config-spring-boot-starter</artifactId>
          <version>0.2.1</version>
        </dependency>

        配置文件


        spring:
         kafka:
           bootstrap-servers: kafka-server:9092
           producer:
             acks: all
           consumer:
             group-id: node1-group #三个服务分别为node1 node2 node3
             enable-auto-commit: false
        # 部署的nacos服务
        nacos:
         config:
           server-addr: nacos-server:8848

        建议配置本机host 就可以填写xxx-server 不用填写服务ip

        业务解读

        我们现在需要对三个服务进行编排,保障每个服务可以插拔,也可以调整服务的位子示意图如下:

        SpringBoot+Nacos+Kafka微服务流编排的简单实现

        • node1服务监听前置服务发送的数据流,输入的topic为前置数据服务输出topic

        • node2监听node1处理后的数据,所以node2监听的topic为node1输出的topic,node3同理,最终node3处理完成后将数据发送到数据流终点

        • 我们现在要调整流程 移除node2-server,我们只需要把node1-sink改变成node2-sink即可,这样我们这几个服务就可以灵活的嵌入的不同项目的数据流处理业务中,做到即插即用(当然,数据格式这些业务层面的都是需要约定好的)

        • 动态可调还可以保证服务某一节点出现问题时候,即时改变数据流向,比如发送到数暂存服务,避免Kafka中积累太多数据,吞吐不平衡

        Nacos配置

        创建配置

        通常流编排里面每个服务都有一个输入及输出,分别为input及sink,所以每个服务我们需要配置两个topic,分别是input-topic output-topic,我们就在nacos里面添加输入输出配置

        nacos配置项需要配置groupId,dataId,通常我们用服务名称作为groupId,配置项的名称作为dataId,如node1-server服务有一个input配置项,配置如下:

        SpringBoot+Nacos+Kafka微服务流编排的简单实现

        完成其中一个服务的配置,其它服务参考下图配置即可

        SpringBoot+Nacos+Kafka微服务流编排的简单实现

        读取配置


        @Configuration
        @NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
        // autoRefreshed=true指的是nacos中配置发生改变后会刷新,false代表只会使用服务启动时候读取到的值
        @NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
        public class NacosConfig {

        @NacosValue(value = "${input:}", autoRefreshed = true)
           private String input;

        @NacosValue(value = "${sink:}", autoRefreshed = true)
           private String sink;

        public String getInput() {
               return input;
           }

        public String getSink() {
               return sink;
           }
        }

        监听配置改变

        服务的输入需要在服务启动时候创建消费者,在topic发生改变时候重新创建消费者,移除旧topic的消费者,输出是业务驱动的,无需监听改变,在每次发送时候读取到的都是最新配置的topic,因为在上面的配置类中autoRefreshed = true,这个只会刷新nacosConfig中的配置值,服务需要知道配置改变去驱动消费的创建业务,需要创建nacos配置监听


        /**
        * 监听Nacos配置改变,创建消费者,更新消费
        */
        @Component
        public class ConsumerManager {

        @Value("${spring.kafka.bootstrap-servers}")
           private String servers;

        @Value("${spring.kafka.consumer.enable-auto-commit}")
           private boolean enableAutoCommit;

        @Value("${spring.kafka.consumer.group-id}")
           private boolean groupId;

        @Autowired
           private NacosConfig nacosConfig;

        @Autowired
           private KafkaTemplate kafkaTemplate;

        // 用于存放当前消费者使用的topic
           private String topic;

        // 用于执行消费者线程
           private ExecutorService executorService;

        /**
            * 监听input
            */
           @NacosConfigListener(dataId = "node1-server", groupId = "input")
           public void inputListener(String input) {
               // 这个监听触发的时候 实际NacosConfig中input的值已经是最新的值了 我们只是需要这个监听触发我们更新消费者的业务
               String inputTopic = nacosConfig.getInput();
               // 我使用nacosConfig中读取的原因是因为监听到内容是input=xxxx而不是xxxx,如果使用需要自己截取一下,nacosConfig中的内容框架会处理好,大家看一下第一张图的配置内容就明白了
               // 先检查当前局部变量topic是否有值,有值代表是更新消费者,没有值只需要创建即可
               if(topic != null) {
                   // 停止旧的消费者线程
                   executorService.shutdownNow();
                   executorService == null;
               }
               // 根据为新的topic创建消费者
               topic = inputTopic;
               ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
               executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2), threadFactory);
               // 执行消费业务
               executorService.execute(() -> consumer(topic));
           }

        /**
            * 创建消费者
            */
           public void consumer(String topic) {
               Properties properties = new Properties();
               properties.put("bootstrap.servers", servers);
               properties.put("enable.auto.commit", enableAutoCommit);
               properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
               properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
               properties.put("group.id", groupId);
               KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
               consumer.subscribe(Arrays.asList(topic));
               try {
                   while (!Thread.currentThread().isInterrupted()) {
                       Duration duration = Duration.ofSeconds(1L);
                       ConsumerRecords<String, String> records = consumer.poll(duration);
                       for (ConsumerRecord<String, String> record : records) {
                           String message = record.value();
                           // 执行数据处理业务 省略业务实现
                           String handleMessage =  handle(message);
                           // 处理完成后发送到下一个节点
                           kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
                       }
                   }
                   consumer.commitAsync();
               }
               } catch (Exception e) {
                   LOGGER.error(e.getMessage(), e);
               } finally {
                   try {
                       consumer.commitSync();
                   } finally {
                       consumer.close();
                   }
               }
           }
        }

        来源:https://juejin.cn/post/6997704312835047438

        0
        投稿

        猜你喜欢

        • JPA介绍JPA(Java Persistence API)是Sun官方提出的Java持久化规范。它为Java开发人员提供了一种对象/关联映
        • 本文实例为大家分享了java实现在线聊天系统的具体代码,供大家参考,具体内容如下本博客是博主在观看相关视频后写下的代码,希望能够帮助大家掌握
        • 有时候,根据业务逻辑的需求,我们想要获取到某个接口的所有实现类。在这里大致介绍两种方式:1.借助Spring容器实现Spring作为一个容器
        • 一、死锁简介在多道程序设计环境下,多个进程可能竞争一定数量的资源,。一个进程申请资源,如果资源不可用,那么进程进入等待状态。如果所申请的资源
        • 队列简介队列是一个有序列表,可以用数组或是链表来实现。遵循先入先出的原则。即先存入队列的数据,先取出,后存入的后取出。示意图:(使用数组模拟
        • 有时候我们在使用java编程的时候,想启动线程,怎么启动呢,下面来分享一下方法第一步在我们的电脑上打开eclipse,创建一个java项目,
        • 面向对象的程序是由对象组成的,每个对象包含对用户公开的特定功能部分和隐藏的实现部分。在面向对象程序设计(OOP)中,不必关心对象的具体实现。
        • TimeSpan 结构  表示一个时间间隔。命名空间:System 程序集:mscorlib(在 mscorlib.dll 中)说
        • RecyclerView上拉加载,先看效果:网上有很多这类得框架,不过在自己的项目只用到上拉加载的功能,所以自己封装一个简单点的。主要依赖B
        • Mybatis与Ibatis的区别: 1、Mybatis实现了接口绑定,使用更加方便 在ibatis2.x中我们需要在DAO的实现类中指定具
        • 首先需要在 AndroidManifest.xml 文件中添加「获取模拟定位信息」权限。<uses-permission androi
        • 在并发多线程的情况下,为了保证数据安全性,一般我们会对数据进行加锁,通常使用Synchronized或者ReentrantLock同步锁。S
        • 在我们实现某些功能时,可能会有倒计时的需求。比如发送短信验证码,发送成功后可能要求用户一段时间内不能再次发送,这时候我们就需要进行倒计时,时
        • 前言日志模块是每个项目中必须的,用来记录程序运行中的相关信息。一般在开发环境下使用DEBUG级别的日志输出,为了方便查看问题,而在线上一般都
        • SpringMVC支持的数据校验是JSR303的标准,通过在bean的属性上打上@NotNull、@Max等进行验证。JSR303提供有很多
        • 内存泄露内存泄漏就是在当前应用周期内不再使用的对象被GC Roots引用,导致不能回收,使实际可使用内存变小,通俗点讲,就是无法回收无用对象
        • 背景后台系统需要接入 企业微信登入,满足企业员工快速登入系统流程图简单代码说明自定义一套 springsecurity 认证逻辑主要就是 根
        • 目录1、通过session中的token验证步骤1:创建自定义注解步骤2:创建自定义 * (@slf4j是lombok的注解)步骤3:将自定
        • 近期发现C盘空闲空间剩余不多了,经过检查发现在C:\Users\<电脑用户名>\的目录下,有这两个文件夹空间比较大,这两文件夹分
        • 什么是枚举?枚举是JDK5引入的新特性。在某些情况下,一个类的对象是固定的,就可以定义为枚举。在实际使用中,枚举类型也可以作为一种规范,保障
        手机版 软件编程 asp之家 www.aspxhome.com