zookeeper生产最广泛使用java客户端curator介绍及其它客户端比较
关于zookeeper的原理解析,可以参见zookeeper核心原理详解,本文所述大多数实践基于对zookeeper原理的首先理解。
Curator是Netflix公司开源的一个Zookeeper客户端,目前是apache顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量,相当于netty之于socket编程。提供了一套易用性和可读性更强的Fluent风格的客户端API框架。官网为http://curator.apache.org/
除此之外,Curator中还提供了Zookeeper各种应用场景(Recipe,如共享锁服务、Master选举机制和分布式计算器等)的抽象封装。所以说啊,不管是做底层库还是应用,用户体验真的很重要。
关于zookeeper的java客户端
Zookeeper的官方客户端提供了基本的操作,比如,创建会话、创建节点、读取节点、更新数据、删除节点和检查节点是否存在等。但对于开发人员来说,Zookeeper提供的基本操纵还是有一些不足之处。典型的缺点为:
(1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;
(2)Session超时之后没有实现重连机制;
(3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
(4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
(5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
(6)删除节点无法实现级联删除;
因此,产生了两款主流的三方zk客户端,ZkClient和Curator。第一个主流的三方zk客户端是ZkClient,由Datameer的工程师开发,对Zookeeper的原生API进行了包装,实现了超时重连、Watcher反复注册等功能。像dubbo等框架对其也进行了集成使用。
虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:
- 几乎没有参考文档;
- 异常处理简化(抛出RuntimeException);
- 重试机制比较难用;
- 没有提供各种使用场景的实现;
注:除此之外,很多依赖zookeeper的中间件或大数据组件都配备了与之相适应的zookeeper客户端,例如hbase、hadoop、fabric8等。
因此,除了早期集成外,目前新的框架和系统很少使用ZkClient,因此本文详细解析curator。如果读者对zkclient感兴趣,可以参考https://www.jianshu.com/p/d6de2d21d744去,其官网为https://github.com/sgroschupf/zkclient,已经基本不活跃了、更新极少且star不过千。
curator依赖添加
Curator的Maven依赖如下:一般直接使用curator-recipes就行了,如果需要自己封装一些底层些的功能的话,例如增加连接管理重试机制等,则可以引入curator-framework包。client是低级api。
<dependency><groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<!--All of the recipes listed on the ZooKeeper recipes doc (except two phase commit).--><version>${apache-curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<!-- High-level API that greatly simplifies using ZooKeeper. --><version>${apache-curator.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<!-- Low-level API --><version>${apache-curator.version}</version>
</dependency>
最新版本可以从https://mvnrepository.com/artifact/org.apache.curator/curator-client查阅,不过需要注意的是,curator和zookeeper本身的依赖(尤其是zookeeper 3.4和3.5不兼容,导致的客户端也是不一样)对应关系。目前绝大多数使用2.x的版本。
典型的zk场景
Client操作
利用Curator提供的客户端API,可以完全实现在zkCli.sh原生客户端的各种功能。值得注意的是,Curator采用流式风格API。准确的说是类似JPA化。由于针对zk/redis等的操作都相当简单,因此这种模式在这种场景下是比较合适的。如下:
import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
/**
* Curator framework's client test.
* Output:
* $ create /zktest hello
* $ ls /
* [zktest, zookeeper]
* $ get /zktest
* hello
* $ set /zktest world
* $ get /zktest
* world
* $ delete /zktest
* $ ls /
* [zookeeper]
*/
public class CuratorClientTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "10.20.30.17:2181";
private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
// 2.Client API test
// 2.1 Create node
String data1 = "hello";
print("create", ZK_PATH, data1);
client.create().
creatingParentsIfNeeded().
forPath(ZK_PATH, data1.getBytes());
// 2.2 Get node and data
print("ls", "/");
print(client.getChildren().forPath("/"));
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
// 2.3 Modify data
String data2 = "world";
print("set", ZK_PATH, data2);
client.setData().forPath(ZK_PATH, data2.getBytes());
print("get", ZK_PATH);
print(client.getData().forPath(ZK_PATH));
// 2.4 Remove node
print("delete", ZK_PATH);
client.delete().forPath(ZK_PATH);
print("ls", "/");
print(client.getChildren().forPath("/"));
}
private static void print(String... cmds) {
StringBuilder text = new StringBuilder("$ ");
for (String cmd : cmds) {
text.append(cmd).append(" ");
}
System.out.println(text.toString());
}
private static void print(Object result) {
System.out.println(
result instanceof byte[]
? new String((byte[]) result)
: result);
}
}
详细的CuratorFramework功能及使用说明可参见https://curator.apache.org/curator-framework/index.html。
监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
- Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
- Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
- Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.RetryNTimes;
/**
* Curator framework watch test.
*/
public class CuratorWatcherTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws Exception {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
// 2.Register watcher
PathChildrenCache watcher = new PathChildrenCache(
client,
ZK_PATH,
true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(StartMode.BUILD_INITIAL_CACHE);
System.out.println("Register zk watcher successfully!");
Thread.sleep(Integer.MAX_VALUE);
}
}
输出如下:
Java: zk client start successfully!Java: Register zk watcher successfully!
zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata
Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121]
zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata
Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]
zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello
Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]
下列两个系列称为Recipe(专题,关于这个recipe应该如何翻译,LZ做了研究,直译是菜谱,肯定不对,也有叫做攻略的,貌似也不正确,所以叫专题可能确实更合适),完整的curator recipe实现可参见https://curator.apache.org/curator-recipes/index.html。
分布式协调
一般我们称分布式锁的时候,指的是短时的分布式锁,因此一般采用redis实现,而zk下的称之为分布式协调更合理,因为它通常时间更长。比如分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。大多数的分布式协调采用临时节点+watch机制实现。除了直接采用原始的监听器自己实现外,curator实现了分布式的IPM(进程间锁)。Curator的机制为:使用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。由于内部采用zookeeper的临时顺序节点特性,一旦客户端失去连接后,则就会自动清除该节点,redis则只能等待超时。
import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes;
import java.util.concurrent.TimeUnit;
/**
* Curator framework's distributed lock test.
*/
public class CuratorDistrLockTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_LOCK_PATH = "/zktest";
public static void main(String[] args) throws InterruptedException {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
System.out.println("zk client start successfully!");
Thread t1 = new Thread(() -> {
doWithLock(client);
}, "t1");
Thread t2 = new Thread(() -> {
doWithLock(client);
}, "t2");
t1.start();
t2.start();
}
private static void doWithLock(CuratorFramework client) {
InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH);
try {
if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + " hold lock");
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " release lock");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
lock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
当然实际中会更加复杂,比如只是某些接口需要全局单点,但是服务的粒度又没有拆分到独立的微服务。另外,客户端宕机后锁是否自动释放也是要考虑的,否则其他节点就无法接管。InterProcessMutex的实现分析可以参考:https://www.jianshu.com/p/5fa6a1464076
Leader选举
在分布式系统中,不少系统也采用和zk本身一样的leader/follower架构,因此存在leader选举的问题,例如es/kafka(注:在一般分布式系统中,并不会使用到该特性)。curator就包含了对应的解决方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
/**
* Curator framework's leader election test.
* Output:
* LeaderSelector-2 take leadership!
* LeaderSelector-2 relinquish leadership!
* LeaderSelector-1 take leadership!
* LeaderSelector-1 relinquish leadership!
* LeaderSelector-0 take leadership!
* LeaderSelector-0 relinquish leadership!
* ...
*/
public class CuratorLeaderTest {
/** Zookeeper info */
private static final String ZK_ADDRESS = "192.168.1.100:2181";
private static final String ZK_PATH = "/zktest";
public static void main(String[] args) throws InterruptedException {
LeaderSelectorListener listener = new LeaderSelectorListener() {
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(Thread.currentThread().getName() + " take leadership!");
// takeLeadership() method should only return when leadership is being relinquished.
Thread.sleep(5000L);
System.out.println(Thread.currentThread().getName() + " relinquish leadership!");
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
}
};
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
new Thread(() -> {
registerListener(listener);
}).start();
Thread.sleep(Integer.MAX_VALUE);
}
private static void registerListener(LeaderSelectorListener listener) {
// 1.Connect to zk
CuratorFramework client = CuratorFrameworkFactory.newClient(
ZK_ADDRESS,
new RetryNTimes(10, 5000)
);
client.start();
// 2.Ensure path
try {
new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());
} catch (Exception e) {
e.printStackTrace();
}
// 3.Register listener
LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);
selector.autoRequeue();
selector.start();
}
}
以上是 zookeeper生产最广泛使用java客户端curator介绍及其它客户端比较 的全部内容, 来源链接: utcz.com/z/392721.html