18人参与 • 2025-07-16 • Redis
在我们公司的电商平台中,存在大量异步事件需要实时处理,例如用户下单、库存更新、支付回调等。这些事件对消息的可靠性、顺序性和高吞吐量有较高要求。传统的消息中间件(如kafka、rabbitmq)在运维成本或部署复杂度上存在一定挑战,在部分场景下难以满足“轻量、低延迟、易集成” 的需求。
经过调研和验证,redis 6.0+ 提供的 streams 特性在嵌入式部署、快速上手方面具有显著优势。本篇文章将分享我们在生产环境中基于 redis streams 构建实时消息处理的完整经验,包括技术选型、核心代码示例、踩坑解决和优化方案。
基于以上考量,最终选型 redis streams,落地于现有 redis 集群,无需额外独立中间件部署。
maven 依赖(以 lettuce 客户端为例):
<dependencies> <dependency> <groupid>io.lettuce</groupid> <artifactid>lettuce-core</artifactid> <version>6.1.5.release</version> </dependency> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-api</artifactid> <version>1.7.30</version> </dependency> <dependency> <groupid>ch.qos.logback</groupid> <artifactid>logback-classic</artifactid> <version>1.2.3</version> </dependency> </dependencies>
springboot 配置(application.yml):
spring: redis: host: redis-cluster-host port: 6379 password: your_password timeout: 2000ms
import io.lettuce.core.redisclient; import io.lettuce.core.api.statefulredisconnection; import io.lettuce.core.api.sync.rediscommands; import java.util.hashmap; import java.util.map; public class redisstreamproducer { private redisclient client; private statefulredisconnection<string, string> connection; private rediscommands<string, string> commands; private static final string stream_key = "orderstream"; public redisstreamproducer(string uri) { client = redisclient.create(uri); connection = client.connect(); commands = connection.sync(); } public string sendmessage(map<string, string> message) { // xadd key * field value [field value ...] return commands.xadd(stream_key, message); } public void shutdown() { connection.close(); client.shutdown(); } public static void main(string[] args) { redisstreamproducer producer = new redisstreamproducer("redis://:your_password@redis-host:6379/0"); map<string, string> order = new hashmap<>(); order.put("orderid", "123456"); order.put("userid", "u7890"); order.put("amount", "258.50"); string messageid = producer.sendmessage(order); system.out.println("消息发送成功, id=" + messageid); producer.shutdown(); } }
import io.lettuce.core.redisclient; import io.lettuce.core.streammessage; import io.lettuce.core.api.statefulredisconnection; import io.lettuce.core.api.sync.rediscommands; import io.lettuce.core.models.stream.consumer; import io.lettuce.core.models.stream.pendingmessage; import java.time.duration; import java.util.list; import java.util.map; public class redisstreamconsumer { private redisclient client; private statefulredisconnection<string, string> connection; private rediscommands<string, string> commands; private static final string stream_key = "orderstream"; private static final string group_name = "ordergroup"; private static final string consumer_name = "consumer-1"; public redisstreamconsumer(string uri) { client = redisclient.create(uri); connection = client.connect(); commands = connection.sync(); // 创建消费者组, 如果已创建可 ignore try { commands.xgroupcreate(stream_key, group_name, "$", true); } catch (exception e) { // group exists } } public void consume() { while (true) { // 从 pending list 先处理未 ack 的消息 list<pendingmessage> pending = commands.xpending(stream_key, group_name, range.unbounded(), limit.from(10)); for (pendingmessage pm : pending) { // 重新消费 streammessage<string, string> msg = commands.xclaim( stream_key, group_name, consumer_name, 5000, pm.getid()); process(msg.getbody()); commands.xack(stream_key, group_name, pm.getid()); } // 正常读取新消息 list<streammessage<string, string>> messages = commands.xreadgroup( consumer.from(group_name, consumer_name), xreadargs.streamoffset.lastconsumed(stream_key)); if (messages != null) { for (streammessage<string, string> msg : messages) { process(msg.getbody()); commands.xack(stream_key, group_name, msg.getid()); } } // 轮询间隔 try { thread.sleep(200); } catch (interruptedexception e) { thread.currentthread().interrupt(); break; } } } private void process(map<string, string> body) { // 业务处理逻辑 system.out.println("处理订单: " + body); } public void shutdown() { connection.close(); client.shutdown(); } public static void main(string[] args) { redisstreamconsumer consumer = new redisstreamconsumer("redis://:your_password@redis-host:6379/0"); consumer.consume(); consumer.shutdown(); } }
1.消息重复消费
2.消息积压与内存压力
xtrim maxlen ~ n
对流进行修剪,结合业务保留时间策略,定期分批清理历史消息。3.消费者实例重启后状态丢失
到此这篇关于基于redis streams的实时消息处理实战指南的文章就介绍到这了,更多相关redis streams消息处理内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论