黑马点评5
黑马点评5
秒杀优化
用户模拟
模拟1000个用户同时发送请求,也就是需要使用jmeter进行压力测试,需要编写代码获取1000个用户的token
生成1000个登录用户并生成token
@SpringBootTest
class HmDianPingApplicationTests {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private IUserService userService;
@Test
@Transactional
public void insertUser() {
final String filePath = "src/main/resources/user.txt";
final int count = 1000;
BufferedWriter writer;
try {
writer = new BufferedWriter(new FileWriter(filePath));
for (int i = 0; i < count; i++) {
String phone = "13" + RandomUtil.randomNumbers(9);
String token = this.login(phone);
writer.write(token);
writer.newLine();
}
writer.close();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
System.out.println("生成用户token完毕");
}
}
public String login(String phone) {
User user = new User();
user.setPhone(phone);
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
userService.save(user);
//生成token
String token = UUID.randomUUID().toString();
//将User对象转为HashMap存储
UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);
Map<String, Object> userMap = BeanUtil.beanToMap(userDTO, new HashMap<>(),
CopyOptions.create()
.setIgnoreNullValue(true)
.setFieldValueEditor((fieldName, fieldvalue) -> fieldvalue.toString())
);
//存储
String tokenKey = LOGIN_USER_KEY + token;
stringRedisTemplate.opsForHash().putAll(tokenKey, userMap);
//设置有效期
stringRedisTemplate.expire(tokenKey, LOGIN_USER_TTL, TimeUnit.MINUTES);
return token;
}
}
jmeter设置:
设置请求:
设置token
这里指定刚才生成token的位置
测试得到结果:
Redis优化秒杀
我们将耗时比较短的逻辑判断放入到redis中,比如是否库存足够,比如是否一人一单,这样的操作,只要这种逻辑可以完成,就意味着我们是一定可以下单完成的,我们只需要进行快速的逻辑判断,根本就不用等下单逻辑走完,我们直接给用户返回成功, 再在后台开一个线程,后台线程慢慢的去执行queue里边的消息。
当用户下单之后,判断库存是否充足只需要导redis中去根据key找对应的value是否大于0即可,如果不充足,则直接结束,如果充足,继续在redis中判断用户是否可以下单,如果set集合中没有这条数据,说明他可以下单,如果set集合中没有这条记录,则将userId和优惠卷存入到redis中,并且返回0,整个过程需要保证是原子性的,我们可以使用lua来操作
当以上判断逻辑走完之后,我们可以判断当前redis中返回的结果是否是0 ,如果是0,则表示可以下单,则将之前说的信息存入到到queue中去,然后返回,然后再来个线程异步的下单,前端可以通过返回的订单id来判断是否下单成功。
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
新增秒杀券的时候保存到redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
//保存秒杀到redis
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
}
创建lua脚本
--1.参数列表
--优惠卷id
local voucherId = ARGV[1]
--用户id
local userId = ARGV[2]
--数据库key
--库存key
local stockKey = "seckill:stock:" .. voucherId
--订单key
local orderKey = "seckill:order:" .. voucherId
--业务脚本
--判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
--库存 不足
return 1
end
--判断用户是否下单
if (redis.call("sismember", orderKey, userId) == 1) then
--存在,说明是重复下单
return 2
end
--扣库存
redis.call('incrby', stockKey, -1)
--下单,保存用户
redis.call('sadd', orderKey, userId)
return 0
初步修改抢优惠券 逻辑:
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Result seckillVoucher(Long voucherId) {
//执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString());
//判断结果是否为0
int r = result.intValue();
//0,没有购买资格
if (r != 0) {
return Result.fail((r == 1) ? "库存不足 " : "不能重复下单");
}
//1,有购买资格,生成订单,保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
//返回订单id
return Result.ok(orderId);
}
使用阻塞队列优化秒杀
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
VoucherOrder voucherOrder = orderTasks.take();
//6.生成订单
handleVoucherOrder(voucherOrder);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
IVoucherOrderService proxy;
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
//获取失败,返回错误或者 重试
log.error("获取锁失败,用户id:{}", userId);
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
lock.unlock();
}
}
创建订单:
@Override
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
// 一人一单
Long userId = voucherOrder.getUserId();
int count = this.query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
log.error("用户已经抢购过了,用户id:{}", userId);
return;
}
//5.扣减库存
boolean update = seckillVoucherService.update()
.setSql("stock=stock-1")
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0)
.update();
if (!update) {
//扣减失败
log.error("扣减库存失败,用户id:{}", userId);
return;
}
//创建订单
this.save(voucherOrder);
}
@Override
public Result seckillVoucher(Long voucherId) {
//执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString());
//判断结果是否为0
int r = result.intValue();
//0,没有购买资格
if (r != 0) {
return Result.fail((r == 1) ? "库存不足 " : "不能重复下单");
}
//1,有购买资格,生成订单,保存到阻塞队列
long orderId = redisIdWorker.nextId("order");
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
//放到阻塞队列中
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService) AopContext.currentProxy();
//返回订单id
return Result.ok(orderId);
}
使用Redis消息队列
什么是消息队列:字面意思就是存放消息的队列。最简单的消息队列模型包括3个角色:
- 消息队列:存储和管理消息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列获取消息并处理消息
也可以使用kafka,rabbitmq等等
基于List实现消息队列
Redis的list数据结构是一个双向链表,很容易模拟出队列效果。
两种用法:
- LPUSH 结合 RPOP
- RPUSH 结合 LPOP来实现。
要注意的是,当队列中没有消息时RPOP或LPOP操作会返回null,并不像JVM的阻塞队列那样会阻塞并等待消息。因此这里应该使用BRPOP或者BLPOP来实现阻塞效果。
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis的持久化机制,数据安全性有保证
- 可以满足消息有序性
缺点: - 无法避免消息丢失
- 只支持单消费者
基于PubSub顶消息队列
PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
SUBSCRIBE channel [channel]
:订阅一个或多个频道 PUBLISH channel msg
:向一个频道发送消息PSUBSCRIBE pattern[pattern]
:订阅与pattern格式匹配的所有频道
优点:
- 采用发布订阅模型,支持多生产、多消费
缺点: - 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失
基于Stream的消息队列
发送消息:
创建消息队列users,发送消息name=jack,age=18
,redis会自动生成ID
xadd users * name jack age 18
读消息:
读第一个消息
xread count 1 streams users 0
XREAD阻塞方式,读取最新的消息:
xread count 1 block 1000 streams users $
STREAM类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
基于Stream的消息队列-消费者组
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
删除指定的消费者组
XGROUP DESTORY key groupName
给指定的消费者组添加消费者 XGROUP CREATECONSUMER key groupname consumername
删除消费者组中的指定消费者 XGROUP DELCONSUMER key groupname consumername
从消费者组读取消息: XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
对比:
使用Stream
需求
- 创建一个Stream类型的消息队列,名为stream.orders
- 修改之前的秒杀下单Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包含voucherId、userId、orderId
- 项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单
修改lua脚本:
--1.参数列表
--优惠卷id
local voucherId = ARGV[1]
--用户id
local userId = ARGV[2]
--订单id
local orderId = ARGV[3]
--数据库key
--库存key
local stockKey = "seckill:stock:" .. voucherId
--订单key
local orderKey = "seckill:order:" .. voucherId
--业务脚本
--判断库存是否充足
if tonumber(redis.call('get', stockKey)) <= 0 then
--库存 不足
return 1
end
--判断用户是否下单
if (redis.call("sismember", orderKey, userId) == 1) then
--存在,说明是重复下单
return 2
end
--扣库存
redis.call('incrby', stockKey, -1)
--下单,保存用户
redis.call('sadd', orderKey, userId)
--发送消息到队列中 xadd stream.orders * k1 v1 k2 v2
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
发送消息
@Override
public Result seckillVoucher(Long voucherId) {
//执行lua脚本,得到购买资格
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nextId("order");
Long result = stringRedisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId));
//判断结果是否为0
int r = result.intValue();
//0,没有购买资格
if (r != 0) {
return Result.fail((r == 1) ? "库存不足 " : "不能重复下单");
}
//1,有购买资格,生成订单,保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
//放到阻塞队列中
orderTasks.add(voucherOrder);
proxy = (IVoucherOrderService) AopContext.currentProxy();
//返回订单id
return Result.ok(orderId);
}
修改后业务:
private class VoucherOrderHandler implements Runnable {
final String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try {
//获取 消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//判断是否获取成功
if (list == null || list.isEmpty()) {
//失败,重试
continue;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//成功,生成订单
handleVoucherOrder(voucherOrder);
//ACK确认 sack stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理订单失败", e);
handlePendingList();
}
}
}
private void handlePendingList() {
while (true) {
try {
//获取 消息队列中的订单信息 xreadgroup group g1 c1 count 1 block 2000 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//判断是否获取成功
if (list == null || list.isEmpty()) {
//失败,结束
break;
}
//解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
//成功,生成订单
handleVoucherOrder(voucherOrder);
//ACK确认 sack stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理pendinglist异常", e);
}
}
}
}