java连接zookeeper实现zookeeper教程
作者:长河 发布时间:2022-09-19 03:04:35
标签:java,zookeeper
java连接zookeeper实现zookeeper
Java服务端连接Zookeeper,进行节点信息的获取,管理…整理成一个基本工具
添加依赖:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.6</version>
</dependency>
具体代码如下:
package com;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class BaseZookeeper implements Watcher{
private ZooKeeper zookeeper;
/**
* 超时时间
*/
private static final int SESSION_TIME_OUT = 2000;
private CountDownLatch countDownLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
System.out.println("Watch received event");
countDownLatch.countDown();
}
}
/**连接zookeeper
* @param host
* @throws Exception
*/
public void connectZookeeper(String host) throws Exception{
zookeeper = new ZooKeeper(host, SESSION_TIME_OUT, this);
countDownLatch.await();
System.out.println("zookeeper connection success");
}
/**
* 创建节点
* @param path
* @param data
* @throws Exception
*/
public String createNode(String path,String data) throws Exception{
return this.zookeeper.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
/**
* 获取路径下所有子节点
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public List<String> getChildren(String path) throws KeeperException, InterruptedException{
List<String> children = zookeeper.getChildren(path, false);
return children;
}
/**
* 获取节点上面的数据
* @param path 路径
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getData(String path) throws KeeperException, InterruptedException{
byte[] data = zookeeper.getData(path, false, null);
if (data == null) {
return "";
}
return new String(data);
}
/**
* 设置节点信息
* @param path 路径
* @param data 数据
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public Stat setData(String path,String data) throws KeeperException, InterruptedException{
Stat stat = zookeeper.setData(path, data.getBytes(), -1);
return stat;
}
/**
* 删除节点
* @param path
* @throws InterruptedException
* @throws KeeperException
*/
public void deleteNode(String path) throws InterruptedException, KeeperException{
zookeeper.delete(path, -1);
}
/**
* 获取创建时间
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String getCTime(String path) throws KeeperException, InterruptedException{
Stat stat = zookeeper.exists(path, false);
return String.valueOf(stat.getCtime());
}
/**
* 获取某个路径下孩子的数量
* @param path
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public Integer getChildrenNum(String path) throws KeeperException, InterruptedException{
int childenNum = zookeeper.getChildren(path, false).size();
return childenNum;
}
/**
* 关闭连接
* @throws InterruptedException
*/
public void closeConnection() throws InterruptedException{
if (zookeeper != null) {
zookeeper.close();
}
}
}
测试:
public class Demo {
public static void main(String[] args) throws Exception {
BaseZookeeper zookeeper = new BaseZookeeper();
zookeeper.connectZookeeper("192.168.0.1:2181");
List<String> children = zookeeper.getChildren("/");
System.out.println(children);
}
}
ZookeeperJavaAPI基本操作
Zookeeper官方提供了两种语言的API,Java和C,在这里只演示JavaAPI
操作API的类中的变量,一下方法都会使用到
static Logger logg = LoggerFactory.getLogger(ZKApi.class);
private static final String zkServerPath = "10.33.57.28:2181";
private static final String zkServerPath = "127.0.0.1:2181";
private static final Integer timeOut = 5000;
private static Stat stat = new Stat();
以及实现接口Watcher的实现方法process
public void process(WatchedEvent event) {
try {
if (event.getType() == Event.EventType.NodeDataChanged) {
ZooKeeper zk = null;
zk = ZKApi.getZkConnect();
byte[] resByt = new byte[0];
resByt = zk.getData("/test1", false, stat);
String resStr = new String(resByt);
System.out.println("更改后的值:" + resStr);
System.out.println("版本号的变化:" + stat.getVersion());
System.out.println("-------");
countDown.countDown();
}else if(event.getType() == Event.EventType.NodeChildrenChanged){
System.out.println("NodeChildrenChanged");
ZooKeeper zk = null;
zk = ZKApi.getZkConnect();
List<String> srcChildList = zk.getChildren(event.getPath(), false);
for (String child:srcChildList){
System.out.println(child);
}
countDown.countDown();
}else if(event.getType() == Event.EventType.NodeCreated){
countDown.countDown();
}else if (event.getType() == Event.EventType.NodeCreated){
countDown.countDown();
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
1.连接客户端
创建客户端连接使用Zookeeper类的构造函数
Zookeeper构造函数总共四个如下:
/*
@param connectString zk连接地址以及端口号 格式如:127.0.0.1:2181,如果多个zk,则使用逗号分隔
@param sessionTimeout session超时时间 单位ms
@param watcher * ,使用watcher必须实现接口Watcher实现process方法
@sessionId session id 可以用作恢复回话的参数
@sessionPassword session password 可以用作恢复回话的参数
@canbeReadOnly zk3.4添加的 只读模式
* */
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd)
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
连接客户端代码
public static ZooKeeper getZkConnect() throws IOException {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
logg.debug("连接状态:{}", zk.getState());
return zk;
}
DEBUG [main] - zookeeper.disableAutoWatchReset is false
DEBUG [main] - 连接状态:CONNECTING
2.恢复回话
public static void recoveryConnect() throws IOException, InterruptedException {
ZooKeeper zooKeeper = new ZooKeeper(zkServerPath, timeOut, new ZKApi());
long sessionId = zooKeeper.getSessionId();
byte[] sessionPasswd = zooKeeper.getSessionPasswd();
logg.debug("开始连接服务器 . . .");
logg.debug("连接状态:{}",zooKeeper.getState());
new Thread().sleep(1000 );
logg.debug("开始重连 . . . ");
ZooKeeper zooSession = new ZooKeeper(zkServerPath, timeOut, new ZKApi(), sessionId, sessionPasswd);
logg.debug("重连状态:{}",zooSession.getState());
new Thread().sleep(200);
logg.debug("重连状态:{}",zooSession.getState());
}
DEBUG [main] - 开始连接服务器 . . .
DEBUG [main] - 连接状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0011, negotiated timeout = 5000
DEBUG [main] - 开始重连 . . .
INFO [main] - Initiating client connection, connectString=10.33.57.67:2181 sessionTimeout=5000 watcher=ZKApi@73a28541 sessionId=0 sessionPasswd=<hidden>
DEBUG [main] - 重连状态:CONNECTING
DEBUG [main-SendThread(hdfa67:2181)] - Canonicalized address to hdfa67
INFO [main-SendThread(hdfa67:2181)] - Opening socket connection to server hdfa67/10.33.57.67:2181. Will not attempt to authenticate using SASL (unknown error)
INFO [main-SendThread(hdfa67:2181)] - Socket connection established to hdfa67/10.33.57.67:2181, initiating session
DEBUG [main-SendThread(hdfa67:2181)] - Session establishment request sent on hdfa67/10.33.57.67:2181
INFO [main-SendThread(hdfa67:2181)] - Session establishment complete on server hdfa67/10.33.57.67:2181, sessionid = 0x10000ea59aa0012, negotiated timeout = 5000
DEBUG [main] - 重连状态:CONNECTED
3.创建节点
创建节点通过zk客户端对象的create方法进行创建,主要有两个方法:一种是同步,一种是异步,接下来的修改等方法同样如此,就不多加解释了
/**
@param path 创建的节点路径
@param data 节点数据
@param acl 权限列表,
@param createMode 指定之创建节点的类型
@param cb 异步调用方法
@param ctx 回调对象
*/
public String create(final String path, byte data[], List<ACL> acl,
CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl,
CreateMode createMode, StringCallback cb, Object ctx)
public static void createZkNode1() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = getZkConnect();
String result = zk.create("/test1", "test-data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);//创建一个/test的持续节点
System.out.println(result);
//输出/test1
public static void createZkNode2() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = getZkConnect();
String ctx = "{'create': 'success'}";
zk.create("/test2", "test-data".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,new CreateCallBack() ,ctx);
new Thread().sleep(2000);//需要暂停一会,否则创建失败
}
4.修改节点
public Stat setData(final String path, byte data[], int version)
public void setData(final String path, byte data[], int version,
StatCallback cb, Object ctx)
public static void setZkNode1() throws IOException, KeeperException, InterruptedException{
ZooKeeper zk = getZkConnect();
Stat stat = zk.setData("/test1", "modifyed-data".getBytes(), 0);
System.out.println(stat.getVersion());
}
public static void setZkNode2() throws IOException, KeeperException, InterruptedException{
ZooKeeper zk = getZkConnect();
String ctx = "{'modify': 'success'}";
zk.setData("/test1", "modifyed-data".getBytes(),0,new ModifyCalback(),ctx);
new Thread().sleep(1000);//必须加上,否则回掉不成功
}
5.删除节点
public void delete(final String path, int version)
public void delete(final String path, int version, VoidCallback cb,
Object ctx)
public static void deleteZkNode1() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = getZkConnect();
zk.delete("/test1",1);//不能够删除子节点
}
public static void deleteZkNode2() throws IOException, InterruptedException {
ZooKeeper zk = getZkConnect();
String ctx = "{'delete': 'success'}";
zk.delete("/test2",0,new DeleteCallBack(),ctx);//不能够删除子节点
new Thread().sleep(1000);//必须加上,否则回掉不成功
}
6.查询节点
public byte[] getData(String path, boolean watch, Stat stat)
public byte[] getData(final String path, Watcher watcher, Stat stat)
public void getData(final String path, Watcher watcher,DataCallback cb, Object ctx)
public void getData(String path, boolean watch, DataCallback cb, Object ctx)
public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectData1() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = getZkConnect();
byte[] data = zk.getData("/test1", true, stat);
String s = new String(data);
System.out.println("value: "+s);
countDown.await();
}
if (event.getType() == Event.EventType.NodeDataChanged) {
ZooKeeper zk = null;
zk = ZKApi.getZkConnect();
byte[] resByt = new byte[0];
resByt = zk.getData("/test1", false, stat);
String resStr = new String(resByt);
System.out.println("更改后的值:" + resStr);
System.out.println("版本号的变化:" + stat.getVersion());
System.out.println("-------");
countDown.countDown();
由于更改之后,触发了 * ,再次在命令行中进行更改,出现了一下结果。
7.查询子节点
查询子节点的方法
public List<String> getChildren(final String path, Watcher watcher)
public List<String> getChildren(String path, boolean watch)
public void getChildren(final String path, Watcher watcher, ChildrenCallback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
public List<String> getChildren(final String path, Watcher watcher, Stat stat)
public List<String> getChildren(String path, boolean watch, Stat stat)
public void getChildren(final String path, Watcher watcher, Children2Callback cb, Object ctx)
public void getChildren(String path, boolean watch, Children2Callback cb, Object ctx)
代码实现
public static CountDownLatch countDown = new CountDownLatch(1);
public static void selectchildData1() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = getZkConnect();
List<String> srcChildList = zk.getChildren("/test", true, stat);
for (String child:srcChildList){
System.out.println(child);
}
countDown.await();
}
if(event.getType() == Event.EventType.NodeChildrenChanged){
System.out.println("NodeChildrenChanged");
ZooKeeper zk = null;
zk = ZKApi.getZkConnect();
List<String> srcChildList = zk.getChildren(event.getPath(), false);
for (String child:srcChildList){
System.out.println(child);
}
运行结果完成后,触 * ,再次删除test1
第二种异步方式实现
public static void selectchildData2() throws IOException, KeeperException, InterruptedException{
ZooKeeper zk = getZkConnect();
String ctx = "{'selectChild': 'success'}";
zk.getChildren("/test",false,new ChildrenCallback(),ctx);
new Thread().sleep(1000);
}
8.使用递归得到所有的节点
public static void selectchildData3() throws IOException, KeeperException, InterruptedException{
getChild("/");
}
public static void getChild(String path) throws IOException, KeeperException, InterruptedException {
System.out.println(path);
ZooKeeper zk = getZkConnect();
List<String> childrenList = zk.getChildren(path, false, stat);
if(childrenList.isEmpty() || childrenList ==null)
return;
for(String s:childrenList){
if(path.equals("/"))
getChild(path+s);
else {
getChild(path+"/"+s);
}
}
}
运行结果:
/zookeeper
/zookeeper/config
/zookeeper/quota
/ldd
/ldd/l
/loo
/t1
/test1
/seq
/seq/seq30000000002
/seq/seq20000000001
/seq/se0000000003
/seq/seq10000000000
9.判断节点是否存在
public static void existNode() throws IOException, KeeperException, InterruptedException {
ZooKeeper zk = getZkConnect();
Stat stat = zk.exists("/ff", true);
System.out.println(stat);
}
//输出null则不存在
10.自定义权限
public static void oneSelfACL() throws Exception {
ZooKeeper zk = getZkConnect();
ArrayList<ACL> acls = new ArrayList<ACL>();
// zk.create("/test1","test-data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); //所有人均可访问
Id id1 = new Id("digest", ACLUtils.getDigestUserPassword("id1:123456"));
Id id2 = new Id("digest", ACLUtils.getDigestUserPassword("id2:123456"));
// Id ipId = new Id("ip","127.0.0.1");ip设置
// acls.add(new ACL(ZooDefs.Perms.ALL,id1));
acls.add(new ACL(ZooDefs.Perms.ALL,id1));
acls.add(new ACL(ZooDefs.Perms.DELETE,id2));
//注册过的用户必须通过addAuthInfo才可以操作节点
zk.addAuthInfo("digest","id1:123456".getBytes());
zk.create("/test2","test2-data".getBytes(), acls,CreateMode.PERSISTENT);
}
结果如下:
直接登录id1由于在程序已经注册完成
来源:https://changhe.blog.csdn.net/article/details/82420504
0
投稿
猜你喜欢
- 导读Spring Boot应用可以使用spring-boot-maven-plugin快速打包,构建一个可执行jar。Spring Boot
- 本文实例为大家分享了C#实现航班预订的具体代码,供大家参考,具体内容如下连接数据库using System;using System.Col
- 本文实例为大家分享了Java实现坦克大战小游戏的具体代码,供大家参考,具体内容如下创作背景:n年前的学期末课题设计,从b站上学的,一个代码一
- 问题(1)自己动手写一个锁需要哪些知识?(2)自己动手写一个锁到底有多简单?(3)自己能不能写出来一个完美的锁?简介本篇文章的目标一是自己动
- 推送系统作为通用的组件,存在的价值主要有以下几点会被多个业务项目使用,推送系 * 立维护可降低维护成本推送系统一般都是调用三方api进行推送,
- JVM内部结构图Java虚拟机主要分为五个区域:方法区、堆、Java栈、PC寄存器、本地方法栈。下面来看一些关于JVM结构的重要问题。1.哪
- 1.封装什么是封装,谈谈自己对封装的理解,封装就是将类的信息(比如说类的属性)隐藏在类的内部,不允许外部程序直接访问。此时就要提到一个关键字
- 这篇文章主要介绍了Jmeter如何添加循环控制器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以
- 当jvm虚拟机被关闭的时候,可能我们需要做一些处理,比如对连接的关闭,或者对一些必要信息的存储等等操作,这里就可以借助于虚拟机提供的钩子函数
- ImageCacheconst int _kDefaultSize = 1000;const int _kDefaultSizeBytes
- 本文以一个实例简单实现了类的创建与初始化,实现代码如下所示:using System;using System.Collections.Ge
- java web返回中文乱码ajax返回中文乱码问题 在浏览器按F12查看数据包可以看到charset为 iso-8859-1,这是spri
- 前言今天看代码看到有牵扯到弱引用的东西,就先稍微补一补Java的四种引用类型吧。Java为引用类型专门定义了一个类Reference,它是引
- 一)Document介绍API来源:在JDK中javax.xml.*包下使用场景:1、需要知道XML文档所有结构2、需要把文档一些元素排序3
- 安装nodejs首先电脑中需要安装nodejs,这个就不多提了,windows就是下载node.exe,一步步安装就可以了。如需安装可参考一
- 在本博客中,可以找到一篇《c#实现输出的字符靠右对齐的示例》它有教大家怎样实现字符串输出进行左齐或者是右对齐。本篇的方法,超简单,是使用st
- 关于 swagger 本文不再赘述,网上文章很多。本文要讲的是Knife4j3.0.3 整合SpringBoot 2.6.4,因为 knif
- 用servlet实现一个注册的小功能 ,后台获取数据。注册页面:注册页面代码 :<!DOCTYPE html><html&
- 1、Swagger是啥Swagger 是一个用于生成、描述和调用 RESTful 接口的 Web 服务。通俗的来讲,Swagger
- 今天讲解一下Fragment的控制,主要是切换View和页面替换等操作。还有就是如何获取Fragment的管理对象,以及与Activity的