JAVA造轮子之zookeeper节点操作工具类

编程

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
ZooKeeper的基本运转流程:
1、选举Leader。
2、同步数据。
3、选举Leader过程中算法有很多,但要达到的选举标准是一致的。
4、Leader要具有最高的执行ID,类似root权限。
5、集群中大多数的机器得到响应并接受选出的Leader。

以下示例主要以zk中的node节点进行管理与事件推送为主:基于java对zookeeper数据node操作封装成工具类;

pom.xml

<!-- 集成zookeeper -->

<dependency>

<groupId>org.apache.zookeeper</groupId>

<artifactId>zookeeper</artifactId>

<version>3.4.10</version>

</dependency>

ZkClientUtils.java

import org.apache.zookeeper.*;

import java.io.IOException;

import java.util.ArrayList;

import java.util.List;

/**

* @Description zookeeper节点工具管理类

* @Version V1.0

*/

public class ZkClientUtils implements Watcher {

/**

CreateMode类型分为4种

1.PERSISTENT--持久型

2.PERSISTENT_SEQUENTIAL--持久顺序型

3.EPHEMERAL--临时型

4.EPHEMERAL_SEQUENTIAL--临时顺序型

注:

1、2种类型客户端断开后不会消失

3、4种类型客户端断开后超时时间内没有新的连接节点将会消息

*/

private volatile static ZkClientUtils zkClient = null;

private static String hostPort = "192.168.1.101:2181";

private ZooKeeper zk = null;

private NodeDataFunction nodeDataFunction = null;

private ZkClientUtils(){

this.initZk();

}

/**

* 初始化对象

* @return

*/

public static ZkClientUtils getInstance(){

if (zkClient == null){

synchronized (ZkClientUtils.class) {

if (zkClient == null)

zkClient = new ZkClientUtils();

}

}

return zkClient;

}

/**

* 初始化zk连接对象

*/

private void initZk(){

try {

zk = new ZooKeeper(hostPort, 2000, this);

}catch(IOException e){

e.printStackTrace();

}

}

/**

* 初始化zk节点

* @param path

*/

public void initZkNode(String path){

this.run(()->{

if (zk.exists(path, this) == null) {

zk.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

}

List<String> zooChildren = listNodes(path);

if (zooChildren != null){

System.out.println("Znodes of ""+path+"": ");

zooChildren.forEach(o-> System.out.println(o));

}

});

}

/**

* 显示当前path下所有子节点

* @param path

* @return

*/

public List<String> listNodes(String path){

List<String> lists = new ArrayList<>();

this.run(()->lists.addAll(zk.getChildren(path, this, null)));

return lists;

}

/**

* 创建节点

* @param path

* @param data

*/

public void createNode(String path, String data){

this.run(()->{

if (zk.exists(path, this) == null) {

zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//持久保存

} else {

zk.setData(path, data.getBytes(), -1);

}

});

}

/**

* 获取节点下的数据

* @param path

* @return

*/

public String getData(String path){

String data = "";

this.run(()->{

if (zk.exists(path, this) != null) {

data.concat(new String(zk.getData(path, this, null)));

}

});

return data;

}

/**

* 设置节点数据

* @param path

* @param data

*/

public void setData(String path, String data){

createNode(path, data);

}

/**

* 删除节点

* @param path

*/

public void deleteNode(String path){

this.run(()->zk.delete(path, -1));

}

/**

* 关闭ZK连接(建议在服务需停止时,才去关闭,否则应一直保持zk的连接是open的)

*/

public void close(){

if (zk != null){

try{

zk.close();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

zkClient = null;

}

/**

* 执行节点逻辑

* @param interFunction

*/

private void run(InterFunction interFunction){

try{

interFunction.execute();

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

/**

* 注册业务对象

* @param nodeDataFunction

*/

public void register(NodeDataFunction nodeDataFunction){

this.nodeDataFunction = nodeDataFunction;

}

/**

* 该方法由Zk服务事件主动回调

* @param watchedEvent

*/

@Override

public void process(WatchedEvent watchedEvent) {

//默认只有注册了nodeDataFunction接口的方法,由外部的调用代码块中才能执行,否则得单独实现事件逻辑

if (nodeDataFunction != null) {

try {

nodeDataFunction.execute(watchedEvent, zk, this);

} catch (KeeperException e) {

e.printStackTrace();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

@FunctionalInterface

interface InterFunction{

void execute() throws KeeperException, InterruptedException;

}

@FunctionalInterface

interface NodeDataFunction{

void execute(WatchedEvent watchedEvent, ZooKeeper zk, Watcher watcher) throws KeeperException, InterruptedException;

}

/**

* 测试,事件推送

* @param args

*/

public static void main(String[] args) {

ZkClientUtils zkClientUtils = ZkClientUtils.getInstance();

String path = "/zk_test_node";

zkClientUtils.initZkNode(path);

NodeDataFunction nodeData = (watchedEvent, monitorZk, watcher) -> {

System.out.println(watchedEvent.getType());

if (watchedEvent.getType() == Event.EventType.NodeChildrenChanged) {

System.out.println("发生节点变更...");

List<String> zooChildren = monitorZk.getChildren(path, watcher, null);

System.out.println("Znodes of ""+path+"": ");

for (String child: zooChildren) {

System.out.println(child);

}

}else if (watchedEvent.getType() == Event.EventType.NodeDataChanged) {

System.out.println("发生数据变更...");

byte[] data = monitorZk.getData(path, watcher, null);

System.out.printf("Current Data @ ZK Path %s: %s

", path, new String(data));

} else if (watchedEvent.getType() == Event.EventType.NodeDeleted) {

System.out.println("发生数据删除...");

}

System.out.println();

};

zkClientUtils.register(nodeData);

// List<String> lists = zkClientUtils.listNodes(path);

// lists.forEach(o-> System.out.println(o));

while(true){

try{

Thread.sleep(1000);

}catch (Exception e){

}

}

}

}

说明:

做过项目的人都知道,很多写过的可重复利用的代码块或有用的工具类没有怎么整理,当需要的时候,又得打开项目查找一翻,虽然功能开发不难,但是又得花时间成本去写去测试,这样的重复造轮子的事情太多次了;因此不如把轮子保留,供大家一起使用;

1.这个轮子可以有:需要使用的时候确实还不存在这个组件。
2.我需要的时候轮子不在:每一种技术或工具产生都有它的项目背景,当代码写在项目里的时候,我知道有这个东西,当换了一个项目或公司后,没有备份也没有记录,这个时候你不在了,又得花时间手打一遍;
3.我不知道是不是造轮子:大多数情况下初学者很难分清楚自己是不是在重复造轮子,事实上造轮子不是我目的。我的目的是完成工作任务,任务完成的速度越快越好,质量越高越好。而不是去判断自己在不在造轮子。
4.不想重复花时间造轮子:有时候还会碰到一些并不困难但是很占时间的东西,当然有现成的轮子是花时间最少的;
5.我就是想学习轮子:初学者的并不是在重复造轮子,而是学习后以提高为自己的知识与技能。

轮子有过测试,但难免有失误,如有错误处,还敬请指出;

以上是 JAVA造轮子之zookeeper节点操作工具类 的全部内容, 来源链接: utcz.com/z/519211.html

回到顶部