在单机部署的系统中,使用线程锁来解决高并发的问题,多线程访问共享变量的问题达到数据一致性,如使用synchornized、ReentrantLock等;
但是在后端集群部署的系统中,程序在不同的JVM虚拟机中运行,且因为synchronized或ReentrantLock都只能保证同一个JVM进程中保证有效,所以这时就需要使用分布式锁了。
建立三张表:商品表、订单表、订单商品表
CREATE TABLE `product` (`id` int(11) NOT NULL,`product_name` varchar(255) DEFAULT NULL COMMENT '商品名字',`price` decimal(10,2) DEFAULT NULL COMMENT '商品价格',`count` bigint(50) unsigned DEFAULT NULL COMMENT '库存',`product_desc` varchar(255) DEFAULT NULL COMMENT '商品描述',`version` int(255) DEFAULT NULL COMMENT '乐观锁',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;CREATE TABLE `t_order` (`id` varchar(255) NOT NULL,`order_status` int(1) DEFAULT NULL COMMENT '订单状态 1 待支付 2已支付',`receiver_name` varchar(255) DEFAULT NULL COMMENT '收货人名字',`receiver_mobile` varchar(255) DEFAULT NULL COMMENT '收货人手机',`order_amount` decimal(10,2) DEFAULT NULL COMMENT '订单价格',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;CREATE TABLE `order_item` (`id` varchar(255) NOT NULL,`order_id` varchar(36) DEFAULT NULL COMMENT '订单ID',`produce_id` int(11) DEFAULT NULL COMMENT '商品ID',`purchase_price` decimal(10,2) DEFAULT NULL COMMENT '购买价格',`purchase_num` int(11) DEFAULT NULL COMMENT '购买数量',PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
@Transactional(rollbackFor = Exception.class)@Overridepublic synchronized String createOrder0(Integer produceId, Integer purchaseNum) {// 1、根据商品id获取商品信息Product product = productMapper.selectById(produceId);// 2、判断商品是否存在if (product == null) {throw new RuntimeException("购买商品不存在");}log.info(Thread.currentThread().getName() + "库存数量" + product.getCount());// 3、校验库存if (purchaseNum > product.getCount()) {throw new RuntimeException("库存不足");}// 4、更新库存操作int count = product.getCount() - purchaseNum;product.setCount(count);productMapper.updateById(product);// 5、创建订单TOrder order = new TOrder();//订单状态order.setOrderStatus(1);order.setReceiverName("张三");order.setReceiverMobile("18587781058");//订单价格order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseNum)));baseMapper.insert(order);// 6、创建订单和商品关联OrderItem orderItem = new OrderItem();//订单idorderItem.setOrderId(order.getId());// 商品idorderItem.setProduceId(product.getId());// 购买价格orderItem.setPurchasePrice(product.getPrice());// 购买数量orderItem.setPurchaseNum(purchaseNum);orderItemMapper.insert(orderItem);return order.getId();}
结果、商品表中有5个,但是订单表中却有9个订单,原因是在这个进程执行完后,锁就放开了,但是不能保证事务提交了。故采用下面这种方案来手动提交事务。
Spring进行了统一的抽象,形成了 PlatformTransactionManager事务管理器接口 ,事务的 提交、回滚等操作 全部交给它来实现
事务功能的总体接口设计
@Autowiredprivate PlatformTransactionManager platformTransactionManager;@Autowiredprivate TransactionDefinition transactionDefinition;@Overridepublic synchronized String createOrder(Integer produceId, Integer purchaseNum) {TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition);// 1、根据商品id获取商品信息Product product = productMapper.selectById(produceId);// 2、判断商品是否存在if (product == null) {platformTransactionManager.rollback(transaction);throw new RuntimeException("购买商品不存在");}log.info(Thread.currentThread().getName() + "库存数量" + product.getCount());// 3、校验库存if (purchaseNum > product.getCount()) {platformTransactionManager.rollback(transaction);throw new RuntimeException("库存不足");}// 4、更新库存操作int count = product.getCount() - purchaseNum;product.setCount(count);productMapper.updateById(product);// 5、创建订单TOrder order = new TOrder();//订单状态order.setOrderStatus(1);order.setReceiverName("张三");order.setReceiverMobile("18587781058");//订单价格order.setOrderAmount(product.getPrice().multiply(new BigDecimal(purchaseNum)));baseMapper.insert(order);// 6、创建订单和商品关联OrderItem orderItem = new OrderItem();//订单idorderItem.setOrderId(order.getId());// 商品idorderItem.setProduceId(product.getId());// 购买价格orderItem.setPurchasePrice(product.getPrice());// 购买数量orderItem.setPurchaseNum(purchaseNum);orderItemMapper.insert(orderItem);//提交事务platformTransactionManager.commit(transaction);return order.getId();}
分布式的CAP理论告诉我们:“任何一个系统都无法满足一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)”,最多满足两项。故设计之初就要进行取舍。互联网的绝大多数场景中都是牺牲系统的强一致性来换取系统的高可用性。系统往往只保证最终一致性,只要这个时间在可接受范围内。
基于数据库实现的分布式锁主要利用数据库的唯一索引来实现,唯一索引泰然具有排他性。
1、使用Redis实现分布式锁效率最高,加锁速度最快,因为Redis几乎都是纯内存操作,而数据库和zookeeper都会涉及到磁盘文件IO,效率相对低下。
2、一般Redis实现分布式锁都是利用Redis的SETNX key value这个命令,当key不存在时才执行成功,如果key已经存在则命令执行失败。
Zookeeper一般用作配置中心,其实现分布式锁的原理与Redis类似,我们在Zookeeper中创建临时顺序节点,利用节点不能重复创建来保证排他性。
1、是否可重入
2、锁释放机制
3、分布式锁服务单点问题
悲观锁每次拿数据的时候都会上锁。
1、取出记录,获取当前version
2、更新时,带上这个version
3、执行更新时候,set version = newversion where version = oldVersion
4、如果version不对,就更新失败
MySQL默认隔离级别是可重复度,
update product set count = count - #{purchaseNum},version = version+1 where id = #{id} and version = #{version}
代码逻辑主要写了关键部分,其余和上面的代码类似。
@Override@Transactional(rollbackFor = Exception.class)public String createOrderOptimisticlock(Integer produceId, Integer purchaseNum) {//更新商品数量重试次数int retryCount = 0;//更新结果int update = 0;// 1、根据商品id获取商品信息Product product = productMapper.findById(produceId);// 2、判断商品是否存在if (product == null) {throw new RuntimeException("购买商品不存在");}log.info(Thread.currentThread().getName() + "库存数量" + product.getCount());// 3、校验库存if (purchaseNum > product.getCount()) {throw new RuntimeException("库存不足");}/*** 乐观锁更新库存* 更新失败 说明其他线程已经更新过数据,本地扣减库存失败,可以重试,最多重试3次*/while (retryCount < 3 && update == 0){update = this.reduceCount(produceId, purchaseNum);retryCount ++;}if (update == 0) {throw new RuntimeException("库存不够");}// 5、创建订单// 6、创建订单和商品关联}
更新库存方法
/**减库存* mysql 默认事务隔离级别可重复读,* 会导致在同一个事务里面查询3次 * productMapper.selectById(produceId);* 得到的数据始终都是相同的。 所以我们就提供一个reduceCount * 每次循环都启动一个新的事务扣减库存操作 。* @param produceId* @param purchaseNum* @return*/@Transactional(rollbackFor = Exception.class)public int reduceCount(Integer produceId, Integer purchaseNum) {int result = 0;// 1. 查询商品库存Product product = productMapper.selectById(produceId);// 2. 判断库存if (product.getCount() >= purchaseNum) {//3. 减库存 乐观锁更新库存result = productMapper.updateProduct(product.getId(), purchaseNum, product.getVersion());}return result;}
获取锁
互斥:确保只有一个线程获得锁
setnx lock thread1
释放锁
1、手动释放
2、超时释放
#手动释放,删除即可
del lock
超时释放
setnx lock thread1
expire lock 5
ttl lock# 第二种方式SET key value [EX seconds] [PX milliseconds] [NX|XX]
set lock k1 ex 5 nx
org.springframework.boot spring-boot-starter-data-redis
spring:redis:host: localhostport: 6379
@Override@Transactional(rollbackFor = Exception.class)public String createOrderRedis(Integer produceId, Integer purchaseNum) {String key = "lock";Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key + produceId, Thread.currentThread().getId() + "", 10, TimeUnit.SECONDS);if (!result){return "不允许重复下单!";}try {// 1、根据商品id获取商品信息// 2、判断商品是否存在// 3、校验库存// 4、更新库存操作// 5、创建订单// 6、创建订单和商品关联}catch (Exception e){e.printStackTrace();}finally {// 1、获取锁标识String threadIdFlag = stringRedisTemplate.opsForValue().get(key + produceId);//2、获取当前线程idString id = Thread.currentThread().getId()+"";if (id.equals(threadIdFlag)){stringRedisTemplate.delete(key+produceId);}}return "创建失败!";}
在释放锁的时候判断锁标识,是自己线程的锁才可以释放锁,防止锁被误删除。如下图:
#1、配置锁标识
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString().replace("-","");
#2、获取锁//1、获取线程标识String threadId = ID_PREFIX + Thread.currentThread().getId();// 2、获得锁 setnx key value time typeBoolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+produceId, threadId, 30, TimeUnit.SECONDS);
# 3、释放锁
// 获取锁标识String s = stringRedisTemplate.opsForValue().get(KEY_PREFIX + produceId);// 判断标识是否一致if (s.equals(threadId)){// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + produceId);}
org.redisson redisson-spring-boot-starter 3.17.3
创建工具类
package com.tqq.lock.utils;import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** 分布式锁*/
@Slf4j
@Component
public class DistributedRedisLock {@Autowiredprivate RedissonClient redissonClient;/*** 加锁** @param lockName* @return*/public Boolean lock(String lockName) {// 1. 判断 redisclient是否为空if (redissonClient == null) {log.info("DistributedRedisLock redission client is null");return false;}try {// 2. 加锁RLock lock = redissonClient.getLock(lockName);// 3. 添加过期时间// 加锁以后10秒钟自动解锁// 无需调用unlock方法手动解锁lock.lock(10, TimeUnit.SECONDS);return true;} catch (Exception e) {e.printStackTrace();return false;}}/*** 释放锁** @param lockName* @return*/public Boolean unlock(String lockName) {// 1. 判断 redisclient是否为空if (redissonClient == null) {log.info("DistributedRedisLock redission client is null");return false;}try {RLock lock = redissonClient.getLock(lockName);lock.unlock();// 释放锁return true;} catch (Exception e) {e.printStackTrace();return false;}}
}
业务方法
@Override@Transactional(rollbackFor = Exception.class)public String createOrderRedission(Integer produceId, Integer purchaseNum) {// 1. 加锁String key = "lock:";Boolean lock = distributedRedisLock.lock(key.concat(produceId + ""));// 2. 判断是否获取到锁if (!lock) {return "失败";}try {// 1、根据商品id获取商品信息// 2、判断商品是否存在// 3、校验库存// 4、更新库存操作// 5、创建订单// 6、创建订单和商品关联}catch (Exception e){e.printStackTrace();}finally {distributedRedisLock.unlock(key.concat(produceId + ""));}return "失败";}
Zookeeper的结点Znode有四种类型
1、持久节点:默认的结点类型,创建结点的客户端与zookeeper断开连接后,该节点依旧存在;
2、持久节点顺序节点:所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号,持久节点顺序节点就是有顺序的持久结点;
3、临时结点:和持久结点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除;
4、顺序节点临时节点:有顺序的临时结点
创建临时顺序节点:
create -e -s /test123
-e :临时结点
-s:顺序节点
当第一个客户端请求过来时,zookeeper客户端会创建一个持久节点locks,若它(client1)想要获取锁,需要在locks节点下创建一个顺序节点lock1;然后客户端client1会查找locks下的所有临时顺序节点,看自己是不是最小的那一个,如果是,则获取锁。
这时如果又来了一个客户端client2前来尝试获取锁,他会在locks下创建一个临时顺序节点lock2,它会和client1一样在locks下查找,发现lock1才是最小的,获取锁失败。client2会想它靠前的节点lock1注册watch时间,用来监听lock1是否存在。即client2抢锁失败进入等待状态。
如果在来client3,也会向client2一样创建、查找、监听。
如果任务完成,client1会显式调用删除lock1的指令;
如果客户端故障了,根据临时结点的特性,lock1会自动删除的;
lock1删除,client2会立刻接收到通知,它会再在locks下进行扫描,发现lock2是最小的,获得锁。
Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化ZooKeeper的操作。
org.apache.curator curator-framework 5.2.0 org.apache.curator curator-recipes 5.2.0 org.apache.curator curator-client 5.2.0
@Configuration
public class ZookeeperConfig {/*** 创建Cuator客户端* @return*/@Beanpublic CuratorFramework getZkClient(){// 1. 创建cuator客户端CuratorFramework client = CuratorFrameworkFactory.builder()// zk链接地址.connectString("localhost:2181")// 回话超时时间.sessionTimeoutMs(5000)// 链接创建超时时间.connectionTimeoutMs(5000)// 重试策略.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();client.start();return client;}}
@Transactional(rollbackFor = Exception.class)@Overridepublic String createOrderZooKeeper(Integer produceId, Integer purchaseNum) throws Exception {//InterProcessMutex公平锁//client cruator 中zk客户端对象 path强锁路径同一个锁path需要一致InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/lockPath");//尝试获取锁,第一个参数 时间数字 第二个参数 时间单位if (lock.acquire(5,TimeUnit.SECONDS)){try {// 1、根据商品id获取商品信息// 2、判断商品是否存在// 3、校验库存// 4、更新库存操作// 6、创建订单和商品关联}catch (Exception e){e.printStackTrace();}finally {lock.release();}}return "下单失败";}
优点:简单、使用方便;不需要引入Redis、Zookeeper等中间件;
缺点:不适合高并发场景、db操作性能较差。
优点:性能好,适合高并发场景;较轻量级;有较好的框架支持,如Redisson
缺点:过期时间不好控制;需要考虑锁被其他献策会给你误删的场景
优点:有较好的性能和可靠性,有封装较好的框架,如Curator;
缺点:性能不如Redis实现的分布式锁;比较重的分布式锁;
性能角度:Redis>Zookeeper>数据库
实现复杂程度:ZooKeeper>Redis>数据库
可靠性:ZooKeeper>Redis>数据库
原码:https://gitee.com/hellotqq/distributed/tree/master/lock/lock