软件编程
位置:首页>> 软件编程>> java编程>> Redisson延迟队列执行流程源码解析

Redisson延迟队列执行流程源码解析

作者:老程不秃  发布时间:2021-12-07 12:03:46 

标签:Redisson,延迟队列,执行流程

引言

在实际分布式项目中延迟任务一般不会使用JDK自带的延迟队列,因为它是基于JVM内存存储,没有持久化操作,所以当服务重启后就会丢失任务。

在项目中可以使用MQ死信队列或redisson延迟队列进行处理延迟任务,本篇文章将讲述redisson延迟队列的使用demo和其执行源码。

demo示例

通过脚手架创建一个简易springboot项目,引入redisson的maven依赖,并简单配置redisson连接属性。

<!-- redisson引用 -->
   <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson</artifactId>
       <version>3.16.6</version>
   </dependency>
@Configuration
public class RedissonConfig {
   @Value("${spring.redis.host}")
   private String host;
   @Value("${spring.redis.port}")
   private String port;
   /**
    * 获取redissonClient实例
    *
    * @return
    * @throws Exception
    */
   @Bean
   public RedissonClient getRedisson() {
       Config config = new Config();
       String address = "redis://" + host + ":" + port;
       config.useSingleServer().setAddress(address);
       return Redisson.create(config);
   }
}

定义一个redisson延迟队列插入和获取任务处理类RedissonQueueHandle,通过控制spring的bean加载周期开启独立线程获取延迟任务。这里获取延迟任务使用了三种方法,除了第一种阻塞式获取任务方法外,其他两种方法都不是百分比按照延迟参数获取到任务,因为是时间间隔定时循环获取延迟任务。

/**
* redisson延迟队列处理器
*
* @author zrh
*/
@Slf4j
@Component
public class RedissonQueueHandle implements InitializingBean {
   private final RBlockingQueue<RedisDataEntity<?>> queue;
   private final RDelayedQueue<RedisDataEntity<?>> delayedQueue;
   public RedissonQueueHandle (RedissonClient client) {
       this.queue = client.getBlockingQueue("redisson:queue");
       this.delayedQueue = client.getDelayedQueue(queue);
   }
   @Override
   public void afterPropertiesSet () {
       // 开一个线程阻塞式获取任务
       thread();
       // 使用netty时间轮循环获取任务
//        watchDog(new HashedWheelTimer());
       // 使用线程池定时获取任务
//        schedule();
   }
   private void thread () {
       new Thread(() -> {
           while (true) {
               try {
                   RedisDataEntity entity = queue.take();
                   log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime());
               } catch (Exception e) {
               }
           }
       }, "zrh").start();
   }
   private void watchDog (final HashedWheelTimer timer) {
       timer.newTimeout(timeout -> {
           RedisDataEntity entity = queue.poll();
           if (null != entity) {
               log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime());
           }
           watchDog(timer);
       }, 3, TimeUnit.SECONDS);
   }
   private void schedule () {
       Executors.newScheduledThreadPool(1).scheduleWithFixedDelay(() -> {
           RedisDataEntity entity = queue.poll();
           if (null != entity) {
               log.info("本次获取数据:{},耗时:{}", entity, System.currentTimeMillis() - entity.getTime());
           }
       }, 5, 5, TimeUnit.SECONDS);
   }
   /**
    * 放入redis,定时过期
    *
    * @param entity
    */
   public void offer (RedisDataEntity entity) {
       try {
           delayedQueue.offer(entity, entity.getExpire(), TimeUnit.MILLISECONDS);
       } catch (Exception e) {
           log.error("放入redis延迟队列异常", e);
       }
   }
}

放入redisson延迟队列可以是字符串也可以是对象RedisDataEntity,因为有进行IO磁盘存储操作,所以必须实现Serializable序列化接口。

/**
* @Author: ZRH
* @Date: 2022/1/10 11:54
*/
@Data
public class RedisDataEntity<T> implements Serializable {
   /**
    * 数据
    */
   private final T data;
   /**
    * 过期时间(单位:毫秒)
    */
   private final Long expire;
   /**
    * 添加时间
    */
   private final Long time;
   public RedisDataEntity (T data, Long expire, Long time) {
       this.data = data;
       this.expire = expire;
       this.time = time;
   }
}

然后开一个插入数据接口:

/**
* @Author: ZRH
* @Date: 2022/1/10 11:45
*/
@Slf4j
@RestController
public class IndexController {
   private final RedissonQueueHandle redisHandle;
   public IndexController (RedissonQueueHandle redisHandle) {
       this.redisHandle = redisHandle;
   }
   @PostMapping("redissonQueue")
   public String redissonQueue (@RequestParam String data, @RequestParam Long expire) {
       RedisDataEntity entity = new RedisDataEntity(data, expire, System.currentTimeMillis());
       log.info("本次添加数据:{}", entity);
       redisHandle.offer(entity);
       return "ok";
   }
}
访问接口设置延迟30秒:http://localhost:8802/redissonQueue?data=a&expire=30000,打印结果如下
2022-01-14 14:21:52.140  INFO 10808 --- [nio-8802-exec-1] c.r.web.controller.IndexController       : 本次添加数据:RedisDataEntity(data=a, expire=30000, time=1642141312135)
2022-01-14 14:21:52.887  INFO 10808 --- [nio-8802-exec-2] c.r.web.controller.IndexController       : 本次添加数据:RedisDataEntity(data=a, expire=30000, time=1642141312887)
2022-01-14 14:22:22.240  INFO 10808 --- [            zrh] c.r.web.redis.RedissonQueueHandle        : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1642141312135),耗时:30105
2022-01-14 14:22:22.914  INFO 10808 --- [            zrh] c.r.web.redis.RedissonQueueHandle        : 本次获取数据:RedisDataEntity(data=a, expire=30000, time=1642141312887),耗时:30027

初始执行流程源码解析 redisson延迟队列最终都是和redis服务进行交互的,那可以使用monitor命令查看redis中执行了哪些命令,这样对了解其执行流程有很大帮助。

Redisson延迟队列执行流程源码解析

上图是项目启动时,对redis发送的几个指令

"SUBSCRIBE":订阅队列"redisson_delay_queue_channel:{redisson:queue}",里面有个定时任务通过该队列获取数据

"zrangebyscore":获取"redisson_delay_queue_timeout:{redisson:queue}"集合中排序score值在0到1642148406748(当前时间戳)内的前100元素

"zrange":获取"redisson_delay_queue_timeout:{redisson:queue}"集合中第一个元素,用于获取下一个元素的到期时间

"BLPOP":取出并移除"redisson:queue"列表里的第一个元素,如果没有元素就一直等待阻塞。所以这里会阻塞着

"rpush":如果指令"zrangebyscore"获取到了元素,那就将元素推送到队列redisson:queue内

"lrem":如果指令"zrangebyscore"获取到了元素,那就删除队列"redisson_delay_queue:{redisson:queue}内元素为v的第一个元素

SUBSCRIBE指令

进入RedissonDelayedQueue延迟队列的构造函数,里面就有上述执行指令的lua脚本命令(为了不影响篇幅删了一部分代码,下同):

......
   protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) {
       super(codec, commandExecutor, name);
       // list结构,用于延迟队列的订阅发布
       channelName = prefixName("redisson_delay_queue_channel", getRawName());
       // list结构,存放元素原始顺序
       queueName = prefixName("redisson_delay_queue", getRawName());
       // zset结构,存放未到期元素,并按照过期时间进行排好序
       timeoutSetName = prefixName("redisson_delay_queue_timeout", getRawName());
       QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {
           @Override
           protected RFuture<Long> pushTaskAsync() {
               return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                       "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "
                     + "if #expiredValues > 0 then "
                         + "for i, v in ipairs(expiredValues) do "
                             + "local randomId, value = struct.unpack('dLc0', v);"
                             + "redis.call('rpush', KEYS[1], value);"
                             + "redis.call('lrem', KEYS[3], 1, v);"
                         + "end; "
                         + "redis.call('zrem', KEYS[2], unpack(expiredValues));"
                     + "end; "
                       // get startTime from scheduler queue head task
                     + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "
                     + "if v[1] ~= nil then "
                        + "return v[2]; "
                     + "end "
                     + "return nil;",
                     Arrays.<Object>asList(getRawName(), timeoutSetName, queueName),
                     System.currentTimeMillis(), 100);
           }
           @Override
           protected RTopic getTopic() {
               return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
           }
       };
       queueTransferService.schedule(queueName, task);        
       this.queueTransferService = queueTransferService;
   }

继续跟进queueTransferService.schedule(queueName, task)方法,因为第一次进入tasks集合,所以最后执行start()方法:

......
   private final ConcurrentMap<String, QueueTransferTask> tasks = new ConcurrentHashMap<>();
   public synchronized void schedule(String name, QueueTransferTask task) {
       QueueTransferTask oldTask = tasks.putIfAbsent(name, task);
       if (oldTask == null) {
           task.start();
       } else {
           oldTask.incUsage();
       }
   }

进入QueueTransferTask,继续跟进schedulerTopic.addListener(...)方法:

......
   private int messageListenerId;
   private int statusListenerId;
   public void start() {
       RTopic schedulerTopic = getTopic();
       statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
           @Override
           public void onSubscribe(String channel) {
               pushTask();
           }
       });
       messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
           @Override
           public void onMessage(CharSequence channel, Long startTime) {
               scheduleTask(startTime);
           }
       });
   }

然后会进入PublishSubscribeService.subscribe(...)方法:

注意:这里继续调用重载方法subscribe(...)时设置了参数:PubSubType.SUBSCRIBE

......
   public RFuture<PubSubConnectionEntry> subscribe(Codec codec, ChannelName channelName, RedisPubSubListener<?>... listeners) {
       return subscribe(PubSubType.SUBSCRIBE, codec, channelName, getEntry(channelName), listeners);
   }
   private RFuture<PubSubConnectionEntry> subscribe(PubSubType type, Codec codec, ChannelName channelName, MasterSlaveEntry entry, RedisPubSubListener<?>... listeners) {
       RPromise<PubSubConnectionEntry> promise = new RedissonPromise<>();
       AsyncSemaphore lock = getSemaphore(channelName);
       // 创建一个线程任务放入lock对象
       lock.acquire(() -> {
           if (promise.isDone()) {
               lock.release();
               return;
           }
           subscribe(codec, channelName, entry, promise, type, lock, listeners);
       });
       return promise;
   }

AsyncSemaphore对象的acquire(...)方法会把线程任务放入自身队列listeners里,然后依次读取执行线程任务;

public class AsyncSemaphore {
   private final AtomicInteger counter;
   private final Queue<Runnable> listeners = new ConcurrentLinkedQueue<>();
   public void acquire(Runnable listener) {
       listeners.add(listener);
       tryRun();
   }
   private void tryRun() {
       if (counter.decrementAndGet() >= 0) {
           Runnable listener = listeners.poll();
           if (listener == null) {
               counter.incrementAndGet();
               return;
           }
           listener.run();
       } else {
           if (counter.incrementAndGet() > 0) {
               tryRun();
           }
       }
   }  
}

然后继续跟进方法subscribe(codec, channelName, entry, promise, type, lock, listeners):

.....
   private void subscribe(Codec codec, ChannelName channelName, MasterSlaveEntry entry,
                           RPromise<PubSubConnectionEntry> promise, PubSubType type,
                           AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
       PubSubConnectionEntry connEntry = name2PubSubConnection.get(new PubSubKey(channelName, entry));
       if (connEntry != null) {
           addListeners(channelName, promise, type, lock, connEntry, listeners);
           return;
       }
       freePubSubLock.acquire(() -> {
           if (promise.isDone()) {
               lock.release();
               freePubSubLock.release();
               return;
           }
           MasterSlaveEntry msEntry = Optional.ofNullable(connectionManager.getEntry(entry.getClient())).orElse(entry);
           // 第一次进入entry2PubSubConnection集合为null,所以使用默认值,最后 freeEntry == null
           PubSubEntry freePubSubConnections = entry2PubSubConnection.getOrDefault(msEntry, new PubSubEntry());
           PubSubConnectionEntry freeEntry = freePubSubConnections.getEntries().peek();
           if (freeEntry == null) {
               freePubSubLock.release();
               connect(codec, channelName, msEntry, promise, type, lock, listeners);
               return;
           }
           ......
       });
   }

继续跟进方法connect(codec, channelName, msEntry, promise, type, lock, listeners):

......
   private void connect(Codec codec, ChannelName channelName,
                        MasterSlaveEntry msEntry, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, RedisPubSubListener<?>... listeners) {
       RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(msEntry, channelName);
       promise.onComplete((res, e) -> {...});
       connFuture.onComplete((conn, ex) -> {
           if (ex != null) {...}
           freePubSubLock.acquire(() -> {
               PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
               int remainFreeAmount = entry.tryAcquire();
               PubSubKey key = new PubSubKey(channelName, msEntry);
               PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(key, entry);
               if (oldEntry != null) {...}
               if (remainFreeAmount > 0) {
                   addFreeConnectionEntry(channelName, entry);
               }
               freePubSubLock.release();
               RFuture<Void> subscribeFuture = addListeners(channelName, promise, type, lock, entry, listeners);
               ChannelFuture future;
               // 这里通过上述重载方法传递的参数可知,最后走else逻辑
               if (PubSubType.PSUBSCRIBE == type) {
                   future = entry.psubscribe(codec, channelName);
               } else {
                   future = entry.subscribe(codec, channelName);
               }
               future.addListener((ChannelFutureListener) future1 -> {
                   if (!future1.isSuccess()) {...}
                   connectionManager.newTimeout(timeout ->
                           subscribeFuture.cancel(false),
                           config.getTimeout(), TimeUnit.MILLISECONDS);
               });
           });
       });
   }

该方法中支线内容不表述,主要看方法 entry.subscribe(codec, channelName),最后进入RedisPubSubConnection.async(...)方法,就是发送SUBSCRIBE指令的流程:

Redisson延迟队列执行流程源码解析

zrangebyscore和zrange指令

订阅指令SUBSCRIBE发出后,在QueueTransferTask.start()方法里添加的 * 触发了,就会执行pushTask()

pushTaskAsync()方法执行完(lua脚本执行完),就会开启一个定时任务scheduleTask()

......
   protected abstract RTopic getTopic();
   protected abstract RFuture<Long> pushTaskAsync();
   private void pushTask() {
       // 这个抽象方法在之前构建RedissonDelayedQueue对象的构造函数里有实现,最后返回元素过期时间
       RFuture<Long> startTimeFuture = pushTaskAsync();
       startTimeFuture.onComplete((res, e) -> {
           if (e != null) {
               if (e instanceof RedissonShutdownException) {
                   return;
               }
               log.error(e.getMessage(), e);
               scheduleTask(System.currentTimeMillis() + 5 * 1000L);
               return;
           }
           if (res != null) {
               scheduleTask(res);
           }
       });
   }

BLPOP指令

当RedissonDelayedQueue延迟队列构造完成后,会调用延迟队列的take()方法获取延迟任务,然后会进入RedissonBlockingQueue.takeAsync()方法:

......
   @Override
   public RFuture<V> takeAsync() {
       return commandExecutor.writeAsync(getRawName(), codec, RedisCommands.BLPOP_VALUE, getRawName(), 0);
   }
   /*
    * (non-Javadoc)
    * @see java.util.concurrent.BlockingQueue#take()
    */
   @Override
   public V take() throws InterruptedException {
       return commandExecutor.getInterrupted(takeAsync());
   }
   ......

注意这里的参数其值为 BLPOP,很明显这里就是和我们要找的BLPOP指令有关,所以这里其实就是客户端通过BLPOP指令阻塞式获取值。在客户端开个线程一直循环阻塞获取元素即可;

看下源码继续向下进入CommandAsyncService.writeAsync(...)方法,然后继续向下进入RedisExecutor.execute()方法:

......
   public void execute() {
       if (mainPromise.isCancelled()) {...}
       if (!connectionManager.getShutdownLatch().acquire()) {...}
       codec = getCodec(codec);
       // 获取连接
       RFuture<RedisConnection> connectionFuture = getConnection();
       RPromise<R> attemptPromise = new RedissonPromise<>();
       mainPromiseListener = (r, e) -> {...};
       if (attempt == 0) {...}
       scheduleRetryTimeout(connectionFuture, attemptPromise);
       connectionFuture.onComplete((connection, e) -> {
           if (connectionFuture.isCancelled()) {...}
           if (!connectionFuture.isSuccess()) {...}
           // 连接获取成功就执行当前方法
           sendCommand(attemptPromise, connection);
           writeFuture.addListener(new ChannelFutureListener() {
               @Override
               public void operationComplete(ChannelFuture future) throws Exception {
                   checkWriteFuture(writeFuture, attemptPromise, connection);
               }
           });
       });
       attemptPromise.onComplete((r, e) -> {...});
   }

该方法里一些支线方法按下不表。中间有个超时重试机制,使用netty的时间轮,不是重点也就不表述了。

先获取写入操作连接对象任务,然后进入方法sendCommand(attemptPromise, connection)发送

指令指令:"BLPOP",参数:"redisson:queue" "0"

Redisson延迟队列执行流程源码解析

offer添加任务流程源码解析 项目启动完成后,添加一个延迟任务到redis中,查看redis中所执行的指令:

Redisson延迟队列执行流程源码解析

然后跟进插入元素offer方法,进入RedissonDelayedQueue.offerAsync()方法内,如下所示:

......
   @Override
   public void offer(V e, long delay, TimeUnit timeUnit) {
       get(offerAsync(e, delay, timeUnit));
   }
   @Override
   public RFuture<Void> offerAsync(V e, long delay, TimeUnit timeUnit) {
       if (delay < 0) {
           throw new IllegalArgumentException("Delay can't be negative");
       }
       long delayInMs = timeUnit.toMillis(delay);
       long timeout = System.currentTimeMillis() + delayInMs;
       long randomId = ThreadLocalRandom.current().nextLong();
       return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
               "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);"
             + "redis.call('zadd', KEYS[2], ARGV[1], value);"
             + "redis.call('rpush', KEYS[3], value);"
             // if new object added to queue head when publish its startTime
             // to all scheduler workers
             + "local v = redis.call('zrange', KEYS[2], 0, 0); "
             + "if v[1] == value then "
                + "redis.call('publish', KEYS[4], ARGV[1]); "
             + "end;",
             Arrays.<Object>asList(getRawName(), timeoutSetName, queueName, channelName),
             timeout, randomId, encode(e));
   }

其中很明显一长串的脚本命令就是在redis中执行的指令,基本流程比较简单:

"zadd":这是向zset集合"redisson_delay_queue_timeout:{redisson:queue}"里添加元素数据(此数据被处理过,不用管其结构),排序值为当前时间戳+延迟时间

"rpush":把元素数据推送到list队列"redisson:queue"

"zrange":获取zset集合"redisson_delay_queue_timeout:{redisson:queue}"中排好序的第一个元素

"publish":如果上述获取的元素是本次插入的元素,那就发布通知队列"redisson_delay_queue_channel:{redisson:queue}",内容为当前元素的过期时间,这样做是为了减少本次元素到期的时间差。

最后定时器源码解析

定时器任务主要是通过 * 监听到了有新的客户端订阅或元素通知发布出来时,就会执行pushTask()和scheduleTask(...)方法:

......
   private int messageListenerId;
   private int statusListenerId;
   public void start() {
       RTopic schedulerTopic = getTopic();
       // 当有新的客户端订阅schedulerTopic,就是触发执行pushTask()方法
       statusListenerId = schedulerTopic.addListener(new BaseStatusListener() {
           @Override
           public void onSubscribe(String channel) {
               pushTask();
           }
       });
       // 当redis有新的消息通知,就会触发scheduleTask(...)方法,startTime为上述中publish通知的元素过期时间
       messageListenerId = schedulerTopic.addListener(Long.class, new MessageListener<Long>() {
           @Override
           public void onMessage(CharSequence channel, Long startTime) {
               scheduleTask(startTime);
           }
       });
   }

pushTask()方法是对redis延迟队列进行操作的方法,scheduleTask(...)是netty时间轮来控制调用pushTask()方法,所以pushTask()和scheduleTask()互相调用。

......
   private void scheduleTask(final Long startTime) {
       TimeoutTask oldTimeout = lastTimeout.get();
       if (startTime == null) {...}
       if (oldTimeout != null) {...}
       long delay = startTime - System.currentTimeMillis();
       if (delay > 10) {
           Timeout timeout = connectionManager.newTimeout(new TimerTask() {                    
               @Override
               public void run(Timeout timeout) throws Exception {
                   pushTask();
                   TimeoutTask currentTimeout = lastTimeout.get();
                   if (currentTimeout.getTask() == timeout) {
                       lastTimeout.compareAndSet(currentTimeout, null);
                   }
               }
           }, delay, TimeUnit.MILLISECONDS);
           if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) {
               timeout.cancel();
           }
       } else {
           pushTask();
       }
   }
   protected abstract RTopic getTopic();
   protected abstract RFuture<Long> pushTaskAsync();
   private void pushTask() {
       RFuture<Long> startTimeFuture = pushTaskAsync();
       startTimeFuture.onComplete((res, e) -> {
           if (e != null) {
               if (e instanceof RedissonShutdownException) {
                   return;
               }
               log.error(e.getMessage(), e);
               scheduleTask(System.currentTimeMillis() + 5 * 1000L);
               return;
           }
           if (res != null) {
               scheduleTask(res);
           }
       });
   }

总结:

当有新的客户端进行订阅,就调用pushTask()方法拉取数据放入阻塞队列。当有信的消息进行发布,就调用scheduleTask(...)方法,并根据其过期时间判断是通过时间轮延迟调用还是立即调用pushTask()方法。最后 redisson延迟队列的源码相对而言其实是比较抽象复杂的,感觉没有其分布式锁这块源码容易解析。但仔细用心去看,跟着主要方法走还是可以了解其执行流程。

来源:https://blog.csdn.net/weixin_64314555/article/details/122578703

0
投稿

猜你喜欢

手机版 软件编程 asp之家 www.aspxhome.com