JAVA造轮子之zookeeper节点操作工具类
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的基本运转流程:
1、选举Leader。
2、同步数据。
3、选举Leader过程中算法有很多,但要达到的选举标准是一致的。
4、Leader要具有最高的执行ID,类似root权限。
5、集群中大多数的机器得到响应并接受选出的Leader。
以下示例主要以zk中的node节点进行管理与事件推送为主:基于java对zookeeper数据node操作封装成工具类;
pom.xml
<!-- 集成zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
ZkClientUtils.java
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @Description zookeeper节点工具管理类
* @Version V1.0
*/
public class ZkClientUtils implements Watcher {
/**
CreateMode类型分为4种
1.PERSISTENT--持久型
2.PERSISTENT_SEQUENTIAL--持久顺序型
3.EPHEMERAL--临时型
4.EPHEMERAL_SEQUENTIAL--临时顺序型
注:
1、2种类型客户端断开后不会消失
3、4种类型客户端断开后超时时间内没有新的连接节点将会消息
*/
private volatile static ZkClientUtils zkClient = null;
private static String hostPort = "192.168.1.101:2181";
private ZooKeeper zk = null;
private NodeDataFunction nodeDataFunction = null;
private ZkClientUtils(){
this.initZk();
}
/**
* 初始化对象
* @return
*/
public static ZkClientUtils getInstance(){
if (zkClient == null){
synchronized (ZkClientUtils.class) {
if (zkClient == null)
zkClient = new ZkClientUtils();
}
}
return zkClient;
}
/**
* 初始化zk连接对象
*/
private void initZk(){
try {
zk = new ZooKeeper(hostPort, 2000, this);
}catch(IOException e){
e.printStackTrace();
}
}
/**
* 初始化zk节点
* @param path
*/
public void initZkNode(String path){
this.run(()->{
if (zk.exists(path, this) == null) {
zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List<String> zooChildren = listNodes(path);
if (zooChildren != null){
System.out.println("Znodes of ""+path+"": ");
zooChildren.forEach(o-> System.out.println(o));
}
});
}
/**
* 显示当前path下所有子节点
* @param path
* @return
*/
public List<String> listNodes(String path){
List<String> lists = new ArrayList<>();
this.run(()->lists.addAll(zk.getChildren(path, this, null)));
return lists;
}
/**
* 创建节点
* @param path
* @param data
*/
public void createNode(String path, String data){
this.run(()->{
if (zk.exists(path, this) == null) {
zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//持久保存
} else {
zk.setData(path, data.getBytes(), -1);
}
});
}
/**
* 获取节点下的数据
* @param path
* @return
*/
public String getData(String path){
String data = "";
this.run(()->{
if (zk.exists(path, this) != null) {
data.concat(new String(zk.getData(path, this, null)));
}
});
return data;
}
/**
* 设置节点数据
* @param path
* @param data
*/
public void setData(String path, String data){
createNode(path, data);
}
/**
* 删除节点
* @param path
*/
public void deleteNode(String path){
this.run(()->zk.delete(path, -1));
}
/**
* 关闭ZK连接(建议在服务需停止时,才去关闭,否则应一直保持zk的连接是open的)
*/
public void close(){
if (zk != null){
try{
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
zkClient = null;
}
/**
* 执行节点逻辑
* @param interFunction
*/
private void run(InterFunction interFunction){
try{
interFunction.execute();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 注册业务对象
* @param nodeDataFunction
*/
public void register(NodeDataFunction nodeDataFunction){
this.nodeDataFunction = nodeDataFunction;
}
/**
* 该方法由Zk服务事件主动回调
* @param watchedEvent
*/
@Override
public void process(WatchedEvent watchedEvent) {
//默认只有注册了nodeDataFunction接口的方法,由外部的调用代码块中才能执行,否则得单独实现事件逻辑
if (nodeDataFunction != null) {
try {
nodeDataFunction.execute(watchedEvent, zk, this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@FunctionalInterface
interface InterFunction{
void execute() throws KeeperException, InterruptedException;
}
@FunctionalInterface
interface NodeDataFunction{
void execute(WatchedEvent watchedEvent, ZooKeeper zk, Watcher watcher) throws KeeperException, InterruptedException;
}
/**
* 测试,事件推送
* @param args
*/
public static void main(String[] args) {
ZkClientUtils zkClientUtils = ZkClientUtils.getInstance();
String path = "/zk_test_node";
zkClientUtils.initZkNode(path);
NodeDataFunction nodeData = (watchedEvent, monitorZk, watcher) -> {
System.out.println(watchedEvent.getType());
if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {
System.out.println("发生节点变更...");
List<String> zooChildren = monitorZk.getChildren(path, watcher, null);
System.out.println("Znodes of ""+path+"": ");
for (String child: zooChildren) {
System.out.println(child);
}
}else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {
System.out.println("发生数据变更...");
byte[] data = monitorZk.getData(path, watcher, null);
System.out.printf("Current Data @ ZK Path %s: %s
", path, new String(data));
} else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {
System.out.println("发生数据删除...");
}
System.out.println();
};
zkClientUtils.register(nodeData);
// List<String> lists = zkClientUtils.listNodes(path);
// lists.forEach(o-> System.out.println(o));
while(true){
try{
Thread.sleep(1000);
}catch (Exception e){
}
}
}
}
说明:
做过项目的人都知道,很多写过的可重复利用的代码块或有用的工具类没有怎么整理,当需要的时候,又得打开项目查找一翻,虽然功能开发不难,但是又得花时间成本去写去测试,这样的重复造轮子的事情太多次了;因此不如把轮子保留,供大家一起使用;
1.这个轮子可以有:需要使用的时候确实还不存在这个组件。
2.我需要的时候轮子不在:每一种技术或工具产生都有它的项目背景,当代码写在项目里的时候,我知道有这个东西,当换了一个项目或公司后,没有备份也没有记录,这个时候你不在了,又得花时间手打一遍;
3.我不知道是不是造轮子:大多数情况下初学者很难分清楚自己是不是在重复造轮子,事实上造轮子不是我目的。我的目的是完成工作任务,任务完成的速度越快越好,质量越高越好。而不是去判断自己在不在造轮子。
4.不想重复花时间造轮子:有时候还会碰到一些并不困难但是很占时间的东西,当然有现成的轮子是花时间最少的;
5.我就是想学习轮子:初学者的并不是在重复造轮子,而是学习后以提高为自己的知识与技能。
轮子有过测试,但难免有失误,如有错误处,还敬请指出;
以上是 JAVA造轮子之zookeeper节点操作工具类 的全部内容, 来源链接: utcz.com/z/519211.html