跳至主要內容

黑马点评5

项目实战黑马点评项目实战黑马点评大约 9 分钟约 2775 字全民制作人ikun

黑马点评5

秒杀优化

用户模拟

模拟1000个用户同时发送请求,也就是需要使用jmeter进行压力测试,需要编写代码获取1000个用户的token

生成1000个登录用户并生成token


@SpringBootTest
class HmDianPingApplicationTests {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private IUserService userService;

    @Test
    @Transactional
    public void insertUser() {
        final String filePath = "src/main/resources/user.txt";
        final int count = 1000;
        BufferedWriter writer;
        try {
            writer = new BufferedWriter(new FileWriter(filePath));
            for (int i = 0; i < count; i++) {
                String phone = "13" + RandomUtil.randomNumbers(9);
                String token = this.login(phone);
                writer.write(token);
                writer.newLine();
            }
            writer.close();

        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            System.out.println("生成用户token完毕");
        }
    }

    public String login(String phone) {
        User user = new User();
        user.setPhone(phone);
        user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
        userService.save(user);
        //生成token
        String token = UUID.randomUUID().toString();
        //将User对象转为HashMap存储
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
        Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
                CopyOptions.create()
                        .setIgnoreNullValue(true)
                        .setFieldValueEditor((fieldName, fieldvalue) -> fieldvalue.toString())
        );
        //存储
        String tokenKey = LOGIN_USER_KEY + token;
        stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
        //设置有效期
        stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
        return token;
    }


}

image.png
image.png

jmeter设置:

设置请求:
image.png

设置token
image.png

这里指定刚才生成token的位置
image.png
测试得到结果:
image.png

Redis优化秒杀

我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息。

1653561657295.png
1653561657295.png

当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作
当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
1653562234886.png

需求:

  • 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
  • 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  • 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  • 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

新增秒杀券的时候保存到redis中

    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存秒杀到redis
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
    }

创建lua脚本

--1.参数列表
--优惠卷id
local voucherId = ARGV[1]
--用户id
local userId = ARGV[2]

--数据库key
--库存key
local stockKey = "seckill:stock:" .. voucherId
--订单key
local orderKey = "seckill:order:" .. voucherId

--业务脚本
--判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
    --库存 不足
    return 1
end
--判断用户是否下单
if (redis.call("sismember", orderKey, userId) == 1) then
    --存在,说明是重复下单
    return 2
end
--扣库存
redis.call('incrby', stockKey, -1)
--下单,保存用户
redis.call('sadd', orderKey, userId)
return 0

初步修改抢优惠券 逻辑:

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    @Override
    public Result seckillVoucher(Long voucherId) {
        //执行lua脚本,得到购买资格
        Long userId = UserHolder.getUser().getId();
        Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
                voucherId.toString(), userId.toString());
        //判断结果是否为0
        int r = result.intValue();
        //0,没有购买资格
        if (r != 0) {
            return Result.fail((r == 1) ? "库存不足 " : "不能重复下单");
        }
        //1,有购买资格,生成订单,保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");

        //返回订单id
        return Result.ok(orderId);
    }

使用阻塞队列优化秒杀

	private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);

    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    @PostConstruct
    private void init() {
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    VoucherOrder voucherOrder = orderTasks.take();
                    //6.生成订单
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    IVoucherOrderService proxy;

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
//        SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
        Long userId = voucherOrder.getUserId();
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        boolean isLock = lock.tryLock();
        if (!isLock) {
            //获取失败,返回错误或者 重试
            log.error("获取锁失败,用户id:{}", userId);
            return;
        }
        try {
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }

创建订单:

    @Override
    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        // 一人一单
        Long userId = voucherOrder.getUserId();
        int count = this.query().eq("user_id", userId)
                .eq("voucher_id", voucherOrder.getVoucherId()).count();
        if (count > 0) {
            log.error("用户已经抢购过了,用户id:{}", userId);
            return;
        }
        //5.扣减库存
        boolean update = seckillVoucherService.update()
                .setSql("stock=stock-1")
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock", 0)
                .update();
        if (!update) {
            //扣减失败
            log.error("扣减库存失败,用户id:{}", userId);
            return;
        }
        //创建订单
        this.save(voucherOrder);
    }
    @Override
    public Result seckillVoucher(Long voucherId) {
        //执行lua脚本,得到购买资格
        Long userId = UserHolder.getUser().getId();
        Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
                voucherId.toString(), userId.toString());
        //判断结果是否为0
        int r = result.intValue();
        //0,没有购买资格
        if (r != 0) {
            return Result.fail((r == 1) ? "库存不足 " : "不能重复下单");
        }
        //1,有购买资格,生成订单,保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        //放到阻塞队列中
        orderTasks.add(voucherOrder);
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        //返回订单id
        return Result.ok(orderId);
    }

使用Redis消息队列

什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:

  • 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列获取消息并处理消息
1653574849336.png
1653574849336.png

也可以使用kafka,rabbitmq等等

基于List实现消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
1653575176451.png
两种用法:

  • LPUSH 结合 RPOP
  • RPUSH 结合 LPOP来实现。
    要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
    优点:
  • 利用Redis存储,不受限于JVM内存上限
  • 基于Redis的持久化机制,数据安全性有保证
  • 可以满足消息有序性
    缺点:
  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub顶消息队列

PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel] :订阅一个或多个频道
PUBLISH channel msg :向一个频道发送消息
PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

1653575506373.png
1653575506373.png

优点:

  • 采用发布订阅模型,支持多生产、多消费
    缺点:
  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失

基于Stream的消息队列

发送消息:
创建消息队列users,发送消息name=jack,age=18,redis会自动生成ID

xadd users * name jack age 18

读消息:
读第一个消息

xread count 1 streams users 0

XREAD阻塞方式,读取最新的消息:

xread count 1 block 1000 streams users $

STREAM类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

基于Stream的消息队列-消费者组

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
1653577801668.png
删除指定的消费者组
 XGROUP DESTORY key groupName
给指定的消费者组添加消费者
 XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者
 XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息
 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
    对比:
1653578560691.png
1653578560691.png

使用Stream

需求

  • 创建一个Stream类型的消息队列,名为stream.orders
  • 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
  • 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单

修改lua脚本:

--1.参数列表
--优惠卷id
local voucherId = ARGV[1]
--用户id
local userId = ARGV[2]
--订单id
local orderId = ARGV[3]

--数据库key
--库存key
local stockKey = "seckill:stock:" .. voucherId
--订单key
local orderKey = "seckill:order:" .. voucherId

--业务脚本
--判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
    --库存 不足
    return 1
end
--判断用户是否下单
if (redis.call("sismember", orderKey, userId) == 1) then
    --存在,说明是重复下单
    return 2
end
--扣库存
redis.call('incrby', stockKey, -1)
--下单,保存用户
redis.call('sadd', orderKey, userId)
--发送消息到队列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0

发送消息

    @Override
    public Result seckillVoucher(Long voucherId) {
        //执行lua脚本,得到购买资格
        Long userId = UserHolder.getUser().getId();
        long orderId = redisIdWorker.nextId("order");
        Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId));
        //判断结果是否为0
        int r = result.intValue();
        //0,没有购买资格
        if (r != 0) {
            return Result.fail((r == 1) ? "库存不足 " : "不能重复下单");
        }
        //1,有购买资格,生成订单,保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        //放到阻塞队列中
        orderTasks.add(voucherOrder);
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        //返回订单id
        return Result.ok(orderId);
    }

修改后业务:

private class VoucherOrderHandler implements Runnable {
        final String queueName = "stream.orders";

        @Override
        public void run() {
            while (true) {
                try {
                    //获取 消息队列中的订单信息   xreadgroup group g1  c1 count 1 block 2000 streams streams.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //判断是否获取成功
                    if (list == null || list.isEmpty()) {
                        //失败,重试
                        continue;
                    }
                    //解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    //成功,生成订单
                    handleVoucherOrder(voucherOrder);
                    //ACK确认 sack stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理订单失败", e);
                    handlePendingList();
                }
            }
        }

        private void handlePendingList() {
            while (true) {
                try {
                    //获取 消息队列中的订单信息   xreadgroup group g1  c1 count 1 block 2000 streams streams.order >
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    //判断是否获取成功
                    if (list == null || list.isEmpty()) {
                        //失败,结束
                        break;
                    }
                    //解析消息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> value = record.getValue();
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                    //成功,生成订单
                    handleVoucherOrder(voucherOrder);
                    //ACK确认 sack stream.orders g1 id
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理pendinglist异常", e);
                }
            }
        }
    }
上次编辑于:
贡献者: yunfeidog