springboot项目中zookeeper分布式锁的实现
导入开发包
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
添加配置项
spring.cloud.zookeeper.session-timeout-ms=6000spring.cloud.zookeeper.connection-timeout-ms=6000
#spring.cloud.zookeeper.discovery.register=true
#spring.cloud.zookeeper.discovery.root=/
#spring.cloud.zookeeper.discovery.enabled=true
#spring.cloud.zookeeper.discovery.instanceHost=127.0.0.1
#spring.cloud.zookeeper.prefix=
spring.cloud.zookeeper.enabled=true
spring.cloud.zookeeper.connect-string=127.0.0.1:2181
spring.cloud.zookeeper.max-retries=3
spring.cloud.zookeeper.base-sleep-time-ms=1000
spring.cloud.zookeeper.max-sleep-ms=500
编写配置类
import org.springframework.beans.factory.annotation.Value;import org.springframework.cloud.zookeeper.ZookeeperProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZkConfiguration {
@Value("${spring.cloud.zookeeper.connect-string}")
private String zookeeperServer;
@Value("${spring.cloud.zookeeper.session-timeout-ms}")
private int sessionTimeoutMs;
@Value("${spring.cloud.zookeeper.connection-timeout-ms}")
private int connectionTimeoutMs;
@Value("${spring.cloud.zookeeper.max-retries}")
private int maxRetries;
@Value("${spring.cloud.zookeeper.base-sleep-time-ms}")
private int baseSleepTimeMs;
@Bean(initMethod = "init", destroyMethod = "stop")
public ZkClientUtil zkClient(ZookeeperProperties zookeeperProperties) {
ZkClientUtil zkClient = new ZkClientUtil(zookeeperProperties);
zkClient.setZookeeperServer(zookeeperProperties.getConnectString());
zkClient.setSessionTimeoutMs(sessionTimeoutMs);
zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
zkClient.setMaxRetries(zookeeperProperties.getMaxRetries());
zkClient.setBaseSleepTimeMs(zookeeperProperties.getBaseSleepTimeMs());
return zkClient;
}
}
编写工具类
import org.apache.commons.lang.StringUtils;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.zookeeper.ZookeeperProperties;
import java.io.File;
import java.net.InetAddress;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ZkClientUtil {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private CuratorFramework client;
private ZookeeperProperties zookeeperProperties;
public TreeCache cache;
private String zookeeperServer;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int baseSleepTimeMs;
private int maxRetries;
public ZkClientUtil(ZookeeperProperties zookeeperProperties) {
this.zookeeperProperties = zookeeperProperties;
}
public void setZookeeperServer(String zookeeperServer) {
this.zookeeperServer = zookeeperServer;
}
public String getZookeeperServer() {
return zookeeperServer;
}
public void setSessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
}
public int getSessionTimeoutMs() {
return sessionTimeoutMs;
}
public void setConnectionTimeoutMs(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
}
public int getConnectionTimeoutMs() {
return connectionTimeoutMs;
}
public void setBaseSleepTimeMs(int baseSleepTimeMs) {
this.baseSleepTimeMs = baseSleepTimeMs;
}
public int getBaseSleepTimeMs() {
return baseSleepTimeMs;
}
public void setMaxRetries(int maxRetries) {
this.maxRetries = maxRetries;
}
public int getMaxRetries() {
return maxRetries;
}
public void init() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
.sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
client.start();
}
public void stop() {
client.close();
}
public CuratorFramework getClient() {
return client;
}
public String register() {
String forPath = "";
try {
String rootPath = "/" + "services";
String hostAddress = InetAddress.getLocalHost().getHostAddress();
String serviceInstance = "prometheus" + "-" + hostAddress + "-";
forPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + serviceInstance);
} catch (Exception e) {
logger.error("注册出错", e);
}
return forPath;
}
public List<String> getChildren(String path) {
List<String> childrenList = new ArrayList<>();
try {
childrenList = client.getChildren().forPath(path);
} catch (Exception e) {
logger.error("获取子节点出错", e);
}
return childrenList;
}
public int getChildrenCount(String path) {
return getChildren(path).size();
}
public List<String> getInstances() {
return getChildren("/services");
}
public int getInstancesCount() {
return getInstances().size();
}
/**
* 初始化本地缓存
*
* @param watchRootPath
* @throws Exception
*/
private void initLocalCache(String watchRootPath) throws Exception {
cache = new TreeCache(client, watchRootPath);
TreeCacheListener listener = (client1, event) -> {
logger.info("event:" + event.getType() +
" |path:" + (null != event.getData() ? event.getData().getPath() : null));
if (event.getData() != null && event.getData().getData() != null) {
logger.info("发生变化的节点内容为:" + new String(event.getData().getData()));
}
};
cache.getListenable().addListener(listener);
cache.start();
}
/**
* 创建节点
*
* @param mode 节点类型
* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
* @param path 节点名称
* @param nodeData 节点数据
*/
public String createNode(CreateMode mode, String path, String nodeData) {
String forPath = "";
try {
if (StringUtils.isBlank(path)){
path = File.pathSeparatorChar + "";
} else {
if (!(path.startsWith("/") || path.startsWith("\"))) {
path = File.pathSeparatorChar + path;
}
}
//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
forPath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes("UTF-8"));
} catch (Exception e) {
logger.error("注册出错", e);
}
return forPath;
}
/**
* 创建节点
*
* @param mode 节点类型
* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。
* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失
* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除
* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。
* @param path 节点名称
*/
public String createNode(CreateMode mode, String path) {
String forPath = "";
try {
if (StringUtils.isBlank(path)){
path = File.pathSeparatorChar + "";
} else {
if (!(path.startsWith("/") || path.startsWith("\"))) {
path = File.pathSeparatorChar + path;
}
}
//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点
forPath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);
} catch (Exception e) {
logger.error("注册出错", e);
}
return forPath;
}
/**
* 删除节点数据
*
* @param path
*/
public void deleteNode(final String path) {
try {
deleteNode(path, true);
} catch (Exception ex) {
logger.error("", ex);
}
}
/**
* 删除节点数据
*
* @param path
* @param deleteChildre 是否删除子节点
*/
public void deleteNode(final String path, Boolean deleteChildre) {
try {
if (deleteChildre) {
//guaranteed()删除一个节点,强制保证删除,
// 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功
client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);
} else {
client.delete().guaranteed().forPath(path);
}
} catch (Exception e) {
logger.error("", e);
}
}
/**
* 设置指定节点的数据
*
* @param path
* @param datas
*/
public void setNodeData(String path, byte[] datas) {
try {
client.setData().forPath(path, datas);
} catch (Exception ex) {
logger.error("", ex);
}
}
/**
* 获取指定节点的数据
*
* @param path
* @return
*/
public byte[] getNodeData(String path) {
Byte[] bytes = null;
try {
if (cache != null) {
ChildData data = cache.getCurrentData(path);
if (data != null) {
return data.getData();
}
}
client.getData().forPath(path);
return client.getData().forPath(path);
} catch (Exception ex) {
logger.error("", ex);
}
return null;
}
/**
* 获取数据时先同步
*
* @param path
* @return
*/
public byte[] synNodeData(String path) {
client.sync();
return getNodeData(path);
}
/**
* 判断路径是否存在
*
* @param path
* @return
*/
public boolean isExistNode(final String path) {
client.sync();
try {
return null != client.checkExists().forPath(path);
} catch (Exception ex) {
logger.error("", ex);
return false;
}
}
/**
* 随机读取一个path子路径, "/"为根节点对应该namespace
* 先从cache中读取,如果没有,再从zookeeper中查询
*
* @param path
* @return
*/
public String getRandomData(String path) {
try {
Map<String, ChildData> cacheMap = cache.getCurrentChildren(path);
if (cacheMap != null && cacheMap.size() > 0) {
logger.debug("get random value from cache,path=" + path);
Collection<ChildData> values = cacheMap.values();
List<ChildData> list = new ArrayList<>(values);
Random rand = new Random();
byte[] b = list.get(rand.nextInt(list.size())).getData();
return new String(b, "utf-8");
}
if (isExistNode(path)) {
logger.debug("path [{}] is not exists,return null", path);
return null;
} else {
logger.debug("read random from zookeeper,path=" + path);
List<String> list = client.getChildren().forPath(path);
if (list == null || list.size() == 0) {
logger.debug("path [{}] has no children return null", path);
return null;
}
Random rand = new Random();
String child = list.get(rand.nextInt(list.size()));
path = path + "/" + child;
byte[] b = client.getData().forPath(path);
return new String(b, "utf-8");
}
} catch (Exception e) {
logger.error("", e);
}
return null;
}
/**
* 可重入共享锁 -- Shared Reentrant Lock
* @param lockPath
* @param time
* @param dealWork 获取
* @return
*/
/*public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
if (!lock.acquire(time, TimeUnit.SECONDS)) {
logger.error("get lock fail:{}", " could not acquire the lock");
return null;
}
logger.debug("{} get the lock",lockPath);
Object b = dealWork.deal();
return b;
}catch(Exception e){
logger.error("", e);
}finally{
try {
lock.release();
} catch (Exception e) {
//log.error("",e);
}
}
return null;
}*/
/**
* 获取读写锁
*
* @param path
* @return
*/
public InterProcessReadWriteLock getReadWriteLock(String path) {
return new InterProcessReadWriteLock(client, path);
}
/**
* 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理
*/
private ExecutorService pool = Executors.newFixedThreadPool(2);
/**
* 监听数据节点的变化情况
*
* @param watchPath
* @param listener
*/
public void watchPath(String watchPath, TreeCacheListener listener) {
// NodeCache nodeCache = new NodeCache(client, watchPath, false);
TreeCache cache = new TreeCache(client, watchPath);
cache.getListenable().addListener(listener, pool);
try {
cache.start();
} catch (Exception e) {
logger.error("", e);
}
}
}
测试
@Test public void lockTest1() {
final CuratorFramework client = zkClient.getClient();
//client.start();
final InterProcessMutex lock = new InterProcessMutex(client, "/curator/lock");
final CountDownLatch down = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
down.await();
lock.acquire();
} catch (Exception e) {
}
SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss.SSS");
String orderNo = format.format(new Date());
System.out.println(">>>>>>生成的订单号是: " + orderNo);
try {
lock.release();
} catch (Exception e) {
}
}
}).start();
}
down.countDown();
zkClient.stop();
}
InterProcessMutex类通过CuratorFramework和path构造,当执行了acquire方法的时候会在/curator/lock节点下生成尾号带顺序的节点.
zookeeper服务会获取/curator/lock节点下所有的等待锁的临时节点,最小的节点获得锁,并非最小的节点会等待上一个节点释放锁,释放锁的时候会删除临时节点,所以并非最小的节点会在小1的节点注册事件,一旦监听到事件,会立马唤醒所有等待的线程
以上是 springboot项目中zookeeper分布式锁的实现 的全部内容, 来源链接: utcz.com/z/516043.html