zookeeper分布式锁及zookeeper集群单数原因

什么是分布式锁

分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多个tomcat应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务。

数据库锁

1、基于MySQL锁表
完全依靠数据库唯一索引来实现,当想要获得锁时,即向数据库中插入一条记录,释放锁时就删除这条记录。
这种方式存在以下问题:

  • 锁没有失效时间,解锁失败会导致死锁,其他线程无法再获取到锁,因为唯一索引insert都会返回失败
  • 只能是非阻塞锁,insert失败直接就报错了,无法进入队列进行重试
  • 不可重入,同一线程在没有释放锁之前无法再获取到锁

2、采用乐观锁
增加版本号,根据版本号来判断更新之前有没有其他线程更新过,如果被更新过,则获取锁失败。

缓存锁

采用setnx(),get(),getset()
(1) 线程A setnx,值为超时的时间戳(t1),如果返回true,获得锁。
(2) 线程B用get 命令获取t1,与当前时间戳比较,判断是否超时,没超时false,如果已超时执行步骤3
(3) 计算新的超时时间t2,使用getset命令返回t3(这个值可能其他线程已经修改过),如果t1==t3,获得锁,如果t1!=t3说明锁被其他线程获取了
(4) 获取锁后,处理完业务逻辑,再去判断锁是否超时,如果没超时删除锁,如果已超时,不用处理(防止删除其他线程的锁)

zk分布式锁

   在获取分布式锁的时候在locker节点下创建临时顺序节点,释放锁的时候删除该临时节点。客户端调用createNode方法在locker下创建临时顺序节点,然后调用getChildren(“locker”)来获取locker下面的所有子节点,注意此时不用设置任何Watcher。客户端获取到所有的子节点path之后,如果发现自己在之前创建的子节点序号最小,那么就认为该客户端获取到了锁。如果发现自己创建的节点并非locker所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时对其注册事件监听器。之后,让这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是locker子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* zk实现分布式锁
*/
@Service
public class ZookeeperImproveLock implements Lock {
private static final String LOCK_PATH="/LOCK";
private static final String ZOOKEEPER_IP_PORT="localhost:2181";
private ZkClient client = new ZkClient(ZOOKEEPER_IP_PORT,10001000new SerializableSerializer());
private CountDownLatch cdl;
private String beforePath; //当前请求的节点前一个节点
private String currentPath; //当前请求的节点

// 判断有没有LOCK_PATH目录,没有则创建
public ZookeeperImproveLock(){
if(!this.client.exists(LOCK_PATH)){
this.client.createPersistent(LOCK_PATH);
}
}

@Override
//非阻塞时加锁
public boolean tryLock() {
try {
// 如果currentpath为空则为第一次尝试加锁,第一次加锁赋值currentpath
if(currentPath==null||this.client.getChildren(LOCK_PATH).size() <=0){
// 创建一个临时顺序节点
currentPath = this.client.createEphemeralSequential (LOCK_PATH + '/'"lock");
System.out.println("创建一个临时节点--->"+currentPath);
}
//获取所有临时节点并排序,临时节点名称为自增长的字符串,如:00000000400
List<String> children = this.client.getChildren(LOCK_PATH);
Collections.sort(children);
System.out.println(children.toString());
System.out.println(Thread.currentThread().getName()+ "get0--->"+ children.get(0));
System.out.println(Thread.currentThread().getName()+"currentPath"+ currentPath);
String realPath = LOCK_PATH+'/'+children.get(0);
System.out.println(Thread.currentThread().getName()+ "real---"+ realPath);
if(currentPath.equals(realPath)){
System.out.println(Thread.currentThread().getName()+"成功啦--->" + currentPath);
return true;
}else{
//如果当前节点在所有节点中排名不是第一,则获取前面的节点名称,并赋值给beforepath
// int wz = 0;
// if(children.contains(currentPath)){
// wz = children.indexOf(currentPath);
// }
int wz = Collections.binarySearch(children,currentPath.substring(6));
beforePath = LOCK_PATH + '/' + children.get(wz-1);
System.out.println(Thread.currentThread().getName()+"beforePath"+ beforePath);
}
} catch (RuntimeException e) {
e.printStackTrace();
}
return false;
}

@Override
public void lock() {
if(!tryLock()){
System.out.println("获取锁不成功--->" + currentPath);
waitForLock();
lock();
}else{
System.out.println("获得分布式锁--->"+currentPath);
}
}

private void waitForLock() {
IZkDataListener listener = new IZkDataListener() {
@Override
public void handleDataChange(String s, Object o) throws Exception {

}

@Override
public void handleDataDeleted(String s) throws Exception {
if(cdl != null){
System.out.println("countdown" + currentPath);
cdl.countDown();
}
}
};

//给排在前面的节点增加数据删除的watcher
this.client.subscribeDataChanges(beforePath,listener);

if(this.client.exists(beforePath)){
cdl = new CountDownLatch(1);
try {
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.client.unsubscribeDataChanges(beforePath,listener);
}

@Override
public void lockInterruptibly() throws InterruptedException {

}



@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}

@Override
public void unlock() {
//删除当前临时节点
this.client.delete(currentPath);
System.out.println("删除当前锁---->" + currentPath);
}

@Override
public Condition newCondition() {
return null;
}
}

Zookeeper集群为什么要是单数

在zookeeper集群中,会有三种角色,leader、 follower、 observer分别对应着总统、议员、观察者。

半数以上投票通过:客户端的增删改操作无论访问到了哪台zookeeper服务器,最终都会被转发给leader服务器,再由leader服务器分给zookeeper集群中所有follower服务器去投票(投票指的是在内存中做增删改操作),半数投票通过就被认为操作可执行(commit),否则不可执行。

observer观察者服务器是针对于查询操作做负载的,observer与follower服务器最大的不同在于observer没有投票权,在客户端发起的增删改操中,leader服务器是不会把消息传递给observer服务器让其投票的。但是查询操作跟follower一样,客户端的查询到了observer服务器节点,observer服务器去访问leader服务器取最新的数据然后返回给客户端。

原因

1、容错
由于在增删改操作中需要半数以上服务器通过,来分析以下情况。

2台服务器,至少2台正常运行才行(2的半数为1,半数以上最少为2),正常运行1台服务器都不允许挂掉

3台服务器,至少2台正常运行才行(3的半数为1.5,半数以上最少为2),正常运行可以允许1台服务器挂掉

4台服务器,至少3台正常运行才行(4的半数为2,半数以上最少为3),正常运行可以允许1台服务器挂掉

5台服务器,至少3台正常运行才行(5的半数为2.5,半数以上最少为3),正常运行可以允许2台服务器挂掉

6台服务器,至少3台正常运行才行(6的半数为3,半数以上最少为4),正常运行可以允许2台服务器挂掉

通过以上可以发现,3台服务器和4台服务器都最多允许1台服务器挂掉,5台服务器和6台服务器都最多允许2台服务器挂掉

但是明显4台服务器成本高于3台服务器成本,6台服务器成本高于5服务器成本。这是由于半数以上投票通过决定的。

2、防脑裂
一个zookeeper集群中,可以有多个follower、observer服务器,但是必需只能有一个leader服务器。

如果leader服务器挂掉了,剩下的服务器集群会通过半数以上投票选出一个新的leader服务器(这是脑裂的大前提)。

集群互不通讯情况:

一个集群3台服务器,全部运行正常,但是其中1台裂开了,和另外2台无法通讯。3台机器里面2台正常运行过半票可以选出一个leader。

一个集群4台服务器,全部运行正常,但是其中2台裂开了,和另外2台无法通讯。4台机器里面2台正常工作没有过半票以上达到3,无法选出leader正常运行。

一个集群5台服务器,全部运行正常,但是其中2台裂开了,和另外3台无法通讯。5台机器里面3台正常运行过半票可以选出一个leader。

一个集群6台服务器,全部运行正常,但是其中3台裂开了,和另外3台无法通讯。6台机器里面3台正常工作没有过半票以上达到4,无法选出leader正常运行。

通可以上分析可以看出,为什么zookeeper集群数量总是单出现,主要原因还是在于第2点,防脑裂,对于第1点,无非是成本控制,但是不影响集群正常运行。但是出现第2种裂的情况,zookeeper集群就无法正常运行了。