商品分为热点商品抢单和非热点商品抢单,因此此系统中抢单模式并非一种。
3.1 抢单分析
如上图,用户登录后经过nginx,进行抢单,此时会先判断商品是否是热点商品,如果是非热点商品,则直接调用订单系统进行下单操作,如果是热点商品,则向Kafka生产消息进行排队下单,订单系统会订阅排队下单信息,这样可以降低服务器所直接承受的抢单压力,这种操作也叫队列削峰。
3.2 非热点商品抢单
我们在订单系统中实现非热点商品抢单操作,非热点商品只用在订单系统中实现抢单即可,但抢单的时候要注意这么几个问题:
1.先递减库存2.库存递减成功后,执行下单3.下单失败,需要实现分布式事务4.下单成功后,要记录用户抢单信息,在24小时内不允许再抢该商品5.抢单中,有可能存在抢购的商品正好变成了热点商品,此时应该走排队的方式抢单,否则商品数量会发生不精准问题
3.2.1 库存递减
库存递减我们需要
/*** * 递减库存 * @param id * @param count * @return */@Update("UPDATE tb_sku SET seckill_num=seckill_num-#{count} WHERE id=#{id} AND islock=1 AND seckill_num>=#{count}") int dcount(@Param("id") String id,@Param("count") Integer count);
这里我们需要控制数据的原子性,因此不能在内存中进行操作,需要用SQL语句在数据库中执行。
1)库存递减
修改seckill-goods的
com.seckill.goods.service.SkuService添加库存递减方法,代码如下:
/*** * 库存递减 * @param id * @param count * @return */int dcount(String id, Integer count);
修改seckill-goods的
com.seckill.goods.service.impl.SkuServiceImpl添加库存递减实现方法,代码如下:
/*** * 库存递减 * @param id * @param count * @return */@Overridepublic int dcount(String id, Integer count) { int dcount = skuMapper.dcount(id,count); if(dcount<=0){ //递减失败,查询状态 Sku sku = skuMapper.selectByPrimaryKey(id); if(sku.getIslock()==2){ return 205; } if(sku.getSeckillNum()<count){ return 405; } } return dcount; }
修改
com.seckill.goods.controller.SkuController添加库存递减调用方法,代码如下:
/*** * Sku数量递减 * @return */@PutMapping(value = "/dcount/{id}/{count}" )public Result<Sku> dcount(@PathVariable(value = "id")String id,@PathVariable(value = "count")Integer count){ int code = skuService.dcount(id,count); String message = ""; Sku sku = null; switch (code){ case 1: message="库存削减成功"; sku = skuService.findById(id); break; case 405: message="库存不足"; break; case 205: message="该商品为热点商品"; break; default: } return new Result<Sku>(true,code,message,sku); }
2)SkuFeign配置
修改
com.seckill.goods.feign.SkuFeign,添加库存递减方法,代码如下:
/*** * Sku数量递减 * @return */@PutMapping(value = "/sku/dcount/{id}/{count}" ) Result<Sku> dcount(@PathVariable(value = "id")String id, @PathVariable(value = "count")Integer count);
3.2.2 抢单实现
当库存递减成功后,需要给用户直接下单,如果递减不成功,会出现商品变成热卖商品的现象,我们需要向Kafka发送队列数据,所以需要引入Kafka配置。
bootstrap.yml配置kafka:
kafka: producer: acks: all #acks:消息的确认机制,默认值是0, acks=0:如果设置为0,生产者不会等待kafka的响应。 acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。 retries: 0 #发送失败重试次数,配置为大于0的值的话,客户端会在消息发送失败时重新发送。 batch-size: 16384 #当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。 buffer-memory: 33554432 #即32MB的批处理缓冲区 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: kafka-server:9092 #如果kafka启动错误,打开debug级别日志,出现Can't resolve address: flink:9092 的错误,需要在 windows下修改IP映射即可, C:\Windows\System32\drivers\etc\hosts, 192.168.234.128 flink。 consumer: group-id: test auto-offset-reset: latest #(1)earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费;(2)latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 ;(3)none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 enable-auto-commit: true #如果为true,消费者的偏移量将在后台定期提交。 auto-commit-interval: 1000 #消费者偏移自动提交给Kafka的频率 (以毫秒为单位),默认值为5000 max-poll-records: 5 #一次拉起的条数 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer bootstrap-servers: kafka-server:9092
修改seckill-order的
com.seckill.order.service.impl.OrderServiceImpl的add方法,代码如下:
/** * 增加Order * @param order */@Overridepublic int add(Order order){ //1.库存递减 Result<Sku> result = skuFeign.dcount(order.getId(), 1); //2.递减成功,下单 if(result.getCode()==1){ //商品秒杀价格 Sku sku = result.getData(); order.setName(sku.getName()); order.setPrice(sku.getSeckillPrice()); order.setId("No"+idWorker.nextId()); order.setOrderStatus("0"); order.setPayStatus("0"); order.setConsignStatus("0"); orderMapper.insertSelective(order); }else if(result.getCode()==205){//3.商品转入了热点商品,排队 //检查用户是否在排队 String key = "SKU_"+order.getSkuId(); String userKey = "USER"+order.getUsername()+"ID"+order.getSkuId(); //如果为true,则表示用户正在排队 Boolean bo = redisTemplate.boundHashOps(key).hasKey(userKey); if(!bo){ Map<String,String> orderMap = new HashMap<String,String>(); orderMap.put("username",order.getUsername()); orderMap.put("id",order.getSkuId()); //抢单排队 kafkaTemplate.send("neworder", JSON.toJSONString(orderMap)); } return 202; } return result.getCode(); }
修改
com.seckill.order.controller.OrderController的add方法,代码如下:
/*** * 新增Order数据 * @return */@GetMapping(value = "/{id}")public Result add(@RequestHeader(value = "Authorization")String authorization,//令牌 @PathVariable(value = "id")String id) //商品ID{ //解析令牌 Map<String,Object> tokenMap = JwtTokenUtil.parseToken(authorization); Order order = new Order(); order.setCreateTime(new Date()); order.setUpdateTime(order.getCreateTime()); order.setSkuId(id); order.setUsername(tokenMap.get("username").toString()); //调用OrderService实现添加Order int code = orderService.add(order); String message = ""; switch (code){ case 405: message="库存不足"; break; case 200: message="抢购成功"; break; case 202: message="正在排队"; break; case 1: message="抢单成功"; break; default: } return new Result(true, StatusCode.OK,message); }
3.2.3 抢单测试
编写lua脚本控制抢单,当用户处于已登录状态,则执行下单,创建脚本seckill-order-add.lua,脚本如下:
ngx.header.content_type="application/json;charset=utf8"--引入json库local cjson = require "cjson"--引入jwt模块local jwttoken = require "token"--获取请求头中的令牌数据local auth_header = ngx.var.http_Authorization--调用令牌校验local result = jwttoken.check(auth_header)--如果code==200表示令牌校验通过if result.code==200 then --获取id local uri_args = ngx.req.get_uri_args() local id = uri_args["id"] --拼接url local url = "/api/order/"..id --执行请求 ngx.exec(url)else -- 输出结果 ngx.say(cjson.encode(result)) ngx.exit(result.code)end
在nginx.conf中添加api以及抢单请求路径的路由,配置如下:
#抢单location /lua/order/add { content_by_lua_file /usr/local/openresty/nginx/lua/seckill-order-add.lua; }#微服务网关location /api/ { proxy_pass http://192.168.1.5:8001; }
注意如果是POST提交,Lua脚本中需要读取请求体数据:
1 2 3ngx.req.read_body()ngx.req.get_post_args()
3.3 热点商品抢单
上面我们完成了非热点商品抢单,接着我们实现以下热点商品抢单。热点商品和非热点商品不一样,热点商品已经隔离出来,在Redis缓存中,并且热点商品抢单要实现高效率操作而且还能抗压,Nginx的并发能力远远超越tomcat,因此热点商品抢单我们可以使用Lua+Nginx。
3.3.1 抢单流程分析
用户进入抢单流程,通过Lua脚本判断令牌是否有效,如果有效,则进入抢单环节,抢单环节执行过程我们做一个分析:
1.判断该商品用户是否在24小时内购买过2.如果购买了,直接提示用户24小时内无法购买3.如果用户没有购买过该商品,则判断该商品是否属于热点商品4.如果是非热点商品,则走非热点商品抢单流程5.如果是热点商品,则走热点商品抢单流程6.判断该商品用户是否已经排队,如果没有排队,则进入排队,如果已经排队,则提示用户正在排队7.下单过程交给订单系统,订单系统通过队列订阅读取用户下单信息,并进行下单
3.3.2 Lua实现Redis操作
判断用户是否在24小时内抢购过该商品,我们可以将用户抢单信息存入到Redis缓存中,定时24小时过期即可,此时需要在Lua里面实现Redis集群操作,需要第三方库的支持lua-resty-redis-cluster。
我们需要安装lua-resty-redis-cluster,下载地址:<
https://github.com/cuiweixie/lua-resty-redis-cluster>,下载该文件配置后即可实现Redis集群操作。
3.3.2.1 配置lua-resty-redis-cluster
1)lua-resty-redis-cluster配置
将下载好的文件上传到服务器的`/usr/local/openresty目录下,并解压,我们只需要用到包中2个文件rediscluster.lua和redis_slot.c。
将
lua-resty-redis-cluster/lib/redis_slot.c 拷贝到 openresty/lualib 目录下,将
lua-resty-redis-cluster/lib/resty/rediscluster.lua 拷贝到 openresty/lualib/resty 目录下。
拷贝redis_slot.c:
cd /usr/local/openresty/lua-resty-redis-cluster-master/lib/ cp redis_slot.c /usr/local/openresty/lualib/
拷贝rediscluster.lua:
cd /usr/local/openresty/lua-resty-redis-cluster-master/lib/resty cp rediscluster.lua /usr/local/openresty/lualib/resty/
编译:
cd /usr/local/openresty/lualib gcc redis_slot.c -fPIC -shared -o libredis_slot.so
编译的时候有可能会发生如下错误
错误解决:
解决:应该是lua版本不对,自带的lua应该不好使 方式一:删除自带的lua,一般是/usr/lua和/usr/luac ,删除这两个文件 方式二:yum install lua-devel 下载一个依赖方式三:自己重新再lua官网下载一个lua,重新安装一个lua(这个很好使)
这里我们可以选择第二种方式解决,再次进行编译就没问题了。
2)指令配置
lua-resty-redis-cluster中有部分redis指令并未开放,我们可以手动修改,开放相关指令,我们这里开放过期指令,因为后面会用到该指令。
修改
/usr/local/openresty/lualib/resty/rediscluster.lua文件,添加相关指令,如下图:
3.3.2.2 操作Redis集群实现
以后别的地方也有可能会用到redis,我们可以写个工具类redis-cluster.lua,实现redis的操作,这里主要实现了根据key获取缓存数据、根据key设置缓存过期时间、根据key从hash类型中获取数据、往hash类型中添加数据,代码如下:
--redis连接配置local config = { name = "test", serv_list = { {ip="192.168.211.137", port = 7001}, {ip="192.168.211.137", port = 7002}, {ip="192.168.211.137", port = 7003}, {ip="192.168.211.137", port = 7004}, {ip="192.168.211.137", port = 7005}, {ip="192.168.211.137", port = 7006}, }, idle_timeout = 1000, pool_size = 10000, }--引入redis集群配置local redis_cluster = require "resty.rediscluster"--定义一个对象local lredis = {}--根据key查询function lredis.get(key) --创建链接 local red = redis_cluster:new(config) red:init_pipeline() --根据key获取数据 red:get(key) local rresult = red:commit_pipeline() --关闭链接 red:close() return rresultend--添加带过期的数据function lredis.setexp(key,value,time) --创建链接 local red = redis_cluster:new(config) red:init_pipeline() --添加key,同时设置过期时间 red:set(key,value) red:expire(key,time) local rresult = red:commit_pipeline()end--根据key查询hashfunction lredis.hget(key1,key2) --创建链接 local red = redis_cluster:new(config) red:init_pipeline() --根据key获取数据 red:hmget(key1,key2) local rresult = red:commit_pipeline() --关闭链接 red:close() return rresult[1]end--hash数据添加function lredis.hset(key1,key2,value) --创建链接 local red = redis_cluster:new(config) red:init_pipeline() --添加hash数据 red:hmset(key1,key2,value) local rresult = red:commit_pipeline() --关闭链接 red:close() return rresultend--hash中指定的key自增function lredis.hincrby(key1,key2,value) --创建链接 local red = redis_cluster:new(config) red:init_pipeline() --添加hash数据 red:hincrby(key1,key2,value) local rresult = red:commit_pipeline() --关闭链接 red:close() return rresultendreturn lredis
我们接着来测试一次集群操作,修改nginx.conf,配置一个location节点,如下:
#redislocation /test/redis { content_by_lua ' ngx.header.content_type="application/json;charset=utf8" --引入redis local rredis = require "redis-cluster" --从redis中查询hash类型数据 local sku = rredis.hget("SKU_S1235433012716498944","num")[1] ngx.say(sku) '; }
测试效果如下:
3.3.3 Lua实现Kafka操作
用户抢单的时候,如果是热点商品,这时候需要实现用户排队,用户排队我们需要向kafka发送抢单信息,因此需要使用Lua脚本操作kafka,我们需要依赖lua-restry-kafka库,该库我们也已经配置使用过了,所以这里无需再配置了。
以后使用kafka的地方也有可能会有很多,所以针对kafka我们也可以单独抽取出一个配置脚本,创建一个脚本名字叫’kafka.lua’,用于配置kafka的操作,代码如下:
--创建对象local kafka={}--kafka依赖库local client = require "resty.kafka.client"local producer = require "resty.kafka.producer"--配置kafka的链接地址local broker_list = { { host = "192.168.211.137", port = 9092 } }--发送消息--queuename:队列名字--content:发送的内容function kafka.send(queuename,content) --创建生产者 local pro = producer:new(broker_list,{ producer_type="async"}) --发送消息 local offset, err = pro:send(queuename, nil, content) --返回结果 return offsetendreturn kafka
编写一段代码向kafka发送信息,修改nginx.conf,添加如下代码:
#kafkalocation /test/kafka { content_by_lua ' ngx.header.content_type="application/json;charset=utf8" --引入kafka local kafka = require "kafka" --发送消息 local offset = kafka.send("demo","hello") ngx.say(offset) '; }
测试效果如下:
3.3.4 抢单实现
抢单这里分为2部分,首先需要向Kafka发送抢单信息实现排队,排队后,订单系统订阅抢单信息实现下单操作,所有的数据操作一律在Redis中完成,降低程序对服务器的压力。
3.3.4.1 排队
排队抢单需要引入redis和kafka,我们的实现思路如下:
1.校验用户令牌,如果不通过直接结束程序提示用户2.令牌校验通过,从Redis中获取用户在24小时内是抢购过该商品,如果抢购过直接结束程序并提示用户3.如果符合购买该商品条件,则校验该商品是否是热点商品,如果不是,直接请求后台下单4.如果是热点商品,并且库存>0,校验用户是否已经在排队,使用redis的incr自增判断排队次数可以去除重复排队5.如果没有排队,则向Kafka发送消息实现排队
创建seckill-order-add.lua,实现代码如下:
ngx.header.content_type="application/json;charset=utf8"--引入json库local cjson = require "cjson"--引入jwt模块local jwttoken = require "token"--引入redislocal redis = require "redis-cluster"--引入kafkalocal kafka = require "kafka"--获取请求头中的令牌数据local auth_header = ngx.var.http_Authorization--调用令牌校验local result = jwttoken.check(auth_header)--如果code==200表示令牌校验通过if result.code==200 then--响应结果local response = {}--获取idlocal uri_args = ngx.req.get_uri_args()local id = uri_args["id"]--判断该商品用户是否已经在指定时间内购买过local username = result["body"]["payload"]["username"]local userKey= "USER"..username.."ID"..idlocal hasbuy = redis.get(userKey)--如果没有购买,则判断该商品是否是热点商品if hasbuy==nil or hasbuy[1]==nil or hasbuy[1]==ngx.null then--从redis中获取该商品信息local num = redis.hget("SKU_"..id,"num")[1]--如果不是热点商品,则走普通抢单流程if num==nil or num==ngx.null then--拼接urllocal url = "/api/order/"..id--执行请求 ngx.exec(url)returnelse--热点商品 num = tonumber(num)--如果有库存,才允许抢单if num<=0 then --库存不足,无法排队 response["code"]=405 response["message"]="当前商品库存不足,无法抢购" ngx.say(cjson.encode(response))returnelse--递增排队local incrresult = redis.hincrby("SKU_"..id,userKey,1) incrresult=tonumber(incrresult) if incrresult==1 then --热点数据,发送MQ排队 local userorder = {} userorder["username"]=username userorder["id"]=id--排队抢单 kafka.send("neworder",cjson.encode(userorder)) response["code"]=202 response["message"]="您正在排队抢购该商品" ngx.say(cjson.encode(response))returnelse--响应用户正在排队抢购该商品 response["code"]=202 response["message"]="您正在排队抢购该商品" ngx.say(cjson.encode(response))returnendendendelse--24小时内购买过该商品 response["code"]=415 response["message"]="您24小时内已经抢购了该商品,不能重复抢购" ngx.say(cjson.encode(response))returnendelse-- 输出结果 ngx.say(cjson.encode(result)) ngx.exit(result.code)end
3.3.4.2 下单实现
创建
com.seckill.order.config.KafkaOrderListener,用于读取排队信息,并调用下单操作,代码如下:
@Componentpublic class KafkaOrderListener { @Autowired private OrderService orderService; /*** * 监听消息 * 创建订单 * @param message */ @KafkaListener(topics = {"neworder"}) public void receive(String message){ //将消息转成Map Map<String,String> orderMap = JSON.parseObject(message,Map.class); //创建订单 orderService.addHotOrder(orderMap); } }
修改
com.seckill.order.service.OrderService添加热点数据下单方法,代码如下:
/*** * 热点数据下单 * @param orderMap */void addHotOrder(Map<String, String> orderMap);
修改
com.seckill.order.service.impl.OrderServiceImpl添加热点数据下单实现方法,代码如下:
/*** * 秒杀下单 * @param orderMap */@Override public void addHotOrder(Map<String, String> orderMap) { String id = orderMap.get("id"); String username = orderMap.get("username"); //key String key = "SKU_" + id; //用户购买的key String userKey = "USER" + username + "ID" + id; if (redisTemplate.hasKey(key)) { //数量 Integer num = Integer.parseInt(redisTemplate.boundHashOps(key).get("num").toString()); //拥有库存,执行递减操作 if (num > 0) { //查询商品 Result<Sku> result = skuFeign.findById(id); Sku sku = result.getData(); Order order = new Order(); order.setCreateTime(new Date()); order.setUpdateTime(order.getCreateTime()); order.setUsername(username); order.setSkuId(id); order.setName(sku.getName()); order.setPrice(sku.getSeckillPrice()); order.setId("No" + idWorker.nextId()); order.setOrderStatus("0"); order.setPayStatus("0"); order.setConsignStatus("0"); orderMapper.insertSelective(order); //库存递减 num--; if (num == 0) { //同步数据到数据库,秒杀数量归零 skuFeign.zero(id); } //更新数据 Map<String, Object> dataMap = new HashMap<String, Object>(); dataMap.put("num", num); dataMap.put(userKey, 0); //存数据 redisTemplate.boundHashOps(key).putAll(dataMap); } //记录该商品用户24小时内无法再次购买,测试环境,我们只配置成1分钟 redisTemplate.boundValueOps(userKey).set(""); redisTemplate.expire(userKey, 1, TimeUnit.MINUTES); } }