15人参与 • 2025-05-04 • Redis
redis 5.0版本引入的stream数据类型,为redis生态带来了强大而灵活的消息队列功能,弥补了之前发布/订阅模式的不足,如消息持久化、消费者组、消息确认等特性。
redis stream结合了传统消息队列和时序数据库的特点,适用于日志收集、事件驱动应用、实时分析等多种场景。
本文将介绍redis stream的6种消息处理模式。
简单消费模式是redis stream最基础的使用方式,不使用消费者组,直接读取流中的消息。生产者将消息追加到流中,消费者通过指定起始id来读取消息。
# 发布消息 xadd stream_name [id] field value [field value ...] # 读取消息 xread [count count] [block milliseconds] streams stream_name start_id
redis cli
# 添加消息到stream > xadd mystream * sensor_id 1234 temperature 19.8 humidity 56 "1647257548956-0" # 从头开始读取所有消息 > xread streams mystream 0 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # 从指定id开始读取 > xread streams mystream 1647257548956-0 (empty list or set) # 从最新的消息id之后开始读取(阻塞等待新消息) > xread block 5000 streams mystream $ (nil)
java spring boot示例
@service public class simplestreamservice { @autowired private stringredistemplate redistemplate; /** * 发布消息到stream */ public string publishevent(string streamkey, map<string, object> eventdata) { stringrecord record = streamrecords.string(eventdata).withstreamkey(streamkey); return redistemplate.opsforstream().add(record).getvalue(); } /** * 从指定位置开始读取消息 */ public list<maprecord<string, object, object>> readevents(string streamkey, string startid, int count) { streamreadoptions readoptions = streamreadoptions.empty().count(count); return redistemplate.opsforstream().read(readoptions, streamoffset.from(streamkey, readoffset.from(startid))); } /** * 阻塞式读取消息 */ public list<maprecord<string, object, object>> readeventsblocking(string streamkey, int timeoutmillis) { streamreadoptions readoptions = streamreadoptions.empty().count(10).block(duration.ofmillis(timeoutmillis)); return redistemplate.opsforstream().read(readoptions, streamoffset.latest(streamkey)); } }
优点
缺点
消费者组允许多个消费者共同处理一个流的消息,实现负载均衡,并提供消息确认机制,确保消息至少被处理一次。每个消费者组维护自己的消费位置,不同消费者组之间互不干扰。
# 创建消费者组 xgroup create stream_name group_name [id|$] [mkstream] # 从消费者组读取消息 xreadgroup group group_name consumer_name [count count] [block milliseconds] streams stream_name [>|id] # 确认消息处理完成 xack stream_name group_name message_id [message_id ...]
redis cli
# 创建消费者组 > xgroup create mystream processing-group $ mkstream ok # 消费者1读取消息 > xreadgroup group processing-group consumer-1 count 1 streams mystream > 1) 1) "mystream" 2) 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56" # 确认消息已处理 > xack mystream processing-group 1647257548956-0 (integer) 1 # 消费者2读取消息(已无未处理消息) > xreadgroup group processing-group consumer-2 count 1 streams mystream > 1) 1) "mystream" 2) (empty list or set)
java spring boot示例
@service public class consumergroupservice { @autowired private stringredistemplate redistemplate; /** * 创建消费者组 */ public void creategroup(string streamkey, string groupname) { try { redistemplate.opsforstream().creategroup(streamkey, groupname); } catch (redissystemexception e) { // 处理流不存在的情况 if (e.getrootcause() instanceof rediscommandexecutionexception && e.getrootcause().getmessage().contains("nogroup")) { redistemplate.opsforstream().creategroup(readoffset.from("0"), streamkey, groupname); } else { throw e; } } } /** * 从消费者组读取消息 */ public list<maprecord<string, object, object>> readfromgroup( string streamkey, string groupname, string consumername, int count) { streamreadoptions options = streamreadoptions.empty().count(count); return redistemplate.opsforstream().read( consumer.from(groupname, consumername), options, streamoffset.create(streamkey, readoffset.lastconsumed()) ); } /** * 阻塞式从消费者组读取消息 */ public list<maprecord<string, object, object>> readfromgroupblocking( string streamkey, string groupname, string consumername, int count, duration timeout) { streamreadoptions options = streamreadoptions.empty().count(count).block(timeout); return redistemplate.opsforstream().read( consumer.from(groupname, consumername), options, streamoffset.create(streamkey, readoffset.lastconsumed()) ); } /** * 确认消息已处理 */ public long acknowledgemessage(string streamkey, string groupname, string... messageids) { return redistemplate.opsforstream().acknowledge(streamkey, groupname, messageids); } }
优点
缺点
阻塞式消费允许消费者在没有新消息时保持连接,等待新消息到达。这种模式减少了轮询开销,提高了实时性,适合对消息处理时效性要求高的场景。
# 阻塞式简单消费 xread block milliseconds streams stream_name id # 阻塞式消费者组消费 xreadgroup group group_name consumer_name block milliseconds streams stream_name >
redis cli
# 阻塞等待新消息(最多等待10秒) > xread block 10000 streams mystream $ (nil) # 如果10秒内没有新消息 # 使用消费者组的阻塞式消费 > xreadgroup group processing-group consumer-1 block 10000 streams mystream > (nil) # 如果10秒内没有新分配的消息
java spring boot示例
@service public class blockingstreamconsumerservice { @autowired private stringredistemplate redistemplate; /** * 阻塞式消息消费者任务 */ @async public void startblockingconsumer(string streamkey, string lastid, duration timeout) { streamreadoptions options = streamreadoptions.empty() .count(1) .block(timeout); while (!thread.currentthread().isinterrupted()) { try { // 阻塞读取消息 list<maprecord<string, object, object>> records = redistemplate.opsforstream() .read(options, streamoffset.from(streamkey, readoffset.from(lastid))); if (records != null && !records.isempty()) { for (maprecord<string, object, object> record : records) { // 处理消息 processmessage(record); // 更新最后读取的id lastid = record.getid().getvalue(); } } else { // 超时未读取到消息,可以执行一些其他逻辑 } } catch (exception e) { // 异常处理 log.error("error reading from stream: {}", e.getmessage(), e); try { thread.sleep(1000); // 出错后等待一段时间再重试 } catch (interruptedexception ie) { thread.currentthread().interrupt(); break; } } } } /** * 阻塞式消费者组消费 */ @async public void startgroupblockingconsumer( string streamkey, string groupname, string consumername, duration timeout) { streamreadoptions options = streamreadoptions.empty() .count(1) .block(timeout); while (!thread.currentthread().isinterrupted()) { try { // 阻塞读取消息 list<maprecord<string, object, object>> records = redistemplate.opsforstream() .read(consumer.from(groupname, consumername), options, streamoffset.create(streamkey, readoffset.lastconsumed())); if (records != null && !records.isempty()) { for (maprecord<string, object, object> record : records) { try { // 处理消息 processmessage(record); // 确认消息 redistemplate.opsforstream() .acknowledge(streamkey, groupname, record.getid().getvalue()); } catch (exception e) { // 处理失败,记录日志 log.error("error processing message: {}", e.getmessage(), e); } } } } catch (exception e) { log.error("error reading from stream group: {}", e.getmessage(), e); try { thread.sleep(1000); } catch (interruptedexception ie) { thread.currentthread().interrupt(); break; } } } } private void processmessage(maprecord<string, object, object> record) { // 实际消息处理逻辑 log.info("processing message: {}", record); // ...处理消息的具体业务逻辑 } }
优点
缺点
扇出模式允许多个独立的消费者组同时消费同一个流中的所有消息,类似于发布/订阅模式,但具有消息持久化和回溯能力。每个消费者组独立维护自己的消费位置。
创建多个消费者组,它们都独立消费同一个流:
xgroup create stream_name group_name_1 $ mkstream xgroup create stream_name group_name_2 $ mkstream xgroup create stream_name group_name_3 $ mkstream
redis cli
# 创建多个消费者组 > xgroup create notifications analytics-group $ mkstream ok > xgroup create notifications email-group $ mkstream ok > xgroup create notifications mobile-group $ mkstream ok # 添加一条消息 > xadd notifications * type user_signup user_id 1001 email "user@example.com" "1647345678912-0" # 从各个消费者组读取(每个组都能收到所有消息) > xreadgroup group analytics-group analytics-1 count 1 streams notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com" > xreadgroup group email-group email-1 count 1 streams notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com" > xreadgroup group mobile-group mobile-1 count 1 streams notifications > 1) 1) "notifications" 2) 1) 1) "1647345678912-0" 2) 1) "type" 2) "user_signup" 3) "user_id" 4) "1001" 5) "email" 6) "user@example.com"
java spring boot示例
@service public class fanoutservice { @autowired private stringredistemplate redistemplate; /** * 初始化扇出消费者组 */ public void initializefanoutgroups(string streamkey, list<string> groupnames) { // 确保流存在 try { streaminfo.xinfostream info = redistemplate.opsforstream().info(streamkey); } catch (exception e) { // 流不存在,发送一个初始消息 map<string, object> initialmessage = new hashmap<>(); initialmessage.put("init", "true"); redistemplate.opsforstream().add(streamkey, initialmessage); } // 创建所有消费者组 for (string groupname : groupnames) { try { redistemplate.opsforstream().creategroup(streamkey, groupname); } catch (exception e) { // 忽略组已存在的错误 log.info("group {} may already exist: {}", groupname, e.getmessage()); } } } /** * 发布扇出消息 */ public string publishfanoutmessage(string streamkey, map<string, object> messagedata) { stringrecord record = streamrecords.string(messagedata).withstreamkey(streamkey); return redistemplate.opsforstream().add(record).getvalue(); } /** * 为特定组启动消费者 */ @async public void startgroupconsumer( string streamkey, string groupname, string consumername, consumer<maprecord<string, object, object>> messagehandler) { streamreadoptions options = streamreadoptions.empty().count(10).block(duration.ofseconds(2)); while (!thread.currentthread().isinterrupted()) { try { list<maprecord<string, object, object>> messages = redistemplate.opsforstream().read( consumer.from(groupname, consumername), options, streamoffset.create(streamkey, readoffset.lastconsumed()) ); if (messages != null && !messages.isempty()) { for (maprecord<string, object, object> message : messages) { try { // 处理消息 messagehandler.accept(message); // 确认消息 redistemplate.opsforstream().acknowledge( streamkey, groupname, message.getid().getvalue()); } catch (exception e) { log.error("error processing message in group {}: {}", groupname, e.getmessage(), e); } } } } catch (exception e) { log.error("error reading from stream for group {}: {}", groupname, e.getmessage(), e); try { thread.sleep(1000); } catch (interruptedexception ie) { thread.currentthread().interrupt(); break; } } } } }
@service public class notificationservice { @autowired private fanoutservice fanoutservice; @postconstruct public void init() { // 初始化扇出组 list<string> groups = arrays.aslist("email-group", "sms-group", "analytics-group"); fanoutservice.initializefanoutgroups("user-events", groups); // 启动各个消费者组的处理器 fanoutservice.startgroupconsumer( "user-events", "email-group", "email-consumer", this::processemailnotification); fanoutservice.startgroupconsumer( "user-events", "sms-group", "sms-consumer", this::processsmsnotification); fanoutservice.startgroupconsumer( "user-events", "analytics-group", "analytics-consumer", this::processanalyticsevent); } private void processemailnotification(maprecord<string, object, object> message) { map<object, object> messagedata = message.getvalue(); log.info("processing email notification: {}", messagedata); // 邮件发送逻辑 } private void processsmsnotification(maprecord<string, object, object> message) { map<object, object> messagedata = message.getvalue(); log.info("processing sms notification: {}", messagedata); // 短信发送逻辑 } private void processanalyticsevent(maprecord<string, object, object> message) { map<object, object> messagedata = message.getvalue(); log.info("processing analytics event: {}", messagedata); // 分析事件处理逻辑 } public void publishuserevent(string eventtype, map<string, object> eventdata) { map<string, object> message = new hashmap<>(eventdata); message.put("event_type", eventtype); message.put("timestamp", system.currenttimemillis()); fanoutservice.publishfanoutmessage("user-events", message); } }
优点
缺点
这种模式关注处理失败消息的恢复和重试机制。redis stream消费者组会跟踪每个消息的处理状态,允许查看和管理未确认(pel - pending entry list)的消息,实现可靠的消息处理。
# 查看消费者组中未确认的消息 xpending stream_name group_name [start_id end_id count] [consumer_name] # 查看消费者组中长时间未确认的消息详情 xpending stream_name group_name start_id end_id count [consumer_name] # 认领处理超时的消息 xclaim stream_name group_name consumer_name min_idle_time message_id [message_id ...] [justid]
redis cli
# 查看未确认的消息数量 > xpending mystream processing-group 1) (integer) 2 # 未确认消息数量 2) "1647257548956-0" # 最小id 3) "1647257549123-0" # 最大id 4) 1) 1) "consumer-1" # 各个消费者的未确认消息数 2) (integer) 1 2) 1) "consumer-2" 2) (integer) 1 # 查看特定消费者的未确认消息 > xpending mystream processing-group - + 10 consumer-1 1) 1) "1647257548956-0" # 消息id 2) "consumer-1" # 当前持有的消费者 3) (integer) 120000 # 空闲时间(毫秒) 4) (integer) 2 # 传递次数 # 认领超过2分钟未处理的消息 > xclaim mystream processing-group consumer-2 120000 1647257548956-0 1) 1) "1647257548956-0" 2) 1) "sensor_id" 2) "1234" 3) "temperature" 4) "19.8" 5) "humidity" 6) "56"
java spring boot示例
@service public class messagerecoveryservice { @autowired private stringredistemplate redistemplate; /** * 获取消费者组中的未确认消息 */ public pendingmessagessummary getpendingmessagessummary(string streamkey, string groupname) { return redistemplate.opsforstream().pending(streamkey, groupname); } /** * 获取指定消费者的详细未确认消息 */ public pendingmessages getpendingmessages( string streamkey, string groupname, string consumername, range<string> idrange, long count) { return redistemplate.opsforstream().pending( streamkey, consumer.from(groupname, consumername), idrange, count); } /** * 认领长时间未处理的消息 */ public list<maprecord<string, object, object>> claimmessages( string streamkey, string groupname, string newconsumername, duration minidletime, string... messageids) { return redistemplate.opsforstream().claim( streamkey, consumer.from(groupname, newconsumername), minidletime, messageids); } /** * 定时检查和恢复未处理的消息 */ @scheduled(fixedrate = 60000) // 每分钟执行一次 public void recoverstalemessages() { // 配置参数 string streamkey = "mystream"; string groupname = "processing-group"; string recoveryconsumer = "recovery-consumer"; duration minidletime = duration.ofminutes(5); // 超过5分钟未处理的消息 try { // 1. 获取所有未确认消息的摘要 pendingmessagessummary summary = getpendingmessagessummary(streamkey, groupname); if (summary != null && summary.gettotalpendingmessages() > 0) { // 2. 遍历每个消费者的未确认消息 for (consumer consumer : summary.getpendingmessagesperconsumer().keyset()) { // 获取该消费者的详细未确认消息列表 pendingmessages pendingmessages = getpendingmessages( streamkey, groupname, consumer.getname(), range.unbounded(), 50); // 每次最多处理50条 if (pendingmessages != null) { // 3. 筛选出空闲时间超过阈值的消息 list<string> stalemessageids = new arraylist<>(); for (pendingmessage message : pendingmessages) { if (message.getelapsedtimesincelastdelivery().compareto(minidletime) > 0) { stalemessageids.add(message.getidasstring()); } } // 4. 认领这些消息 if (!stalemessageids.isempty()) { log.info("claiming {} stale messages from consumer {}", stalemessageids.size(), consumer.getname()); list<maprecord<string, object, object>> claimedmessages = claimmessages( streamkey, groupname, recoveryconsumer, minidletime, stalemessageids.toarray(new string[0])); // 5. 处理这些被认领的消息 processclaimedmessages(streamkey, groupname, claimedmessages); } } } } } catch (exception e) { log.error("error recovering stale messages: {}", e.getmessage(), e); } } /** * 处理被认领的消息 */ private void processclaimedmessages( string streamkey, string groupname, list<maprecord<string, object, object>> messages) { if (messages == null || messages.isempty()) { return; } for (maprecord<string, object, object> message : messages) { try { // 执行消息处理逻辑 processmessage(message); // 确认消息 redistemplate.opsforstream().acknowledge( streamkey, groupname, message.getid().getvalue()); log.info("successfully processed recovered message: {}", message.getid()); } catch (exception e) { log.error("failed to process recovered message {}: {}", message.getid(), e.getmessage(), e); // 根据业务需求决定是否将消息加入死信队列 movetodeadletterqueue(streamkey, message); } } } /** * 将消息移至死信队列 */ private void movetodeadletterqueue(string sourcestream, maprecord<string, object, object> message) { string deadletterstream = sourcestream + ":dead-letter"; map<object, object> messagedata = message.getvalue(); map<string, object> dlqmessage = new hashmap<>(); messagedata.foreach((k, v) -> dlqmessage.put(k.tostring(), v)); // 添加元数据 dlqmessage.put("original_id", message.getid().getvalue()); dlqmessage.put("error_time", system.currenttimemillis()); redistemplate.opsforstream().add(deadletterstream, dlqmessage); // 可选:从原消费者组确认该消息 // redistemplate.opsforstream().acknowledge(sourcestream, groupname, message.getid().getvalue()); } private void processmessage(maprecord<string, object, object> message) { // 实际的消息处理逻辑 log.info("processing recovered message: {}", message); // ... } }
优点
缺点
流处理窗口模式基于时间或消息计数划分数据流,在每个窗口内执行聚合或分析操作。这种模式适用于实时分析、趋势监测和时间序列处理。虽然redis stream本身不直接提供窗口操作,但可以结合redis的其他特性实现。
主要通过以下几种方式实现:
1. 基于消息id的时间范围(redis消息id包含毫秒时间戳)
2. 结合redis的排序集合(sortedset)存储窗口数据
3. 使用redis的过期键实现滑动窗口
redis cli
窗口数据收集与查询:
# 添加带时间戳的数据 > xadd temperature * sensor_id 1 value 21.5 timestamp 1647257548000 "1647257550123-0" > xadd temperature * sensor_id 1 value 21.8 timestamp 1647257558000 "1647257560234-0" > xadd temperature * sensor_id 1 value 22.1 timestamp 1647257568000 "1647257570345-0" # 查询特定时间范围的数据 > xrange temperature 1647257550000-0 1647257570000-0 1) 1) "1647257550123-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.5" 5) "timestamp" 6) "1647257548000" 2) 1) "1647257560234-0" 2) 1) "sensor_id" 2) "1" 3) "value" 4) "21.8" 5) "timestamp" 6) "1647257558000"
java spring boot示例
@service public class timewindowprocessingservice { @autowired private stringredistemplate redistemplate; /** * 添加数据点到流,并存储到相应的时间窗口 */ public string adddatapoint(string streamkey, string sensorid, double value) { long timestamp = system.currenttimemillis(); // 1. 添加到原始数据流 map<string, object> datapoint = new hashmap<>(); datapoint.put("sensor_id", sensorid); datapoint.put("value", string.valueof(value)); datapoint.put("timestamp", string.valueof(timestamp)); stringrecord record = streamrecords.string(datapoint).withstreamkey(streamkey); recordid recordid = redistemplate.opsforstream().add(record); // 2. 计算所属的窗口(这里以5分钟为一个窗口) long windowstart = timestamp - (timestamp % (5 * 60 * 1000)); string windowkey = streamkey + ":window:" + windowstart; // 3. 将数据点添加到窗口的有序集合中,分数为时间戳 string datapointjson = new objectmapper().writevalueasstring(datapoint); redistemplate.opsforzset().add(windowkey, datapointjson, timestamp); // 4. 设置窗口键的过期时间(保留24小时) redistemplate.expire(windowkey, duration.ofhours(24)); return recordid.getvalue(); } /** * 获取指定时间窗口内的数据点 */ public list<map<string, object>> getwindowdata( string streamkey, long windowstarttime, long windowendtime) { // 计算可能的窗口键(每5分钟一个窗口) list<string> windowkeys = new arraylist<>(); long current = windowstarttime - (windowstarttime % (5 * 60 * 1000)); while (current <= windowendtime) { windowkeys.add(streamkey + ":window:" + current); current += (5 * 60 * 1000); } // 从各个窗口获取数据点 list<map<string, object>> results = new arraylist<>(); objectmapper mapper = new objectmapper(); for (string windowkey : windowkeys) { set<string> datapoints = redistemplate.opsforzset().rangebyscore( windowkey, windowstarttime, windowendtime); if (datapoints != null) { for (string datapointjson : datapoints) { try { map<string, object> datapoint = mapper.readvalue( datapointjson, new typereference<map<string, object>>() {}); results.add(datapoint); } catch (exception e) { log.error("error parsing data point: {}", e.getmessage(), e); } } } } // 按时间戳排序 results.sort(comparator.comparing(dp -> long.parselong(dp.get("timestamp").tostring()))); return results; } /** * 计算窗口内数据的聚合统计 */ public map<string, object> getwindowstats( string streamkey, string sensorid, long windowstarttime, long windowendtime) { list<map<string, object>> windowdata = getwindowdata(streamkey, windowstarttime, windowendtime); // 过滤特定传感器的数据 list<double> values = windowdata.stream() .filter(dp -> sensorid.equals(dp.get("sensor_id").tostring())) .map(dp -> double.parsedouble(dp.get("value").tostring())) .collect(collectors.tolist()); map<string, object> stats = new hashmap<>(); stats.put("count", values.size()); if (!values.isempty()) { doublesummarystatistics summarystats = values.stream().collect(collectors.summarizingdouble(v -> v)); stats.put("min", summarystats.getmin()); stats.put("max", summarystats.getmax()); stats.put("avg", summarystats.getaverage()); stats.put("sum", summarystats.getsum()); } stats.put("start_time", windowstarttime); stats.put("end_time", windowendtime); stats.put("sensor_id", sensorid); return stats; } /** * 实现滑动窗口处理 */ @scheduled(fixedrate = 60000) // 每分钟执行一次 public void processslidingwindows() { string streamkey = "temperature"; long now = system.currenttimemillis(); // 处理过去10分钟窗口的数据 long windowendtime = now; long windowstarttime = now - (10 * 60 * 1000); list<string> sensorids = arrays.aslist("1", "2", "3"); // 示例传感器id for (string sensorid : sensorids) { try { // 获取窗口统计 map<string, object> stats = getwindowstats(streamkey, sensorid, windowstarttime, windowendtime); // 根据统计结果执行业务逻辑 if (stats.containskey("avg")) { double avgtemp = (double) stats.get("avg"); if (avgtemp > 25.0) { // 触发高温警报 log.warn("high temperature alert for sensor {}: {} °c", sensorid, avgtemp); triggeralert(sensorid, "high_temp", avgtemp); } } // 存储聚合结果用于历史趋势分析 saveaggregatedresults(streamkey, sensorid, stats); } catch (exception e) { log.error("error processing sliding window for sensor {}: {}", sensorid, e.getmessage(), e); } } } /** * 触发警报 */ private void triggeralert(string sensorid, string alerttype, double value) { map<string, object> alertdata = new hashmap<>(); alertdata.put("sensor_id", sensorid); alertdata.put("alert_type", alerttype); alertdata.put("value", value); alertdata.put("timestamp", system.currenttimemillis()); redistemplate.opsforstream().add("alerts", alertdata); } /** * 保存聚合结果 */ private void saveaggregatedresults(string streamkey, string sensorid, map<string, object> stats) { long windowtime = (long) stats.get("end_time"); string aggregatekey = streamkey + ":aggregate:" + sensorid; // 使用时间作为分数存储聚合结果 redistemplate.opsforzset().add( aggregatekey, new objectmapper().writevalueasstring(stats), windowtime); // 保留30天的聚合数据 redistemplate.expire(aggregatekey, duration.ofdays(30)); } }
优点
缺点
redis stream提供了强大而灵活的消息处理功能,通过组合这些模式,可以构建出高性能、可靠且灵活的消息处理系统,满足从简单的任务队列到复杂的实时数据处理等各种应用需求。
在选择和实现这些模式时,应充分考虑业务特性、性能需求、可靠性要求以及系统规模,结合redis stream的特性,打造最适合自己应用场景的消息处理解决方案。
以上就是一文带你搞懂redis stream的6种消息处理模式的详细内容,更多关于redis stream消息处理模式的资料请关注代码网其它相关文章!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论