黑马点评笔记

0 准备工作及前置知识

导出初始化文件并上传到gitee

CMD137/hm-dianping

1 短信登录

1.1 发送验证码

public Result sendcode(String phone, HttpSession session) {
        //1.校验手机号码
        if(RegexUtils.isPhoneInvalid(phone))
            return Result.fail("手机号码格式错误");
        //2.生成验证码
        String code = RandomUtil.randomNumbers(6);
        //3.存入session并返回
        session.setAttribute("code",code);
        //4.调用第三方api向手机发送验证码
        log.info("发送验证码:{}",code);

        return Result.ok();
    }

1.2 用户登录校验

注意此处使用UserDTO类型存入session:

  1. 数据脱敏
  2. 减轻ThraedLocal和Session的内存压力。
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.校验手机号
String phone = loginForm.getPhone();
if(RegexUtils.isPhoneInvalid(phone))
return Result.fail("手机号码格式错误");

//2.校验验证码
String trueCode = (String) session.getAttribute("code");
if (trueCode == null) {
return Result.fail("验证码已失效,请重新获取");
}
if (!trueCode.equals(loginForm.getCode()))
return Result.fail("验证码错误");

//3.查询phone,如果不存在,直接创建新用户
User user = query().eq("phone", phone).one();
if (user==null){
user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
save(user);
}
//4.将用户信息存入session
session.setAttribute("user", BeanUtil.copyProperties(user, UserDTO.class));

return Result.ok();
}

知识点:

session过期后如果你从 Session 中获取某个属性(如验证码或用户信息),会返回 null,前端得到的sessionid发过来找不到对应的session。

1.3 登录校验拦截器

设置拦截器

public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取用户
Object user =request.getSession().getAttribute("user");
//2.若用户为空,进行拦截
if (user==null){
response.setStatus(401);
return false;
}
//3.不空,存入threadLocal,放行
UserHolder.saveUser((UserDTO) user);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
HandlerInterceptor.super.afterCompletion(request, response, handler, ex);
}
}

注册拦截器

@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/cod e",
"/user/login"
);
}
}

知识点:

每一个tomcat的请求都是一个独立的线程。

1.4 基于 Redis 解决 Session 共享问题

在集群环境下,Session 的集群共享主要要解决以下问题:

  • 数据一致性问题:确保在多个服务器之间,Session 数据能够保持一致,避免出现数据冲突或不一致的情况,以免导致用户体验受损或业务逻辑出现错误。
  • 分布式存储与访问问题:需要找到合适的方式将 Session 数据分布式存储在集群中的多个节点上,并能让各个节点快速、高效地访问和更新这些数据,以提高系统的性能和可扩展性。
  • 负载均衡问题:当用户请求在不同服务器之间分发时,要保证无论请求被路由到哪个服务器,都能正确地获取和处理该用户的 Session 数据,实现负载均衡的同时不影响 Session 的正常使用。
  • 故障转移问题:如果集群中的某个服务器出现故障,要能够将 Session 数据无缝地转移到其他健康的服务器上,确保用户的操作不会中断,保证系统的高可用性。
  • 并发访问控制问题:多个用户同时访问和修改同一个 Session 数据时,需要进行有效的并发控制,防止数据被破坏或出现不一致的状态,确保数据的完整性和准确性。

解决方案:

Redis代替session的业务流程

key的结构
使用hash,既可以节约内存,还便于单字段的crud


设计key,需要满足两点:
1、key要具有唯一性
2、key要方便携带

  • 使用phone: 敏感数据不宜带到前端
  • 使用随机token

修改后:

public Result sendcode(String phone, HttpSession session) {
//1.校验手机号码
if(RegexUtils.isPhoneInvalid(phone))
return Result.fail("手机号码格式错误");
//2.生成验证码
String code = RandomUtil.randomNumbers(6);
//3.存入redis,有效期2分钟
redisTemplate.opsForValue().set(LOGIN_CODE_KEY+phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
//4.调用第三方api向手机发送验证码
log.info("发送验证码:{}",code);

return Result.ok();
}
public Result login(LoginFormDTO loginForm, HttpSession session) {
//1.校验手机号
String phone = loginForm.getPhone();
if(RegexUtils.isPhoneInvalid(phone))
return Result.fail("手机号码格式错误");

//2.校验验证码
String trueCode = (String) redisTemplate.opsForValue().get(LOGIN_CODE_KEY+phone);
if (trueCode == null) {
return Result.fail("验证码已失效,请重新获取");
}
if (!trueCode.equals(loginForm.getCode()))
return Result.fail("验证码错误");

//3.查询phone,如果不存在,直接创建新用户
User user = query().eq("phone", phone).one();
if (user==null){
user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX+RandomUtil.randomString(10));
save(user);
}

//4.生成token,作为key
String token = UUID.randomUUID().toString();

//5.将用户信息存入redis
UserDTO userForRedis =new UserDTO();
BeanUtil.copyProperties(user,userForRedis);

Map<String, Object> map=BeanUtil.beanToMap(userForRedis);

redisTemplate.opsForHash().putAll(LOGIN_USER_KEY+token,map);
redisTemplate.expire(LOGIN_USER_KEY+token,30,TimeUnit.MINUTES);

//6.传回作为登录凭证
return Result.ok(token);
}

因redis替代了session,对拦截器也要修改:

之前的拦截器无法对不需要拦截的路径生效,那么我们可以添加一个拦截器,在第一个拦截器中拦截所有的路径,把第二个拦截器做的事情放入到第一个拦截器中,同时刷新令牌,因为第一个拦截器有了threadLocal的数据,所以此时第二个拦截器只需要判断拦截器中的user对象是否存在即可,完成整体刷新功能。

第一个拦截器:(注意redisTemplate的注入问题)

public class RefreshTokenInterceptor implements HandlerInterceptor {

//通过构造函数注入redisTemplate,因为RefreshTokenInterceptor并没有被spring容器管理
//另外,直接加上@component注解也不行,因为RefreshTokenInterceptor是被手动new出来的
private final RedisTemplate<String, Object> redisTemplate;

public RefreshTokenInterceptor(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 1.获取请求头中的token
String token = request.getHeader("authorization");
if (token==null) {
return true;
}
// 2.基于TOKEN获取redis中的用户
String key = LOGIN_USER_KEY + token;
Map<Object, Object> userMap = redisTemplate.opsForHash().entries(key);
// 3.判断用户是否存在
if (userMap.isEmpty()) {
return true;
}
// 5.将查询到的hash数据转为UserDTO
UserDTO userDTO = BeanUtil.fillBeanWithMap(userMap, new UserDTO(), false);
// 6.存在,保存用户信息到 ThreadLocal
UserHolder.saveUser(userDTO);
// 7.刷新token有效期
redisTemplate.expire(key, LOGIN_USER_TTL, TimeUnit.MINUTES);
// 8.放行
return true;
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
// 移除用户
UserHolder.removeUser();
}
}

第二个拦截器:

public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1.获取用户
Object user =UserHolder.getUser();
//2.若用户为空,进行拦截
if (user==null){
response.setStatus(401);
return false;
}
//3.不空,放行
return true;
}

}

拦截器注册:

@Configuration
public class MvcConfig implements WebMvcConfigurer {
@Autowired
RedisTemplate redisTemplate;
@Override
public void addInterceptors(InterceptorRegistry registry) {

//1
registry.addInterceptor(new RefreshTokenInterceptor(redisTemplate));
//2
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/user/code",
"/user/login"
);
}
}

2 店铺数据查询缓存

2.1 缓存:

类比理解:CPU — Cache — 内存 ; tomcat — Redis —Mysql

  • 缓存作用:
    • 降低后端(数据库)负载
    • 提高读写效率,降低响应时间
  • 缓存成本:
    • 数据一致性
    • 代码维护、运维

2.2 添加Redis缓存:

public Result queryById(Long id) {
        String key = CACHE_SHOP_KEY+id;
        //1.在缓存中寻找
        String shopJson = stringRedisTemplate.opsForValue().get(key);
        //2.没找到,查数据库
        if (StrUtil.isNotBlank(shopJson)){
            //找到就直接返回
            Shop shop = JSONUtil.toBean(shopJson,Shop.class);
            return Result.ok(shop);
        }

        Shop shop= getById(id);
        //3.若不存在,返回错误
        if (shop==null){
            return  Result.fail("店铺不存在!");
        }

        //4.写入缓存
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop));

        //5.返回
        return Result.ok(shop);
    }

注意发生缓存未命中,查完数据库要再次写入缓存。

练习:给商铺类型添加缓存:

public List<ShopType> queryList() {
String key = "cache:shopTypeList";
//1.在缓存中查找
String typeListJson = stringRedisTemplate.opsForValue().get(key);

//2.没找到从数据库找,找到直接返回
if (StrUtil.isNotBlank(typeListJson)){
return JSONUtil.toList(typeListJson, ShopType.class);
}

List<ShopType> typeList=query().orderByAsc("sort").list();
//3.写入缓存并返回
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(typeList));

return typeList;
}

2.3 缓存更新策略:

内存淘汰超时剔除主动更新
说明不用自己维护,利用Redis的内存淘汰机制,当内存不足时自动淘汰部分数据。下次查询时更新缓存。给缓存数据添加TTL时间,到期后自动删除缓存。下次查询时更新缓存。编写业务逻辑,在修改数据库的同时,更新缓存。
一致性一般
维护成本


主动更新的类型:

策略类型实现方式一致性维护成本适用场景
Cache Aside
旁路缓存模式
应用代码中显式处理:1. 读时先查缓存,未命中则读DB并回填 2. 写时更新DB后删除缓存最好通用场景
Write Through
写穿法
缓存与DB绑定:所有写操作必须同时更新缓存和DB(通常通过存储层插件实现)非常高写密集型系统
Write Behind
写回法
先更新缓存,异步批量写DB(需维护操作日志防丢失)一般极高超高吞吐量系统(如电商秒杀)

Cache Aside 关注:

  • 删除缓存:更新数据库时让缓存失效,查询时再写入缓存。
  • 缓存与数据库操作原子性:
    • 单体系统:放在一个事物
    • 分布式:TCC等分布式事务方案
  • 先删缓存还是先改数据库?
    • 先操作数据库,再删除缓存。(一致性问题可能性低)

一般应用:

  1. 低一致性需求:使用Redis自带的内存淘汰机制
  2. 高一致性需求:主动更新的Cache-Aside 为主,主动删除缓存为核心,过期时间为兜底,特殊场景加锁 / 异步补偿
    • 读操作:
      • 缓存命中直接返回。
      • 缓存未命中则查询数据库,并写入缓存,设定超时时间。
    • 写操作:
      • 先写数据库,在删除缓存。
      • 要确保数据库与缓存操作的原子性

练习:给查询、修改商铺应用主动更新+超时剔除的缓存更新策略:

queryById中:
//4.写入缓存
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);

修改商铺:

public Result updateShopById(Shop shop) {
if (shop.getId()==null){
return Result.fail("shopId不存在");
}
String key = CACHE_SHOP_KEY+shop.getId();
//1.更新数据库
updateById(shop);
//2.删除缓存
stringRedisTemplate.delete(key);

return Result.ok();
}

2.4 缓存穿透

📌 场景描述:

用户请求一个 数据库中根本不存在 的数据,例如 GET /user?id=999999。由于 Redis 中也没有缓存,系统会继续查数据库,发现也没有,就返回空。

这本没问题,但如果恶意请求者频繁请求大量不存在的数据,Redis 缓存每次都 miss,而数据库要一次次“白忙活”处理这些请求,等于缓存完全没用!

这就是缓存穿透:请求绕过缓存直击数据库,形成穿透攻击或资源浪费。

常见的几种解决方式

方案描述优缺点
缓存 空对象:null 值在数据库中未找到时在缓存中存储null优点:实现、维护简单
缺点:额外内存消耗、短期不一致
布隆过滤器事先把所有合法 key 放进去,非法的直接过滤掉,本质是 多个hash+bitmap优点:内存占用少,没有多余key
缺点:实现复杂,存在误判可能

练习:店铺查询应用缓存空对象以防止缓存穿透

public Result queryById(Long id) {
String key = CACHE_SHOP_KEY+id;
//1.在缓存中寻找
String shopJson = stringRedisTemplate.opsForValue().get(key);



//2.判断缓存中是否存在
if (StrUtil.isNotBlank(shopJson)){
//找到就直接返回
Shop shop = JSONUtil.toBean(shopJson,Shop.class);
return Result.ok(shop);
}

//3.处理命中缓存穿透的“”空值
if (shopJson!=null&&shopJson.equals("")){
//返回错误信息
return Result.fail("店铺不存在!");
}

//4.缓存中不存在,查找数据库
Shop shop= getById(id);

//5.若根本不存在,返回错误
if (shop==null){
//将空值写入redis,防止缓存穿透
stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
//返回错误
return Result.fail("店铺不存在!");
}

//6.写入缓存
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_SHOP_TTL, TimeUnit.MINUTES);

//7.返回
return Result.ok(shop);
}

2.5 缓存雪崩

❄️ 场景描述:

“本来一切安好,突然 Redis 挂了或某一类缓存同一时间全部失效,大量请求同时涌入数据库,压力直接雪崩!”

大量缓存同一时刻失效,导致数据库压力骤增、服务可能崩溃的现象,称为 缓存雪崩(Cache Avalanche)

问题分类:

  • 大量 Key 同时过期:
    开发时所有缓存设置了统一过期时间,导致某一时刻大面积失效
  • 热点数据集中失效:
    热点数据(如首页推荐、热门商品)缓存突然过期,访问瞬时打满数据库。
  • Redis 服务宕机
    Redis 整体不可用,所有缓存层失效,请求全部打到数据库。

解决方案:

  • 给不同的 Key 的 TTL 添加随机值
  • 利用 Redis 集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

2.6 缓存击穿

缓存击穿问题也叫热点key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的打击。

两种方案:

对比项加锁方案逻辑过期方案
核心思想高并发时只有一个线程查询数据库并写入缓存,其他线程等待缓存数据设定“逻辑过期时间”,过期后异步更新,旧值仍可返回
实现方式使用互斥锁(如分布式锁:Redis setnx + TTL)缓存中包含数据与过期时间,后台线程定期异步刷新
是否阻塞请求是,可能会短暂阻塞其他线程否,请求永远能返回旧缓存,更新异步进行
适合数据类型小规模、访问频率集中、对一致性要求高的数据访问频繁、对时效性要求不高、允许短时间使用旧数据
数据库压力较低(只一个线程访问 DB)非高峰期后台异步刷新,不集中访问 DB,压力更平稳
一致性保证强一致性(缓存更新前其他线程阻塞)最终一致性(旧数据短时间内可能存在)
实现复杂度较简单(加锁处理即可)略复杂(需维护过期时间、后台刷新线程或任务)
热点数据表现有效避免缓存击穿有效避免缓存击穿
示例应用场景商品详情页、秒杀库存、用户信息等首页推荐、排行榜、店铺评分等允许数据短暂过期的场景

加锁方案:

这个互斥锁更适合使用非阻塞的自旋锁逻辑(如 Redis SETNX 实现的分布式锁)而非java自带的synchronized 阻塞式锁

锁怎么释放?主动释放+添加有效期

逻辑过期方案:

“过期”是在业务层中而非redis中实现的,这个值会提前加载到缓存中并一直存在于redis中。若在redis中未命中,直接返回null。

练习:利用互斥锁解决缓存击穿

示例:修改基于id查询店铺的业务、基于互斥锁方式解决缓存击穿问题:

获取锁:

private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        //不直接返回是为了防止自动解包时出现异样导致返回null
        return BooleanUtil.isTrue(flag);
    }

释放锁:

private void unLock(String key){
        stringRedisTemplate.delete(key);
    }

业务实现:

//互斥锁解决缓存击穿(带了缓存穿透)
public Shop queryByIdWithMutex(Long id){
    String key = CACHE_SHOP_KEY+id;
    //1.在缓存中寻找
    String shopJson = stringRedisTemplate.opsForValue().get(key);

    //2.判断缓存中是否存在
    if (StrUtil.isNotBlank(shopJson)){
        //找到就直接返回
        Shop shop = JSONUtil.toBean(shopJson,Shop.class);
        return shop;
    }

    //处理命中缓存穿透的“”空值(缓存穿透)
    if (shopJson!=null&&shopJson.equals("")){
        //返回错误信息
        return  null;
    }

    //3.没有找到,开始缓存重建
    //4获取互斥锁
    String lockKey=LOCK_SHOP_KEY+id;

    Shop shop= null;
    try {
        if (!tryLock(lockKey)){
            //5.1没有获取到,休眠一段时间再次查询缓存(自旋)
            //自旋实现建议使用循环而非递归
            while (true){
                Thread.sleep(50);
                //1.在缓存中寻找
                shopJson = stringRedisTemplate.opsForValue().get(key);

                //2.判断缓存中是否存在
                if (StrUtil.isNotBlank(shopJson)){
                    //找到就直接返回
                    shop = JSONUtil.toBean(shopJson,Shop.class);
                    return shop;
                }
            }
        }

        //5.2.获取到了lock,查数据库
        shop = getById(id);
        if (shop==null){
            //将空值写入redis,防止缓存穿透
            stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
            //返回错误
            return  null;
        }
        //6.写入缓存
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),LOCK_SHOP_TTL, TimeUnit.MINUTES);


    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {
        //7.释放锁
        unLock(lockKey);
    }

    //8.返回
    return shop;
}

练习:利用逻辑过期解决缓存击穿

示例:修改基于id查询店铺的业务、基于逻辑过期方式解决缓存击穿问题:

由于需要保存时间,有工具类:

@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}

业务代码:

...
//缓存池,注意:此处不符合《阿里巴巴 Java 开发手册》的推荐(一般使用 ThreadPoolExecutor 明确参数)
private ExecutorService cacheRebuildExecutor = Executors.newFixedThreadPool(10);
...

//逻辑过期解决缓存击穿
    private Shop queryByIdWithLogicExpire(Long id) {
        String key = CACHE_SHOP_KEY+id;
        //1.在缓存中寻找
        String shopJson = stringRedisTemplate.opsForValue().get(key);

        //2.判断缓存中是否存在
        if (!StrUtil.isNotBlank(shopJson)){
            //没找到就直接返回null
            return null;
        }

        //3.检查是否逻辑过期
        RedisData shopData = JSONUtil.toBean(shopJson, RedisData.class);
        Shop shop = (Shop) shopData.getData();

        if (LocalDateTime.now().isBefore(shopData.getExpireTime())){
            //没过期,直接返回
            return shop;
        }

        //4.过期了,尝试获取锁
        String lockKey= LOCK_SHOP_KEY+id;
        boolean isLock = tryLock(lockKey);

        //没获取到,直接返回旧数据
        if (!isLock){
            return shop;
        }
        //获取到锁,异步开启新线程重建缓存,也先获取返回旧数据
        cacheRebuildExecutor.submit(() -> {
            try {
                // double check:重新取一次缓存,看是否已被其他线程更新
                String latestJson = stringRedisTemplate.opsForValue().get(key);
                RedisData latestData = JSONUtil.toBean(latestJson, RedisData.class);
                if (latestData.getExpireTime().isAfter(LocalDateTime.now())) {
                    return; // 已更新,无需重复构建
                }

                // 查询数据库并构建新缓存
                this.saveShop2Redis(id,CACHE_SHOP_TTL);
            } finally {
                unLock(lockKey);
            }
        });

        return shop;
    }

//逻辑过期缓击穿加载数据
    public void  saveShop2Redis(Long id,Long expireMinutes){
        Shop shop = getById(id);

        RedisData redisData = new RedisData();
        redisData.setData(shop);
        redisData.setExpireTime(LocalDateTime.now().plusMinutes(expireMinutes));

        stringRedisTemplate.opsForValue().set(CACHE_SHOP_KEY+id,JSONUtil.toJsonStr(redisData));
    }

2.7 Redis缓存工具类的封装

涉及知识点较多,泛型、、lambda、回调函数设计。

/**
 * 缓存工具类
 */
@Component
@Slf4j
public class CacheClient {
    private StringRedisTemplate stringRedisTemplate;

    public CacheClient(StringRedisTemplate stringRedisTemplate){
        this.stringRedisTemplate = stringRedisTemplate;
    }

    //对象转 JSON 存 string 类型 key,可设 TTL 过期时间
    public void set(String key, Object value, Long time, TimeUnit unit){
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
    }

    //对象转 JSON 存 string 类型 key,设逻辑过期时间处理缓存击穿。
    public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
        //设置逻辑过期
        RedisData redisData = new RedisData();
        redisData.setData(value);
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));

        //写入Redis
        stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(redisData));
    }

    //按 key 查缓存并反序列化为指定类型,用缓存空值解决缓存穿透
    public <R,ID> R queryWithPassThrough(
            String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit unit
    ){
        String key = keyPrefix+id;
        //1.在缓存中寻找
        String json = stringRedisTemplate.opsForValue().get(key);


        //2.判断缓存中是否存在
        if (StrUtil.isNotBlank(json)){
            //找到就直接返回
            R r = JSONUtil.toBean(json,type);
            return r;
        }

        //3.处理命中缓存穿透的“”空值
        if (json!=null&&json.equals("")){
            //返回错误信息
            return  null;
        }

        //4.缓存中不存在,查找数据库
        R r = dbFallback.apply(id);

        //5.若根本不存在,返回错误
        if (r==null){
            //将空值写入redis,防止缓存穿透
            stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
            //返回错误
            return  null;
        }

        //6.写入缓存
        this.set(key,r,time,unit);

        //7.返回
        return r;
    }

    //按 key 查缓存并反序列化为指定类型,用逻辑过期解决缓存击穿
    private ExecutorService cacheRebuildExecutor = Executors.newFixedThreadPool(10);
    public <R,ID> R queryWithLogicExpire(
            String keyPrefix,String lockKeyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback,Long time, TimeUnit unit
    ){
        String key = keyPrefix+id;
        //1.在缓存中寻找
        String json = stringRedisTemplate.opsForValue().get(key);

        //2.判断缓存中是否存在
        if (!StrUtil.isNotBlank(json)){
            //没找到就直接返回null
            return null;
        }

        //3.检查是否逻辑过期
        RedisData redisData = JSONUtil.toBean(json, RedisData.class);
        R r = (R) redisData.getData();

        if (LocalDateTime.now().isBefore(redisData.getExpireTime())){
            //没过期,直接返回
            return r;
        }

        //4.过期了,尝试获取锁
        String lockKey= lockKeyPrefix+id;
        boolean isLock = tryLock(lockKey);

        //没获取到,直接返回旧数据
        if (!isLock){
            return r;
        }
        //获取到锁,异步开启新线程重建缓存,也先获取返回旧数据
        cacheRebuildExecutor.submit(() -> {
            try {
                // double check:重新取一次缓存,看是否已被其他线程更新
                String latestJson = stringRedisTemplate.opsForValue().get(key);
                RedisData latestData = JSONUtil.toBean(latestJson, RedisData.class);
                if (latestData.getExpireTime().isAfter(LocalDateTime.now())) {
                    return; // 已更新,无需重复构建
                }

                // 查询数据库并构建新缓存
                R newR=dbFallback.apply(id);
                setWithLogicalExpire(key,newR,time,unit);

            } finally {
                unLock(lockKey);
            }
        });

        return r;
    }

    private boolean tryLock(String key){
        Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
        //不直接返回是为了防止自动解包时出现异样导致返回null
        return BooleanUtil.isTrue(flag);
    }
    private void unLock(String key){
        stringRedisTemplate.delete(key);
    }
}

3 优惠券秒杀

3.1全局唯一ID

数据库自增ID的问题:

  • id规律性明显
  • 受单表数据量的限制

分布式系统下生成全局唯一ID的工具,特性:

  • 唯一性
  • 高可用
  • 高性能
  • 单调递增性
  • 安全性

全局唯一ID生成策略:

  • UUID
  • 雪花算法
  • Redis自增
  • 数据库自增

基于redis自增的全局ID生成器:

设计:

long型:64位=1位符号位+31位时间戳(以秒为单位)+32位秒内自增计数器

序列号回滚以天为单位(即一天一个key),方便统计订单量

/**
* 全局唯一ID生成器
*/
@Component
public class RedisIdWorker {
private static final long BEGIN_TIMESTAMP = 1640995200L;
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;

public RedisIdWorker(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}

public long nextId(String keyPrefix){
//生成时间戳
long timeStamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)-BEGIN_TIMESTAMP;
//生成序列号
//序列号回滚以天为单位(即一天一个key)
String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//redis自增长
long count = stringRedisTemplate.opsForValue().increment("icr:"+keyPrefix+":"+date);

//拼接返回
return timeStamp<<COUNT_BITS|count;
}
}

3.2 优惠券秒杀下单

下单时判断:

  • 秒杀是否开始、结束
  • 库存是否充足

最基础的秒杀券下单实现:

@Transactional
public Result seckillVoucher(Long voucherId) {
    //1.查询秒杀类优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    if (voucher == null){
        return Result.fail("秒杀券不存在");
    }

    //2.判断是否在活动时间内
    LocalDateTime now = LocalDateTime.now();
    if (voucher.getBeginTime().isAfter(now)||voucher.getEndTime().isBefore(now)){
        return Result.fail("秒杀券不在活动时间");
    }

    //3.是否有库存
    if (voucher.getStock() < 1){
        return Result.fail("库存不足");
    }

    //4.扣减库存
    boolean success = seckillVoucherService.update()
        .eq("voucher_id", voucherId)
        .setSql("stock = stock - 1")
        .update();

    if (!success){
            return Result.fail("库存不足");
        }

    //5.创建订单
    idWorker=new RedisIdWorker(stringRedisTemplate);
    VoucherOrder order = new VoucherOrder();
    Long orderId = idWorker.nextId("order");
    //订单id
    order.setId(orderId);
    //用户id
    order.setUserId(UserHolder.getUser().getId());
    //秒杀券id
    order.setVoucherId(voucherId);

    save(order);

    //6.返回订单id
    return Result.ok(orderId);
}

3.3 库存超卖问题

典型的多线程安全问题,常见解决方案:加锁。

悲观锁:

  • 核心思想:先获取锁,再操作
  • Synchronized、lock…

乐观锁:

  • 只在更新数据时去判断有没有其他线程对数据做了修改
    • 如果没有修改,继续执行
    • 发现被修改,重试或异常
  • 两种常见方式:
    • 版本号法:
      • 在数据表中添加一个 version 字段,用于记录数据的版本号。每次更新数据时,版本号递增。
      • 流程
        • 读取数据时,同时获取当前的版本号。
        • 更新数据时,检查版本号是否与读取时一致。
        • 如果一致,则执行更新并递增版本号;否则,表示数据已被其他事务修改,操作失败。
    • CAS法(Compare And Swap):
      • 以该超卖场景为例,就是把库存数据同时当版本号使用

乐观锁CAS解决超卖问题示例:

//4.扣减库存
boolean success = seckillVoucherService.update()
    .eq("voucher_id", voucherId)
    .setSql("stock = stock - 1")
    .update();

改为:

//4.扣减库存
boolean success = seckillVoucherService.update()
    .eq("voucher_id", voucherId).eq("stock",voucher.getStock())//cas乐观锁
    .setSql("stock = stock - 1")
    .update();

此时会出现多个线程同时失效:

可修改为:

//4.扣减库存
boolean success = seckillVoucherService.update()
    .eq("voucher_id", voucherId).eq("stock",voucher.getStock())//cas乐观锁
    .setSql("stock = stock - 1")
    .update();

这样也是乐观锁,但不再是cas或版本号式。

也可以加上重式机制:

boolean success = false;
        int retryTimes = 3;
        while (retryTimes-- > 0) {
            success = seckillVoucherService.update()
                    .setSql("stock = stock - 1")
                    .eq("voucher_id", voucherId)
                    .gt("stock", 0) // 确保库存大于0
                    .update();
            if (success) {
                break;
            }
            // 每次失败可以考虑 sleep 一点时间缓解写冲突(可选)

        }

超卖问题解决方案:

  1. 悲观锁:添加同步锁,让线程串行执行
    ・优点:简单粗暴
    ・缺点:性能一般
  2. 乐观锁:不加锁,在更新时判断是否有其它线程在修改
    ・优点:性能好
    ・缺点:存在成功率低的问题
  3. Redis 原子脚本(Lua脚本)库存扣减 + 异步落库
    ・优点:性能最好、减轻数据库压力
    ・缺点:开发难度较大、维护困难,Redis 故障风险、复杂性高

3.4 一人一单问题

场景描述:限制同一用户只能下单一次(秒杀只买一次)

可以用乐观锁吗?

乐观锁本质是基于版本号或条件更新的并发控制,它对“库存扣减”这种“数值更新”非常适用。
对“一人一单”这种“用户唯一性限制”问题,乐观锁不是最直接的解决方式。

所以此处选择悲观锁解决:

@Override
public Result seckillVoucher(Long voucherId) {
    //1.查询秒杀类优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    if (voucher == null){
        return Result.fail("秒杀券不存在");
    }

    //2.判断是否在活动时间内
    LocalDateTime now = LocalDateTime.now();
    if (voucher.getBeginTime().isAfter(now)||voucher.getEndTime().isBefore(now)){
        return Result.fail("秒杀券不在活动时间");
    }

    //3.是否有库存
    if (voucher.getStock() < 1){
        return Result.fail("库存不足");
    }



    //6.返回订单id
    return createOrder(voucher);
}

@Transactional
public  Result createOrder(SeckillVoucher voucher){
    //一人一单用悲观锁:synchronized
    Long userId = UserHolder.getUser().getId();

    //userId.toString().intern():userId.toString().intern() 是为了确保相同用户在多线程下使用同一把锁,实现用户级互斥,而不是全局锁或失效锁。
    synchronized (userId.toString().intern()){
        int count = query().eq("user_id", userId).eq("voucher_id", voucher.getVoucherId()).count();
        if (count > 0){
            return Result.fail("同一用户只能购买一次");
        }

        //4.扣减库存
        boolean success = seckillVoucherService.update()
                .eq("voucher_id", voucher.getVoucherId()).eq("stock",voucher.getStock())//cas乐观锁
                .setSql("stock = stock - 1")
                .update();

        if (!success){
            return Result.fail("库存不足");
        }

        //5.创建订单
        idWorker=new RedisIdWorker(stringRedisTemplate);

        VoucherOrder order = new VoucherOrder();
        Long orderId = idWorker.nextId("order");
        //订单id
        order.setId(orderId);
        //用户id
        order.setUserId(UserHolder.getUser().getId());
        //秒杀券id
        order.setVoucherId(voucher.getVoucherId());

        save(order);

        return Result.ok(orderId);
    }
}

注意锁:userId.toString().intern()

锁对象是否唯一对象是否适合并发控制说明
this所有人共用一把锁,性能差
userId可能是不同对象
userId.toString()每次都是新对象
userId
.toString().intern()
字符串常量池,确保同用户同锁

新问题:

由于“Spring 事务是提交时生效,在方法最后才提交;”。这里锁了create方法内,会导致:先释放了锁,再提交事务的中间时期可能有其他线程拿到锁进行了修改。造成并发安全问题。

修改:

@Override
public Result seckillVoucher(Long voucherId) {
//1.查询秒杀类优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if (voucher == null){
return Result.fail("秒杀券不存在");
}

//2.判断是否在活动时间内
LocalDateTime now = LocalDateTime.now();
if (voucher.getBeginTime().isAfter(now)||voucher.getEndTime().isBefore(now)){
return Result.fail("秒杀券不在活动时间");
}

//3.是否有库存
if (voucher.getStock() < 1){
return Result.fail("库存不足");
}

//6.返回订单id
Long userId = UserHolder.getUser().getId();
//userId.toString().intern()解释:userId.toString().intern() 是为了确保相同用户在多线程下使用同一把锁,实现用户级互斥,而不是全局锁或失效锁。

synchronized (userId.toString().intern()) {//获取锁
return createOrder(voucher);//提交事务
}//释放锁
}

@Transactional
public Result createOrder(SeckillVoucher voucher){
//一人一单用悲观锁:synchronized
Long userId = UserHolder.getUser().getId();

int count = query().eq("user_id", userId).eq("voucher_id", voucher.getVoucherId()).count();
if (count > 0){
return Result.fail("同一用户只能购买一次");
}

//4.扣减库存
boolean success = seckillVoucherService.update()
.eq("voucher_id", voucher.getVoucherId()).eq("stock",voucher.getStock())//cas乐观锁
.setSql("stock = stock - 1")
.update();

if (!success){
return Result.fail("库存不足");
}

//5.创建订单
idWorker=new RedisIdWorker(stringRedisTemplate);

VoucherOrder order = new VoucherOrder();
Long orderId = idWorker.nextId("order");
//订单id
order.setId(orderId);
//用户id
order.setUserId(UserHolder.getUser().getId());
//秒杀券id
order.setVoucherId(voucher.getVoucherId());
save(order);
return Result.ok(orderId);
}

还是有问题:事务失效!!!

Spring 事务的本质:靠代理对象实现。当你调用 @Transactional 方法时,其实调用的是代理对象的方法

seckillVoucher() 调用的是当前类的 createOrder() 方法,而非有事务管理能力的当前类的代理类的方法,所以事务失效了。

部分解决方法:

解决方法简述特点
方法一:锁与事务写在同一个 @Transactional 方法中在一个方法中同时加锁并执行下单逻辑,避免内部调用最简单,事务生效,适合单体应用
方法二:将事务方法抽取到另一个 Spring Bean 中createOrder() 拆出去,通过 Spring 注入调用推荐做法,解耦清晰,事务生效
方法三:使用 AopContext.currentProxy() 获取代理对象通过 Spring 暴露的代理对象间接调用自身方法原理清晰,但写法别扭,不推荐生产用
方法四:自己注入自己的代理对象在类中注入自己,通过代理对象调用事务方法可行但不优雅,推荐用接口注入方式

最终:

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private VoucherOrderServiceImpl proxySelf;

    private RedisIdWorker idWorker;
    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.查询秒杀类优惠券
        SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
        if (voucher == null){
            return Result.fail("秒杀券不存在");
        }

        //2.判断是否在活动时间内
        LocalDateTime now = LocalDateTime.now();
        if (voucher.getBeginTime().isAfter(now)||voucher.getEndTime().isBefore(now)){
            return Result.fail("秒杀券不在活动时间");
        }

        //3.是否有库存
        if (voucher.getStock() < 1){
            return Result.fail("库存不足");
        }

        //6.返回订单id
        Long userId = UserHolder.getUser().getId();
        //userId.toString().intern()解释:userId.toString().intern() 是为了确保相同用户在多线程下使用同一把锁,实现用户级互斥,而不是全局锁或失效锁。

        synchronized (userId.toString().intern()) {//获取锁
            //提交事务
            //return createOrder(voucher);,在类内部“自己调用自己”,就绕过了代理,导致事务不生效
            // 调用的是被代理对象,事务生效
            return proxySelf.createOrder(voucher);
        }//释放锁
    }

    @Transactional
    public  Result createOrder(SeckillVoucher voucher){
        //一人一单用悲观锁:synchronized
        Long userId = UserHolder.getUser().getId();

        int count = query().eq("user_id", userId).eq("voucher_id", voucher.getVoucherId()).count();
        if (count > 0){
            return Result.fail("同一用户只能购买一次");
        }

        //4.扣减库存
        boolean success = seckillVoucherService.update()
                .eq("voucher_id", voucher.getVoucherId()).eq("stock",voucher.getStock())//cas乐观锁
                .setSql("stock = stock - 1")
                .update();

        if (!success){
            return Result.fail("库存不足");
        }

        //5.创建订单
        idWorker=new RedisIdWorker(stringRedisTemplate);

        VoucherOrder order = new VoucherOrder();
        Long orderId = idWorker.nextId("order");
        //订单id
        order.setId(orderId);
        //用户id
        order.setUserId(UserHolder.getUser().getId());
        //秒杀券id
        order.setVoucherId(voucher.getVoucherId());
        save(order);
        return Result.ok(orderId);
    }
}

至此,解决了单体架构的“一人一单”问题,但是若是集群模式,仍然因为锁的问题存在并发安全问题。

在集群环境中,需要使用 分布式锁(如 Redis Redisson、Zookeeper 等) 来保证锁的全局唯一性和跨节点互斥,从而彻底解决“一人一单”的并发安全问题。

4.分布式锁

满足分布式系统或集群模式下多线程可见并且互斥的锁。

  • 多线程可见
  • 互斥
  • 高可用
  • 高性能
  • 安全性

基于Redis的分布式锁

分布式锁的两个基本方法:

  1. 获取锁
    • 互斥:SETNX
    • 需要保证获取锁与设定过期时间的原子性
    • 非阻塞:尝试一次,成功true,失败false
  • 释放锁
    • 手动释放:DEL
    • 超时释放:EXPIRE,兜底

4.1分布式锁实现初级版本:

分布式锁类:

public class SimpleRedisLock implements ILock{

public static final String KEY_PREFIX = "lock:";
private String name;
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

//获取锁
@Override
public boolean tryLock(long timeoutSec) {
String key = KEY_PREFIX + name;
long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId+"", timeoutSec, TimeUnit.SECONDS);

return BooleanUtil.isTrue(success);
}

//释放锁
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

业务修改:使用分布式锁而非原来的synchronized (userId.toString().intern()

SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:"+userId, stringRedisTemplate);
boolean success = simpleRedisLock.tryLock(1500);
if (!success){
return Result.fail("请勿重复下单");
}

try {
//提交事务
//return createOrder(voucher);,在类内部“自己调用自己”,就绕过了代理,导致事务不生效
// 调用的是被代理对象,事务生效
return proxySelf.createOrder(voucher);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
simpleRedisLock.unlock();
}

4.2Redis分布式锁误删问题:

当线程1获取锁后,由于业务阻塞,线程1的锁超时释放了,这时候线程2趁虚而入拿到了锁,然后此时线程1业务完成了,然后把线程2刚刚获取的锁给释放了,这时候线程3又趁虚而入拿到了锁,出现并发安全问题。

解决思路:给锁加上唯一标识,主动释放时检查标再释放锁。不一致则不释放,

此处设计标识:JVM实例级UUID+线程ID。(同一JVM内的线程得到的UUID一样)。

public class SimpleRedisLock implements ILock{

public static final String KEY_PREFIX = "lock:";
public static final String ID_PREFIX= UUID.randomUUID().toString(true)+"_";
private String name;
private StringRedisTemplate stringRedisTemplate;

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

//获取锁
@Override
public boolean tryLock(long timeoutSec) {
String key = KEY_PREFIX + name;
//线程标识:JVM实例级UUID+线程ID
String uniqueLockId = ID_PREFIX+Thread.currentThread().getId();

long threadId = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, uniqueLockId, timeoutSec, TimeUnit.SECONDS);

return BooleanUtil.isTrue(success);
}

//释放锁
@Override
public void unlock() {
//获取标识
String lockId = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name)
String uniqueLockId = ID_PREFIX+Thread.currentThread().getId();

if (lockId.equals(uniqueLockId)) {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
}

4.3Redis分布式锁原子性问题:

仍存在的问题:

到此处,即使加上了唯一标识用于控制释放锁,但是仍然没有彻底解决误删问题:
线程A在执行业务后,拿到了锁的标识,检查一致后,阻塞了,此时超时释放锁。线程B就可以拿到锁,执行业务,在这期间,A又被唤醒,因为已经检查过一致性,就在B执行业务过程中释放了实际属于B的锁。此时线程C又可以拿到锁…

而造成以上问题的原因就是“判断标识”和“释放锁”两个操作之间没有原子性,此处选择使用Lua脚本,在一个脚本中编写多条Redis命令来实现多条命令的原子性。

Lua脚本:

-- 正确的释放锁脚本:校验持有者身份后再删除
-- KEYS[1]:锁的键(如lock:order)
-- ARGV[1]:当前客户端的唯一标识(如clientA:123)
if redis.call('get', KEYS[1]) == ARGV[1] then
    redis.call('del', KEYS[1])  -- 只有持有者匹配,才删除锁
    return 1  -- 释放成功
else
    return 0  -- 锁不属于自己,不删除
end

java调用Lua脚本:

RedisTemplate 调用 Lua 脚本的关键 API

  • execute 方法
<T> T execute(RedisScript<T> script, List<K> keys, Object... args)
  • 参数说明
    • script:封装 Lua 脚本的RedisScript对象
    • keys:传递给 Lua 脚本的键列表(对应 Lua 中的KEYS数组)
    • args:传递给 Lua 脚本的参数列表(对应 Lua 中的ARGV数组)
  • 返回值:执行结果,类型由RedisScript指定
//初始化Lua脚本
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        UNLOCK_SCRIPT=new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("./unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }

...............

//释放锁
    @Override
    public void unlock() {
        stringRedisTemplate.execute(
                UNLOCK_SCRIPT,
                Collections.singletonList(KEY_PREFIX + name),
                ID_PREFIX+Thread.currentThread().getId()
                );
    }

仍存在的问题:

到此为止,分布式锁已经先对完善了,但是仍然有并发安全问题:

线程A拿到锁,进入到临界区:createOrder,结果被阻塞(数据库慢、GC 抖动、Redis 响应慢…),超时释放了锁。此时B拿到了锁,也进入了临界区:createOrder。就可能交替执行createOrder内部代码,产生了并发安全问题。

4.4 Redission

再来讨论下上述自己实现的分布式锁的问题:

  • 不可重入:同一线程无法多次获取同一把锁。
    • 同一线程内方法A拿到了锁,执行方法B,但是方法B也要这把锁,就死锁了。
  • 不可重试:获取锁失败只尝试一次就返回false,没有重试机制
  • 超时释放:超时释放虽然可以避免死锁,但是业务执行过长,就会导致锁释放,造成并发安全问题。(具体场景见上)
  • 主从一致性:(读写分离(主写读存))。主从同步出现延迟时,主节点宕机,可能有多个线程拿到锁。

为了解决这些问题,使用Redisson:

Redisson 是基于 Redis 的 Java 驻内存数据网格,提供分布式和可扩展的 Java 数据结构,简化 Redis 使用并增强分布式系统能力。以下是其核心功能:

1. 分布式锁与同步器

  • 可重入锁(RLock):支持自动续期的分布式锁,避免业务超时导致锁提前释放。
  • 读写锁(RReadWriteLock):允许多个读操作或单个写操作,提升并发性能。
  • 信号量(RSemaphore)、闭锁(RLatch):控制分布式环境下的资源访问和线程协作。

2. 分布式集合与对象

  • 集合框架:分布式 MapListSetQueue 等,支持集群环境下的数据共享。
  • 特殊结构LocalCachedMap(本地缓存 + 远程同步)、Geo(地理位置操作)、BloomFilter(布隆过滤器)。

3. 高可用与集群支持

  • 适配 Redis 单机主从哨兵集群模式,自动故障转移。
  • 连接池管理优化性能,减少频繁连接开销。

4. 异步与响应式编程

  • 支持 CompletableFuture 和 RxJava,非阻塞调用提升吞吐量。

5. 分布式服务

  • 远程服务(RRemoteService):跨节点的 Java 方法调用。
  • 分布式执行器(RExecutorService):集群环境下的任务调度与执行。

6. 其他特性

  • 缓存策略:TTL 过期、LRU 淘汰、本地二级缓存。
  • 事件监听:监听分布式对象的变更事件(如锁释放、Map 更新)。

核心优势

  • API 简洁:与 Java 原生接口高度一致,学习成本低。
  • 可靠性强:解决分布式锁常见问题(如死锁、误释放)。
  • 性能优化:本地缓存、批量操作减少 Redis 访问。

典型场景

  • 分布式锁(秒杀、库存扣减)
  • 分布式缓存(热点数据共享)
  • 任务编排与协作(分布式计算)
  • 数据共享(分布式会话管理)

4.4.1 Redission快速入门:

Maven依赖:

<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>

配置类:

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        //配置
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379").setPassword("123456");
        //创建RedissonClient对象
        return Redisson.create(config);
    }
}

修改业务:

//SimpleRedisLock simpleRedisLock = new SimpleRedisLock("order:"+userId, stringRedisTemplate);
RLock lock = redissonClient.getLock("lock:order:" + userId);

boolean success = lock.tryLock();
if (!success){
return Result.fail("请勿重复下单");
}

4.4.2 Redission可重入锁原理:

添加字段value表示进入临界区\拿到锁的方法个数(同一线程的),标识使用线程级标识,使得同一线程内的方法可以重入锁。拿锁时先根据标识判断是否是同一线程的,是则value+1,进入临界区;释放锁时先让value-1,value为0时才真正释放,否则只是延长下锁的有效期。

4.4.3 Redission的锁重试和看门狗:

  1. 锁重试机制
    使用看门狗机制配合公平锁队列。获取失败时线程进入等待队列,通过订阅机制接收锁释放通知,自动重试获取锁,避免忙等待。
  2. 超时续约
    默认锁持有时间 30 秒,后台看门狗线程每 10 秒(锁超时时间的 1/3)自动续约。业务未执行完时会不断延长锁的过期时间,防止提前释放。

注意:显式指定锁的有效期(如lock.lock(10, TimeUnit.SECONDS))时,Redisson 会禁用看门狗自动续约机制。而看门狗的默认时间变量可以在配置类里配置。

4.4.4 Redission解决中从一致性:mutiLock:

Redisson 的 RedissonMultiLock :多个独立的Redis节点,必须在所有节点(和RedLock的“超过半数”比较)都获取重入锁。才算获取锁成功。

创建 MultiLock 实例:

// 方式1:通过构造函数传入多个 RLock 实例
RedissonMultiLock multiLock = new RedissonMultiLock(lock1, lock2, lock3); 

加锁方法:

// 阻塞式加锁:一直等待直到获取所有锁
 void lock();
 
// 带超时的阻塞式加锁:等待指定时间仍未获取所有锁则返回
boolean tryLock(long waitTime, TimeUnit unit);

// 带超时和锁过期时间的加锁:自动释放锁,防止死锁 
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);

4.5 秒杀优化

4.5.1 异步秒杀

当前业务逻辑:

  1. 查询优惠券
  2. 判断秒杀库存
  3. 查询订单
  4. 校验一人一单
  5. 减库存
  6. 创建订单

以上步骤串行执行,并且使用了分布式锁等工具,导致效率很低。所以将无需查库的2,4拆出来交给redis(Lua)通过条件后将{优惠券id、用户id、订单id}交给阻塞队列,然后直接向前端返回订单id。异步将其他任务由消息队列交给另一个线程。

lua脚本内的“减库存”“添加userId”都是预操作。

4.5.2 基于Redis和阻塞队列完成秒杀资格判断:

需求:
① 新增秒杀优惠券的同时,将优惠券信息保存到 Redis 中
② 基于 Lua 脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
③ 如果抢购成功,将优惠券 id 和用户 id 封装后存入阻塞队列
④ 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

lua脚本:

-- 参数定义
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderKey = 'seckill:order:' .. voucherId
local stockKey = 'seckill:stock:' .. voucherId

-- 库存检查
if (tonumber(redis.call('get', stockKey)) <= 0) then
return 1
end

-- 一人一单校验
if (redis.call('sismember', orderKey, userId) == 1) then
return 2
end

-- 扣减库存并记录订单
redis.call('decr', stockKey)
redis.call('sadd', orderKey, userId)
return 0

业务修改:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    @Lazy
    private VoucherOrderServiceImpl proxySelf;

    @Autowired
    private RedissonClient redissonClient;

    private RedisIdWorker idWorker;

    //初始化Lua脚本
    private static final DefaultRedisScript<Long> SECKILL_SCRIPTS;
    static {
        SECKILL_SCRIPTS =new DefaultRedisScript<>();
        SECKILL_SCRIPTS.setLocation(new ClassPathResource("./scripts/seckill.lua"));
        SECKILL_SCRIPTS.setResultType(Long.class);
    }

    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
    private ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    private class VoucherOrderTask implements Runnable{
        @Override
        public void run() {
            while (true){
                try {
                    //1.从阻塞队列里获取任务
                    VoucherOrder order = orderTasks.take();
                    //2.数据库操作:减库存+添加订单
                    proxySelf.asyncCreateOrder(order);//依然使用自己注入自己的动态代理对象的方防止事务失效
                } catch (InterruptedException e) {
                    log.info("异步任务失败");
                }

            }
        }
    }

    //服务一启动就要开启线程池任务,所以
    @PostConstruct
    private void ExecutorServiceInit(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderTask());
    }



    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.lua脚本判断秒杀资格(原子性本身也保证线程安全)
        Long flag = stringRedisTemplate.execute(
                SECKILL_SCRIPTS,
                Collections.emptyList(),
                voucherId.toString(),
                UserHolder.getUser().getId().toString()
        );
        //2.处理判断结果
        if (flag == 1){
            return Result.fail("库存不足");
        }else if (flag == 2){
            return Result.fail("不能重复下单");
        }
        //3.若有资格,把下单信息保存到阻塞队列
        idWorker=new RedisIdWorker(stringRedisTemplate);
        Long orderId = idWorker.nextId("order");
        VoucherOrder order = new VoucherOrder();
        //订单id
        order.setId(orderId);
        //用户id
        order.setUserId(UserHolder.getUser().getId());
        //秒杀券id
        order.setVoucherId(voucherId);

        orderTasks.add(order);
        //4.返回订单id
        return Result.ok(orderId);
    }


    @Transactional
    public void asyncCreateOrder(VoucherOrder order){
        //这里由于Redis就解决的超卖、一人一单问题,所以不需要再加锁、判断什么的(只要redis不崩)
        //1.扣减库存
        boolean success = seckillVoucherService.update()
                .eq("voucher_id", order.getVoucherId())
                .setSql("stock = stock - 1")
                .update();

        if (!success){
            log.info("扣减库存失败");
        }

        //2.添加订单
        save(order);
    }
........

基于阻塞队列的异步秒杀存在的问题:

  • 内存限制问题:目前的阻塞队列是JDK的,存储在jvm中,容易爆内存。
  • 数据安全问题:订单信息可能因服务宕机丢失

4.6 基于redis消息队列实现异步秒杀:

Redis 实现消息队列的三种方式:

  • list 结构:基于 List 结构模拟消息队列
  • PubSub:基本的点对点消息模型
  • Stream:比较完善的消息队列模型

没什么意义,后面学Rocket/Rabit MQ。这里直接跳过。

想参考可以去:黑马点评项目学习笔记(15w字详解,堪称史上最详细,欢迎收藏)-CSDN博客

5 达人探店:

查看探店笔记、都是简单的CRUD,不多说。

5.1 完善点赞功能:

需求:

  • 同一个用户只能点赞一次,再次点击则取消点赞
  • 如果当前用户已经点赞,则点赞按钮高亮显示(前端已实现,判断字段 Blog 类的 isLike 属性)

实现步骤:
① 给 Blog 类中添加一个 isLike 字段,标示是否被当前用户点赞
② 修改点赞功能,利用 Redis 的 set 集合判断是否点赞过,未点赞过则点赞数 + 1,已点赞过则点赞数 – 1
③ 修改根据 id 查询 Blog 的业务,判断当前登录用户是否点赞过,赋值给 isLike 字段
④ 修改分页查询 Blog 业务,判断当前登录用户是否点赞过,赋值给 isLike 字段

使用Set,因为Set类型的数据结构具有

  1. 不重复,符合业务的特点,一个用户只能点赞一次
  2. 高性能,Set集合内部实现了高效的数据结构(Hash表)
  3. 灵活性,Set集合可以实现一对多,一个博客可以看到有多个用户点赞
@Override
public Result queryHotBlog(Integer current) {
    // 根据用户查询
    Page<Blog> page = query()
            .orderByDesc("liked")
            .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
    // 获取当前页数据
    List<Blog> records = page.getRecords();
    // 查询用户
    records.forEach(blog ->{
        queryBlogUser(blog);
        isBlogLiked(blog);
    });
    return Result.ok(records);
}

@Override
public Result queryBlogById(Long id) {
    //1.查询blog
    Blog blog = getById(id);
    if (blog == null){
        return Result.fail("数据不存在");
    }

    //2.查询blog有关用户
    queryBlogUser(blog);
    //3.查询用户点赞该BLOG
    isBlogLiked(blog);
    return Result.ok(blog);
}

@Override
public Result likeBlog(Long id) {
    //1.获取用户
    Long userId = UserHolder.getUser().getId();
    //2.在redis内判断当前用户是否已经点过赞
    String key = BLOG_LIKED_KEY+id;
    Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());

    if (BooleanUtil.isFalse(isMember)){
        //2.1如果没有点过,可以点赞
        boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
        if (isSuccess){
            //更新成功
            //保存用户到redis
            stringRedisTemplate.opsForSet().add(key, userId.toString());
        }

    }else {
        //2.2如果点过,取消点赞
        boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
        if (isSuccess){
            //更新成功
            //redis内的集合删除该用户
            stringRedisTemplate.opsForSet().remove(key, userId.toString());
        }
    }

    return Result.ok();
}

private void queryBlogUser(Blog blog) {
    Long userId = blog.getUserId();
    User user = userService.getById(userId);
    blog.setName(user.getNickName());
    blog.setIcon(user.getIcon());
}

/**
 * 判断当前用户是否点赞该博客
 */
private void isBlogLiked(Blog blog) {
        UserDTO user = UserHolder.getUser();
        if (user==null){
            //用户未登录,无需查询是否点过赞
            return;
        }
        Long userId = UserHolder.getUser().getId();
        String key = BLOG_LIKED_KEY + blog.getId();
        Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        blog.setIsLike(score!=null);
    }

5.2 点赞排行榜:

需求:按照点赞时间先后顺序,找到前5名用户。

想到zset/sorted set ,既保证唯一性,也保证有序性(根据score值排序)。
改造业务代码:
在存储时使用zset,使用时间戳作为score:

Double score = stringRedisTemplate.opsForZSet().score(key, userId.toString());

        if (score==null){
            //2.1如果没有点过,可以点赞
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            if (isSuccess){
                //更新成功
                //保存用户到redis
                stringRedisTemplate.opsForZSet().add(key, userId.toString(),System.currentTimeMillis());
            }

        }else {
            //2.2如果点过,取消点赞
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            if (isSuccess){
                //更新成功
                //redis内的集合删除该用户
                stringRedisTemplate.opsForZSet().remove(key, userId.toString());
            }
        }

查询点赞排名:

public Result queryBlogLikedRankById(Long id) {
//查询top5的点赞用户
String key = BLOG_LIKED_KEY + id;
Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);

if (top5==null ||top5.isEmpty()){
return Result.ok(Collections.emptyList());
}

//解析id
Set<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toSet());
//根据用户id查询用户并封装为DTO(去除敏感信息)
List<User> users = userService.listByIds(ids);
List<UserDTO> userDTOList = users.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
//返回
return Result.ok(userDTOList);
}

此时有个关于mysql的问题:

List<User> users = userService.listByIds(ids);
这一句的SQL:

SELECT id, phone, nick_name, icon, create_time, update_time 
FROM tb_user 
WHERE id IN (5, 1);
ORDER BY FIELD(id,id1,id2,id3...)

这里使用了in,SQL查询时不按参数的顺序,而是会自动按从小到大的顺序,导致排名出错。
解决:使用 ORDER BY FIELD(id1,id2,id3…)手动强制指定查询顺序:

业务修正:

public Result queryBlogLikedRankById(Long id) {
        //查询top5的点赞用户
        String key = BLOG_LIKED_KEY + id;
        Set<String> top5 = stringRedisTemplate.opsForZSet().range(key, 0, 4);

        if (top5==null ||top5.isEmpty()){
            return Result.ok(Collections.emptyList());
        }

        //解析id
        List<Long> ids = top5.stream().map(Long::valueOf).collect(Collectors.toList());
        //根据用户id查询用户并封装为DTO(去除敏感信息)
        StringBuilder stringBuilder= new StringBuilder();
        ids.forEach(tid -> stringBuilder.append(tid).append(","));
        String idsStr = stringBuilder.toString();

        /*
        SELECT id, phone, nick_name, icon, create_time, update_time
        FROM tb_user
        WHERE id IN (5, 1);
        ORDER BY FIELD(id,id1,id2,id3...)
         */
        List<User> users = userService
                .query()
                .in("id",ids)
                .last("ORDER BY FIELD(id,"+idsStr+")")
                .list();
        List<UserDTO> userDTOList = users.stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        //返回
        return Result.ok(userDTOList);
    }

6 好友关注:

6.1 关注和取关:

两个接口:
关注和取关的接口;
判断是否关注的接口。

由于关注者和被关注者是多对多的关系,使用中间表。

关注就是新增,取关就是删除。

在此节顺便复习下mybatis

service:

/**
 * 设置关注(True)/取关(False)
 * @param followUserId
 * @param isFollow
 * @return
 */
@Override
public Result follow(Long followUserId, Boolean isFollow) {
    Long userId = UserHolder.getUser().getId();

    //1.判断是关注还是取关
    if (isFollow){
        //1.1 关注:新增
        Follow follow = new Follow();
        follow.setFollowUserId(followUserId);
        follow.setUserId(userId);
        follow.setCreateTime(LocalDateTime.now());
        //save(follow);复习下mybatis
        followMapper.myInsert(follow);
    }else{
        //1.2 取关:删除
        //remove(new QueryWrapper<Follow>().eq("follow_user_id", followUserId).eq("user_id", id));
        followMapper.deleteByfollowUserIdAndUserId(followUserId,userId);
    }
    return Result.ok();
}

/**
 * 查询是否关注
 * @param followUserId
 * @return
 */
@Override
public Result isFollow(Long followUserId) {
    Long userId = UserHolder.getUser().getId();
    int count = followMapper.isFollowed(userId, followUserId);
    return Result.ok(count>0);
}

mapper:

@Insert("INSERT INTO tb_follow (user_id, follow_user_id, create_time)" +
" VALUES (#{userId},#{followUserId},#{createTime})")
void myInsert(Follow follow);

@Delete("DELETE FROM tb_follow WHERE follow_user_id=#{followUserId} AND user_id = #{userId}")
void deleteByfollowUserIdAndUserId(@Param("followUserId")Long followUserId,
@Param("userId")Long userId);

@Select("SELECT COUNT(1) FROM tb_follow WHERE user_id = #{userId} AND follow_user_id = #{followUserId}")
int isFollowed(@Param("userId") Long userId, @Param("followUserId") Long followUserId);

6.2 共同关注:

需求

利用 Redis 中恰当的数据结构,实现共同关注功能。在博主个人页面展示当前用户与博主的共同好友。

由于要使用redis的set,所以需要改造之前关注的接口。

            followMapper.myInsert(follow);
            stringRedisTemplate.opsForSet().add(key,followUserId.toString());
        ...
            followMapper.deleteByfollowUserIdAndUserId(followUserId,userId);
            stringRedisTemplate.opsForSet().remove(key,followUserId.toString());

共同关注接口:

public Result followCommons(Long followUserId) {
Long userId = UserHolder.getUser().getId();
String myKey = "follows:"+userId.toString();
String targetKey = "follows:"+followUserId.toString();
//求交集
Set<String> stringSet = stringRedisTemplate.opsForSet().intersect(myKey, targetKey);

if (stringSet==null||stringSet.isEmpty()){
//没有共同关注
return Result.ok(Collections.emptyList());
}

//解析ID集合
List<Long> ids = stringSet.stream()
.map(id -> Long.valueOf(id))
.collect(Collectors.toList());

List<UserDTO> userDTOList = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

return Result.ok(userDTOList);
}

6.3 关注推送

关注推送也叫Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

6.3.1 Feed流基本模式

1. Pull模式(拉取式)

用户访问时从后台实时查询生成 Feed。

  • 优点
    • 数据新鲜、实时性好。
    • 写入轻,不需推送或维护缓存。
  • 缺点
    • 查询压力大,尤其是大V或粉丝众多用户。
    • 用户体验差,可能遇到加载慢、重复内容等。

2. Push模式(推送式)

内容生产者发布内容时,系统将其推送到所有关注者的 Feed 中。

  • 优点
    • 读取快,用户访问时可直接获取。
  • 缺点
    • 写入压力大。
    • 存储冗余,内容需要复制到多个用户。
    • 不适合大V(百万粉丝)内容分发。

3. Hybrid模式(混合式)

常见的方案是:

  • 对普通用户采用 Push。
  • 对大V(如粉丝 > 10w)采用 Pull。
  • 热门内容(全网推荐)采用定时 Push 或广播下发。

此节使用推模式来实现:

6.3.2 基于推模式实现关注推送功能:

功能需求:

  1. 新增探店笔记业务流程:在保存 blog 数据到数据库的操作同步执行时,需将该笔记推送至粉丝的收件箱 。
  2. 收件箱数据结构:基于 Redis 实现,且要支持按时间戳对收件箱内容进行排序 。
  3. 数据查询能力:支持对收件箱数据执行分页查询操作 。

Timeline 推荐用 ZSet实现feed的滚动分页。

在保存blog时推送到收件箱:

public Result saveBlog(Blog blog) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
blog.setUserId(user.getId());
// 保存探店博文
save(blog);
//获取所有粉丝列表
List<Follow> followers = followService.query().eq("follow_user_id", user.getId()).list();
//发送到粉丝的收件箱
followers.forEach(follow -> {
Long followId = follow.getUserId();
String key = "feed:" + followId;
stringRedisTemplate.opsForZSet().add(key, blog.getId().toString(), System.currentTimeMillis());
});
// 返回id
return Result.ok(blog.getId());
}

滚动分页基于以时间戳为score的zset实现。有四个查询参数,两种情况:

  1. 第一次查询:
    • max:当前时间戳
    • min:0
    • offset:0
    • count:CONST LIMIT
  2. 之后的查询:
    • max:上一次查询的最小时间戳
    • min:0
    • offset:已查查询结果时间戳与最小时间戳相等的数量(注意是已经被查的所有结果(不只是上一次)里的相同的)
    • count:CONST LIMIT

此处视频内讲的有bug,关于黑马点评P87的滚动查询,为什么要加:os = minTime == max ? os + offset : os;_os = mintime == max ? os : os + offset;-CSDN博客

public Result queryBlogOfFollow(Long max, Integer offset) {
//1.获取当前用户
Long userId = UserHolder.getUser().getId();
//2.查询收件箱
String key = FEED_KEY + userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples =
stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, max, offset, 2);

if(typedTuples==null || typedTuples.isEmpty()){
return Result.ok();
}
//3.解析收件箱
long minTime=max;
int os=1;
List<Long> ids = new ArrayList<>(typedTuples.size());
for (ZSetOperations.TypedTuple<String> typedTuple : typedTuples){
//获取blogId
ids.add(Long.valueOf(typedTuple.getValue()));
//获取score
long time = typedTuple.getScore().longValue();
if (time==minTime){
os++;
}else {
minTime=time;
os = 1;
}
}

os = minTime == max ? os + offset : os;
//4.根据id查询blog
String idsStr = ids.stream()
.map(Object::toString)
.collect(Collectors.joining(","));
List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id,"+idsStr+")").list();

//5.填充非blog数据库字段
for (Blog blog : blogs) {
queryBlogUser(blog);
isBlogLiked(blog);
}
//6.返回
ScrollResult r = new ScrollResult();
r.setList(blogs);
r.setOffset(os);
r.setMinTime(minTime);
return Result.ok(r);
}

7 用户签到 : BitMap

附近商铺有关的Redis GEO没什么用,直接跳过。

7.1 BitMap用法:

Bitmap 简要介绍

定义:用比特位(bit)表示元素状态(0/1)的数据结构,每个比特对应一个整数元素。

核心特点

  1. 空间效率高
    存储 n 个元素仅需 n/8 字节(如存储 1 亿个 ID 仅需 12MB)。
  2. 操作速度快
    位运算由 CPU 直接支持,时间复杂度通常为 O(1) 或 O(n/64)
  3. 支持集合运算
    通过位运算实现高效交集(&)、并集(|)、差集(& ~)等操作。

典型应用场景

  1. 去重与判重
    • 统计独立用户数(如 UV)
    • 检测重复数据(如日志 ID 去重)
  2. 范围查询与过滤
    • 筛选年龄在 18-30 岁的用户
    • 权限快速匹配(如判断用户是否有读写权限)
  3. 大数据排序
    • 对 0-100 万的整数排序(时间复杂度 O(n)
  4. 布隆过滤器底层实现
    • 快速判断元素是否 “可能存在”(有一定误判率)

局限性

  • 仅支持整数映射,无法直接处理字符串等类型
  • 若元素范围大但实际数量少(如最大值 1 亿但仅存 10 个元素),会浪费空间
  • 无法记录元素重复次数(需变种如 Counting Bitmap)

在Redis中底层是String。
Redis 字符串的最大字节长度为 512MB,因此 Bitmap 的最大 bit 数为 512MB × 8 = 4,294,967,296 bit(约 42 亿 bit)

Redis 命令RedisTemplate 方法描述示例
SETBITopsForValue().setBit(key, offset, value)设置指定位置的位值(true/false)redisTemplate.opsForValue().setBit("user:sign", 0, true);
GETBITopsForValue().getBit(key, offset)获取指定位置的位值Boolean isSigned = redisTemplate.opsForValue().getBit("user:sign", 0);
BITCOUNTredisTemplate.execute( (RedisCallback<Long>) conn -> conn.bitCount(key.getBytes()) )统计位图中值为 1 的位数量Long count = redisTemplate.execute( (RedisCallback<Long>) conn -> conn.bitCount("user:sign".getBytes()) );
BITOPredisTemplate.execute( (RedisCallback<Long>) conn -> conn.bitOp(operation, destKey.getBytes(), srcKeys.getBytes()) )对多个位图执行位运算(AND/OR/XOR/NOT)redisTemplate.execute( (RedisCallback<Long>) conn -> conn.bitOp(RedisStringCommands.BitOperation.AND, "result".getBytes(), "key1".getBytes(), "key2".getBytes()) );
BITPOSredisTemplate.execute( (RedisCallback<Long>) conn -> conn.bitPos(key.getBytes(), bit, start, end) )查找第一个值为指定位的位置Long firstOne = redisTemplate.execute( (RedisCallback<Long>) conn -> conn.bitPos("user:sign".getBytes(), true, 0, -1) );

7.2 实现签到功能:

public Result sign() {
//1.获取用户
Long id = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
int dayOfMonth = now.getDayOfMonth();
//3.setBitMap
String key = USER_SIGN_KEY+id+":"+now.format(DateTimeFormatter.ofPattern("yyyyMM"));
redisTemplate.opsForValue().setBit(key,dayOfMonth-1,true);
//4.返回
return Result.ok();
}

7.3 统计连续签到:

连续签到次数:从最后一次签到开始向前统计到第一次未签到为止的连续次数。

public Result countContinuedSignedDay() {
//1.获取用户
Long id = UserHolder.getUser().getId();
//2.获取日期
LocalDateTime now = LocalDateTime.now();
int dayOfMonth = now.getDayOfMonth();
//3.获取key
String key = USER_SIGN_KEY+id+":"+now.format(DateTimeFormatter.ofPattern("yyyyMM"));
//4.获取本月截止今天为止的所有的签到记录,,返回的是一个十进制的数字
List<Long> bit = redisTemplate.opsForValue().bitField(
key,
BitFieldSubCommands.create()
.get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth))
.valueAt(0)
);
//没签到
if (bit==null||bit.size()==0)
return Result.ok(0);

Long num = bit.get(0);
//没签到
if (num==0)
return Result.ok(0);

//5.右移遍历得到连续签到天数
//5.1今天没签到直接返回0
if ((num & 1)==0)
return Result.ok(0);
//5.2遍历得到连续天数
int count=0;
while ((num&1)==1){
count++;
num=num>>1;
}

return Result.ok(count);
}

8.UV统计

  • UV:全称Unique Visitor,也叫独立访客量,是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站,只记录1次。
  • PV:全称Page View,也叫页面访问量或点击量,用户每访问网站的一个页面,记录1次PV,用户多次打开页面,则记录多次PV。往往用来衡量网站的流量。

8.1HyperLogLog用法

Hyperloglog(HLL)是从Loglog算法派生的概率算法,用于确定非常大的集合的基数,而不需要存储其所有值。

Redis中的HLL是基于string结构实现的,单个HLL的内存永远小于16kb。但其测量结果是概率性的,有小于0.81%的误差。不过对于UV统计来说,这完全可以忽略。

8.2 实现UV统计

实现方案:

  • 使用后端拦截器统计(如Spring的 HandlerInterceptor)。
  • 提取 userId / IP / Token / UA, HyperLogLog 实现去重统计。

public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
    String uid = getUserIdOrIp(request);
    stringRedisTemplate.opsForHyperLogLog().add("uv:" + LocalDate.now(), uid);
    return true;
}

9 使用Rocket MQ代替原生阻塞队列:

9.0 准备工作:

maven配置:

<!--RocketMQ-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>

yaml:(注意不在spring下,(GPT浪费了2小时时间,watch out!))

rocketmq:
name-server: 127.0.0.1:9876
consumer:
group: VOUCHER_ORDER_GROUP

本地启动服务:

此处使用docker(有个大坑,见下conf文件中解释):

docker-compose.yml:

version: '3.8'
services:
  namesrv:
    image: apache/rocketmq:4.9.6
    container_name: rocketmq-namesrv
    ports:
      - "9876:9876"
    command: sh mqnamesrv

  broker:
    image: apache/rocketmq:4.9.6
    container_name: rocketmq-broker
    depends_on:
      - namesrv
    ports:
      - "10909:10909"
      - "10911:10911"
    volumes:
      - ./conf/broker.conf:/home/rocketmq/rocketmq-4.9.6/conf/broker.conf
    command: sh mqbroker -n namesrv:9876 -c /home/rocketmq/rocketmq-4.9.6/conf/broker.conf

  dashboard:
    image: styletang/rocketmq-dashboard
    container_name: rocketmq-dashboard
    ports:
      - "8079:8080"  # 宿主机 8079 → 容器 8080
    environment:
      - ROCKETMQ_NAMESRV_ADDR=namesrv:9876

conf/broker.conf:

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

# ⚠️ 关键:指定 Broker 的 IP 地址,必须使用宿主机可访问的 IP
# 如果不设置,Spring Boot Consumer 就会连接docker内部的ip地址(注册到nameserver的),无法连接到 Broker。
brokerIP1=127.0.0.1

docker-compose up -d 启动

9.1 代码实现:

可以先删去不用的阻塞队列相关以及用于消费任务的线程池及任务类。

9.1.0:向redis初始化库存:

@Component
public class StockInitializer implements ApplicationRunner {

    // 注入秒杀券服务,用来查询数据库中的秒杀券信息
    @Autowired
    private ISeckillVoucherService seckillVoucherService;

    // 注入 Redis 模板,用于操作 Redis
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * run 方法会在 Spring Boot 启动完成后自动执行
     * @param args 启动参数,一般不需要使用
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 查询数据库中所有的秒杀券
        List<SeckillVoucher> vouchers = seckillVoucherService.list();

        // 遍历每一个秒杀券,初始化库存到 Redis
        for (SeckillVoucher voucher : vouchers) {
            // 构建 Redis key,比如 "seckill:stock:1"
            String stockKey = "seckill:stock:" + voucher.getVoucherId();

            // 如果 Redis 中不存在这个 key,则设置库存值
            // setIfAbsent 相当于 "SETNX",保证不会覆盖已经存在的库存
            stringRedisTemplate.opsForValue().setIfAbsent(stockKey, voucher.getStock().toString());
        }
    }
}

9.1.1 生产者:

使用RocketMQTemplate这个核心工具类作为生产者对象:

VoucherOrderServiceImpl中:

@Autowired
    private RocketMQTemplate rocketMQTemplate;

.........

@Override
    public Result seckillVoucher(Long voucherId) {
        //1.lua脚本判断秒杀资格(原子性本身也保证线程安全)
        Long flag = stringRedisTemplate.execute(
                SECKILL_SCRIPTS,
                Collections.emptyList(),
                voucherId.toString(),
                UserHolder.getUser().getId().toString()
        );
        //2.处理判断结果
        if (flag == 1){
            return Result.fail("库存不足");
        }else if (flag == 2){
            return Result.fail("不能重复下单");
        }
        //3.若有资格,把下单信息保存到阻塞队列
        idWorker=new RedisIdWorker(stringRedisTemplate);
        Long orderId = idWorker.nextId("order");
        VoucherOrder order = new VoucherOrder();
        //订单id
        order.setId(orderId);
        //用户id
        order.setUserId(UserHolder.getUser().getId());
        //秒杀券id
        order.setVoucherId(voucherId);

        //4. 发送订单消息到 RocketMQ
        rocketMQTemplate.convertAndSend("seckill-order-topic", order);

        //5.返回订单id
        return Result.ok(orderId);
    }

9.1.2 消费者:

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "seckill-order-topic",   // 订阅的 Topic:生产者发到这里的消息会被收到
        consumerGroup = "VOUCHER_ORDER_GROUP" // 消费组:保证消息只被一台消费者处理
)
public class VoucherOrderConsumer implements RocketMQListener<VoucherOrder> {
    @Autowired
    private VoucherOrderServiceImpl voucherOrderService;

    // 当 MQ 有消息到达时,自动回调这个方法
    @Override
    public void onMessage(VoucherOrder order) {
        log.info("接收到订单消息, orderId={}", order.getId());
        voucherOrderService.asyncCreateOrder(order);
    }
}

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇