缓存雪崩
定义
缓存雪崩是指在短时间内,大量缓存键集中过期或缓存服务崩溃,导致大量请求直接访问数据库,使数据库瞬间压力激增,可能引起整个系统崩溃的现象。
产生原因
- 大量缓存同时过期:系统中大量缓存使用了相同的过期时间
- 缓存服务器宕机:Redis实例发生故障或重启
- 高并发请求:在缓存失效的同时有大量请求涌入
解决方案
过期时间差异化
1
2
|
int expireTime = baseExpireTime + new Random().nextInt(RANDOM_RANGE);
redisTemplate.opsForValue().set(key, value, expireTime, TimeUnit.SECONDS);
|
缓存高可用
- Redis集群:使用Redis Sentinel或Redis Cluster确保高可用
- 多级缓存:本地缓存 + 分布式缓存,降低Redis压力
1
2
3
4
5
6
7
8
9
|
// 使用Caffeine作为本地缓存
@Bean
public CacheManager cacheManager() {
CaffeineCacheManager cacheManager = new CaffeineCacheManager();
cacheManager.setCaffeine(Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.maximumSize(10000));
return cacheManager;
}
|
熔断降级
- 限流:使用令牌桶或漏桶算法限制请求流量
- 熔断机制:当检测到异常时,暂停部分服务
1
2
3
4
5
6
7
8
9
10
11
12
|
// 使用Sentinel实现熔断
@SentinelResource(value = "getProductInfo", fallback = "getProductInfoFallback")
public Product getProductInfo(Long productId) {
String key = "product:" + productId;
// 查询缓存...
// 缓存未命中查询数据库...
}
public Product getProductInfoFallback(Long productId) {
// 返回默认值或基础数据
return new Product(productId, "默认商品", 0);
}
|
预热机制
- 系统启动时加载热点数据到缓存
- 定时刷新即将过期的缓存
适用场景
- 电商秒杀:提前预热商品数据,设置差异化过期时间
- 促销活动:活动开始前预加载数据,避免活动开始时缓存雪崩
- 系统重启:实现缓存预热机制,避免重启后请求全部落到数据库
缓存穿透
定义
缓存穿透是指查询一个根本不存在的数据,缓存中没有,数据库中也没有,导致请求每次都要穿透到数据库,增加数据库压力。
产生原因
- 业务误操作:查询不存在的数据
- 恶意攻击:专门查询不存在的数据,绕过缓存压垮数据库
解决方案
空值缓存
- 缓存空结果:对不存在的数据也进行缓存,但过期时间较短
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
public Product getProduct(Long id) {
String key = "product:" + id;
// 查询缓存
String productJson = redisTemplate.opsForValue().get(key);
// 判断是否为空值标记
if ("__NULL__".equals(productJson)) {
return null;
}
if (productJson != null) {
return JSON.parseObject(productJson, Product.class);
}
// 查询数据库
Product product = productMapper.selectById(id);
// 数据库中不存在,缓存空值
if (product == null) {
redisTemplate.opsForValue().set(key, "__NULL__", 5, TimeUnit.MINUTES);
return null;
}
// 数据库中存在,缓存结果
redisTemplate.opsForValue().set(key, JSON.toJSONString(product), 30, TimeUnit.MINUTES);
return product;
}
|
布隆过滤器
- 原理:使用布隆过滤器快速判断数据是否存在
- 实现:将所有可能存在的数据哈希到布隆过滤器中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
// 使用Redisson实现布隆过滤器
@Bean
public RBloomFilter<String> bloomFilter(RedissonClient redissonClient) {
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("productBloomFilter");
// 初始化布隆过滤器,预计元素数量为100万,误判率为0.01
bloomFilter.tryInit(1000000L, 0.01);
return bloomFilter;
}
// 使用布隆过滤器判断商品是否存在
public Product getProduct(Long id) {
String key = "product:" + id;
String idStr = id.toString();
// 通过布隆过滤器判断是否存在
if (!bloomFilter.contains(idStr)) {
return null; // 布隆过滤器中不存在,直接返回
}
// 查询缓存和数据库的逻辑...
}
|
请求参数校验
- 接口层校验:对参数进行合法性校验
- 限流策略:对同一用户频繁访问进行限制
适用场景
- 用户信息查询:使用布隆过滤器预先加载所有用户ID
- 商品目录:对不存在的商品ID进行空值缓存
- API接口防护:对外部接口使用参数校验和限流保护
缓存击穿
定义
缓存击穿是指一个热点key在过期的瞬间,同时有大量请求并发访问该key,导致所有请求都落到数据库上,造成数据库瞬间压力激增。
产生原因
- 热点数据过期:高访问量的热点数据在某一时刻过期
- 并发请求:大量并发请求同时到达
解决方案
互斥锁(分布式锁)
- 原理:获取锁的线程负责查询数据库并更新缓存,其他线程等待或重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public Product getProduct(Long id) {
String key = "product:" + id;
String lockKey = "lock:product:" + id;
// 查询缓存
String productJson = redisTemplate.opsForValue().get(key);
if (productJson != null) {
return JSON.parseObject(productJson, Product.class);
}
// 获取分布式锁
boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
try {
if (locked) {
// 双重检查
productJson = redisTemplate.opsForValue().get(key);
if (productJson != null) {
return JSON.parseObject(productJson, Product.class);
}
// 查询数据库
Product product = productMapper.selectById(id);
if (product != null) {
// 更新缓存,设置较长的过期时间
redisTemplate.opsForValue().set(key, JSON.toJSONString(product), 1, TimeUnit.HOURS);
}
return product;
} else {
// 未获取到锁,短暂休眠后重试
Thread.sleep(50);
return getProduct(id);
}
} catch (Exception e) {
log.error("获取商品信息异常", e);
return null;
} finally {
// 释放锁
if (locked) {
redisTemplate.delete(lockKey);
}
}
}
|
永不过期策略
- 逻辑过期:不设置实际过期时间,而是在value中维护一个逻辑过期时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
public Product getProduct(Long id) {
String key = "product:" + id;
// 查询缓存
String productJsonWithExpire = redisTemplate.opsForValue().get(key);
if (productJsonWithExpire != null) {
ProductWithExpire productWithExpire = JSON.parseObject(productJsonWithExpire, ProductWithExpire.class);
// 判断是否逻辑过期
if (productWithExpire.getExpireTime() > System.currentTimeMillis()) {
// 未过期,直接返回
return productWithExpire.getProduct();
}
// 已过期,尝试获取锁进行更新
String lockKey = "lock:product:" + id;
boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (locked) {
try {
// 异步更新缓存
threadPool.submit(() -> {
// 查询数据库
Product newProduct = productMapper.selectById(id);
if (newProduct != null) {
// 设置新的过期时间
ProductWithExpire newProductWithExpire = new ProductWithExpire();
newProductWithExpire.setProduct(newProduct);
newProductWithExpire.setExpireTime(System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1));
// 更新缓存
redisTemplate.opsForValue().set(key, JSON.toJSONString(newProductWithExpire));
}
});
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
}
// 返回过期的数据
return productWithExpire.getProduct();
}
// 缓存未命中,查询数据库并设置缓存
// ...
}
|
提前刷新缓存
- 定时任务:对热点数据定时刷新,避免过期
- 异步更新:在即将过期前异步更新缓存
适用场景
- 商品详情页:高流量商品使用互斥锁或永不过期策略
- 首页数据:使用定时任务提前刷新缓存
- 热门活动:活动期间对热点数据设置永不过期
缓存预热
定义
缓存预热是指在系统启动或者预计流量高峰前,提前将热点数据加载到缓存中,避免用户请求时再加载导致的性能问题。
实现方案
系统启动预热
- 启动时加载:系统启动时,主动查询数据库并加载热点数据到缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@Component
public class CachePreheater implements ApplicationRunner {
@Autowired
private ProductService productService;
@Override
public void run(ApplicationArguments args) {
log.info("开始预热商品缓存...");
// 加载热门商品
List<Long> hotProductIds = productService.getHotProductIds();
for (Long id : hotProductIds) {
productService.preloadProductCache(id);
}
log.info("商品缓存预热完成,共预热{}个商品", hotProductIds.size());
}
}
|
定时刷新
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
@Component
@EnableScheduling
public class CacheRefresher {
@Autowired
private ProductService productService;
// 每天凌晨2点执行
@Scheduled(cron = "0 0 2 * * ?")
public void refreshHotProductCache() {
log.info("开始刷新热门商品缓存...");
List<Long> hotProductIds = productService.getHotProductIds();
for (Long id : hotProductIds) {
productService.preloadProductCache(id);
}
log.info("热门商品缓存刷新完成");
}
}
|
手动触发
- 管理接口:提供管理接口,允许运维人员手动触发缓存预热
适用场景
- 电商秒杀:活动开始前预热商品数据
- 系统重启:重启后立即预热核心数据
- 促销活动:活动前预热相关商品和活动规则
缓存降级
定义
缓存降级是指在Redis缓存异常或者流量剧增的情况下,暂时屏蔽部分功能或返回默认值,保证核心业务的正常运行。
实现方案
返回默认值
- 异常时返回兜底数据:当缓存和数据库都无法访问时,返回预设的默认值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
public List<Product> getRecommendProducts(Long userId) {
try {
// 尝试从缓存获取
String key = "recommend:user:" + userId;
String productsJson = redisTemplate.opsForValue().get(key);
if (productsJson != null) {
return JSON.parseArray(productsJson, Product.class);
}
// 缓存未命中,查询推荐系统
List<Product> products = recommendService.getRecommendProducts(userId);
if (!products.isEmpty()) {
redisTemplate.opsForValue().set(key, JSON.toJSONString(products), 1, TimeUnit.HOURS);
return products;
}
// 推荐系统未返回结果,查询默认推荐
return getDefaultRecommendProducts();
} catch (Exception e) {
log.error("获取推荐商品异常,返回默认推荐", e);
return getDefaultRecommendProducts();
}
}
private List<Product> getDefaultRecommendProducts() {
// 返回预设的热门商品列表
return productService.getHotProducts(10);
}
|
功能降级
- 关闭非核心功能:在系统压力大时,暂时关闭一些非核心功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@Service
public class ProductServiceWithDegradation {
@Autowired
private ConfigService configService;
public ProductDetail getProductDetail(Long productId) {
ProductDetail detail = new ProductDetail();
// 基础信息(核心功能,必须保留)
Product product = getProductBasicInfo(productId);
detail.setProduct(product);
// 判断是否开启了降级
boolean isDegraded = configService.isFeatureDegraded("product_detail");
if (!isDegraded) {
// 非降级状态,加载完整信息
detail.setComments(getProductComments(productId));
detail.setRecommendations(getRelatedProducts(productId));
detail.setDetailImages(getProductDetailImages(productId));
} else {
// 降级状态,只保留核心功能
detail.setComments(Collections.emptyList());
detail.setRecommendations(Collections.emptyList());
detail.setDetailImages(Collections.singletonList(product.getMainImage()));
}
return detail;
}
}
|
本地缓存兜底
- 使用本地缓存:当Redis不可用时,使用本地缓存提供服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
@Service
public class CacheDegradationService {
@Autowired
private StringRedisTemplate redisTemplate;
// 本地缓存
private LoadingCache<String, String> localCache = CacheBuilder.newBuilder()
.maximumSize(1000)
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(new CacheLoader<String, String>() {
@Override
public String load(String key) {
// 从数据库加载数据
return loadFromDatabase(key);
}
});
public String getData(String key) {
try {
// 优先从Redis获取
String value = redisTemplate.opsForValue().get(key);
if (value != null) {
// 同步更新本地缓存
localCache.put(key, value);
return value;
}
} catch (Exception e) {
log.warn("Redis访问异常,降级使用本地缓存", e);
// Redis异常,使用本地缓存
}
// 从本地缓存获取
try {
return localCache.get(key);
} catch (ExecutionException e) {
log.error("本地缓存获取数据异常", e);
return null;
}
}
}
|
适用场景
- 促销活动:大促期间对非核心功能进行降级
- 系统故障:Redis故障时使用本地缓存提供基础服务
- 流量高峰:返回静态默认推荐,减轻推荐系统压力
缓存并发竞争
定义
缓存并发竞争是指多个线程或进程同时操作缓存中的同一个key,可能导致数据不一致或更新丢失的问题。
解决方案
分布式锁
- 互斥访问:使用Redis分布式锁确保同一时间只有一个线程能更新缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public boolean updateProductStock(Long productId, int deductAmount) {
String lockKey = "lock:product_stock:" + productId;
String stockKey = "product_stock:" + productId;
// 获取分布式锁
boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if (!locked) {
// 未获取到锁,稍后重试
return false;
}
try {
// 获取当前库存
String stockStr = redisTemplate.opsForValue().get(stockKey);
int currentStock = stockStr != null ? Integer.parseInt(stockStr) : 0;
// 判断库存是否足够
if (currentStock < deductAmount) {
return false;
}
// 更新缓存中的库存
int newStock = currentStock - deductAmount;
redisTemplate.opsForValue().set(stockKey, String.valueOf(newStock));
// 异步更新数据库
threadPool.submit(() -> updateProductStockInDb(productId, newStock));
return true;
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
}
|
原子操作
- 使用Redis原子操作:利用Redis的原子操作如INCR、DECR等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
public boolean deductStock(Long productId, int amount) {
String stockKey = "product_stock:" + productId;
// 使用Redis的原子操作扣减库存
Long result = redisTemplate.opsForValue().decrement(stockKey, amount);
// 判断扣减后的库存是否合法
if (result != null && result >= 0) {
// 异步更新数据库
threadPool.submit(() -> updateProductStockInDb(productId, result.intValue()));
return true;
} else if (result != null && result < 0) {
// 库存不足,恢复缓存中的库存
redisTemplate.opsForValue().increment(stockKey, amount);
return false;
}
return false;
}
|
乐观锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public boolean updateProductInfo(Product product) {
String productKey = "product:" + product.getId();
String versionKey = "product_version:" + product.getId();
// 获取当前版本号
String versionStr = redisTemplate.opsForValue().get(versionKey);
int currentVersion = versionStr != null ? Integer.parseInt(versionStr) : 0;
// 设置新版本号
int newVersion = currentVersion + 1;
product.setVersion(newVersion);
// 使用Lua脚本实现原子操作
String script =
"if redis.call('get', KEYS[2]) == ARGV[1] then " +
" redis.call('set', KEYS[1], ARGV[2]); " +
" redis.call('set', KEYS[2], ARGV[3]); " +
" return 1; " +
"else " +
" return 0; " +
"end";
List<String> keys = Arrays.asList(productKey, versionKey);
List<String> args = Arrays.asList(
String.valueOf(currentVersion),
JSON.toJSONString(product),
String.valueOf(newVersion)
);
Long result = (Long) redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), keys, args.toArray());
return result != null && result == 1;
}
|
适用场景
- 库存管理:使用原子操作或分布式锁控制库存扣减
- 计数器服务:点赞、评论计数使用原子操作
- 配置更新:使用乐观锁控制配置信息更新
缓存更新策略
定义
缓存更新策略是指在数据发生变化时,如何保证缓存数据与数据库数据的一致性。
常见策略
Cache-Aside(旁路缓存)
- 读操作:先查缓存,缓存没有则查数据库,并将结果放入缓存
- 写操作:先更新数据库,再删除缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
// 读取数据
public Product getProduct(Long id) {
String key = "product:" + id;
// 查询缓存
String productJson = redisTemplate.opsForValue().get(key);
if (productJson != null) {
return JSON.parseObject(productJson, Product.class);
}
// 缓存未命中,查询数据库
Product product = productMapper.selectById(id);
if (product != null) {
// 将结果放入缓存
redisTemplate.opsForValue().set(key, JSON.toJSONString(product), 1, TimeUnit.HOURS);
}
return product;
}
// 更新数据
public void updateProduct(Product product) {
// 先更新数据库
productMapper.updateById(product);
// 再删除缓存
String key = "product:" + product.getId();
redisTemplate.delete(key);
}
|
Read-Through/Write-Through(读写穿透)
- 读操作:应用程序从缓存读取,缓存负责从数据库加载
- 写操作:应用程序写入缓存,缓存负责更新数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
// 使用Spring Cache实现Read-Through/Write-Through
@Service
public class ProductServiceImpl implements ProductService {
@Autowired
private ProductMapper productMapper;
@Cacheable(value = "products", key = "#id")
public Product getProduct(Long id) {
// 缓存未命中时自动调用此方法加载数据
return productMapper.selectById(id);
}
@CachePut(value = "products", key = "#product.id")
public Product updateProduct(Product product) {
// 更新数据库
productMapper.updateById(product);
// 返回的对象会被自动放入缓存
return product;
}
@CacheEvict(value = "products", key = "#id")
public void deleteProduct(Long id) {
// 删除数据库中的数据
productMapper.deleteById(id);
// 缓存会被自动删除
}
}
|
Write-Behind(异步写入)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
@Service
public class ProductWriteBehindService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ProductMapper productMapper;
// 写入队列
private BlockingQueue<Product> writeQueue = new LinkedBlockingQueue<>(1000);
@PostConstruct
public void init() {
// 启动异步写入线程
new Thread(() -> {
List<Product> batch = new ArrayList<>();
while (true) {
try {
// 收集批量更新的数据
Product product = writeQueue.poll(100, TimeUnit.MILLISECONDS);
if (product != null) {
batch.add(product);
}
// 达到批量大小或等待超时,执行批量更新
if (batch.size() >= 100 || (!batch.isEmpty() && product == null)) {
updateBatch(batch);
batch.clear();
}
} catch (Exception e) {
log.error("异步写入数据库异常", e);
}
}
}).start();
}
// 更新数据
public void updateProduct(Product product) {
// 先更新缓存
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product), 1, TimeUnit.HOURS);
// 加入异步写入队列
writeQueue.offer(product);
}
// 批量更新数据库
private void updateBatch(List<Product> products) {
if (products.isEmpty()) {
return;
}
try {
productMapper.batchUpdate(products);
} catch (Exception e) {
log.error("批量更新数据库异常", e);
// 异常处理:记录失败的更新,定时重试等
}
}
}
|
适用场景
- 读多写少:Cache-Aside适合读多写少的场景
- 高并发写入:Write-Behind适合高并发写入场景,可以减轻数据库压力
- 框架集成:Read-Through/Write-Through适合与缓存框架集成的场景
缓存淘汰策略
定义
缓存淘汰策略是指当缓存空间不足时,如何选择删除哪些数据,为新数据腾出空间。
Redis支持的淘汰策略
volatile-lru(默认)
- 策略:从设置了过期时间的键中,删除最近最少使用的键
- 适用场景:希望只淘汰有过期时间的键,且希望留下最常用的数据
allkeys-lru
- 策略:从所有键中,删除最近最少使用的键
- 适用场景:缓存访问符合幂律分布(少数键被频繁访问)
volatile-lfu(Redis 0+)
- 策略:从设置了过期时间的键中,删除使用频率最少的键
- 适用场景:有些键虽然最近被访问,但访问频率很低
allkeys-lfu(Redis 0+)
- 策略:从所有键中,删除使用频率最少的键
- 适用场景:希望留下被访问次数最多的数据
volatile-random
- 策略:从设置了过期时间的键中,随机删除
- 适用场景:键的访问概率相同
allkeys-random
- 策略:从所有键中,随机删除
- 适用场景:键的访问概率相同
volatile-ttl
- 策略:从设置了过期时间的键中,删除即将过期的键
- 适用场景:希望留下过期时间更长的数据
noeviction
- 策略:不删除键,当内存不足时,新写入操作会报错
- 适用场景:不允许丢失数据,宁可写入失败
配置方法
配置文件设置
1
2
3
|
# 在redis.conf中设置
maxmemory 2gb
maxmemory-policy allkeys-lru
|
命令行设置
1
2
|
CONFIG SET maxmemory 2gb
CONFIG SET maxmemory-policy allkeys-lru
|
最佳实践
业务场景选择
- 热点数据:使用LRU或LFU策略
- 统计数据:使用LFU策略
- 时效性数据:使用TTL策略
内存规划
- 预留内存:maxmemory设置为总内存的70%-80%
- 监控内存:设置内存使用率告警
过期时间设置
- 差异化过期:根据数据重要性设置不同过期时间
- 避免同时过期:添加随机时间
缓存数据一致性
定义
缓存数据一致性是指缓存中的数据与数据库中的数据保持一致的程度。根据业务需求,可以分为强一致性、最终一致性和弱一致性。
一致性级别
强一致性
- 特点:缓存与数据库中的数据始终保持一致
- 实现方式:双写操作放在一个事务中,或使用分布式事务
- 代价:性能较低,可用性降低
1
2
3
4
5
6
7
8
9
10
11
|
@Transactional
public void updateProductWithStrongConsistency(Product product) {
// 更新数据库
productMapper.updateById(product);
// 更新缓存
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
// 如果缓存更新失败,事务回滚
}
|
最终一致性
- 特点:缓存与数据库的数据在一定时间后达到一致
- 实现方式:异步更新、定时同步、消息队列
- 代价:存在短暂的不一致窗口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// 使用消息队列实现最终一致性
public void updateProductWithEventualConsistency(Product product) {
// 更新数据库
productMapper.updateById(product);
// 发送消息到队列
CacheUpdateMessage message = new CacheUpdateMessage();
message.setKey("product:" + product.getId());
message.setOperation("update");
message.setData(JSON.toJSONString(product));
kafkaTemplate.send("cache-update-topic", JSON.toJSONString(message));
}
// 消费者处理缓存更新
@KafkaListener(topics = "cache-update-topic")
public void handleCacheUpdate(String messageJson) {
CacheUpdateMessage message = JSON.parseObject(messageJson, CacheUpdateMessage.class);
if ("update".equals(message.getOperation())) {
redisTemplate.opsForValue().set(message.getKey(), message.getData());
} else if ("delete".equals(message.getOperation())) {
redisTemplate.delete(message.getKey());
}
}
|
弱一致性
- 特点:允许缓存与数据库的数据存在一定程度的不一致
- 实现方式:设置缓存过期时间,过期后自动刷新
- 代价:可能读取到旧数据
1
2
3
4
5
6
7
8
9
10
|
// 设置较短的过期时间,接受一定程度的不一致
public void updateProductWithWeakConsistency(Product product) {
// 更新数据库
productMapper.updateById(product);
// 不主动更新缓存,等待缓存自动过期
// 或者设置较短的过期时间
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product), 5, TimeUnit.MINUTES);
}
|
常见问题及解决方案
缓存与数据库双写不一致
- 问题:更新数据库成功,但更新缓存失败
- 解决方案:
- 重试机制:缓存更新失败时进行重试
- 消息队列:使用消息队列保证最终一致性
- 延迟双删:更新数据库后删除缓存,并在一定延迟后再次删除缓存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// 延迟双删策略
public void updateProductWithDelayedDoubleDelete(Product product) {
String key = "product:" + product.getId();
// 先删除缓存
redisTemplate.delete(key);
// 更新数据库
productMapper.updateById(product);
// 延迟一段时间后再次删除缓存
threadPool.schedule(() -> {
redisTemplate.delete(key);
}, 500, TimeUnit.MILLISECONDS);
}
|
读写并发导致的不一致
- 问题:写操作和读操作并发执行,可能导致读取到旧数据
- 解决方案:
- 读写锁:对同一资源的读写操作加锁
- 先更新数据库,再删除缓存:减少不一致窗口
缓存穿透导致的不一致
- 问题:缓存未命中,多个请求同时查询数据库并更新缓存
- 解决方案:
- 分布式锁:获取锁的线程负责查询数据库并更新缓存
- 布隆过滤器:过滤不存在的数据
适用场景
- 强一致性:订单、支付等对一致性要求高的核心业务
- 最终一致性:商品详情、用户信息等允许短暂不一致的业务
- 弱一致性:推荐列表、热门商品等对实时性要求不高的业务
缓存性能优化
定义
缓存性能优化是指通过合理配置和使用Redis,提高缓存的响应速度、吞吐量和资源利用率。
优化方向
数据结构优化
- 合理选择数据类型:根据业务场景选择合适的Redis数据类型
- String:简单键值对,如用户信息
- Hash:对象存储,如商品详情
- List:有序列表,如消息队列
- Set:无序集合,如用户标签
- Sorted Set:有序集合,如排行榜
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
// 使用Hash存储对象,减少内存占用
public void saveProductUsingHash(Product product) {
String key = "product:" + product.getId();
Map<String, String> fields = new HashMap<>();
fields.put("id", product.getId().toString());
fields.put("name", product.getName());
fields.put("price", product.getPrice().toString());
fields.put("stock", product.getStock().toString());
// 其他字段...
redisTemplate.opsForHash().putAll(key, fields);
}
// 获取部分字段,减少网络传输
public Product getProductBasicInfo(Long id) {
String key = "product:" + id;
List<String> fields = Arrays.asList("id", "name", "price");
List<Object> values = redisTemplate.opsForHash().multiGet(key, fields);
Product product = new Product();
product.setId(Long.valueOf(values.get(0).toString()));
product.setName(values.get(1).toString());
product.setPrice(new BigDecimal(values.get(2).toString()));
return product;
}
|
内存优化
- 压缩数据:使用压缩算法减少内存占用
- 共享对象池:启用Redis的对象共享池
- 合理设置过期时间:避免长期占用内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 使用压缩算法减少内存占用
public void saveCompressedData(String key, Object data) {
try {
// 序列化对象
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(data);
byte[] bytes = baos.toByteArray();
// 压缩数据
ByteArrayOutputStream compressedBaos = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedBaos);
gzipOutputStream.write(bytes);
gzipOutputStream.close();
byte[] compressedBytes = compressedBaos.toByteArray();
// 存储压缩后的数据
redisTemplate.opsForValue().set(key, Base6getEncoder().encodeToString(compressedBytes));
} catch (Exception e) {
log.error("压缩数据异常", e);
}
}
|
连接优化
- 连接池配置:合理设置连接池大小和超时时间
- Pipeline批量操作:使用Pipeline批量执行命令,减少网络往返
- Lua脚本:使用Lua脚本将多个操作合并为一个原子操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// 使用Pipeline批量获取数据
public List<Product> batchGetProducts(List<Long> ids) {
List<Product> products = new ArrayList<>();
// 使用Pipeline批量查询
List<Object> results = redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
// 批量发送命令
for (Long id : ids) {
stringRedisConn.get("product:" + id);
}
return null; // 返回值由Pipeline处理
}
});
// 处理结果
for (int i = 0; i < results.size(); i++) {
String productJson = (String) results.get(i);
if (productJson != null) {
products.add(JSON.parseObject(productJson, Product.class));
} else {
// 缓存未命中,从数据库加载
Product product = productMapper.selectById(ids.get(i));
if (product != null) {
products.add(product);
// 异步更新缓存
final Long id = ids.get(i);
final Product finalProduct = product;
threadPool.submit(() -> {
redisTemplate.opsForValue().set("product:" + id, JSON.toJSONString(finalProduct));
});
}
}
}
return products;
}
|
读写分离
- 主从复制:读操作访问从节点,写操作访问主节点
- 读写分离:使用不同的连接池连接主从节点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, String> masterRedisTemplate() {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(masterConnectionFactory());
// 配置序列化器等
return redisTemplate;
}
@Bean
public RedisTemplate<String, String> slaveRedisTemplate() {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(slaveConnectionFactory());
// 配置序列化器等
return redisTemplate;
}
// 读操作使用从节点
public Product getProduct(Long id) {
String key = "product:" + id;
String productJson = slaveRedisTemplate.opsForValue().get(key);
// ...
}
// 写操作使用主节点
public void updateProduct(Product product) {
String key = "product:" + id;
masterRedisTemplate.opsForValue().set(key, JSON.toJSONString(product));
// ...
}
}
|
性能监控与调优
监控指标
- 命中率:缓存命中率是衡量缓存效率的重要指标
- 延迟:操作的响应时间
- 内存使用:内存使用率和碎片率
- 连接数:当前连接数和连接峰值
常见问题及解决方案
- 大key问题:拆分大key,使用Hash存储
- 热点key问题:本地缓存 + 分布式缓存
- 缓存穿透:布隆过滤器,空值缓存
- 缓存雪崩:随机过期时间,多级缓存
适用场景
- 高并发读取:商品详情、用户信息等
- 计数器服务:点赞、评论计数等
- 排行榜:使用Sorted Set实现实时排行
- 分布式锁:秒杀、库存控制等
缓存与数据库双写一致性
定义
缓存与数据库双写一致性是指在同时更新缓存和数据库时,如何保证两者数据的一致性,避免出现数据不一致的情况。
常见更新模式
先更新数据库,再更新缓存
- 优点:数据库作为数据源,保证数据的可靠性
- 缺点:如果更新缓存失败,会导致数据不一致
1
2
3
4
5
6
7
8
|
public void updateProduct(Product product) {
// 先更新数据库
productMapper.updateById(product);
// 再更新缓存
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
}
|
先更新缓存,再更新数据库
- 优点:用户可以立即看到更新后的数据
- 缺点:如果更新数据库失败,会导致数据不一致
1
2
3
4
5
6
7
8
|
public void updateProduct(Product product) {
// 先更新缓存
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
// 再更新数据库
productMapper.updateById(product);
}
|
先删除缓存,再更新数据库
- 优点:避免缓存和数据库不一致的时间窗口
- 缺点:可能导致缓存穿透
1
2
3
4
5
6
7
8
|
public void updateProduct(Product product) {
// 先删除缓存
String key = "product:" + product.getId();
redisTemplate.delete(key);
// 再更新数据库
productMapper.updateById(product);
}
|
先更新数据库,再删除缓存(推荐)
- 优点:不一致窗口较小,实现简单
- 缺点:在高并发下仍可能出现不一致
1
2
3
4
5
6
7
8
|
public void updateProduct(Product product) {
// 先更新数据库
productMapper.updateById(product);
// 再删除缓存
String key = "product:" + product.getId();
redisTemplate.delete(key);
}
|
一致性保障方案
延迟双删策略
- 原理:更新数据库后删除缓存,并在一定延迟后再次删除缓存
- 目的:解决并发读写导致的数据不一致问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public void updateProductWithDelayedDoubleDelete(Product product) {
String key = "product:" + product.getId();
// 先删除缓存
redisTemplate.delete(key);
// 更新数据库
productMapper.updateById(product);
// 延迟一段时间后再次删除缓存
threadPool.schedule(() -> {
redisTemplate.delete(key);
}, 500, TimeUnit.MILLISECONDS);
}
|
消息队列保证最终一致性
- 原理:使用消息队列异步更新缓存,保证最终一致性
- 目的:解耦数据库和缓存操作,提高系统可用性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public void updateProductWithMessageQueue(Product product) {
// 更新数据库
productMapper.updateById(product);
// 发送消息到队列
CacheUpdateMessage message = new CacheUpdateMessage();
message.setKey("product:" + product.getId());
message.setOperation("delete"); // 或者"update"
message.setData(JSON.toJSONString(product));
kafkaTemplate.send("cache-update-topic", JSON.toJSONString(message));
}
// 消费者处理缓存更新
@KafkaListener(topics = "cache-update-topic")
public void handleCacheUpdate(String messageJson) {
CacheUpdateMessage message = JSON.parseObject(messageJson, CacheUpdateMessage.class);
if ("update".equals(message.getOperation())) {
redisTemplate.opsForValue().set(message.getKey(), message.getData());
} else if ("delete".equals(message.getOperation())) {
redisTemplate.delete(message.getKey());
}
}
|
分布式事务
- 原理:使用分布式事务保证缓存和数据库操作的原子性
- 目的:强一致性保障,但性能较低
1
2
3
4
5
6
7
8
|
@Transactional
public void updateProductWithTransaction(Product product) {
// 在事务中更新数据库和缓存
productMapper.updateById(product);
String key = "product:" + product.getId();
redisTemplate.delete(key);
}
|
适用场景
- 核心业务:订单、支付等对一致性要求高的业务使用分布式事务
- 一般业务:商品、用户等允许短暂不一致的业务使用延迟双删或消息队列
- 非核心业务:推荐、统计等对一致性要求低的业务使用简单的更新策略
缓存与数据库异步双写一致性
定义
缓存与数据库异步双写一致性是指通过异步方式更新缓存和数据库,保证两者数据最终一致的方案。
实现方案
基于消息队列的异步更新
- 原理:写操作发送消息到队列,消费者异步更新缓存或数据库
- 优点:解耦系统,提高吞吐量,保证最终一致性
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 更新商品信息
public void updateProduct(Product product) {
// 直接更新缓存,提供快速响应
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
// 发送消息到队列,异步更新数据库
kafkaTemplate.send("db-update-topic", JSON.toJSONString(product));
}
// 消费者异步更新数据库
@KafkaListener(topics = "db-update-topic")
public void handleDatabaseUpdate(String productJson) {
Product product = JSON.parseObject(productJson, Product.class);
try {
productMapper.updateById(product);
} catch (Exception e) {
log.error("更新数据库失败", e);
// 重试机制或补偿机制
retryService.addRetryTask("db-update", productJson);
}
}
|
基于Binlog的异步同步
- 原理:监听数据库Binlog变更,异步更新缓存
- 优点:数据库作为单一数据源,减少不一致风险
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
// 使用Canal监听MySQL Binlog
@Component
public class BinlogCacheUpdater {
@Autowired
private StringRedisTemplate redisTemplate;
@PostConstruct
public void init() {
// 配置Canal客户端
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("120.0.1", 11111),
"example", "", "");
// 启动监听线程
new Thread(() -> {
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
Message message = connector.getWithoutAck(100);
long batchId = message.getId();
try {
List<Entry> entries = message.getEntries();
for (Entry entry : entries) {
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
// 处理数据变更
processRowChange(entry.getHeader().getTableName(), rowChange);
}
}
connector.ack(batchId);
} catch (Exception e) {
log.error("处理Binlog异常", e);
connector.rollback(batchId);
}
}
}).start();
}
private void processRowChange(String tableName, RowChange rowChange) {
if ("product".equals(tableName)) {
for (RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == EventType.UPDATE ||
rowChange.getEventType() == EventType.INSERT) {
// 获取变更后的数据
List<Column> columns = rowData.getAfterColumnsList();
Map<String, Object> data = new HashMap<>();
Long id = null;
for (Column column : columns) {
data.put(column.getName(), column.getValue());
if ("id".equals(column.getName())) {
id = Long.valueOf(column.getValue());
}
}
// 更新缓存
if (id != null) {
String key = "product:" + id;
redisTemplate.opsForValue().set(key, JSON.toJSONString(data));
}
} else if (rowChange.getEventType() == EventType.DELETE) {
// 处理删除操作
List<Column> columns = rowData.getBeforeColumnsList();
Long id = null;
for (Column column : columns) {
if ("id".equals(column.getName())) {
id = Long.valueOf(column.getValue());
break;
}
}
// 删除缓存
if (id != null) {
String key = "product:" + id;
redisTemplate.delete(key);
}
}
}
}
}
}
|
定时任务同步
- 原理:定时从数据库加载数据更新缓存
- 优点:实现简单,适合对实时性要求不高的场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
@Component
@EnableScheduling
public class CacheSyncTask {
@Autowired
private ProductMapper productMapper;
@Autowired
private StringRedisTemplate redisTemplate;
// 每天凌晨2点执行全量同步
@Scheduled(cron = "0 0 2 * * ?")
public void fullSync() {
log.info("开始全量同步商品缓存...");
// 分页查询所有商品
int pageSize = 1000;
int pageNum = 1;
Page<Product> page;
do {
page = productMapper.selectPage(new Page<>(pageNum, pageSize), null);
List<Product> products = page.getRecords();
// 批量更新缓存
for (Product product : products) {
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
}
pageNum++;
} while (page.hasNext());
log.info("商品缓存全量同步完成");
}
// 每小时执行增量同步
@Scheduled(cron = "0 0 * * * ?")
public void incrementalSync() {
log.info("开始增量同步商品缓存...");
// 获取最近一小时更新的商品
Date oneHourAgo = new Date(System.currentTimeMillis() - 3600 * 1000);
List<Product> products = productMapper.selectByUpdateTimeAfter(oneHourAgo);
// 批量更新缓存
for (Product product : products) {
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
}
log.info("商品缓存增量同步完成,共同步{}个商品", products.size());
}
}
|
异常处理
重试机制
- 原理:操作失败时进行重试,直到成功或达到最大重试次数
- 实现:使用重试框架或自定义重试逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
@Service
public class RetryService {
@Autowired
private StringRedisTemplate redisTemplate;
// 重试队列
private BlockingQueue<RetryTask> retryQueue = new LinkedBlockingQueue<>();
@PostConstruct
public void init() {
// 启动重试线程
new Thread(() -> {
while (true) {
try {
RetryTask task = retryQueue.take();
// 判断是否达到最大重试次数
if (task.getRetryCount() >= task.getMaxRetryCount()) {
log.error("任务重试次数达到上限,放弃重试:{}", task);
continue;
}
// 执行重试
boolean success = executeRetry(task);
if (!success) {
// 重试失败,增加重试次数并重新加入队列
task.setRetryCount(task.getRetryCount() + 1);
// 指数退避策略
long delay = (long) Math.pow(2, task.getRetryCount()) * 1000;
Thread.sleep(delay);
retryQueue.put(task);
}
} catch (Exception e) {
log.error("重试任务执行异常", e);
}
}
}).start();
}
// 添加重试任务
public void addRetryTask(String type, String data) {
RetryTask task = new RetryTask();
task.setType(type);
task.setData(data);
task.setRetryCount(0);
task.setMaxRetryCount(5);
task.setCreateTime(new Date());
retryQueue.offer(task);
}
// 执行重试
private boolean executeRetry(RetryTask task) {
try {
if ("cache-update".equals(task.getType())) {
// 重试更新缓存
CacheUpdateMessage message = JSON.parseObject(task.getData(), CacheUpdateMessage.class);
if ("update".equals(message.getOperation())) {
redisTemplate.opsForValue().set(message.getKey(), message.getData());
} else if ("delete".equals(message.getOperation())) {
redisTemplate.delete(message.getKey());
}
} else if ("db-update".equals(task.getType())) {
// 重试更新数据库
Product product = JSON.parseObject(task.getData(), Product.class);
productMapper.updateById(product);
}
return true;
} catch (Exception e) {
log.error("执行重试任务失败:{}", task, e);
return false;
}
}
}
|
补偿机制
- 原理:定期检查数据一致性,发现不一致时进行修复
- 实现:定时任务或专门的补偿服务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
@Component
@EnableScheduling
public class CacheCompensationTask {
@Autowired
private ProductMapper productMapper;
@Autowired
private StringRedisTemplate redisTemplate;
// 每天执行一次补偿任务
@Scheduled(cron = "0 0 3 * * ?")
public void compensate() {
log.info("开始执行缓存补偿任务...");
// 获取所有缓存的商品ID
Set<String> keys = redisTemplate.keys("product:*");
for (String key : keys) {
try {
// 提取商品ID
Long productId = Long.valueOf(key.substring("product:".length()));
// 获取缓存中的商品数据
String productJson = redisTemplate.opsForValue().get(key);
Product cacheProduct = JSON.parseObject(productJson, Product.class);
// 获取数据库中的商品数据
Product dbProduct = productMapper.selectById(productId);
// 比较缓存和数据库中的数据
if (dbProduct == null) {
// 数据库中不存在,删除缓存
redisTemplate.delete(key);
log.info("补偿删除不存在的商品缓存:{}", key);
} else if (!isProductEqual(cacheProduct, dbProduct)) {
// 数据不一致,更新缓存
redisTemplate.opsForValue().set(key, JSON.toJSONString(dbProduct));
log.info("补偿更新不一致的商品缓存:{}", key);
}
} catch (Exception e) {
log.error("处理缓存补偿异常:{}", key, e);
}
}
log.info("缓存补偿任务执行完成");
}
// 比较两个商品对象是否相等
private boolean isProductEqual(Product p1, Product p2) {
if (p1 == null || p2 == null) {
return p1 == p2;
}
return Objects.equals(pgetId(), pgetId()) &&
Objects.equals(pgetName(), pgetName()) &&
Objects.equals(pgetPrice(), pgetPrice()) &&
Objects.equals(pgetStock(), pgetStock()) &&
Objects.equals(pgetVersion(), pgetVersion());
}
}
|
适用场景
- 高并发写入:使用消息队列异步更新,提高系统吞吐量
- 数据库为主:使用Binlog同步,保证数据库作为单一数据源
- 定时统计:使用定时任务同步,适合对实时性要求不高的场景
缓存与数据库同步双写一致性
定义
缓存与数据库同步双写一致性是指在同一个事务中同时更新缓存和数据库,保证两者数据强一致的方案。
实现方案
本地事务
- 原理:在同一个本地事务中更新数据库和缓存
- 局限性:只适用于数据库和缓存在同一个系统中的场景
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
@Service
public class ProductService {
@Autowired
private ProductMapper productMapper;
@Autowired
private StringRedisTemplate redisTemplate;
@Transactional
public void updateProduct(Product product) {
// 更新数据库
productMapper.updateById(product);
// 更新缓存
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
// 如果缓存更新失败,事务会回滚
}
}
|
分布式事务
- 原理:使用分布式事务框架保证跨系统操作的原子性
- 实现:使用2PC、TCC、SAGA等分布式事务模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
// 使用Seata实现分布式事务
@GlobalTransactional
public void updateProductWithDistributedTransaction(Product product) {
// 更新数据库
productMapper.updateById(product);
// 更新缓存
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, JSON.toJSONString(product));
}
// 使用TCC模式实现分布式事务
@Transactional
public void updateProductWithTCC(Product product) {
// Try阶段
// 锁定数据库记录
productMapper.lockById(product.getId());
// 准备缓存更新(设置临时标记)
String key = "product:" + product.getId();
String tempKey = "temp:product:" + product.getId();
redisTemplate.opsForValue().set(tempKey, JSON.toJSONString(product));
try {
// Confirm阶段
// 更新数据库
productMapper.updateById(product);
// 更新缓存
redisTemplate.rename(tempKey, key);
} catch (Exception e) {
// Cancel阶段
// 解锁数据库记录
productMapper.unlockById(product.getId());
// 删除临时缓存
redisTemplate.delete(tempKey);
throw e;
}
}
|
最终一致性方案
- 原理:使用补偿机制保证最终一致性
- 实现:使用可靠消息、定时任务等
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
// 使用可靠消息保证最终一致性
public void updateProductWithReliableMessage(Product product) {
// 发送预备消息
String messageId = UUID.randomUUID().toString();
Message message = new Message();
message.setId(messageId);
message.setStatus("PREPARING");
message.setContent(JSON.toJSONString(product));
messageMapper.insert(message);
try {
// 更新数据库
productMapper.updateById(product);
// 将消息状态改为已发送
message.setStatus("SENT");
messageMapper.updateById(message);
// 发送消息到队列
kafkaTemplate.send("cache-update-topic", JSON.toJSONString(product));
} catch (Exception e) {
// 异常处理:将消息状态改为失败
message.setStatus("FAILED");
messageMapper.updateById(message);
throw e;
}
}
// 消费者处理缓存更新
@KafkaListener(topics = "cache-update-topic")
public void handleCacheUpdate(String productJson) {
Product product = JSON.parseObject(productJson, Product.class);
try {
// 更新缓存
String key = "product:" + product.getId();
redisTemplate.opsForValue().set(key, productJson);
} catch (Exception e) {
log.error("更新缓存失败", e);
// 重试机制
retryService.addRetryTask("cache-update", productJson);
}
}
|
一致性保障挑战
网络延迟和分区
- 问题:网络延迟可能导致缓存和数据库操作的时序不确定
- 解决方案:使用延迟双删、消息队列等机制
并发读写
- 问题:并发读写可能导致数据不一致
- 解决方案:使用分布式锁、版本控制等机制
系统故障
- 问题:系统故障可能导致部分操作失败
- 解决方案:使用补偿机制、重试机制等
适用场景
- 金融交易:使用分布式事务保证强一致性
- 订单系统:使用可靠消息保证最终一致性
- 用户信息:使用本地事务或延迟双删策略
总结
Redis作为高性能的缓存系统,在提升系统性能的同时也带来了一系列挑战,如缓存雪崩、穿透、击穿等问题。本文详细介绍了这些问题的定义、产生原因和解决方案,并针对不同的业务场景提供了最佳实践。
在实际应用中,需要根据业务特点和一致性要求,选择合适的缓存策略和更新机制。对于核心业务,可能需要强一致性保障;而对于非核心业务,可以采用最终一致性方案,提高系统性能和可用性。
无论采用何种方案,都需要做好监控和异常处理,确保系统在各种情况下都能正常运行,并保持数据的一致性。