一:Curator客户端
???? Curator是Netfix公司开源的基于Zookeeper的客户端框架,其封装了原生Zookeeper很多底层的操作,比如重试机制,watcher的反复注册等。
? maven项目引入curator很简单
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-x-discovery</artifactId> <version>2.7.0</version> </dependency>二:Curator操作Zookeeper
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy) { return builder(). connectString(connectString). sessionTimeoutMs(sessionTimeoutMs). connectionTimeoutMs(connectionTimeoutMs). retryPolicy(retryPolicy). build(); }??? 参数说明:
public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs) { super(validateMaxRetries(maxRetries)); this.baseSleepTimeMs = baseSleepTimeMs; this.maxSleepMs = maxSleepMs; }? 参数说明:
@Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { }??? 我们知道,zk的事件通知处理线程是EventThread,它是一个串行处理的线程,想象一下,如果某个事件处理比较耗时,势必会影响后面的事件处理,而Curator给我们提供的异步处理接口,在异步处理处理接口中,其中有如下的一个构造方法
public T inBackground(BackgroundCallback callback, Executor executor);? 也就是在程序中我们可以传入一个Executor实例,这样的话,我们就可以把某些处理比较耗时的事件单独交给线程池来处理了,看下面的例子:
public class Curator_Node_BackGroud_Example { static String path = "/zk-backGroud"; static String hosts = "ip:2181"; static CuratorFramework client; static { client = CuratorFrameworkFactory.newClient(hosts, 2000, 2000, new ExponentialBackoffRetry(1000, 3)); } public static void main(String[] args) { try { ExecutorService service = Executors.newFixedThreadPool(2); final CountDownLatch latch = new CountDownLatch(2); client.start(); client.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("创建节点返回的状态码:" + event.getResultCode() + ",返回的类型:" + event.getType()); System.out.println("运行创建该节点的线程为:" + Thread.currentThread().getName()); latch.countDown(); } }, service).forPath(path, "测试".getBytes()); Thread.sleep(2000); // 为了比较,再次在相同的节点下创建 client.create().creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { System.out.println("创建节点返回的状态码:" + event.getResultCode() + ",返回的类型:" + event.getType()); System.out.println("运行创建该节点的线程为:" + Thread.currentThread().getName()); latch.countDown(); } }).forPath(path, "测试2".getBytes()); latch.await(); service.shutdown(); } catch (Exception e) { e.printStackTrace(); } finally { client.close(); } } }???可能得运行结果如下:
创建节点返回的状态码:0,返回的类型:CREATE 运行创建该节点的线程为:pool-3-thread-1 创建节点返回的状态码:-110,返回的类型:CREATE 运行创建该节点的线程为:main-EventThread? ?可以看到,异步接口返回的状态码0表示节点创建成功,当节点已经存在再去创建的时候,返回-110表示节点创建失败了。
?curator还封装了很多原生zookeeper的操作,比如分布式锁,队列,屏障,事件监听,后面我们都会介绍,请持续关注。
原文:http://tanjie090508.iteye.com/blog/2288255