springboot项目中zookeeper分布式锁的实现

编程

导入开发包

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>

<version>2.2.1.RELEASE</version>

</dependency>

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.6.0</version>

<exclusions>

<exclusion>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-log4j12</artifactId>

</exclusion>

<exclusion>

<groupId>org.slf4j</groupId>

<artifactId>slf4j-api</artifactId>

</exclusion>

</exclusions>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>

<version>4.3.0</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-client</artifactId>

<version>4.3.0</version>

</dependency>​

添加配置项

spring.cloud.zookeeper.session-timeout-ms=6000

spring.cloud.zookeeper.connection-timeout-ms=6000

#spring.cloud.zookeeper.discovery.register=true

#spring.cloud.zookeeper.discovery.root=/

#spring.cloud.zookeeper.discovery.enabled=true

#spring.cloud.zookeeper.discovery.instanceHost=127.0.0.1

#spring.cloud.zookeeper.prefix=

spring.cloud.zookeeper.enabled=true

spring.cloud.zookeeper.connect-string=127.0.0.1:2181

spring.cloud.zookeeper.max-retries=3

spring.cloud.zookeeper.base-sleep-time-ms=1000

spring.cloud.zookeeper.max-sleep-ms=500

编写配置类

import org.springframework.beans.factory.annotation.Value;

import org.springframework.cloud.zookeeper.ZookeeperProperties;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class ZkConfiguration {

@Value("${spring.cloud.zookeeper.connect-string}")

private String zookeeperServer;

@Value("${spring.cloud.zookeeper.session-timeout-ms}")

private int sessionTimeoutMs;

@Value("${spring.cloud.zookeeper.connection-timeout-ms}")

private int connectionTimeoutMs;

@Value("${spring.cloud.zookeeper.max-retries}")

private int maxRetries;

@Value("${spring.cloud.zookeeper.base-sleep-time-ms}")

private int baseSleepTimeMs;

@Bean(initMethod = "init", destroyMethod = "stop")

public ZkClientUtil zkClient(ZookeeperProperties zookeeperProperties) {

ZkClientUtil zkClient = new ZkClientUtil(zookeeperProperties);

zkClient.setZookeeperServer(zookeeperProperties.getConnectString());

zkClient.setSessionTimeoutMs(sessionTimeoutMs);

zkClient.setConnectionTimeoutMs(connectionTimeoutMs);

zkClient.setMaxRetries(zookeeperProperties.getMaxRetries());

zkClient.setBaseSleepTimeMs(zookeeperProperties.getBaseSleepTimeMs());

return zkClient;

}

}

编写工具类

import org.apache.commons.lang.StringUtils;

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.cache.ChildData;

import org.apache.curator.framework.recipes.cache.TreeCache;

import org.apache.curator.framework.recipes.cache.TreeCacheListener;

import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.cloud.zookeeper.ZookeeperProperties;

import java.io.File;

import java.net.InetAddress;

import java.util.*;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class ZkClientUtil {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private CuratorFramework client;

private ZookeeperProperties zookeeperProperties;

public TreeCache cache;

private String zookeeperServer;

private int sessionTimeoutMs;

private int connectionTimeoutMs;

private int baseSleepTimeMs;

private int maxRetries;

public ZkClientUtil(ZookeeperProperties zookeeperProperties) {

this.zookeeperProperties = zookeeperProperties;

}

public void setZookeeperServer(String zookeeperServer) {

this.zookeeperServer = zookeeperServer;

}

public String getZookeeperServer() {

return zookeeperServer;

}

public void setSessionTimeoutMs(int sessionTimeoutMs) {

this.sessionTimeoutMs = sessionTimeoutMs;

}

public int getSessionTimeoutMs() {

return sessionTimeoutMs;

}

public void setConnectionTimeoutMs(int connectionTimeoutMs) {

this.connectionTimeoutMs = connectionTimeoutMs;

}

public int getConnectionTimeoutMs() {

return connectionTimeoutMs;

}

public void setBaseSleepTimeMs(int baseSleepTimeMs) {

this.baseSleepTimeMs = baseSleepTimeMs;

}

public int getBaseSleepTimeMs() {

return baseSleepTimeMs;

}

public void setMaxRetries(int maxRetries) {

this.maxRetries = maxRetries;

}

public int getMaxRetries() {

return maxRetries;

}

public void init() {

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);

client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)

.sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();

client.start();

}

public void stop() {

client.close();

}

public CuratorFramework getClient() {

return client;

}

public String register() {

String forPath = "";

try {

String rootPath = "/" + "services";

String hostAddress = InetAddress.getLocalHost().getHostAddress();

String serviceInstance = "prometheus" + "-" + hostAddress + "-";

forPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(rootPath + "/" + serviceInstance);

} catch (Exception e) {

logger.error("注册出错", e);

}

return forPath;

}

public List<String> getChildren(String path) {

List<String> childrenList = new ArrayList<>();

try {

childrenList = client.getChildren().forPath(path);

} catch (Exception e) {

logger.error("获取子节点出错", e);

}

return childrenList;

}

public int getChildrenCount(String path) {

return getChildren(path).size();

}

public List<String> getInstances() {

return getChildren("/services");

}

public int getInstancesCount() {

return getInstances().size();

}

/**

* 初始化本地缓存

*

* @param watchRootPath

* @throws Exception

*/

private void initLocalCache(String watchRootPath) throws Exception {

cache = new TreeCache(client, watchRootPath);

TreeCacheListener listener = (client1, event) -> {

logger.info("event:" + event.getType() +

" |path:" + (null != event.getData() ? event.getData().getPath() : null));

if (event.getData() != null && event.getData().getData() != null) {

logger.info("发生变化的节点内容为:" + new String(event.getData().getData()));

}

};

cache.getListenable().addListener(listener);

cache.start();

}

/**

* 创建节点

*

* @param mode 节点类型

* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。

* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失

* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除

* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。

* @param path 节点名称

* @param nodeData 节点数据

*/

public String createNode(CreateMode mode, String path, String nodeData) {

String forPath = "";

try {

if (StringUtils.isBlank(path)){

path = File.pathSeparatorChar + "";

} else {

if (!(path.startsWith("/") || path.startsWith("\"))) {

path = File.pathSeparatorChar + path;

}

}

//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点

forPath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path, nodeData.getBytes("UTF-8"));

} catch (Exception e) {

logger.error("注册出错", e);

}

return forPath;

}

/**

* 创建节点

*

* @param mode 节点类型

* 1、PERSISTENT 持久化目录节点,存储的数据不会丢失。

* 2、PERSISTENT_SEQUENTIAL顺序自动编号的持久化目录节点,存储的数据不会丢失

* 3、EPHEMERAL临时目录节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除

* 4、EPHEMERAL_SEQUENTIAL临时自动编号节点,一旦创建这个节点的客户端与服务器端口也就是session 超时,这种节点会被自动删除,并且根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名。

* @param path 节点名称

*/

public String createNode(CreateMode mode, String path) {

String forPath = "";

try {

if (StringUtils.isBlank(path)){

path = File.pathSeparatorChar + "";

} else {

if (!(path.startsWith("/") || path.startsWith("\"))) {

path = File.pathSeparatorChar + path;

}

}

//使用creatingParentContainersIfNeeded()之后Curator能够自动递归创建所有所需的父节点

forPath = client.create().creatingParentsIfNeeded().withMode(mode).forPath(path);

} catch (Exception e) {

logger.error("注册出错", e);

}

return forPath;

}

/**

* 删除节点数据

*

* @param path

*/

public void deleteNode(final String path) {

try {

deleteNode(path, true);

} catch (Exception ex) {

logger.error("", ex);

}

}

/**

* 删除节点数据

*

* @param path

* @param deleteChildre 是否删除子节点

*/

public void deleteNode(final String path, Boolean deleteChildre) {

try {

if (deleteChildre) {

//guaranteed()删除一个节点,强制保证删除,

// 只要客户端会话有效,那么Curator会在后台持续进行删除操作,直到删除节点成功

client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);

} else {

client.delete().guaranteed().forPath(path);

}

} catch (Exception e) {

logger.error("", e);

}

}

/**

* 设置指定节点的数据

*

* @param path

* @param datas

*/

public void setNodeData(String path, byte[] datas) {

try {

client.setData().forPath(path, datas);

} catch (Exception ex) {

logger.error("", ex);

}

}

/**

* 获取指定节点的数据

*

* @param path

* @return

*/

public byte[] getNodeData(String path) {

Byte[] bytes = null;

try {

if (cache != null) {

ChildData data = cache.getCurrentData(path);

if (data != null) {

return data.getData();

}

}

client.getData().forPath(path);

return client.getData().forPath(path);

} catch (Exception ex) {

logger.error("", ex);

}

return null;

}

/**

* 获取数据时先同步

*

* @param path

* @return

*/

public byte[] synNodeData(String path) {

client.sync();

return getNodeData(path);

}

/**

* 判断路径是否存在

*

* @param path

* @return

*/

public boolean isExistNode(final String path) {

client.sync();

try {

return null != client.checkExists().forPath(path);

} catch (Exception ex) {

logger.error("", ex);

return false;

}

}

/**

* 随机读取一个path子路径, "/"为根节点对应该namespace

* 先从cache中读取,如果没有,再从zookeeper中查询

*

* @param path

* @return

*/

public String getRandomData(String path) {

try {

Map<String, ChildData> cacheMap = cache.getCurrentChildren(path);

if (cacheMap != null && cacheMap.size() > 0) {

logger.debug("get random value from cache,path=" + path);

Collection<ChildData> values = cacheMap.values();

List<ChildData> list = new ArrayList<>(values);

Random rand = new Random();

byte[] b = list.get(rand.nextInt(list.size())).getData();

return new String(b, "utf-8");

}

if (isExistNode(path)) {

logger.debug("path [{}] is not exists,return null", path);

return null;

} else {

logger.debug("read random from zookeeper,path=" + path);

List<String> list = client.getChildren().forPath(path);

if (list == null || list.size() == 0) {

logger.debug("path [{}] has no children return null", path);

return null;

}

Random rand = new Random();

String child = list.get(rand.nextInt(list.size()));

path = path + "/" + child;

byte[] b = client.getData().forPath(path);

return new String(b, "utf-8");

}

} catch (Exception e) {

logger.error("", e);

}

return null;

}

/**

* 可重入共享锁 -- Shared Reentrant Lock

* @param lockPath

* @param time

* @param dealWork 获取

* @return

*/

/*public Object getSRLock(String lockPath,long time, SRLockDealCallback<?> dealWork){

InterProcessMutex lock = new InterProcessMutex(client, lockPath);

try {

if (!lock.acquire(time, TimeUnit.SECONDS)) {

logger.error("get lock fail:{}", " could not acquire the lock");

return null;

}

logger.debug("{} get the lock",lockPath);

Object b = dealWork.deal();

return b;

}catch(Exception e){

logger.error("", e);

}finally{

try {

lock.release();

} catch (Exception e) {

//log.error("",e);

}

}

return null;

}*/

/**

* 获取读写锁

*

* @param path

* @return

*/

public InterProcessReadWriteLock getReadWriteLock(String path) {

return new InterProcessReadWriteLock(client, path);

}

/**

* 在注册监听器的时候,如果传入此参数,当事件触发时,逻辑由线程池处理

*/

private ExecutorService pool = Executors.newFixedThreadPool(2);

/**

* 监听数据节点的变化情况

*

* @param watchPath

* @param listener

*/

public void watchPath(String watchPath, TreeCacheListener listener) {

// NodeCache nodeCache = new NodeCache(client, watchPath, false);

TreeCache cache = new TreeCache(client, watchPath);

cache.getListenable().addListener(listener, pool);

try {

cache.start();

} catch (Exception e) {

logger.error("", e);

}

}

}

测试

    @Test

public void lockTest1() {

final CuratorFramework client = zkClient.getClient();

//client.start();

final InterProcessMutex lock = new InterProcessMutex(client, "/curator/lock");

final CountDownLatch down = new CountDownLatch(1);

for (int i = 0; i < 10; i++) {

new Thread(new Runnable() {

@Override

public void run() {

try {

down.await();

lock.acquire();

} catch (Exception e) {

}

SimpleDateFormat format = new SimpleDateFormat("HH:mm:ss.SSS");

String orderNo = format.format(new Date());

System.out.println(">>>>>>生成的订单号是: " + orderNo);

try {

lock.release();

} catch (Exception e) {

}

}

}).start();

}

down.countDown();

zkClient.stop();

}

InterProcessMutex类通过CuratorFramework和path构造,当执行了acquire方法的时候会在/curator/lock节点下生成尾号带顺序的节点.
zookeeper服务会获取/curator/lock节点下所有的等待锁的临时节点,最小的节点获得锁,并非最小的节点会等待上一个节点释放锁,释放锁的时候会删除临时节点,所以并非最小的节点会在小1的节点注册事件,一旦监听到事件,会立马唤醒所有等待的线程

 

以上是 springboot项目中zookeeper分布式锁的实现 的全部内容, 来源链接: utcz.com/z/516043.html

回到顶部