it编程 > 数据库 > Redis

一文带你搞懂Redis Stream的6种消息处理模式

15人参与 2025-05-04 Redis

redis 5.0版本引入的stream数据类型,为redis生态带来了强大而灵活的消息队列功能,弥补了之前发布/订阅模式的不足,如消息持久化、消费者组、消息确认等特性。

redis stream结合了传统消息队列和时序数据库的特点,适用于日志收集、事件驱动应用、实时分析等多种场景。

本文将介绍redis stream的6种消息处理模式。

1. 简单消费模式(simple consumption)

基本概念

简单消费模式是redis stream最基础的使用方式,不使用消费者组,直接读取流中的消息。生产者将消息追加到流中,消费者通过指定起始id来读取消息。

核心命令

# 发布消息
xadd stream_name [id] field value [field value ...]

# 读取消息
xread [count count] [block milliseconds] streams stream_name start_id

实现示例

redis cli

# 添加消息到stream
> xadd mystream * sensor_id 1234 temperature 19.8 humidity 56
"1647257548956-0"

# 从头开始读取所有消息
> xread streams mystream 0
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# 从指定id开始读取
> xread streams mystream 1647257548956-0
(empty list or set)

# 从最新的消息id之后开始读取(阻塞等待新消息)
> xread block 5000 streams mystream $
(nil)

java spring boot示例

@service
public class simplestreamservice {
    
    @autowired
    private stringredistemplate redistemplate;
    
    /**
     * 发布消息到stream
     */
    public string publishevent(string streamkey, map<string, object> eventdata) {
        stringrecord record = streamrecords.string(eventdata).withstreamkey(streamkey);
        return redistemplate.opsforstream().add(record).getvalue();
    }
    
    /**
     * 从指定位置开始读取消息
     */
    public list<maprecord<string, object, object>> readevents(string streamkey, string startid, int count) {
        streamreadoptions readoptions = streamreadoptions.empty().count(count);
        return redistemplate.opsforstream().read(readoptions, streamoffset.from(streamkey, readoffset.from(startid)));
    }
    
    /**
     * 阻塞式读取消息
     */
    public list<maprecord<string, object, object>> readeventsblocking(string streamkey, int timeoutmillis) {
        streamreadoptions readoptions = streamreadoptions.empty().count(10).block(duration.ofmillis(timeoutmillis));
        return redistemplate.opsforstream().read(readoptions, streamoffset.latest(streamkey));
    }
}

使用场景

优缺点

优点

缺点

2. 消费者组模式(consumer groups)

基本概念

消费者组允许多个消费者共同处理一个流的消息,实现负载均衡,并提供消息确认机制,确保消息至少被处理一次。每个消费者组维护自己的消费位置,不同消费者组之间互不干扰。

核心命令

# 创建消费者组
xgroup create stream_name group_name [id|$] [mkstream]

# 从消费者组读取消息
xreadgroup group group_name consumer_name [count count] [block milliseconds] streams stream_name [>|id]

# 确认消息处理完成
xack stream_name group_name message_id [message_id ...]

实现示例

redis cli

# 创建消费者组
> xgroup create mystream processing-group $ mkstream
ok

# 消费者1读取消息
> xreadgroup group processing-group consumer-1 count 1 streams mystream >
1) 1) "mystream"
   2) 1) 1) "1647257548956-0"
         2) 1) "sensor_id"
            2) "1234"
            3) "temperature"
            4) "19.8"
            5) "humidity"
            6) "56"

# 确认消息已处理
> xack mystream processing-group 1647257548956-0
(integer) 1

# 消费者2读取消息(已无未处理消息)
> xreadgroup group processing-group consumer-2 count 1 streams mystream >
1) 1) "mystream"
   2) (empty list or set)

java spring boot示例

@service
public class consumergroupservice {
    
    @autowired
    private stringredistemplate redistemplate;
    
    /**
     * 创建消费者组
     */
    public void creategroup(string streamkey, string groupname) {
        try {
            redistemplate.opsforstream().creategroup(streamkey, groupname);
        } catch (redissystemexception e) {
            // 处理流不存在的情况
            if (e.getrootcause() instanceof rediscommandexecutionexception 
                    && e.getrootcause().getmessage().contains("nogroup")) {
                redistemplate.opsforstream().creategroup(readoffset.from("0"), streamkey, groupname);
            } else {
                throw e;
            }
        }
    }
    
    /**
     * 从消费者组读取消息
     */
    public list<maprecord<string, object, object>> readfromgroup(
            string streamkey, string groupname, string consumername, int count) {
        
        streamreadoptions options = streamreadoptions.empty().count(count);
        return redistemplate.opsforstream().read(
                consumer.from(groupname, consumername),
                options,
                streamoffset.create(streamkey, readoffset.lastconsumed())
        );
    }
    
    /**
     * 阻塞式从消费者组读取消息
     */
    public list<maprecord<string, object, object>> readfromgroupblocking(
            string streamkey, string groupname, string consumername, int count, duration timeout) {
        
        streamreadoptions options = streamreadoptions.empty().count(count).block(timeout);
        return redistemplate.opsforstream().read(
                consumer.from(groupname, consumername),
                options,
                streamoffset.create(streamkey, readoffset.lastconsumed())
        );
    }
    
    /**
     * 确认消息已处理
     */
    public long acknowledgemessage(string streamkey, string groupname, string... messageids) {
        return redistemplate.opsforstream().acknowledge(streamkey, groupname, messageids);
    }
}

使用场景

优缺点

优点

缺点

3. 阻塞式消费模式(blocking consumption)

基本概念

阻塞式消费允许消费者在没有新消息时保持连接,等待新消息到达。这种模式减少了轮询开销,提高了实时性,适合对消息处理时效性要求高的场景。

核心命令

# 阻塞式简单消费
xread block milliseconds streams stream_name id

# 阻塞式消费者组消费
xreadgroup group group_name consumer_name block milliseconds streams stream_name >

实现示例

redis cli

# 阻塞等待新消息(最多等待10秒)
> xread block 10000 streams mystream $
(nil)  # 如果10秒内没有新消息

# 使用消费者组的阻塞式消费
> xreadgroup group processing-group consumer-1 block 10000 streams mystream >
(nil)  # 如果10秒内没有新分配的消息

java spring boot示例

@service
public class blockingstreamconsumerservice {
    
    @autowired
    private stringredistemplate redistemplate;
    
    /**
     * 阻塞式消息消费者任务
     */
    @async
    public void startblockingconsumer(string streamkey, string lastid, duration timeout) {
        streamreadoptions options = streamreadoptions.empty()
                .count(1)
                .block(timeout);
        
        while (!thread.currentthread().isinterrupted()) {
            try {
                // 阻塞读取消息
                list<maprecord<string, object, object>> records = redistemplate.opsforstream()
                        .read(options, streamoffset.from(streamkey, readoffset.from(lastid)));
                
                if (records != null && !records.isempty()) {
                    for (maprecord<string, object, object> record : records) {
                        // 处理消息
                        processmessage(record);
                        
                        // 更新最后读取的id
                        lastid = record.getid().getvalue();
                    }
                } else {
                    // 超时未读取到消息,可以执行一些其他逻辑
                }
            } catch (exception e) {
                // 异常处理
                log.error("error reading from stream: {}", e.getmessage(), e);
                try {
                    thread.sleep(1000); // 出错后等待一段时间再重试
                } catch (interruptedexception ie) {
                    thread.currentthread().interrupt();
                    break;
                }
            }
        }
    }
    
    /**
     * 阻塞式消费者组消费
     */
    @async
    public void startgroupblockingconsumer(
            string streamkey, string groupname, string consumername, duration timeout) {
        
        streamreadoptions options = streamreadoptions.empty()
                .count(1)
                .block(timeout);
        
        while (!thread.currentthread().isinterrupted()) {
            try {
                // 阻塞读取消息
                list<maprecord<string, object, object>> records = redistemplate.opsforstream()
                        .read(consumer.from(groupname, consumername),
                               options,
                               streamoffset.create(streamkey, readoffset.lastconsumed()));
                
                if (records != null && !records.isempty()) {
                    for (maprecord<string, object, object> record : records) {
                        try {
                            // 处理消息
                            processmessage(record);
                            
                            // 确认消息
                            redistemplate.opsforstream()
                                    .acknowledge(streamkey, groupname, record.getid().getvalue());
                        } catch (exception e) {
                            // 处理失败,记录日志
                            log.error("error processing message: {}", e.getmessage(), e);
                        }
                    }
                }
            } catch (exception e) {
                log.error("error reading from stream group: {}", e.getmessage(), e);
                try {
                    thread.sleep(1000);
                } catch (interruptedexception ie) {
                    thread.currentthread().interrupt();
                    break;
                }
            }
        }
    }
    
    private void processmessage(maprecord<string, object, object> record) {
        // 实际消息处理逻辑
        log.info("processing message: {}", record);
        // ...处理消息的具体业务逻辑
    }
}

使用场景

优缺点

优点

缺点

4. 扇出模式(fan-out pattern)

基本概念

扇出模式允许多个独立的消费者组同时消费同一个流中的所有消息,类似于发布/订阅模式,但具有消息持久化和回溯能力。每个消费者组独立维护自己的消费位置。

核心命令

创建多个消费者组,它们都独立消费同一个流:

xgroup create stream_name group_name_1 $ mkstream
xgroup create stream_name group_name_2 $ mkstream
xgroup create stream_name group_name_3 $ mkstream

实现示例

redis cli

# 创建多个消费者组
> xgroup create notifications analytics-group $ mkstream
ok
> xgroup create notifications email-group $ mkstream
ok
> xgroup create notifications mobile-group $ mkstream
ok

# 添加一条消息
> xadd notifications * type user_signup user_id 1001 email "user@example.com"
"1647345678912-0"

# 从各个消费者组读取(每个组都能收到所有消息)
> xreadgroup group analytics-group analytics-1 count 1 streams notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

> xreadgroup group email-group email-1 count 1 streams notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

> xreadgroup group mobile-group mobile-1 count 1 streams notifications >
1) 1) "notifications"
   2) 1) 1) "1647345678912-0"
         2) 1) "type"
            2) "user_signup"
            3) "user_id"
            4) "1001"
            5) "email"
            6) "user@example.com"

java spring boot示例

@service
public class fanoutservice {
    
    @autowired
    private stringredistemplate redistemplate;
    
    /**
     * 初始化扇出消费者组
     */
    public void initializefanoutgroups(string streamkey, list<string> groupnames) {
        // 确保流存在
        try {
            streaminfo.xinfostream info = redistemplate.opsforstream().info(streamkey);
        } catch (exception e) {
            // 流不存在,发送一个初始消息
            map<string, object> initialmessage = new hashmap<>();
            initialmessage.put("init", "true");
            redistemplate.opsforstream().add(streamkey, initialmessage);
        }
        
        // 创建所有消费者组
        for (string groupname : groupnames) {
            try {
                redistemplate.opsforstream().creategroup(streamkey, groupname);
            } catch (exception e) {
                // 忽略组已存在的错误
                log.info("group {} may already exist: {}", groupname, e.getmessage());
            }
        }
    }
    
    /**
     * 发布扇出消息
     */
    public string publishfanoutmessage(string streamkey, map<string, object> messagedata) {
        stringrecord record = streamrecords.string(messagedata).withstreamkey(streamkey);
        return redistemplate.opsforstream().add(record).getvalue();
    }
    
    /**
     * 为特定组启动消费者
     */
    @async
    public void startgroupconsumer(
            string streamkey, string groupname, string consumername, 
            consumer<maprecord<string, object, object>> messagehandler) {
        
        streamreadoptions options = streamreadoptions.empty().count(10).block(duration.ofseconds(2));
        
        while (!thread.currentthread().isinterrupted()) {
            try {
                list<maprecord<string, object, object>> messages = redistemplate.opsforstream().read(
                        consumer.from(groupname, consumername),
                        options,
                        streamoffset.create(streamkey, readoffset.lastconsumed())
                );
                
                if (messages != null && !messages.isempty()) {
                    for (maprecord<string, object, object> message : messages) {
                        try {
                            // 处理消息
                            messagehandler.accept(message);
                            
                            // 确认消息
                            redistemplate.opsforstream().acknowledge(
                                    streamkey, groupname, message.getid().getvalue());
                        } catch (exception e) {
                            log.error("error processing message in group {}: {}", 
                                    groupname, e.getmessage(), e);
                        }
                    }
                }
            } catch (exception e) {
                log.error("error reading from stream for group {}: {}", 
                        groupname, e.getmessage(), e);
                try {
                    thread.sleep(1000);
                } catch (interruptedexception ie) {
                    thread.currentthread().interrupt();
                    break;
                }
            }
        }
    }
}

使用示例

@service
public class notificationservice {
    
    @autowired
    private fanoutservice fanoutservice;
    
    @postconstruct
    public void init() {
        // 初始化扇出组
        list<string> groups = arrays.aslist("email-group", "sms-group", "analytics-group");
        fanoutservice.initializefanoutgroups("user-events", groups);
        
        // 启动各个消费者组的处理器
        fanoutservice.startgroupconsumer(
                "user-events", "email-group", "email-consumer", this::processemailnotification);
        
        fanoutservice.startgroupconsumer(
                "user-events", "sms-group", "sms-consumer", this::processsmsnotification);
        
        fanoutservice.startgroupconsumer(
                "user-events", "analytics-group", "analytics-consumer", this::processanalyticsevent);
    }
    
    private void processemailnotification(maprecord<string, object, object> message) {
        map<object, object> messagedata = message.getvalue();
        log.info("processing email notification: {}", messagedata);
        // 邮件发送逻辑
    }
    
    private void processsmsnotification(maprecord<string, object, object> message) {
        map<object, object> messagedata = message.getvalue();
        log.info("processing sms notification: {}", messagedata);
        // 短信发送逻辑
    }
    
    private void processanalyticsevent(maprecord<string, object, object> message) {
        map<object, object> messagedata = message.getvalue();
        log.info("processing analytics event: {}", messagedata);
        // 分析事件处理逻辑
    }
    
    public void publishuserevent(string eventtype, map<string, object> eventdata) {
        map<string, object> message = new hashmap<>(eventdata);
        message.put("event_type", eventtype);
        message.put("timestamp", system.currenttimemillis());
        
        fanoutservice.publishfanoutmessage("user-events", message);
    }
}

使用场景

优缺点

优点

缺点

5. 重试与恢复模式(retry and recovery)

基本概念

这种模式关注处理失败消息的恢复和重试机制。redis stream消费者组会跟踪每个消息的处理状态,允许查看和管理未确认(pel - pending entry list)的消息,实现可靠的消息处理。

核心命令

# 查看消费者组中未确认的消息
xpending stream_name group_name [start_id end_id count] [consumer_name]

# 查看消费者组中长时间未确认的消息详情
xpending stream_name group_name start_id end_id count [consumer_name]

# 认领处理超时的消息
xclaim stream_name group_name consumer_name min_idle_time message_id [message_id ...] [justid]

实现示例

redis cli

# 查看未确认的消息数量
> xpending mystream processing-group
1) (integer) 2         # 未确认消息数量
2) "1647257548956-0"   # 最小id
3) "1647257549123-0"   # 最大id
4) 1) 1) "consumer-1"  # 各个消费者的未确认消息数
      2) (integer) 1
   2) 1) "consumer-2"
      2) (integer) 1

# 查看特定消费者的未确认消息
> xpending mystream processing-group - + 10 consumer-1
1) 1) "1647257548956-0"   # 消息id
   2) "consumer-1"         # 当前持有的消费者
   3) (integer) 120000     # 空闲时间(毫秒)
   4) (integer) 2          # 传递次数

# 认领超过2分钟未处理的消息
> xclaim mystream processing-group consumer-2 120000 1647257548956-0
1) 1) "1647257548956-0"
   2) 1) "sensor_id"
      2) "1234"
      3) "temperature"
      4) "19.8"
      5) "humidity"
      6) "56"

java spring boot示例

@service
public class messagerecoveryservice {
    
    @autowired
    private stringredistemplate redistemplate;
    
    /**
     * 获取消费者组中的未确认消息
     */
    public pendingmessagessummary getpendingmessagessummary(string streamkey, string groupname) {
        return redistemplate.opsforstream().pending(streamkey, groupname);
    }
    
    /**
     * 获取指定消费者的详细未确认消息
     */
    public pendingmessages getpendingmessages(
            string streamkey, string groupname, string consumername, 
            range<string> idrange, long count) {
        
        return redistemplate.opsforstream().pending(
                streamkey, 
                consumer.from(groupname, consumername), 
                idrange, 
                count);
    }
    
    /**
     * 认领长时间未处理的消息
     */
    public list<maprecord<string, object, object>> claimmessages(
            string streamkey, string groupname, string newconsumername, 
            duration minidletime, string... messageids) {
        
        return redistemplate.opsforstream().claim(
                streamkey, 
                consumer.from(groupname, newconsumername), 
                minidletime, 
                messageids);
    }
    
    /**
     * 定时检查和恢复未处理的消息
     */
    @scheduled(fixedrate = 60000) // 每分钟执行一次
    public void recoverstalemessages() {
        // 配置参数
        string streamkey = "mystream";
        string groupname = "processing-group";
        string recoveryconsumer = "recovery-consumer";
        duration minidletime = duration.ofminutes(5); // 超过5分钟未处理的消息
        
        try {
            // 1. 获取所有未确认消息的摘要
            pendingmessagessummary summary = getpendingmessagessummary(streamkey, groupname);
            
            if (summary != null && summary.gettotalpendingmessages() > 0) {
                // 2. 遍历每个消费者的未确认消息
                for (consumer consumer : summary.getpendingmessagesperconsumer().keyset()) {
                    // 获取该消费者的详细未确认消息列表
                    pendingmessages pendingmessages = getpendingmessages(
                            streamkey, groupname, consumer.getname(), 
                            range.unbounded(), 50); // 每次最多处理50条
                    
                    if (pendingmessages != null) {
                        // 3. 筛选出空闲时间超过阈值的消息
                        list<string> stalemessageids = new arraylist<>();
                        
                        for (pendingmessage message : pendingmessages) {
                            if (message.getelapsedtimesincelastdelivery().compareto(minidletime) > 0) {
                                stalemessageids.add(message.getidasstring());
                            }
                        }
                        
                        // 4. 认领这些消息
                        if (!stalemessageids.isempty()) {
                            log.info("claiming {} stale messages from consumer {}", 
                                    stalemessageids.size(), consumer.getname());
                            
                            list<maprecord<string, object, object>> claimedmessages = claimmessages(
                                    streamkey, groupname, recoveryconsumer, minidletime, 
                                    stalemessageids.toarray(new string[0]));
                            
                            // 5. 处理这些被认领的消息
                            processclaimedmessages(streamkey, groupname, claimedmessages);
                        }
                    }
                }
            }
        } catch (exception e) {
            log.error("error recovering stale messages: {}", e.getmessage(), e);
        }
    }
    
    /**
     * 处理被认领的消息
     */
    private void processclaimedmessages(
            string streamkey, string groupname, 
            list<maprecord<string, object, object>> messages) {
        
        if (messages == null || messages.isempty()) {
            return;
        }
        
        for (maprecord<string, object, object> message : messages) {
            try {
                // 执行消息处理逻辑
                processmessage(message);
                
                // 确认消息
                redistemplate.opsforstream().acknowledge(
                        streamkey, groupname, message.getid().getvalue());
                
                log.info("successfully processed recovered message: {}", message.getid());
            } catch (exception e) {
                log.error("failed to process recovered message {}: {}", 
                        message.getid(), e.getmessage(), e);
                // 根据业务需求决定是否将消息加入死信队列
                movetodeadletterqueue(streamkey, message);
            }
        }
    }
    
    /**
     * 将消息移至死信队列
     */
    private void movetodeadletterqueue(string sourcestream, maprecord<string, object, object> message) {
        string deadletterstream = sourcestream + ":dead-letter";
        map<object, object> messagedata = message.getvalue();
        
        map<string, object> dlqmessage = new hashmap<>();
        messagedata.foreach((k, v) -> dlqmessage.put(k.tostring(), v));
        
        // 添加元数据
        dlqmessage.put("original_id", message.getid().getvalue());
        dlqmessage.put("error_time", system.currenttimemillis());
        
        redistemplate.opsforstream().add(deadletterstream, dlqmessage);
        
        // 可选:从原消费者组确认该消息
        // redistemplate.opsforstream().acknowledge(sourcestream, groupname, message.getid().getvalue());
    }
    
    private void processmessage(maprecord<string, object, object> message) {
        // 实际的消息处理逻辑
        log.info("processing recovered message: {}", message);
        // ...
    }
}

使用场景

优缺点

优点

缺点

6. 流处理窗口模式(streaming window processing)

基本概念

流处理窗口模式基于时间或消息计数划分数据流,在每个窗口内执行聚合或分析操作。这种模式适用于实时分析、趋势监测和时间序列处理。虽然redis stream本身不直接提供窗口操作,但可以结合redis的其他特性实现。

实现方式

主要通过以下几种方式实现:

1. 基于消息id的时间范围(redis消息id包含毫秒时间戳)

2. 结合redis的排序集合(sortedset)存储窗口数据

3. 使用redis的过期键实现滑动窗口

实现示例

redis cli

窗口数据收集与查询:

# 添加带时间戳的数据
> xadd temperature * sensor_id 1 value 21.5 timestamp 1647257548000
"1647257550123-0"
> xadd temperature * sensor_id 1 value 21.8 timestamp 1647257558000
"1647257560234-0"
> xadd temperature * sensor_id 1 value 22.1 timestamp 1647257568000
"1647257570345-0"

# 查询特定时间范围的数据
> xrange temperature 1647257550000-0 1647257570000-0
1) 1) "1647257550123-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.5"
      5) "timestamp"
      6) "1647257548000"
2) 1) "1647257560234-0"
   2) 1) "sensor_id"
      2) "1"
      3) "value"
      4) "21.8"
      5) "timestamp"
      6) "1647257558000"

java spring boot示例

@service
public class timewindowprocessingservice {
    
    @autowired
    private stringredistemplate redistemplate;
    
    /**
     * 添加数据点到流,并存储到相应的时间窗口
     */
    public string adddatapoint(string streamkey, string sensorid, double value) {
        long timestamp = system.currenttimemillis();
        
        // 1. 添加到原始数据流
        map<string, object> datapoint = new hashmap<>();
        datapoint.put("sensor_id", sensorid);
        datapoint.put("value", string.valueof(value));
        datapoint.put("timestamp", string.valueof(timestamp));
        
        stringrecord record = streamrecords.string(datapoint).withstreamkey(streamkey);
        recordid recordid = redistemplate.opsforstream().add(record);
        
        // 2. 计算所属的窗口(这里以5分钟为一个窗口)
        long windowstart = timestamp - (timestamp % (5 * 60 * 1000));
        string windowkey = streamkey + ":window:" + windowstart;
        
        // 3. 将数据点添加到窗口的有序集合中,分数为时间戳
        string datapointjson = new objectmapper().writevalueasstring(datapoint);
        redistemplate.opsforzset().add(windowkey, datapointjson, timestamp);
        
        // 4. 设置窗口键的过期时间(保留24小时)
        redistemplate.expire(windowkey, duration.ofhours(24));
        
        return recordid.getvalue();
    }
    
    /**
     * 获取指定时间窗口内的数据点
     */
    public list<map<string, object>> getwindowdata(
            string streamkey, long windowstarttime, long windowendtime) {
        
        // 计算可能的窗口键(每5分钟一个窗口)
        list<string> windowkeys = new arraylist<>();
        long current = windowstarttime - (windowstarttime % (5 * 60 * 1000));
        
        while (current <= windowendtime) {
            windowkeys.add(streamkey + ":window:" + current);
            current += (5 * 60 * 1000);
        }
        
        // 从各个窗口获取数据点
        list<map<string, object>> results = new arraylist<>();
        objectmapper mapper = new objectmapper();
        
        for (string windowkey : windowkeys) {
            set<string> datapoints = redistemplate.opsforzset().rangebyscore(
                    windowkey, windowstarttime, windowendtime);
            
            if (datapoints != null) {
                for (string datapointjson : datapoints) {
                    try {
                        map<string, object> datapoint = mapper.readvalue(
                                datapointjson, new typereference<map<string, object>>() {});
                        results.add(datapoint);
                    } catch (exception e) {
                        log.error("error parsing data point: {}", e.getmessage(), e);
                    }
                }
            }
        }
        
        // 按时间戳排序
        results.sort(comparator.comparing(dp -> long.parselong(dp.get("timestamp").tostring())));
        
        return results;
    }
    
    /**
     * 计算窗口内数据的聚合统计
     */
    public map<string, object> getwindowstats(
            string streamkey, string sensorid, long windowstarttime, long windowendtime) {
        
        list<map<string, object>> windowdata = getwindowdata(streamkey, windowstarttime, windowendtime);
        
        // 过滤特定传感器的数据
        list<double> values = windowdata.stream()
                .filter(dp -> sensorid.equals(dp.get("sensor_id").tostring()))
                .map(dp -> double.parsedouble(dp.get("value").tostring()))
                .collect(collectors.tolist());
        
        map<string, object> stats = new hashmap<>();
        stats.put("count", values.size());
        
        if (!values.isempty()) {
            doublesummarystatistics summarystats = values.stream().collect(collectors.summarizingdouble(v -> v));
            stats.put("min", summarystats.getmin());
            stats.put("max", summarystats.getmax());
            stats.put("avg", summarystats.getaverage());
            stats.put("sum", summarystats.getsum());
        }
        
        stats.put("start_time", windowstarttime);
        stats.put("end_time", windowendtime);
        stats.put("sensor_id", sensorid);
        
        return stats;
    }
    
    /**
     * 实现滑动窗口处理
     */
    @scheduled(fixedrate = 60000) // 每分钟执行一次
    public void processslidingwindows() {
        string streamkey = "temperature";
        long now = system.currenttimemillis();
        
        // 处理过去10分钟窗口的数据
        long windowendtime = now;
        long windowstarttime = now - (10 * 60 * 1000);
        
        list<string> sensorids = arrays.aslist("1", "2", "3"); // 示例传感器id
        
        for (string sensorid : sensorids) {
            try {
                // 获取窗口统计
                map<string, object> stats = getwindowstats(streamkey, sensorid, windowstarttime, windowendtime);
                
                // 根据统计结果执行业务逻辑
                if (stats.containskey("avg")) {
                    double avgtemp = (double) stats.get("avg");
                    if (avgtemp > 25.0) {
                        // 触发高温警报
                        log.warn("high temperature alert for sensor {}: {} °c", sensorid, avgtemp);
                        triggeralert(sensorid, "high_temp", avgtemp);
                    }
                }
                
                // 存储聚合结果用于历史趋势分析
                saveaggregatedresults(streamkey, sensorid, stats);
                
            } catch (exception e) {
                log.error("error processing sliding window for sensor {}: {}", 
                        sensorid, e.getmessage(), e);
            }
        }
    }
    
    /**
     * 触发警报
     */
    private void triggeralert(string sensorid, string alerttype, double value) {
        map<string, object> alertdata = new hashmap<>();
        alertdata.put("sensor_id", sensorid);
        alertdata.put("alert_type", alerttype);
        alertdata.put("value", value);
        alertdata.put("timestamp", system.currenttimemillis());
        
        redistemplate.opsforstream().add("alerts", alertdata);
    }
    
    /**
     * 保存聚合结果
     */
    private void saveaggregatedresults(string streamkey, string sensorid, map<string, object> stats) {
        long windowtime = (long) stats.get("end_time");
        string aggregatekey = streamkey + ":aggregate:" + sensorid;
        
        // 使用时间作为分数存储聚合结果
        redistemplate.opsforzset().add(
                aggregatekey, 
                new objectmapper().writevalueasstring(stats),
                windowtime);
        
        // 保留30天的聚合数据
        redistemplate.expire(aggregatekey, duration.ofdays(30));
    }
}

使用场景

优缺点

优点

缺点

结论

redis stream提供了强大而灵活的消息处理功能,通过组合这些模式,可以构建出高性能、可靠且灵活的消息处理系统,满足从简单的任务队列到复杂的实时数据处理等各种应用需求。

在选择和实现这些模式时,应充分考虑业务特性、性能需求、可靠性要求以及系统规模,结合redis stream的特性,打造最适合自己应用场景的消息处理解决方案。

以上就是一文带你搞懂redis stream的6种消息处理模式的详细内容,更多关于redis stream消息处理模式的资料请关注代码网其它相关文章!

(0)
打赏 微信扫一扫 微信扫一扫

您想发表意见!!点此发布评论

推荐阅读

Redis中6种缓存更新策略详解

05-03

Redis实现分布式锁全解析之从原理到实践过程

05-05

redis中使用lua脚本的原理与基本使用详解

04-29

Redis实现客户端缓存的4种方式

05-07

Redis 热 key 和大 key 问题小结

04-28

Redis中RedisSearch使用及应用场景

05-09

猜你喜欢

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论