zookeeper生产最广泛使用java客户端curator介绍及其它客户端比较

java

  关于zookeeper的原理解析,可以参见zookeeper核心原理详解,本文所述大多数实践基于对zookeeper原理的首先理解。

  Curator是Netflix公司开源的一个Zookeeper客户端,目前是apache顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,相当于netty之于socket编程。提供了一套易用性和可读性更强的Fluent风格的客户端API框架。官网为http://curator.apache.org/

  除此之外,Curator中还提供了Zookeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计算器等)的抽象封装。所以说啊,不管是做底层库还是应用,用户体验真的很重要。

关于zookeeper的java客户端

  Zookeeper的官方客户端提供了基本的操作,比如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。但对于开发人员来说,Zookeeper提供的基本操纵还是有一些不足之处。典型的缺点为:

(1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;
(2)Session超时之后没有实现重连机制;
(3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
(4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
(5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
(6)删除节点无法实现级联删除;

  因此,产生了两款主流的三方zk客户端,ZkClient和Curator。第一个主流的三方zk客户端是ZkClient,由Datameer的工程师开发,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。像dubbo等框架对其也进行了集成使用。

  虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:

  • 几乎没有参考文档;
  • 异常处理简化(抛出RuntimeException);
  • 重试机制比较难用;
  • 没有提供各种使用场景的实现;

  注:除此之外,很多依赖zookeeper的中间件或大数据组件都配备了与之相适应的zookeeper客户端,例如hbase、hadoop、fabric8等。

  因此,除了早期集成外,目前新的框架和系统很少使用ZkClient,因此本文详细解析curator。如果读者对zkclient感兴趣,可以参考https://www.jianshu.com/p/d6de2d21d744去,其官网为https://github.com/sgroschupf/zkclient,已经基本不活跃了、更新极少且star不过千。

curator依赖添加

  Curator的Maven依赖如下:一般直接使用curator-recipes就行了,如果需要自己封装一些底层些的功能的话,例如增加连接管理重试机制等,则可以引入curator-framework包。client是低级api。

            <dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-recipes</artifactId>
<!--All of the recipes listed on the ZooKeeper recipes doc (except two phase commit).-->

<version>${apache-curator.version}</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-framework</artifactId>
<!-- High-level API that greatly simplifies using ZooKeeper. -->

<version>${apache-curator.version}</version>

</dependency>

<dependency>

<groupId>org.apache.curator</groupId>

<artifactId>curator-client</artifactId>
<!-- Low-level API -->

<version>${apache-curator.version}</version>

</dependency>

  最新版本可以从https://mvnrepository.com/artifact/org.apache.curator/curator-client查阅,不过需要注意的是,curator和zookeeper本身的依赖(尤其是zookeeper 3.4和3.5不兼容,导致的客户端也是不一样)对应关系。目前绝大多数使用2.x的版本。

典型的zk场景

Client操作

  利用Curator提供的客户端API,可以完全实现在zkCli.sh原生客户端的各种功能。值得注意的是,Curator采用流式风格API。准确的说是类似JPA化。由于针对zk/redis等的操作都相当简单,因此这种模式在这种场景下是比较合适的。如下:

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.retry.RetryNTimes;

/**

* Curator framework's client test.

* Output:

* $ create /zktest hello

* $ ls /

* [zktest, zookeeper]

* $ get /zktest

* hello

* $ set /zktest world

* $ get /zktest

* world

* $ delete /zktest

* $ ls /

* [zookeeper]

*/

public class CuratorClientTest {

/** Zookeeper info */

private static final String ZK_ADDRESS = "10.20.30.17:2181";

private static final String ZK_PATH = "/zktest";

public static void main(String[] args) throws Exception {

// 1.Connect to zk

CuratorFramework client = CuratorFrameworkFactory.newClient(

ZK_ADDRESS,

new RetryNTimes(10, 5000)

);

client.start();

System.out.println("zk client start successfully!");

// 2.Client API test

// 2.1 Create node

String data1 = "hello";

print("create", ZK_PATH, data1);

client.create().

creatingParentsIfNeeded().

forPath(ZK_PATH, data1.getBytes());

// 2.2 Get node and data

print("ls", "/");

print(client.getChildren().forPath("/"));

print("get", ZK_PATH);

print(client.getData().forPath(ZK_PATH));

// 2.3 Modify data

String data2 = "world";

print("set", ZK_PATH, data2);

client.setData().forPath(ZK_PATH, data2.getBytes());

print("get", ZK_PATH);

print(client.getData().forPath(ZK_PATH));

// 2.4 Remove node

print("delete", ZK_PATH);

client.delete().forPath(ZK_PATH);

print("ls", "/");

print(client.getChildren().forPath("/"));

}

private static void print(String... cmds) {

StringBuilder text = new StringBuilder("$ ");

for (String cmd : cmds) {

text.append(cmd).append(" ");

}

System.out.println(text.toString());

}

private static void print(Object result) {

System.out.println(

result instanceof byte[]

? new String((byte[]) result)

: result);

}

}

  详细的CuratorFramework功能及使用说明可参见https://curator.apache.org/curator-framework/index.html。

监听器

  Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.cache.ChildData;

import org.apache.curator.framework.recipes.cache.PathChildrenCache;

import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;

import org.apache.curator.retry.RetryNTimes;

/**

* Curator framework watch test.

*/

public class CuratorWatcherTest {

/** Zookeeper info */

private static final String ZK_ADDRESS = "192.168.1.100:2181";

private static final String ZK_PATH = "/zktest";

public static void main(String[] args) throws Exception {

// 1.Connect to zk

CuratorFramework client = CuratorFrameworkFactory.newClient(

ZK_ADDRESS,

new RetryNTimes(10, 5000)

);

client.start();

System.out.println("zk client start successfully!");

// 2.Register watcher

PathChildrenCache watcher = new PathChildrenCache(

client,

ZK_PATH,

true // if cache data

);

watcher.getListenable().addListener((client1, event) -> {

ChildData data = event.getData();

if (data == null) {

System.out.println("No data in event[" + event + "]");

} else {

System.out.println("Receive event: "

+ "type=[" + event.getType() + "]"

+ ", path=[" + data.getPath() + "]"

+ ", data=[" + new String(data.getData()) + "]"

+ ", stat=[" + data.getStat() + "]");

}

});

watcher.start(StartMode.BUILD_INITIAL_CACHE);

System.out.println("Register zk watcher successfully!");

Thread.sleep(Integer.MAX_VALUE);

}

}

输出如下:

Java: zk client start successfully!

Java: Register zk watcher successfully!

zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata

Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata

Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello

Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]

 下列两个系列称为Recipe(专题,关于这个recipe应该如何翻译,LZ做了研究,直译是菜谱,肯定不对,也有叫做攻略的,貌似也不正确,所以叫专题可能确实更合适),完整的curator recipe实现可参见https://curator.apache.org/curator-recipes/index.html。

分布式协调

  一般我们称分布式锁的时候,指的是短时的分布式锁,因此一般采用redis实现,而zk下的称之为分布式协调更合理,因为它通常时间更长。比如分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。大多数的分布式协调采用临时节点+watch机制实现。除了直接采用原始的监听器自己实现外,curator实现了分布式的IPM(进程间锁)。Curator的机制为:使用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。由于内部采用zookeeper的临时顺序节点特性,一旦客户端失去连接后,则就会自动清除该节点,redis则只能等待超时。

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import org.apache.curator.retry.RetryNTimes;

import java.util.concurrent.TimeUnit;

/**

* Curator framework's distributed lock test.

*/

public class CuratorDistrLockTest {

/** Zookeeper info */

private static final String ZK_ADDRESS = "192.168.1.100:2181";

private static final String ZK_LOCK_PATH = "/zktest";

public static void main(String[] args) throws InterruptedException {

// 1.Connect to zk

CuratorFramework client = CuratorFrameworkFactory.newClient(

ZK_ADDRESS,

new RetryNTimes(10, 5000)

);

client.start();

System.out.println("zk client start successfully!");

Thread t1 = new Thread(() -> {

doWithLock(client);

}, "t1");

Thread t2 = new Thread(() -> {

doWithLock(client);

}, "t2");

t1.start();

t2.start();

}

private static void doWithLock(CuratorFramework client) {

InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);

try {

if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {

System.out.println(Thread.currentThread().getName() + " hold lock");

Thread.sleep(5000L);

System.out.println(Thread.currentThread().getName() + " release lock");

}

} catch (Exception e) {

e.printStackTrace();

} finally {

try {

lock.release();

} catch (Exception e) {

e.printStackTrace();

}

}

}

}

   当然实际中会更加复杂,比如只是某些接口需要全局单点,但是服务的粒度又没有拆分到独立的微服务。另外,客户端宕机后锁是否自动释放也是要考虑的,否则其他节点就无法接管。InterProcessMutex的实现分析可以参考:https://www.jianshu.com/p/5fa6a1464076

Leader选举

  在分布式系统中,不少系统也采用和zk本身一样的leader/follower架构,因此存在leader选举的问题,例如es/kafka(注:在一般分布式系统中,并不会使用到该特性)。curator就包含了对应的解决方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.recipes.leader.LeaderSelector;

import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;

import org.apache.curator.framework.state.ConnectionState;

import org.apache.curator.retry.RetryNTimes;

import org.apache.curator.utils.EnsurePath;

/**

* Curator framework's leader election test.

* Output:

* LeaderSelector-2 take leadership!

* LeaderSelector-2 relinquish leadership!

* LeaderSelector-1 take leadership!

* LeaderSelector-1 relinquish leadership!

* LeaderSelector-0 take leadership!

* LeaderSelector-0 relinquish leadership!

* ...

*/

public class CuratorLeaderTest {

/** Zookeeper info */

private static final String ZK_ADDRESS = "192.168.1.100:2181";

private static final String ZK_PATH = "/zktest";

public static void main(String[] args) throws InterruptedException {

LeaderSelectorListener listener = new LeaderSelectorListener() {

@Override

public void takeLeadership(CuratorFramework client) throws Exception {

System.out.println(Thread.currentThread().getName() + " take leadership!");

// takeLeadership() method should only return when leadership is being relinquished.

Thread.sleep(5000L);

System.out.println(Thread.currentThread().getName() + " relinquish leadership!");

}

@Override

public void stateChanged(CuratorFramework client, ConnectionState state) {

}

};

new Thread(() -> {

registerListener(listener);

}).start();

new Thread(() -> {

registerListener(listener);

}).start();

new Thread(() -> {

registerListener(listener);

}).start();

Thread.sleep(Integer.MAX_VALUE);

}

private static void registerListener(LeaderSelectorListener listener) {

// 1.Connect to zk

CuratorFramework client = CuratorFrameworkFactory.newClient(

ZK_ADDRESS,

new RetryNTimes(10, 5000)

);

client.start();

// 2.Ensure path

try {

new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());

} catch (Exception e) {

e.printStackTrace();

}

// 3.Register listener

LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);

selector.autoRequeue();

selector.start();

}

}

以上是 zookeeper生产最广泛使用java客户端curator介绍及其它客户端比较 的全部内容, 来源链接: utcz.com/z/392721.html

回到顶部