23人参与 • 2025-04-22 • Redis
消息队列在分布式系统中非常重要,能够有效解耦系统的各个模块,提供异步处理能力和缓冲能力。redis作为一个高性能的内存数据库,除了缓存和持久化存储,它还能充当轻量级的消息队列。使用redis处理消息队列有助于提高系统的吞吐量和可扩展性。
消息队列的应用场景非常广泛,包括:
redis提供了几种不同的机制来实现消息队列,包括list和pub/sub。
redis的list数据结构是实现队列的基础。常见的操作包括:
lpush
:将消息推入队列的左端。rpush
:将消息推入队列的右端。rpop
:从队列的右端弹出消息(相当于先进先出,即fifo)。blpop
:阻塞式弹出消息,当队列为空时会等待直到有新的消息。redis的**发布/订阅(pub/sub)**是一种不同的消息队列实现方式,支持消息广播。它的机制如下:
但pub/sub的特点是消息不持久化,它更适用于实时消息传递,如果没有订阅者,消息会丢失。
我们的项目基于spring boot ,包括以下模块:
在pom.xml
中添加redis和web的依赖:
<dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-data-redis</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-web</artifactid> </dependency> </dependencies>
在application.yml
中配置redis:
spring: redis: host: localhost port: 6379
配置redistemplate
用于与redis进行交互:
@configuration public class redisconfig { @bean public redistemplate<string, object> redistemplate(redisconnectionfactory redisconnectionfactory) { redistemplate<string, object> template = new redistemplate<>(); template.setconnectionfactory(redisconnectionfactory); return template; } }
producer(消息生产者)
生产者将消息推入队列中,使用lpush
或rpush
操作:
@service public class messageproducer { @autowired private redistemplate<string, object> redistemplate; private static final string message_queue = "message:queue"; public void produce(string message) { redistemplate.opsforlist().leftpush(message_queue, message); } }
consumer(消息消费者)
消费者从队列中阻塞式地弹出消息,并进行处理:
@service public class messageconsumer { @autowired private redistemplate<string, object> redistemplate; private static final string message_queue = "message:queue"; @scheduled(fixedrate = 5000) // 每5秒检查一次队列 public void consume() { string message = (string) redistemplate.opsforlist().rightpop(message_queue); if (message != null) { system.out.println("consumed message: " + message); // 模拟处理消息 } } }
通过@scheduled
注解,消费者可以定期从redis队列中拉取消息进行处理。
producer(发布者)
发布者将消息发布到指定频道:
@service public class pubsubproducer { @autowired private redistemplate<string, object> redistemplate; public void publishmessage(string channel, string message) { redistemplate.convertandsend(channel, message); } }
consumer(订阅者)
订阅者监听频道的消息并处理:
@service public class pubsubconsumer implements messagelistener { @override public void onmessage(message message, byte[] pattern) { system.out.println("received message: " + new string(message.getbody())); } }
redis配置订阅监听器
配置订阅器并注册频道:
@configuration public class redispubsubconfig { @bean public messagelisteneradapter messagelistener() { return new messagelisteneradapter(new pubsubconsumer()); } @bean public redismessagelistenercontainer rediscontainer(redisconnectionfactory connectionfactory, messagelisteneradapter listeneradapter) { redismessagelistenercontainer container = new redismessagelistenercontainer(); container.setconnectionfactory(connectionfactory); container.addmessagelistener(listeneradapter, new patterntopic("pubsub:channel")); return container; } }
为生产者提供api接口:
@restcontroller @requestmapping("/queue") public class queuecontroller { @autowired private messageproducer messageproducer; @autowired private pubsubproducer pubsubproducer; // 将消息放入队列 @postmapping("/produce") public responseentity<string> producemessage(@requestparam string message) { messageproducer.produce(message); return responseentity.ok("message produced"); } // 发布消息 @postmapping("/publish") public responseentity<string> publishmessage(@requestparam string message) { pubsubproducer.publishmessage("pubsub:channel", message); return responseentity.ok("message published"); } }
基于list的消息队列:
/queue/produce
message=helloqueue
基于pub/sub的消息队列:
/queue/publish
message=hellopubsub
redis虽然不是专门的消息队列工具,但在轻量级、实时性要求高的场景下非常适合使用。通过list实现简单的任务队列,通过pub/sub可以实现消息广播。生产环境中,建议使用如下优化措施:
到此这篇关于基于redis实现消息队列的示例代码的文章就介绍到这了,更多相关redis 消息队列内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论