当前位置:首页 » 引流推广 » 正文

网上抢单平台是干嘛的,派单好还是抢单好,秒杀-用户下单实现

5887 人参与  2024年04月16日 01:00  分类 : 引流推广  评论

商品分为热点商品抢单和非热点商品抢单,因此此系统中抢单模式并非一种。

3.1 抢单分析


如上图,用户登录后经过nginx,进行抢单,此时会先判断商品是否是热点商品,如果是非热点商品,则直接调用订单系统进行下单操作,如果是热点商品,则向Kafka生产消息进行排队下单,订单系统会订阅排队下单信息,这样可以降低服务器所直接承受的抢单压力,这种操作也叫队列削峰。

3.2 非热点商品抢单

我们在订单系统中实现非热点商品抢单操作,非热点商品只用在订单系统中实现抢单即可,但抢单的时候要注意这么几个问题:

1.先递减库存2.库存递减成功后,执行下单3.下单失败,需要实现分布式事务4.下单成功后,要记录用户抢单信息,在24小时内不允许再抢该商品5.抢单中,有可能存在抢购的商品正好变成了热点商品,此时应该走排队的方式抢单,否则商品数量会发生不精准问题

3.2.1 库存递减

库存递减我们需要

/***
* 递减库存
* @param id
* @param count
* @return
*/@Update("UPDATE tb_sku SET seckill_num=seckill_num-#{count} WHERE id=#{id} AND islock=1 AND seckill_num>=#{count}")
int dcount(@Param("id") String id,@Param("count") Integer count);

这里我们需要控制数据的原子性,因此不能在内存中进行操作,需要用SQL语句在数据库中执行。

1)库存递减

修改seckill-goods的
com.seckill.goods.service.SkuService添加库存递减方法,代码如下:

/***
 * 库存递减
 * @param id
 * @param count
 * @return
 */int dcount(String id, Integer count);

修改seckill-goods的
com.seckill.goods.service.impl.SkuServiceImpl添加库存递减实现方法,代码如下:

/***
 * 库存递减
 * @param id
 * @param count
 * @return
 */@Overridepublic int dcount(String id, Integer count) {    int dcount = skuMapper.dcount(id,count);    if(dcount<=0){        //递减失败,查询状态
        Sku sku = skuMapper.selectByPrimaryKey(id);        if(sku.getIslock()==2){            return 205;
        }        if(sku.getSeckillNum()<count){            return 405;
        }
    }    return dcount;
}

修改
com.seckill.goods.controller.SkuController添加库存递减调用方法,代码如下:

/***
 * Sku数量递减
 * @return
 */@PutMapping(value = "/dcount/{id}/{count}" )public Result<Sku> dcount(@PathVariable(value = "id")String id,@PathVariable(value = "count")Integer count){    int code = skuService.dcount(id,count);
    String message = "";
    Sku sku = null;    switch (code){        case 1:
            message="库存削减成功";
            sku = skuService.findById(id);            break;        case 405:
            message="库存不足";            break;        case 205:
            message="该商品为热点商品";            break;            default:
    }    return new Result<Sku>(true,code,message,sku);
}

2)SkuFeign配置

修改
com.seckill.goods.feign.SkuFeign,添加库存递减方法,代码如下:

/***
 * Sku数量递减
 * @return
 */@PutMapping(value = "/sku/dcount/{id}/{count}" )
Result<Sku> dcount(@PathVariable(value = "id")String id, @PathVariable(value = "count")Integer count);

3.2.2 抢单实现

当库存递减成功后,需要给用户直接下单,如果递减不成功,会出现商品变成热卖商品的现象,我们需要向Kafka发送队列数据,所以需要引入Kafka配置。

bootstrap.yml配置kafka:

  kafka:    producer:      acks: all #acks:消息的确认机制,默认值是0, acks=0:如果设置为0,生产者不会等待kafka的响应。 acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。 acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。      retries: 0 #发送失败重试次数,配置为大于0的值的话,客户端会在消息发送失败时重新发送。      batch-size: 16384 #当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。      buffer-memory: 33554432 #即32MB的批处理缓冲区      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer      bootstrap-servers: kafka-server:9092 #如果kafka启动错误,打开debug级别日志,出现Can't resolve address: flink:9092 的错误,需要在 windows下修改IP映射即可, C:\Windows\System32\drivers\etc\hosts, 192.168.234.128 flink。    consumer:      group-id: test      auto-offset-reset: latest #(1)earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费;(2)latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 ;(3)none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常      enable-auto-commit:  true  #如果为true,消费者的偏移量将在后台定期提交。      auto-commit-interval: 1000 #消费者偏移自动提交给Kafka的频率 (以毫秒为单位),默认值为5000      max-poll-records: 5 #一次拉起的条数      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      bootstrap-servers: kafka-server:9092

修改seckill-order的
com.seckill.order.service.impl.OrderServiceImpl的add方法,代码如下:

/**
 * 增加Order
 * @param order
 */@Overridepublic int add(Order order){    //1.库存递减
    Result<Sku> result = skuFeign.dcount(order.getId(), 1);    //2.递减成功,下单    if(result.getCode()==1){        //商品秒杀价格
        Sku sku = result.getData();
        order.setName(sku.getName());
        order.setPrice(sku.getSeckillPrice());
        order.setId("No"+idWorker.nextId());
        order.setOrderStatus("0");
        order.setPayStatus("0");
        order.setConsignStatus("0");
        orderMapper.insertSelective(order);
    }else if(result.getCode()==205){//3.商品转入了热点商品,排队        //检查用户是否在排队        String key = "SKU_"+order.getSkuId();        String userKey = "USER"+order.getUsername()+"ID"+order.getSkuId();        //如果为true,则表示用户正在排队        Boolean bo = redisTemplate.boundHashOps(key).hasKey(userKey);        if(!bo){
            Map<String,String> orderMap = new HashMap<String,String>();
            orderMap.put("username",order.getUsername());
            orderMap.put("id",order.getSkuId());            //抢单排队
            kafkaTemplate.send("neworder", JSON.toJSONString(orderMap));
        }        return 202;
    }    return result.getCode();
}

修改
com.seckill.order.controller.OrderController的add方法,代码如下:

/***
 * 新增Order数据
 * @return
 */@GetMapping(value = "/{id}")public Result add(@RequestHeader(value = "Authorization")String authorization,//令牌                  @PathVariable(value = "id")String id) //商品ID{    //解析令牌
    Map<String,Object> tokenMap = JwtTokenUtil.parseToken(authorization);
    Order order = new Order();
    order.setCreateTime(new Date());
    order.setUpdateTime(order.getCreateTime());
    order.setSkuId(id);
    order.setUsername(tokenMap.get("username").toString());    //调用OrderService实现添加Order
    int code = orderService.add(order);    String message = "";    switch (code){        case 405:
            message="库存不足";            break;        case 200:
            message="抢购成功";            break;        case 202:
            message="正在排队";            break;        case 1:
            message="抢单成功";            break;        default:
    }    return new Result(true, StatusCode.OK,message);
}

3.2.3 抢单测试

编写lua脚本控制抢单,当用户处于已登录状态,则执行下单,创建脚本seckill-order-add.lua,脚本如下:

ngx.header.content_type="application/json;charset=utf8"--引入json库local cjson = require "cjson"--引入jwt模块local jwttoken = require "token"--获取请求头中的令牌数据local auth_header = ngx.var.http_Authorization--调用令牌校验local result = jwttoken.check(auth_header)--如果code==200表示令牌校验通过if result.code==200 then        --获取id        local uri_args = ngx.req.get_uri_args()        local id = uri_args["id"]        --拼接url        local url = "/api/order/"..id        --执行请求
        ngx.exec(url)else        -- 输出结果
        ngx.say(cjson.encode(result))
        ngx.exit(result.code)end

在nginx.conf中添加api以及抢单请求路径的路由,配置如下:

#抢单location /lua/order/add {    content_by_lua_file /usr/local/openresty/nginx/lua/seckill-order-add.lua;
}#微服务网关location /api/ {    proxy_pass http://192.168.1.5:8001;
}

注意如果是POST提交,Lua脚本中需要读取请求体数据:

1
2
3ngx.req.read_body()ngx.req.get_post_args()

3.3 热点商品抢单

上面我们完成了非热点商品抢单,接着我们实现以下热点商品抢单。热点商品和非热点商品不一样,热点商品已经隔离出来,在Redis缓存中,并且热点商品抢单要实现高效率操作而且还能抗压,Nginx的并发能力远远超越tomcat,因此热点商品抢单我们可以使用Lua+Nginx。

3.3.1 抢单流程分析


用户进入抢单流程,通过Lua脚本判断令牌是否有效,如果有效,则进入抢单环节,抢单环节执行过程我们做一个分析:

1.判断该商品用户是否在24小时内购买过2.如果购买了,直接提示用户24小时内无法购买3.如果用户没有购买过该商品,则判断该商品是否属于热点商品4.如果是非热点商品,则走非热点商品抢单流程5.如果是热点商品,则走热点商品抢单流程6.判断该商品用户是否已经排队,如果没有排队,则进入排队,如果已经排队,则提示用户正在排队7.下单过程交给订单系统,订单系统通过队列订阅读取用户下单信息,并进行下单

3.3.2 Lua实现Redis操作

判断用户是否在24小时内抢购过该商品,我们可以将用户抢单信息存入到Redis缓存中,定时24小时过期即可,此时需要在Lua里面实现Redis集群操作,需要第三方库的支持lua-resty-redis-cluster。

我们需要安装lua-resty-redis-cluster,下载地址:<
https://github.com/cuiweixie/lua-resty-redis-cluster>,下载该文件配置后即可实现Redis集群操作。

3.3.2.1 配置lua-resty-redis-cluster

1)lua-resty-redis-cluster配置

将下载好的文件上传到服务器的`/usr/local/openresty目录下,并解压,我们只需要用到包中2个文件rediscluster.lua和redis_slot.c。


lua-resty-redis-cluster/lib/redis_slot.c 拷贝到 openresty/lualib 目录下,将
lua-resty-redis-cluster/lib/resty/rediscluster.lua 拷贝到 openresty/lualib/resty 目录下。

拷贝redis_slot.c:

cd /usr/local/openresty/lua-resty-redis-cluster-master/lib/

cp redis_slot.c /usr/local/openresty/lualib/

拷贝rediscluster.lua:

cd /usr/local/openresty/lua-resty-redis-cluster-master/lib/resty

cp rediscluster.lua /usr/local/openresty/lualib/resty/

编译:

cd /usr/local/openresty/lualib

gcc redis_slot.c -fPIC -shared -o libredis_slot.so

编译的时候有可能会发生如下错误


错误解决:

解决:应该是lua版本不对,自带的lua应该不好使
方式一:删除自带的lua,一般是/usr/lua和/usr/luac ,删除这两个文件
方式二:yum install lua-devel 下载一个依赖方式三:自己重新再lua官网下载一个lua,重新安装一个lua(这个很好使)

这里我们可以选择第二种方式解决,再次进行编译就没问题了。

2)指令配置

lua-resty-redis-cluster中有部分redis指令并未开放,我们可以手动修改,开放相关指令,我们这里开放过期指令,因为后面会用到该指令。

修改
/usr/local/openresty/lualib/resty/rediscluster.lua文件,添加相关指令,如下图:


3.3.2.2 操作Redis集群实现

以后别的地方也有可能会用到redis,我们可以写个工具类redis-cluster.lua,实现redis的操作,这里主要实现了根据key获取缓存数据、根据key设置缓存过期时间、根据key从hash类型中获取数据、往hash类型中添加数据,代码如下:

--redis连接配置local config = {
    name = "test",
    serv_list = {
        {ip="192.168.211.137", port = 7001},
        {ip="192.168.211.137", port = 7002},
        {ip="192.168.211.137", port = 7003},
        {ip="192.168.211.137", port = 7004},
        {ip="192.168.211.137", port = 7005},
        {ip="192.168.211.137", port = 7006},
    },
    idle_timeout    = 1000,
    pool_size       = 10000,
}--引入redis集群配置local redis_cluster = require "resty.rediscluster"--定义一个对象local lredis = {}--根据key查询function lredis.get(key)        --创建链接        local red = redis_cluster:new(config)
        red:init_pipeline()        --根据key获取数据
        red:get(key)        local rresult = red:commit_pipeline()        --关闭链接
        red:close()        return rresultend--添加带过期的数据function lredis.setexp(key,value,time)        --创建链接        local red = redis_cluster:new(config)
        red:init_pipeline()        --添加key,同时设置过期时间
        red:set(key,value)
        red:expire(key,time)        local rresult = red:commit_pipeline()end--根据key查询hashfunction lredis.hget(key1,key2)        --创建链接        local red = redis_cluster:new(config)
        red:init_pipeline()        --根据key获取数据
        red:hmget(key1,key2)        local rresult = red:commit_pipeline()        --关闭链接
        red:close()        return rresult[1]end--hash数据添加function lredis.hset(key1,key2,value)        --创建链接        local red = redis_cluster:new(config)
        red:init_pipeline()        --添加hash数据
        red:hmset(key1,key2,value)        local rresult = red:commit_pipeline()        --关闭链接
        red:close()        return rresultend--hash中指定的key自增function lredis.hincrby(key1,key2,value)        --创建链接        local red = redis_cluster:new(config)
        red:init_pipeline()        --添加hash数据
        red:hincrby(key1,key2,value)        local rresult = red:commit_pipeline()        --关闭链接
        red:close()        return rresultendreturn lredis

我们接着来测试一次集群操作,修改nginx.conf,配置一个location节点,如下:

#redislocation /test/redis {
    content_by_lua '
        ngx.header.content_type="application/json;charset=utf8"
        --引入redis
        local rredis = require "redis-cluster"
        --从redis中查询hash类型数据
        local sku = rredis.hget("SKU_S1235433012716498944","num")[1]
        ngx.say(sku)
    ';
}

测试效果如下:


3.3.3 Lua实现Kafka操作

用户抢单的时候,如果是热点商品,这时候需要实现用户排队,用户排队我们需要向kafka发送抢单信息,因此需要使用Lua脚本操作kafka,我们需要依赖lua-restry-kafka库,该库我们也已经配置使用过了,所以这里无需再配置了。

以后使用kafka的地方也有可能会有很多,所以针对kafka我们也可以单独抽取出一个配置脚本,创建一个脚本名字叫’kafka.lua’,用于配置kafka的操作,代码如下:

--创建对象local kafka={}--kafka依赖库local client = require "resty.kafka.client"local producer = require "resty.kafka.producer"--配置kafka的链接地址local broker_list = {
      { host = "192.168.211.137", port = 9092 }
}--发送消息--queuename:队列名字--content:发送的内容function kafka.send(queuename,content)        --创建生产者        local pro = producer:new(broker_list,{ producer_type="async"})        --发送消息        local offset, err = pro:send(queuename, nil, content)        --返回结果        return offsetendreturn kafka

编写一段代码向kafka发送信息,修改nginx.conf,添加如下代码:

#kafkalocation /test/kafka {
    content_by_lua '
        ngx.header.content_type="application/json;charset=utf8"
         --引入kafka
        local kafka = require "kafka"
        --发送消息
        local offset = kafka.send("demo","hello")
        ngx.say(offset)
    ';
}

测试效果如下:



3.3.4 抢单实现


抢单这里分为2部分,首先需要向Kafka发送抢单信息实现排队,排队后,订单系统订阅抢单信息实现下单操作,所有的数据操作一律在Redis中完成,降低程序对服务器的压力。

3.3.4.1 排队

排队抢单需要引入redis和kafka,我们的实现思路如下:

1.校验用户令牌,如果不通过直接结束程序提示用户2.令牌校验通过,从Redis中获取用户在24小时内是抢购过该商品,如果抢购过直接结束程序并提示用户3.如果符合购买该商品条件,则校验该商品是否是热点商品,如果不是,直接请求后台下单4.如果是热点商品,并且库存>0,校验用户是否已经在排队,使用redis的incr自增判断排队次数可以去除重复排队5.如果没有排队,则向Kafka发送消息实现排队

创建seckill-order-add.lua,实现代码如下:

ngx.header.content_type="application/json;charset=utf8"--引入json库local cjson = require "cjson"--引入jwt模块local jwttoken = require "token"--引入redislocal redis = require "redis-cluster"--引入kafkalocal kafka = require "kafka"--获取请求头中的令牌数据local auth_header = ngx.var.http_Authorization--调用令牌校验local result = jwttoken.check(auth_header)--如果code==200表示令牌校验通过if result.code==200 then--响应结果local response = {}--获取idlocal uri_args = ngx.req.get_uri_args()local id = uri_args["id"]--判断该商品用户是否已经在指定时间内购买过local username = result["body"]["payload"]["username"]local userKey= "USER"..username.."ID"..idlocal hasbuy = redis.get(userKey)--如果没有购买,则判断该商品是否是热点商品if hasbuy==nil or hasbuy[1]==nil or hasbuy[1]==ngx.null then--从redis中获取该商品信息local num = redis.hget("SKU_"..id,"num")[1]--如果不是热点商品,则走普通抢单流程if num==nil or num==ngx.null then--拼接urllocal url = "/api/order/"..id--执行请求
			ngx.exec(url)returnelse--热点商品
			num = tonumber(num)--如果有库存,才允许抢单if num<=0 then                --库存不足,无法排队
		        response["code"]=405
                response["message"]="当前商品库存不足,无法抢购"
		        ngx.say(cjson.encode(response))returnelse--递增排队local incrresult = redis.hincrby("SKU_"..id,userKey,1)
				incrresult=tonumber(incrresult)                
				if incrresult==1 then                    --热点数据,发送MQ排队                    local userorder = {}
                    userorder["username"]=username
                    userorder["id"]=id--排队抢单
					kafka.send("neworder",cjson.encode(userorder))

                    response["code"]=202
                    response["message"]="您正在排队抢购该商品"
					ngx.say(cjson.encode(response))returnelse--响应用户正在排队抢购该商品
                	response["code"]=202
        	        response["message"]="您正在排队抢购该商品"
	                ngx.say(cjson.encode(response))returnendendendelse--24小时内购买过该商品
		response["code"]=415
        response["message"]="您24小时内已经抢购了该商品,不能重复抢购"
        ngx.say(cjson.encode(response))returnendelse-- 输出结果
	ngx.say(cjson.encode(result))
	ngx.exit(result.code)end

3.3.4.2 下单实现

创建
com.seckill.order.config.KafkaOrderListener,用于读取排队信息,并调用下单操作,代码如下:

@Componentpublic class KafkaOrderListener {    @Autowired    private OrderService orderService;    /***
     * 监听消息
     * 创建订单
     * @param message
     */    @KafkaListener(topics = {"neworder"})    public void receive(String message){        //将消息转成Map
        Map<String,String> orderMap = JSON.parseObject(message,Map.class);        //创建订单
        orderService.addHotOrder(orderMap);
    }
}

修改
com.seckill.order.service.OrderService添加热点数据下单方法,代码如下:

/***
 * 热点数据下单
 * @param orderMap
 */void addHotOrder(Map<String, String> orderMap);

修改
com.seckill.order.service.impl.OrderServiceImpl添加热点数据下单实现方法,代码如下:

/***
 * 秒杀下单
 * @param orderMap
 */@Override
public void addHotOrder(Map<String, String> orderMap) {    String id = orderMap.get("id");    String username = orderMap.get("username");    //key    String key = "SKU_" + id;    //用户购买的key    String userKey = "USER" + username + "ID" + id;    if (redisTemplate.hasKey(key)) {        //数量
        Integer num = Integer.parseInt(redisTemplate.boundHashOps(key).get("num").toString());        //拥有库存,执行递减操作        if (num > 0) {            //查询商品
            Result<Sku> result = skuFeign.findById(id);
            Sku sku = result.getData();
            Order order = new Order();
            order.setCreateTime(new Date());
            order.setUpdateTime(order.getCreateTime());
            order.setUsername(username);
            order.setSkuId(id);
            order.setName(sku.getName());
            order.setPrice(sku.getSeckillPrice());
            order.setId("No" + idWorker.nextId());
            order.setOrderStatus("0");
            order.setPayStatus("0");
            order.setConsignStatus("0");
            orderMapper.insertSelective(order);            //库存递减
            num--;            if (num == 0) {                //同步数据到数据库,秒杀数量归零
                skuFeign.zero(id);
            }            //更新数据            Map<String, Object> dataMap = new HashMap<String, Object>();
            dataMap.put("num", num);
            dataMap.put(userKey, 0);            //存数据
            redisTemplate.boundHashOps(key).putAll(dataMap);
        }        //记录该商品用户24小时内无法再次购买,测试环境,我们只配置成1分钟
        redisTemplate.boundValueOps(userKey).set("");
        redisTemplate.expire(userKey, 1, TimeUnit.MINUTES);
    }
}

本文链接:https://www.woshiqian.com/post/226968.html

<< 上一篇 到底啦 >>

  • 评论(0)
  • 赞助本站

       

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。