zk中实现分布式锁服务
什么场景下需要实现分布式锁" title="分布式锁">分布式锁?
Q: 例如多台客户端修改zk中配置文件,如何保证数据的一致性
解决方案
zk中分布式锁实现步骤 创建锁,获取锁,删除锁
具体实现
开始在zk中create 一个persistent类型znode,例如名字叫 /locks/write_lock
其他客户端需要在修改文件之前,执行如下步骤,看是否能够获取修改文件的锁,如果获取成功,可以修改文件内容
1客户端创建一个(临时顺序节点)sequence|ephemeral类型znode,名字是lockid开头,创建的znode是
/Locks/write_lock/lockid0000000001
2调用getChildren()不要设置watcher获取/locks/write_lock下的znode列表(这里不设置锁,是为了避免羊群效应,接收到一些无关通知,给server带来很大压力)
3判断创建的节点是不是排序后子节点列表中最小的那个节点,如果是,表示已经获取到了锁,业务完成之后,删除这个节点,如果不是,获取锁失败
4调用exists()方法判断比自己创建节点序列号小1的znode节点是否存在(也就是获取znode节点列表中最小的znode),并且监听该节点状态变化,如果exists()返回false,表示锁已经释放,从2步骤开始继续判断
5如果4中exists()返回true,等待zk通知,接收到通知后继续从2开始执行检查
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @date 2019/11/8 15:14
* description:
*/
public class ZookeeperLock implements Watcher {
public static final String XIE = "/";
private ZooKeeper zk = null;
private final String hosts = "127.0.0.1:2181";
private static final int SESSION_TIMEOUT = 5000;
private String root = "/locks";
private String waitNode;
private int waitTime = 100;
private String myzkNode;
private CountDownLatch latch;
public ZookeeperLock() throws IOException, KeeperException, InterruptedException {
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
Stat stat = zk.exists(root, false);
if (stat == null) {
//创建一个永久根节点
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public boolean tryLock() {
String splitStr = "lock_";
//获取下面所有子节点
List<String> children = null;
try {
myzkNode = zk.create(root + XIE + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
children = zk.getChildren(root, false);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//对子节点排序
Collections.sort(children);
//只存在一个要获取锁的节点,没有竞争
if (myzkNode.equals(root + XIE + children.get(0))) {
return true;
}
//监控比自己小的节点
String childNode = myzkNode.substring(myzkNode.lastIndexOf(XIE) + 1);
waitNode = children.get(Collections.binarySearch(children, childNode) - 1);
return false;
}
private boolean waitLock(String waitNode, int waitTime) throws KeeperException, InterruptedException {
Stat stat = zk.exists(root + XIE + waitNode, true);
if (stat != null) {
this.latch = new CountDownLatch(1);
latch.await(waitTime, TimeUnit.MILLISECONDS);
this.latch = null;
}
return true;
}
/**
* 获取锁
*/
public void lock() {
if (tryLock()) {
System.out.println("Thread" + Thread.currentThread().getName() + "***************************************hold lock!");
return;
} else {
try {
waitLock(waitNode, waitTime);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 释放锁
*/
public void unlock() {
try {
zk.delete(myzkNode, -1);
zk.close();
System.out.println("Thread" + Thread.currentThread().getName() + "*********************************unlock success!" + myzkNode);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public void process(WatchedEvent event) {
if (this.latch != null) {
this.latch.countDown();
}
}
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
//模拟并发数为10
final Semaphore semaphore = new Semaphore(10);
for (int i = 0; i < 100; i++) {
Runnable runnable = () -> {
try {
semaphore.acquire();
ZookeeperLock zkLock = new ZookeeperLock();
zkLock.lock();
//do something
Thread.sleep(1000);
zkLock.unlock();
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
};
service.execute(runnable);
}
service.shutdown();
}
}
以上是 zk中实现分布式锁服务 的全部内容, 来源链接: utcz.com/z/510541.html