it编程 > 数据库 > Redis

基于Redis Streams的实时消息处理实战指南

18人参与 2025-07-16 Redis

业务场景描述

在我们公司的电商平台中,存在大量异步事件需要实时处理,例如用户下单、库存更新、支付回调等。这些事件对消息的可靠性、顺序性和高吞吐量有较高要求。传统的消息中间件(如kafka、rabbitmq)在运维成本或部署复杂度上存在一定挑战,在部分场景下难以满足“轻量、低延迟、易集成” 的需求。

经过调研和验证,redis 6.0+ 提供的 streams 特性在嵌入式部署、快速上手方面具有显著优势。本篇文章将分享我们在生产环境中基于 redis streams 构建实时消息处理的完整经验,包括技术选型、核心代码示例、踩坑解决和优化方案。

技术选型过程

基于以上考量,最终选型 redis streams,落地于现有 redis 集群,无需额外独立中间件部署。

实现方案详解

环境与依赖

maven 依赖(以 lettuce 客户端为例):

<dependencies>
    <dependency>
        <groupid>io.lettuce</groupid>
        <artifactid>lettuce-core</artifactid>
        <version>6.1.5.release</version>
    </dependency>
    <dependency>
        <groupid>org.slf4j</groupid>
        <artifactid>slf4j-api</artifactid>
        <version>1.7.30</version>
    </dependency>
    <dependency>
        <groupid>ch.qos.logback</groupid>
        <artifactid>logback-classic</artifactid>
        <version>1.2.3</version>
    </dependency>
</dependencies>

springboot 配置(application.yml):

spring:
  redis:
    host: redis-cluster-host
    port: 6379
    password: your_password
    timeout: 2000ms

流程设计

生产者实现

import io.lettuce.core.redisclient;
import io.lettuce.core.api.statefulredisconnection;
import io.lettuce.core.api.sync.rediscommands;
import java.util.hashmap;
import java.util.map;

public class redisstreamproducer {
    private redisclient client;
    private statefulredisconnection<string, string> connection;
    private rediscommands<string, string> commands;
    private static final string stream_key = "orderstream";

    public redisstreamproducer(string uri) {
        client = redisclient.create(uri);
        connection = client.connect();
        commands = connection.sync();
    }

    public string sendmessage(map<string, string> message) {
        // xadd key * field value [field value ...]
        return commands.xadd(stream_key, message);
    }

    public void shutdown() {
        connection.close();
        client.shutdown();
    }

    public static void main(string[] args) {
        redisstreamproducer producer = new redisstreamproducer("redis://:your_password@redis-host:6379/0");
        map<string, string> order = new hashmap<>();
        order.put("orderid", "123456");
        order.put("userid", "u7890");
        order.put("amount", "258.50");
        string messageid = producer.sendmessage(order);
        system.out.println("消息发送成功, id=" + messageid);
        producer.shutdown();
    }
}

消费者实现

import io.lettuce.core.redisclient;
import io.lettuce.core.streammessage;
import io.lettuce.core.api.statefulredisconnection;
import io.lettuce.core.api.sync.rediscommands;
import io.lettuce.core.models.stream.consumer;
import io.lettuce.core.models.stream.pendingmessage;

import java.time.duration;
import java.util.list;
import java.util.map;

public class redisstreamconsumer {
    private redisclient client;
    private statefulredisconnection<string, string> connection;
    private rediscommands<string, string> commands;

    private static final string stream_key = "orderstream";
    private static final string group_name = "ordergroup";
    private static final string consumer_name = "consumer-1";

    public redisstreamconsumer(string uri) {
        client = redisclient.create(uri);
        connection = client.connect();
        commands = connection.sync();
        // 创建消费者组, 如果已创建可 ignore
        try {
            commands.xgroupcreate(stream_key, group_name, "$", true);
        } catch (exception e) {
            // group exists
        }
    }

    public void consume() {
        while (true) {
            // 从 pending list 先处理未 ack 的消息
            list<pendingmessage> pending = commands.xpending(stream_key, group_name, range.unbounded(), limit.from(10));
            for (pendingmessage pm : pending) {
                // 重新消费
                streammessage<string, string> msg = commands.xclaim(
                    stream_key,
                    group_name,
                    consumer_name,
                    5000,
                    pm.getid());
                process(msg.getbody());
                commands.xack(stream_key, group_name, pm.getid());
            }

            // 正常读取新消息
            list<streammessage<string, string>> messages = commands.xreadgroup(
                consumer.from(group_name, consumer_name),
                xreadargs.streamoffset.lastconsumed(stream_key));
            if (messages != null) {
                for (streammessage<string, string> msg : messages) {
                    process(msg.getbody());
                    commands.xack(stream_key, group_name, msg.getid());
                }
            }

            // 轮询间隔
            try {
                thread.sleep(200);
            } catch (interruptedexception e) {
                thread.currentthread().interrupt();
                break;
            }
        }
    }

    private void process(map<string, string> body) {
        // 业务处理逻辑
        system.out.println("处理订单: " + body);
    }

    public void shutdown() {
        connection.close();
        client.shutdown();
    }

    public static void main(string[] args) {
        redisstreamconsumer consumer = new redisstreamconsumer("redis://:your_password@redis-host:6379/0");
        consumer.consume();
        consumer.shutdown();
    }
}

踩过的坑与解决方案

1.消息重复消费

2.消息积压与内存压力

3.消费者实例重启后状态丢失

总结与最佳实践

到此这篇关于基于redis streams的实时消息处理实战指南的文章就介绍到这了,更多相关redis streams消息处理内容请搜索代码网以前的文章或继续浏览下面的相关文章希望大家以后多多支持代码网!

(0)

您想发表意见!!点此发布评论

推荐阅读

Redis中BigKey的隐患问题小结

07-15

使用Redis作为异步队列之原理、实现及实践过程

07-17

Redis字符串String操作详解从基础到高级应用小结

07-14

Redis中Stream详解及应用小结

07-14

如何发现 Redis 中的 BigKey及什么是 BigKey

07-11

深度剖析Redis字符串操作指南从入门到实战应用

07-21

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论