什么是分布式锁
      分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务。
        
          数据库锁
      1、基于MySQL锁表
完全依靠数据库唯一索引来实现,当想要获得锁时,即向数据库中插入一条记录,释放锁时就删除这条记录。
这种方式存在以下问题:
- 锁没有失效时间,解锁失败会导致死锁,其他线程无法再获取到锁,因为唯一索引insert都会返回失败
- 只能是非阻塞锁,insert失败直接就报错了,无法进入队列进行重试
- 不可重入,同一线程在没有释放锁之前无法再获取到锁
2、采用乐观锁
增加版本号,根据版本号来判断更新之前有没有其他线程更新过,如果被更新过,则获取锁失败。
        
          缓存锁
      采用setnx(),get(),getset()
(1) 线程A setnx,值为超时的时间戳(t1),如果返回true,获得锁。
(2) 线程B用get 命令获取t1,与当前时间戳比较,判断是否超时,没超时false,如果已超时执行步骤3
(3) 计算新的超时时间t2,使用getset命令返回t3(这个值可能其他线程已经修改过),如果t1==t3,获得锁,如果t1!=t3说明锁被其他线程获取了
(4) 获取锁后,处理完业务逻辑,再去判断锁是否超时,如果没超时删除锁,如果已超时,不用处理(防止删除其他线程的锁)
        
          zk分布式锁
         在获取分布式锁的时候在locker节点下创建临时顺序节点,释放锁的时候删除该临时节点。客户端调用createNode方法在locker下创建临时顺序节点,然后调用getChildren(“locker”)来获取locker下面的所有子节点,注意此时不用设置任何Watcher。客户端获取到所有的子节点path之后,如果发现自己在之前创建的子节点序号最小,那么就认为该客户端获取到了锁。如果发现自己创建的节点并非locker所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时对其注册事件监听器。之后,让这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是locker子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 
 | 
 
 @Service
 public class ZookeeperImproveLock implements Lock {
 private static final String LOCK_PATH="/LOCK";
 private static final String ZOOKEEPER_IP_PORT="localhost:2181";
 private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT,1000,1000,new SerializableSerializer());
 private CountDownLatch cdl;
 private String beforePath;
 private String currentPath;
 
 
 public ZookeeperImproveLock(){
 if(!this.client.exists(LOCK_PATH)){
 this.client.createPersistent(LOCK_PATH);
 }
 }
 
 @Override
 
 public boolean tryLock() {
 try {
 
 if(currentPath==null||this.client.getChildren(LOCK_PATH).size() <=0){
 
 currentPath = this.client.createEphemeralSequential (LOCK_PATH + '/',"lock");
 System.out.println("创建一个临时节点--->"+currentPath);
 }
 
 List<String> children = this.client.getChildren(LOCK_PATH);
 Collections.sort(children);
 System.out.println(children.toString());
 System.out.println(Thread.currentThread().getName()+ "get0--->"+ children.get(0));
 System.out.println(Thread.currentThread().getName()+"currentPath"+ currentPath);
 String realPath = LOCK_PATH+'/'+children.get(0);
 System.out.println(Thread.currentThread().getName()+ "real---"+ realPath);
 if(currentPath.equals(realPath)){
 System.out.println(Thread.currentThread().getName()+"成功啦--->" + currentPath);
 return true;
 }else{
 
 
 
 
 
 int wz = Collections.binarySearch(children,currentPath.substring(6));
 beforePath = LOCK_PATH + '/' + children.get(wz-1);
 System.out.println(Thread.currentThread().getName()+"beforePath"+ beforePath);
 }
 } catch (RuntimeException e) {
 e.printStackTrace();
 }
 return false;
 }
 
 @Override
 public void lock() {
 if(!tryLock()){
 System.out.println("获取锁不成功--->" + currentPath);
 waitForLock();
 lock();
 }else{
 System.out.println("获得分布式锁--->"+currentPath);
 }
 }
 
 private void waitForLock() {
 IZkDataListener listener = new IZkDataListener() {
 @Override
 public void handleDataChange(String s, Object o) throws Exception {
 
 }
 
 @Override
 public void handleDataDeleted(String s) throws Exception {
 if(cdl != null){
 System.out.println("countdown" + currentPath);
 cdl.countDown();
 }
 }
 };
 
 
 this.client.subscribeDataChanges(beforePath,listener);
 
 if(this.client.exists(beforePath)){
 cdl = new CountDownLatch(1);
 try {
 cdl.await();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 this.client.unsubscribeDataChanges(beforePath,listener);
 }
 
 @Override
 public void lockInterruptibly() throws InterruptedException {
 
 }
 
 
 
 @Override
 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
 return false;
 }
 
 @Override
 public void unlock() {
 
 this.client.delete(currentPath);
 System.out.println("删除当前锁---->" + currentPath);
 }
 
 @Override
 public Condition newCondition() {
 return null;
 }
 }
 
 
 | 
          Zookeeper集群为什么要是单数
      在zookeeper集群中,会有三种角色,leader、 follower、 observer分别对应着总统、议员、观察者。
半数以上投票通过:客户端的增删改操作无论访问到了哪台zookeeper服务器,最终都会被转发给leader服务器,再由leader服务器分给zookeeper集群中所有follower服务器去投票(投票指的是在内存中做增删改操作),半数投票通过就被认为操作可执行(commit),否则不可执行。
observer观察者服务器是针对于查询操作做负载的,observer与follower服务器最大的不同在于observer没有投票权,在客户端发起的增删改操中,leader服务器是不会把消息传递给observer服务器让其投票的。但是查询操作跟follower一样,客户端的查询到了observer服务器节点,observer服务器去访问leader服务器取最新的数据然后返回给客户端。
        
          原因
      1、容错
由于在增删改操作中需要半数以上服务器通过,来分析以下情况。
2台服务器,至少2台正常运行才行(2的半数为1,半数以上最少为2),正常运行1台服务器都不允许挂掉
3台服务器,至少2台正常运行才行(3的半数为1.5,半数以上最少为2),正常运行可以允许1台服务器挂掉
4台服务器,至少3台正常运行才行(4的半数为2,半数以上最少为3),正常运行可以允许1台服务器挂掉
5台服务器,至少3台正常运行才行(5的半数为2.5,半数以上最少为3),正常运行可以允许2台服务器挂掉
6台服务器,至少3台正常运行才行(6的半数为3,半数以上最少为4),正常运行可以允许2台服务器挂掉
通过以上可以发现,3台服务器和4台服务器都最多允许1台服务器挂掉,5台服务器和6台服务器都最多允许2台服务器挂掉
但是明显4台服务器成本高于3台服务器成本,6台服务器成本高于5服务器成本。这是由于半数以上投票通过决定的。
2、防脑裂
一个zookeeper集群中,可以有多个follower、observer服务器,但是必需只能有一个leader服务器。
如果leader服务器挂掉了,剩下的服务器集群会通过半数以上投票选出一个新的leader服务器(这是脑裂的大前提)。
集群互不通讯情况:
一个集群3台服务器,全部运行正常,但是其中1台裂开了,和另外2台无法通讯。3台机器里面2台正常运行过半票可以选出一个leader。
一个集群4台服务器,全部运行正常,但是其中2台裂开了,和另外2台无法通讯。4台机器里面2台正常工作没有过半票以上达到3,无法选出leader正常运行。
一个集群5台服务器,全部运行正常,但是其中2台裂开了,和另外3台无法通讯。5台机器里面3台正常运行过半票可以选出一个leader。
一个集群6台服务器,全部运行正常,但是其中3台裂开了,和另外3台无法通讯。6台机器里面3台正常工作没有过半票以上达到4,无法选出leader正常运行。
通可以上分析可以看出,为什么zookeeper集群数量总是单出现,主要原因还是在于第2点,防脑裂,对于第1点,无非是成本控制,但是不影响集群正常运行。但是出现第2种裂的情况,zookeeper集群就无法正常运行了。