39人参与 • 2025-05-03 • Redis
redis作为一款高性能的内存数据库,已经成为缓存层的首选解决方案。然而,使用缓存时最大的挑战在于保证缓存数据与底层数据源的一致性。缓存更新策略直接影响系统的性能、可靠性和数据一致性,选择合适的策略至关重要。
本文将介绍redis中6种缓存更新策略。
cache-aside是最常用的缓存模式,由应用层负责缓存和数据库的交互逻辑:
@service public class userservicecacheaside { @autowired private redistemplate<string, user> redistemplate; @autowired private userrepository userrepository; private static final string cache_key_prefix = "user:"; private static final long cache_expiration = 30; // 缓存过期时间(分钟) public user getuserbyid(long userid) { string cachekey = cache_key_prefix + userid; // 1. 查询缓存 user user = redistemplate.opsforvalue().get(cachekey); // 2. 缓存命中,直接返回 if (user != null) { return user; } // 3. 缓存未命中,查询数据库 user = userrepository.findbyid(userid).orelse(null); // 4. 将数据库结果写入缓存(设置过期时间) if (user != null) { redistemplate.opsforvalue().set(cachekey, user, cache_expiration, timeunit.minutes); } return user; } public void updateuser(user user) { // 1. 先更新数据库 userrepository.save(user); // 2. 再删除缓存 string cachekey = cache_key_prefix + user.getid(); redistemplate.delete(cachekey); // 或者选择更新缓存 // redistemplate.opsforvalue().set(cachekey, user, cache_expiration, timeunit.minutes); } }
优点
缺点
read-through策略将缓存作为主要数据源的代理,由缓存层负责数据加载:
首先定义缓存加载器接口:
public interface cacheloader<k, v> { v load(k key); }
实现read-through缓存管理器:
@component public class readthroughcachemanager<k, v> { @autowired private redistemplate<string, v> redistemplate; private final concurrenthashmap<string, cacheloader<k, v>> loaders = new concurrenthashmap<>(); public void registerloader(string cacheprefix, cacheloader<k, v> loader) { loaders.put(cacheprefix, loader); } public v get(string cacheprefix, k key, long expiration, timeunit timeunit) { string cachekey = cacheprefix + key; // 1. 查询缓存 v value = redistemplate.opsforvalue().get(cachekey); // 2. 缓存命中,直接返回 if (value != null) { return value; } // 3. 缓存未命中,通过加载器获取数据 cacheloader<k, v> loader = loaders.get(cacheprefix); if (loader == null) { throw new illegalstateexception("no cache loader registered for prefix: " + cacheprefix); } // 使用加载器从数据源加载数据 value = loader.load(key); // 4. 将加载的数据存入缓存 if (value != null) { redistemplate.opsforvalue().set(cachekey, value, expiration, timeunit); } return value; } }
使用示例:
@service public class userservicereadthrough { private static final string cache_prefix = "user:"; private static final long cache_expiration = 30; @autowired private readthroughcachemanager<long, user> cachemanager; @autowired private userrepository userrepository; @postconstruct public void init() { // 注册用户数据加载器 cachemanager.registerloader(cache_prefix, this::loaduserfromdb); } private user loaduserfromdb(long userid) { return userrepository.findbyid(userid).orelse(null); } public user getuserbyid(long userid) { // 直接通过缓存管理器获取数据,缓存逻辑由管理器处理 return cachemanager.get(cache_prefix, userid, cache_expiration, timeunit.minutes); } }
优点
缺点
write-through策略由缓存层同步更新底层数据源:
首先定义写入接口:
public interface cachewriter<k, v> { void write(k key, v value); }
实现write-through缓存管理器:
@component public class writethroughcachemanager<k, v> { @autowired private redistemplate<string, v> redistemplate; private final concurrenthashmap<string, cachewriter<k, v>> writers = new concurrenthashmap<>(); public void registerwriter(string cacheprefix, cachewriter<k, v> writer) { writers.put(cacheprefix, writer); } public void put(string cacheprefix, k key, v value, long expiration, timeunit timeunit) { string cachekey = cacheprefix + key; // 1. 获取对应的缓存写入器 cachewriter<k, v> writer = writers.get(cacheprefix); if (writer == null) { throw new illegalstateexception("no cache writer registered for prefix: " + cacheprefix); } // 2. 同步写入数据库 writer.write(key, value); // 3. 更新缓存 redistemplate.opsforvalue().set(cachekey, value, expiration, timeunit); } }
使用示例:
@service public class userservicewritethrough { private static final string cache_prefix = "user:"; private static final long cache_expiration = 30; @autowired private writethroughcachemanager<long, user> cachemanager; @autowired private userrepository userrepository; @postconstruct public void init() { // 注册用户数据写入器 cachemanager.registerwriter(cache_prefix, this::saveusertodb); } private void saveusertodb(long userid, user user) { userrepository.save(user); } public void updateuser(user user) { // 通过缓存管理器更新数据,会同步更新数据库和缓存 cachemanager.put(cache_prefix, user.getid(), user, cache_expiration, timeunit.minutes); } }
优点
缺点
write-behind策略将写操作异步化处理:
实现异步写入队列和处理器:
@component public class writebehindcachemanager<k, v> { @autowired private redistemplate<string, v> redistemplate; private final blockingqueue<cacheupdate<k, v>> updatequeue = new linkedblockingqueue<>(); private final concurrenthashmap<string, cachewriter<k, v>> writers = new concurrenthashmap<>(); public void registerwriter(string cacheprefix, cachewriter<k, v> writer) { writers.put(cacheprefix, writer); } @postconstruct public void init() { // 启动异步写入线程 thread writerthread = new thread(this::processwritebehindqueue); writerthread.setdaemon(true); writerthread.start(); } public void put(string cacheprefix, k key, v value, long expiration, timeunit timeunit) { string cachekey = cacheprefix + key; // 1. 更新缓存 redistemplate.opsforvalue().set(cachekey, value, expiration, timeunit); // 2. 将更新放入队列,等待异步写入数据库 updatequeue.offer(new cacheupdate<>(cacheprefix, key, value)); } private void processwritebehindqueue() { list<cacheupdate<k, v>> batch = new arraylist<>(100); while (true) { try { // 获取队列中的更新,最多等待100ms cacheupdate<k, v> update = updatequeue.poll(100, timeunit.milliseconds); if (update != null) { batch.add(update); } // 继续收集队列中可用的更新,最多收集100个或等待200ms updatequeue.drainto(batch, 100 - batch.size()); if (!batch.isempty()) { // 按缓存前缀分组批量处理 map<string, list<cacheupdate<k, v>>> groupedupdates = batch.stream() .collect(collectors.groupingby(cacheupdate::getcacheprefix)); for (map.entry<string, list<cacheupdate<k, v>>> entry : groupedupdates.entryset()) { string cacheprefix = entry.getkey(); list<cacheupdate<k, v>> updates = entry.getvalue(); cachewriter<k, v> writer = writers.get(cacheprefix); if (writer != null) { // 批量写入数据库 for (cacheupdate<k, v> u : updates) { try { writer.write(u.getkey(), u.getvalue()); } catch (exception e) { // 处理异常,可以重试或记录日志 log.error("failed to write-behind for key {}: {}", u.getkey(), e.getmessage()); } } } } batch.clear(); } } catch (interruptedexception e) { thread.currentthread().interrupt(); break; } catch (exception e) { log.error("error in write-behind process", e); } } } @data @allargsconstructor private static class cacheupdate<k, v> { private string cacheprefix; private k key; private v value; } }
使用示例:
@service public class userservicewritebehind { private static final string cache_prefix = "user:"; private static final long cache_expiration = 30; @autowired private writebehindcachemanager<long, user> cachemanager; @autowired private userrepository userrepository; @postconstruct public void init() { // 注册用户数据写入器 cachemanager.registerwriter(cache_prefix, this::saveusertodb); } private void saveusertodb(long userid, user user) { userrepository.save(user); } public void updateuser(user user) { // 更新仅写入缓存,异步写入数据库 cachemanager.put(cache_prefix, user.getid(), user, cache_expiration, timeunit.minutes); } }
优点
缺点
refresh-ahead策略预测性地在缓存过期前进行更新:
@component public class refreshaheadcachemanager<k, v> { @autowired private redistemplate<string, object> redistemplate; @autowired private threadpooltaskexecutor refreshexecutor; private final concurrenthashmap<string, cacheloader<k, v>> loaders = new concurrenthashmap<>(); // 刷新阈值,当过期时间剩余不足阈值比例时触发刷新 private final double refreshthreshold = 0.75; // 75% public void registerloader(string cacheprefix, cacheloader<k, v> loader) { loaders.put(cacheprefix, loader); } @suppresswarnings("unchecked") public v get(string cacheprefix, k key, long expiration, timeunit timeunit) { string cachekey = cacheprefix + key; // 1. 获取缓存项和其ttl v value = (v) redistemplate.opsforvalue().get(cachekey); long ttl = redistemplate.getexpire(cachekey, timeunit.milliseconds); if (value != null) { // 2. 如果缓存存在但接近过期,触发异步刷新 if (ttl != null && ttl > 0) { long expirationms = timeunit.tomillis(expiration); if (ttl < expirationms * (1 - refreshthreshold)) { refreshasync(cacheprefix, key, cachekey, expiration, timeunit); } } return value; } // 3. 缓存不存在,同步加载 return loadandcache(cacheprefix, key, cachekey, expiration, timeunit); } private void refreshasync(string cacheprefix, k key, string cachekey, long expiration, timeunit timeunit) { refreshexecutor.execute(() -> { try { loadandcache(cacheprefix, key, cachekey, expiration, timeunit); } catch (exception e) { // 异步刷新失败,记录日志但不影响当前请求 log.error("failed to refresh cache for key {}: {}", cachekey, e.getmessage()); } }); } private v loadandcache(string cacheprefix, k key, string cachekey, long expiration, timeunit timeunit) { cacheloader<k, v> loader = loaders.get(cacheprefix); if (loader == null) { throw new illegalstateexception("no cache loader registered for prefix: " + cacheprefix); } // 从数据源加载 v value = loader.load(key); // 更新缓存 if (value != null) { redistemplate.opsforvalue().set(cachekey, value, expiration, timeunit); } return value; } }
使用示例:
@service public class productservicerefreshahead { private static final string cache_prefix = "product:"; private static final long cache_expiration = 60; // 1小时 @autowired private refreshaheadcachemanager<string, product> cachemanager; @autowired private productrepository productrepository; @postconstruct public void init() { // 注册产品数据加载器 cachemanager.registerloader(cache_prefix, this::loadproductfromdb); } private product loadproductfromdb(string productid) { return productrepository.findbyid(productid).orelse(null); } public product getproduct(string productid) { return cachemanager.get(cache_prefix, productid, cache_expiration, timeunit.minutes); } }
@configuration public class threadpoolconfig { @bean public threadpooltaskexecutor refreshexecutor() { threadpooltaskexecutor executor = new threadpooltaskexecutor(); executor.setcorepoolsize(5); executor.setmaxpoolsize(20); executor.setqueuecapacity(100); executor.setthreadnameprefix("cache-refresh-"); executor.setrejectedexecutionhandler(new threadpoolexecutor.callerrunspolicy()); executor.initialize(); return executor; } }
优点
缺点
最终一致性策略基于分布式事件系统实现数据同步:
首先定义数据变更事件:
@data @allargsconstructor public class datachangeevent { private string entitytype; private string entityid; private string operation; // create, update, delete private string payload; // json格式的实体数据 }
实现事件发布者:
@component public class datachangepublisher { @autowired private kafkatemplate<string, datachangeevent> kafkatemplate; private static final string topic = "data-changes"; public void publishchange(string entitytype, string entityid, string operation, object entity) { try { // 将实体序列化为json string payload = new objectmapper().writevalueasstring(entity); // 创建事件 datachangeevent event = new datachangeevent(entitytype, entityid, operation, payload); // 发布到kafka kafkatemplate.send(topic, entityid, event); } catch (exception e) { log.error("failed to publish data change event", e); throw new runtimeexception("failed to publish event", e); } } }
实现事件消费者更新缓存:
@component @slf4j public class cacheupdateconsumer { @autowired private redistemplate<string, object> redistemplate; private static final long cache_expiration = 30; @kafkalistener(topics = "data-changes") public void handledatachangeevent(datachangeevent event) { try { string cachekey = buildcachekey(event.getentitytype(), event.getentityid()); switch (event.getoperation()) { case "create": case "update": // 解析json数据 object entity = parseentity(event.getpayload(), event.getentitytype()); // 更新缓存 redistemplate.opsforvalue().set( cachekey, entity, cache_expiration, timeunit.minutes); log.info("updated cache for {}: {}", cachekey, event.getoperation()); break; case "delete": // 删除缓存 redistemplate.delete(cachekey); log.info("deleted cache for {}", cachekey); break; default: log.warn("unknown operation: {}", event.getoperation()); } } catch (exception e) { log.error("error handling data change event: {}", e.getmessage(), e); // 失败处理:可以将失败事件放入死信队列等 } } private string buildcachekey(string entitytype, string entityid) { return entitytype.tolowercase() + ":" + entityid; } private object parseentity(string payload, string entitytype) throws jsonprocessingexception { // 根据实体类型选择反序列化目标类 class<?> targetclass = getclassforentitytype(entitytype); return new objectmapper().readvalue(payload, targetclass); } private class<?> getclassforentitytype(string entitytype) { switch (entitytype) { case "user": return user.class; case "product": return product.class; // 其他实体类型 default: throw new illegalargumentexception("unknown entity type: " + entitytype); } } }
使用示例:
@service @transactional public class userserviceeventdriven { @autowired private userrepository userrepository; @autowired private datachangepublisher publisher; public user createuser(user user) { // 1. 保存用户到数据库 user saveduser = userrepository.save(user); // 2. 发布创建事件 publisher.publishchange("user", saveduser.getid().tostring(), "create", saveduser); return saveduser; } public user updateuser(user user) { // 1. 更新用户到数据库 user updateduser = userrepository.save(user); // 2. 发布更新事件 publisher.publishchange("user", updateduser.getid().tostring(), "update", updateduser); return updateduser; } public void deleteuser(long userid) { // 1. 从数据库删除用户 userrepository.deletebyid(userid); // 2. 发布删除事件 publisher.publishchange("user", userid.tostring(), "delete", null); } }
优点
缺点
选择合适的缓存更新策略需要考虑以下因素:
业务特征 | 推荐策略 |
---|---|
读多写少 | cache-aside 或 read-through |
写密集型 | write-behind |
高一致性需求 | write-through |
响应时间敏感 | refresh-ahead |
分布式系统 | 最终一致性 |
资源约束 | 推荐策略 |
---|---|
内存限制 | cache-aside(按需缓存) |
数据库负载高 | write-behind(减轻写压力) |
网络带宽受限 | write-behind 或 refresh-ahead |
复杂度要求 | 推荐策略 |
---|---|
简单实现 | cache-aside |
中等复杂度 | read-through 或 write-through |
高复杂度但高性能 | write-behind 或 最终一致性 |
缓存更新是redis应用设计中的核心挑战,没有万能的策略适用于所有场景。根据业务需求、数据特性和系统资源,选择合适的缓存更新策略或组合多种策略才是最佳实践。
在实际应用中,可以根据不同数据的特性选择不同的缓存策略,甚至在同一个系统中组合多种策略,以达到性能和一致性的最佳平衡。
以上就是redis中6种缓存更新策略详解的详细内容,更多关于redis缓存更新的资料请关注代码网其它相关文章!
您想发表意见!!点此发布评论
版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。
发表评论