redis实现队列的阻塞、延时、发布和订阅
作者:morris131 发布时间:2021-07-02 10:56:19
Redis不仅可作为缓存服务器,还可以用作消息队列。它的列表类型天生支持用作消息队列。如下图所示:
由于Redis的列表是使用双向链表实现的,保存了头节点和尾节点,所以在列表的头部和尾部两边插入或获取元素都是非常快的,时间复杂度为O(1)。
普通队列
可以直接使用Redis的list数据类型实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。
lpush+rpop:左进右出的队列
rpush+lpop:左出右进的队列
下面使用redis的命令来模拟普通队列。
使用lpush命令生产消息:
>lpush queue:single 1
"1"
>lpush queue:single 2
"2"
>lpush queue:single 3
"3"
使用rpop命令消费消息:
>rpop queue:single
"1"
>rpop queue:single
"2"
>rpop queue:single
"3"
下面使用Java代码来实现普通队列。
生产者SingleProducer
package com.morris.redis.demo.queue.single;
import redis.clients.jedis.Jedis;
/**
?* 生产者
?*/
public class SingleProducer {
? ? public static final String SINGLE_QUEUE_NAME = "queue:single";
? ? public static void main(String[] args) {
? ? ? ? Jedis jedis = new Jedis();
? ? ? ? for (int i = 0; i < 100; i++) {
? ? ? ? ? ? jedis.lpush(SINGLE_QUEUE_NAME, "hello " + i);
? ? ? ? }
? ? ? ? jedis.close();
? ? }
}
消费者SingleConsumer:
package com.morris.redis.demo.queue.single;
import redis.clients.jedis.Jedis;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
?* 消费者
?*/
public class SingleConsumer {
? ? public static void main(String[] args) throws InterruptedException {
? ? ? ? Jedis jedis = new Jedis();
? ? ? ? while (true) {
? ? ? ? ? ? String message = jedis.rpop(SingleProducer.SINGLE_QUEUE_NAME);
? ? ? ? ? ? if(Objects.nonNull(message)) {
? ? ? ? ? ? ? ? System.out.println(message);
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS.sleep(500);
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
上面的代码已经基本实现了普通队列的生产与消费,但是上述的例子中消息的消费者存在两个问题:
消费者需要不停的调用rpop方法查看redis的list中是否有待处理的数据(消息)。每调用一次都会发起一次连接,有可能list中没有数据,造成大量的空轮询,导致造成不必要的浪费。也许你可以使用Thread.sleep()等方法让消费者线程隔一段时间再消费,如果睡眠时间过长,这样不能处理一些时效性要求高的消息,睡眠时间过短,也会在连接上造成比较大的开销。
如果生产者速度大于消费者消费速度,消息队列长度会一直增大,时间久了会占用大量内存空间。
阻塞队列
消费者可以使用brpop指令从redis的list中获取数据,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端就不需要休眠后获取数据了,这样就相当于实现了一个阻塞队列,
使用redis的brpop命令来模拟阻塞队列。
>brpop queue:single 30
可以看到命令行阻塞在了brpop这里了,30s后没数据就返回。
Java代码实现如下:
生产者与普通队列的生产者一致。
消费者BlockConsumer:
package com.morris.redis.demo.queue.block;
import redis.clients.jedis.Jedis;
import java.util.List;
/**
?* 消费者
?*/
public class BlockConsumer {
? ? public static void main(String[] args) {
? ? ? ? Jedis jedis = new Jedis();
? ? ? ? while (true) {
? ? ? ? ? ? // 超时时间为1s
? ? ? ? ? ? List<String> messageList = jedis.brpop(1, BlockProducer.BLOCK_QUEUE_NAME);
? ? ? ? ? ? if (null != messageList && !messageList.isEmpty()) {
? ? ? ? ? ? ? ? System.out.println(messageList);
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
缺点:无法实现一次生产多次消费。
发布订阅模式
Redis除了对消息队列提供支持外,还提供了一组命令用于支持发布/订阅模式。利用Redis的pub/sub模式可以实现一次生产多次消费的队列。
发布:PUBLISH指令可用于发布一条消息,格式:
PUBLISH channel message
返回值表示订阅了该消息的数量。
订阅:SUBSCRIBE指令用于接收一条消息,格式:
SUBSCRIBE channel
使用SUBSCRIBE指令后进入了订阅模式,但是不会接收到订阅之前publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。
回复分为三种类型:
如果为subscribe,第二个值表示订阅的频道,第三个值表示是已订阅的频道的数量
如果为message(消息),第二个值为产生该消息的频道,第三个值为消息
如果为unsubscribe,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。
下面使用redis的命令来模拟发布订阅模式。
生产者:
127.0.0.1:6379> publish queue hello
(integer) 1
127.0.0.1:6379> publish queue hi
(integer) 1
消费者:
127.0.0.1:6379> subscribe queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1
1) "message"
2) "queue"
3) "hello"
1) "message"
2) "queue"
3) "hi"
Java代码实现如下:
生产者PubsubProducer:
?
package com.morris.redis.demo.queue.pubsub;
import redis.clients.jedis.Jedis;
/**
?* 生产者
?*/
public class PubsubProducer {
? ? public static final String PUBSUB_QUEUE_NAME = "queue:pubsub";
? ? public static void main(String[] args) {
? ? ? ? Jedis jedis = new Jedis();
? ? ? ? for (int i = 0; i < 100; i++) {
? ? ? ? ? ? jedis.publish(PUBSUB_QUEUE_NAME, "hello " + i);
? ? ? ? }
? ? ? ? jedis.close();
? ? }
}
?
消费者PubsubConsumer:
package com.morris.redis.demo.queue.pubsub;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
/**
?* 消费者
?*/
public class PubsubConsumer {
? ? public static void main(String[] args) throws InterruptedException {
? ? ? ? Jedis jedis = new Jedis();
? ? ? ? JedisPubSub jedisPubSub = new JedisPubSub() {
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onMessage(String channel, String message) {
? ? ? ? ? ? ? ? System.out.println("receive message: " + message);
? ? ? ? ? ? ? ? if(message.indexOf("99") > -1) {
? ? ? ? ? ? ? ? ? ? this.unsubscribe();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onSubscribe(String channel, int subscribedChannels) {
? ? ? ? ? ? ? ? System.out.println("subscribe channel: " + channel);
? ? ? ? ? ? }
? ? ? ? ? ? @Override
? ? ? ? ? ? public void onUnsubscribe(String channel, int subscribedChannels) {
? ? ? ? ? ? ? ? System.out.println("unsubscribe channel " + channel);
? ? ? ? ? ? }
? ? ? ? };
? ? ? ? jedis.subscribe(jedisPubSub, PubsubProducer.PUBSUB_QUEUE_NAME);
? ? }
}
消费者可以启动多个,每个消费者都能收到所有的消息。
可以使用指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。
Redis还支持基于通配符的消息订阅,使用指令PSUBSCRIBE (pattern subscribe),例如:
psubscribe channel.*
用PSUBSCRIBE指令订阅的频道也要使用指令PUNSUBSCRIBE指令退订,该指令无法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。
同时PUNSUBSCRIBE指令通配符不会展开。例如:PUNSUBSCRIBE \*不会匹配到channel.\*,所以要取消订阅channel.\*就要这样写PUBSUBSCRIBE channel.\*。
Redis的pub/sub也有其缺点,那就是如果消费者下线,生产者的消息会丢失。
延时队列和优先级队列
Redis中有个数据类型叫Zset,其本质就是在数据类型Set的基础上加了个排序的功能而已,除了保存原始的数据value之外,还提供另一个属性score,这一属性在添加修改元素时候可以进行指定,每次指定后,Zset会自动重新按新的score值进行排序。
如果score字段设置为消息的优先级,优先级最高的消息排在第一位,这样就能实现一个优先级队列。
如果score字段代表的是消息想要执行时间的时间戳,将它插入Zset集合中,便会按照时间戳大小进行排序,也就是对执行时间先后进行排序,集合中最先要执行的消息就会排在第一位,这样的话,只需要起一个死循环线程不断获取集合中的第一个元素,如果当前时间戳大于等于该元素的score就将它取出来进行消费删除,就可以达到延时执行的目的,注意不需要遍历整个Zset集合,以免造成性能浪费。
下面使用redis的zset来模拟延时队列。
生产者:
127.0.0.1:6379> zadd queue:delay 1 order1 2 order2 3 order3
(integer) 0
消费者:
127.0.0.1:6379> zrange queue:delay 0 0 withscores
1) "order1"
2) "1"
127.0.0.1:6379> zrem queue:delay order1
(integer) 1
Java代码如下:
生产者DelayProducer:
package com.morris.redis.demo.queue.delay;
import redis.clients.jedis.Jedis;
import java.util.Date;
import java.util.Random;
/**
?* 生产者
?*/
public class DelayProducer {
? ? public static final String DELAY_QUEUE_NAME = "queue:delay";
? ? public static void main(String[] args) {
? ? ? ? Jedis jedis = new Jedis();
? ? ? ? long now = new Date().getTime();
? ? ? ? Random random = new Random();
? ? ? ? for (int i = 0; i < 10; i++) {
? ? ? ? ? ? int second = random.nextInt(30); // 随机订单失效时间
? ? ? ? ? ? jedis.zadd(DELAY_QUEUE_NAME, now + second * 1000, "order"+i);
? ? ? ? }
? ? ? ? jedis.close();
? ? }
}
消费者:
package com.morris.redis.demo.queue.delay;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
?* 消费者
?*/
public class DelayConsumer {
? ? public static void main(String[] args) throws InterruptedException {
? ? ? ? Jedis jedis = new Jedis();
? ? ? ? while (true) {
? ? ? ? ? ? long now = new Date().getTime();
? ? ? ? ? ? Set<Tuple> tupleSet = jedis.zrangeWithScores(DelayProducer.DELAY_QUEUE_NAME, 0, 0);
? ? ? ? ? ? if(tupleSet.isEmpty()) {
? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS.sleep(500);
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? for (Tuple tuple : tupleSet) {
? ? ? ? ? ? ? ? ? ? Double score = tuple.getScore();
? ? ? ? ? ? ? ? ? ? long time = score.longValue();
? ? ? ? ? ? ? ? ? ? if(time < now) {
? ? ? ? ? ? ? ? ? ? ? ? jedis.zrem(DelayProducer.DELAY_QUEUE_NAME, tuple.getElement());
? ? ? ? ? ? ? ? ? ? ? ? System.out.println("order[" + tuple.getElement() +"] is timeout at " + time);
? ? ? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS.sleep(500);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? break;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
应用场景
延时队列可用于订单超时失效的场景
二级缓存(local+redis)中,当有缓存需要更新时,可以使用发布订阅模式通知其他服务器使得本地缓存失效。
来源:https://blog.csdn.net/u022812849/article/details/124510513


猜你喜欢
- 浅谈先来说一下“this指针”:C++中通过引入this指针解决该问题,暨:C++编译器给每个“非静态的成员函数”增加了一个隐藏的指针参数,
- 前言 之前unity5.x在代码中写了debug.log..等等,打
- public static string Escape(string s) &nb
- GC简介何为GCGC(Garbage Collection)称之为垃圾回收,是对内存中的垃圾对象,采用一定的算法进行内存回收的一个动作。比方
- ExpandableListView介绍 ExpandableListView的引入 ExpandableListVie
- 编程是一门艺术,大批量的改动显然是非常丑陋的做法,用心的琢磨写的代码让它变的更美观。在有些情况下,一个客户不能或者不想直接访问另一个对象,这
- 之前在项目中会用到在Java在后台把数据填入Word文档的模板来提供前台下载,为了自己能随时查看当时的实现方案及方便他人学习我写了这篇博客,
- 想必大家都知道,国内的Android应用基本都是免费的
- 前提前面写过一篇关于Environment属性加载的源码分析和扩展,里面提到属性的占位符解析和类型转换是相对复杂的,这篇文章就是要分析和解读
- 在nginx.conf文件的http模块新增以下内容gzip &nbs
- Android为我们提供了竖直方向的滚动控件GridView,但如果我们想让它水平滚动起来,就需要自己实现了。以下使用的测试数据datas集
- 问题:在项目中,当保存数据超过数据库字段列长度限制时,如何解决?一种常见的解决办法是:截串存取。顾名思义,就是对大文本数据按指定长度进行截取
- 前导:发过程中经常会使用java将office系列文档转换为PDF, 一般都使用微软提供的openoffice+jodconverter 实
- 在Spring Cloud Netflix栈中,各个微服务都是以HTTP接口的形式暴露自身服务的,因此在调用远程服务时就必须使用HTTP客户
- 一:什么是Bitmap像素级的操作相信大家都知道一张jpg或png放大后会是一个个小格子,称为一个像素(px),而且一个小格子是一种颜色,也
- 一,我们使用两个域名互相访问的时候会提示跨域,原因在哪里呢?如下图跨域,我们探究下是什么原因导致浏览器报这个错呢?二,我们研究下看看请求是否
- 使用@Autowired注解有错误提示使用Spring boot +mybatis框架时,在service实现类中使用Mapper类,给Ma
- 1. Retrofit使用Retrofit是一个现在网络请求框架,先来说一下怎么使用网络权限(添加到AndroidManifest.xml)
- Remember me功能就是勾选"记住我"后,一次登录,后面在有效期内免登录。先看具体配置:pom文件:<dep
- 首先来看一下效果: 大体思路如下: 总体布局用了一个自定义的ViewGroup,里面包了两个View(top Vie