单位想把那套ZooKeeper集群用起来.
作为配置中心,一旦出现问题,所有服务都是中断的.
尤其又涉及ACL
想来想去,我觉得这个客户端还是自己封装,安全系数大些.
好长时间也没有写过代码了,确实感觉很生疏.写的逻辑稍微有点乱.
别到时候上线了,因为自己的客户端引发问题.那就尴尬了.
要求:
1.在客户端实现负载均衡
2.客户端ACL密码加密
3.在客户端实现缓存,如果ZK挂了,还能继续提供服务
4.如果服务提供方在ZK正常注册,但是服务调用方出现调用异常,需要将这个服务在缓存做一个标识.在一段时间内,不提供这个服务地址.(默认5分钟)
5.这段代码憋出事儿..心中默念一百遍..
代码结构:

zoo.properties
server=192.168.1.105:2181,192.168.1.106:2181,192.168.1.107:2181,192.168.1.108:2181,192.168.1.109:2181,192.168.1.110:2181
invoker.vdfs=/service/vdfs/upload/dx
r=pqrMCxdhQUhKEgMFZoJG3vM2tDdIGqbA/rlZt9RkL8s=
rw=MqnMrPsX3c8RX7b+NES4mQ==
provider./service/vdfs/upload/dx=http://192.168.1.111
nodename=192.168.16.114:8080
Metadata,提供配置文件加解密和配置文件提取
-
package com.vv.zkClient;
-
-
import java.io.IOException;
-
import java.io.InputStream;
-
import java.security.SecureRandom;
-
import java.util.Enumeration;
-
import java.util.Map;
-
import java.util.Properties;
-
-
import javax.crypto.Cipher;
-
import javax.crypto.SecretKey;
-
import javax.crypto.SecretKeyFactory;
-
import javax.crypto.spec.DESKeySpec;
-
-
import org.apache.commons.codec.binary.Base64;
-
import org.jboss.netty.util.internal.ConcurrentHashMap;
-
-
public class Metadata {
-
private static volatile Metadata META = null;
-
public static String decrypt(byte[] content, String key) {
-
try {
-
SecureRandom random = new SecureRandom();
-
DESKeySpec desKey = new DESKeySpec(key.getBytes());
-
SecretKeyFactory keyFactory = SecretKeyFactory.getInstance("DES");
-
SecretKey securekey = keyFactory.generateSecret(desKey);
-
Cipher cipher = Cipher.getInstance("DES");
-
cipher.init(Cipher.DECRYPT_MODE, securekey, random);
-
byte[] result = cipher.doFinal(content);
-
return new String(result);
-
} catch (Throwable e) {
-
e.printStackTrace();
-
}
-
return null;
-
}
-
public static byte[] encrypt(String content, String key) {
-
try {
-
SecureRandom random = new SecureRandom();
-
DESKeySpec desKey = new DESKeySpec(key.getBytes());
-
SecretKeyFactory keyFactory = SecretKeyFactory.getInstance("DES");
-
SecretKey securekey = keyFactory.generateSecret(desKey);
-
Cipher cipher = Cipher.getInstance("DES");
-
cipher.init(Cipher.ENCRYPT_MODE, securekey, random);
-
byte[] result = cipher.doFinal(content.getBytes());
-
return result;
-
} catch (Throwable e) {
-
e.printStackTrace();
-
}
-
return null;
-
}
-
public static Metadata getInstance() {
-
if (META == null) {
-
synchronized (Metadata.class) {
-
-
if (META == null) {
-
META = new Metadata();
-
}
-
}
-
}
-
return META;
-
}
-
-
private String connectionString = null;
-
private Map<String, String> invokerMap = new ConcurrentHashMap<String, String>();
-
private String key = "12344321";
-
-
private String nodename = null;
-
-
private Properties p = new Properties();
-
-
private Map<String, String> providerMap = new ConcurrentHashMap<String, String>();
-
-
private String readOnlyPassword = null;
-
-
private String readwritePassword = null;
-
-
public Metadata() {
-
InputStream in = Metadata.class.getClassLoader().getResourceAsStream("zoo.properties");
-
init(in);
-
}
-
-
public String getConnectionString() {
-
return connectionString;
-
}
-
-
public Map<String, String> getInvokerMap() {
-
return invokerMap;
-
}
-
-
public String getLocal() {
-
return nodename;
-
}
-
-
public Map<String, String> getProviderMap() {
-
return providerMap;
-
}
-
-
public String getReadOnlyPassword() {
-
String password = new String(decrypt(Base64.decodeBase64(this.readOnlyPassword.getBytes()), this.key));
-
-
return password;
-
}
-
-
public String getReadwritePassword() {
-
String password = new String(decrypt(Base64.decodeBase64(this.readwritePassword.getBytes()), this.key));
-
-
return password;
-
}
-
-
private void init(InputStream in) {
-
try {
-
p.load(in);
-
connectionString = p.getProperty("server", "");
-
readOnlyPassword = p.getProperty("r", "");
-
readwritePassword = p.getProperty("rw", "");
-
nodename = p.getProperty("nodename", "");
-
Enumeration<Object> enums = p.keys();
-
while (enums.hasMoreElements()) {
-
String key = (String) enums.nextElement();
-
if (key.startsWith("invoker.")) {
-
invokerMap.put(key.replace("invoker.", ""), p.getProperty(key));
-
} else if (key.startsWith("provider.")) {
-
providerMap.put(key.replace("provider.", ""), p.getProperty(key));
-
}
-
}
-
-
in.close();
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
-
-
public static void main(String[] args) {
-
-
}
-
}
ServiceProvider 服务提供方调用,将自己注册到配置中心
-
package com.vv.zkClient;
-
-
import java.text.SimpleDateFormat;
-
import java.util.Date;
-
import java.util.concurrent.ScheduledThreadPoolExecutor;
-
import java.util.concurrent.TimeUnit;
-
-
import org.apache.curator.framework.CuratorFramework;
-
import org.apache.curator.framework.CuratorFrameworkFactory;
-
import org.apache.curator.retry.RetryUntilElapsed;
-
import org.apache.zookeeper.CreateMode;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
public class ServiceProvider implements Runnable {
-
private static Logger LOGGER = LoggerFactory.getLogger(ServiceProvider.class);
-
-
private static volatile ServiceProvider SERVICEPROVIDER = null;
-
public static ServiceProvider getInstance() {
-
if (SERVICEPROVIDER == null) {
-
synchronized (ServiceProvider.class) {
-
-
if (SERVICEPROVIDER == null) {
-
SERVICEPROVIDER = new ServiceProvider();
-
}
-
}
-
}
-
return SERVICEPROVIDER;
-
}
-
-
private CuratorFramework client;
-
-
private Metadata meta;
-
-
private ScheduledThreadPoolExecutor threadPool = new ScheduledThreadPoolExecutor(1);
-
-
private ServiceProvider() {
-
this.meta = Metadata.getInstance();
-
connection();
-
threadPool.scheduleAtFixedRate(this, 1, 5, TimeUnit.SECONDS);
-
}
-
-
private void connection() {
-
try {
-
this.client = CuratorFrameworkFactory.newClient(meta.getConnectionString(),
-
new RetryUntilElapsed(2000, 1000));
-
client.start();
-
client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", meta.getReadwritePassword().getBytes());
-
} catch (Exception e) {
-
e.printStackTrace();
-
LOGGER.error(e.getMessage());
-
}
-
}
-
-
public void run() {
-
try {
-
for (String serviceNode : meta.getProviderMap().keySet()) {
-
String serviceURL = meta.getProviderMap().get(serviceNode);
-
String serviceName = serviceNode + "/" + meta.getLocal();
-
if (client.checkExists().forPath(serviceName) == null) {
-
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "||" + serviceURL;
-
client.create().withMode(CreateMode.EPHEMERAL).forPath(serviceName, date.getBytes());
-
LOGGER.info("Created Node->\"{}\",Node Data->\"{}\"", serviceName,date);
-
}
-
}
-
} catch (Exception e) {
-
e.printStackTrace();
-
client.close();
-
connection();
-
LOGGER.error(e.getMessage());
-
}
-
}
-
-
public static void main(String[] args) throws InterruptedException {
-
ServiceProvider.getInstance();
-
Thread.sleep(Integer.MAX_VALUE);
-
}
-
}
ServiceInvoker 服务调用方使用,获取服务连接地址
-
package com.vv.zkClient;
-
-
import java.util.Iterator;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.concurrent.Callable;
-
import java.util.concurrent.ConcurrentHashMap;
-
import java.util.concurrent.CopyOnWriteArrayList;
-
import java.util.concurrent.CountDownLatch;
-
import java.util.concurrent.ExecutionException;
-
import java.util.concurrent.ExecutorService;
-
import java.util.concurrent.Executors;
-
-
import org.apache.curator.framework.CuratorFramework;
-
import org.apache.curator.framework.CuratorFrameworkFactory;
-
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
-
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
-
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
-
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
-
import org.apache.curator.retry.RetryUntilElapsed;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
class Service {
-
private long lastErrorTime = -1;
-
private String name = null;
-
private String url = null;
-
-
@Override
-
public boolean equals(Object obj) {
-
Service s = (Service) obj;
-
return (this.getName() + this.getUrl()).equals(s.getName() + s.getUrl());
-
}
-
-
public long getLastErrorTime() {
-
return lastErrorTime;
-
}
-
-
public String getName() {
-
return name;
-
}
-
-
public String getUrl() {
-
return url;
-
}
-
-
public void setLastErrorTime(long lastErrorTime) {
-
this.lastErrorTime = lastErrorTime;
-
}
-
-
public void setName(String name) {
-
this.name = name;
-
}
-
-
public void setUrl(String url) {
-
this.url = url;
-
}
-
-
}
-
-
public class ServiceInvoker {
-
private static ServiceInvoker INVOKER = null;
-
-
private static Logger LOGGER = LoggerFactory.getLogger(ServiceInvoker.class);
-
public static ServiceInvoker getInstance() {
-
if (INVOKER == null) {
-
synchronized (ServiceInvoker.class) {
-
-
if (INVOKER == null) {
-
INVOKER = new ServiceInvoker();
-
}
-
}
-
}
-
return INVOKER;
-
}
-
-
private CuratorFramework client;
-
private CountDownLatch isInitialized = new CountDownLatch(1);
-
private Map<String, Iterator<Service>> itMap = new ConcurrentHashMap<String, Iterator<Service>>();
-
-
private Metadata meta;
-
-
private Map<String, List<Service>> providerMap = new ConcurrentHashMap<String, List<Service>>();
-
-
private ExecutorService threadPool = Executors.newSingleThreadExecutor();
-
-
private ServiceInvoker() {
-
this.meta = Metadata.getInstance();
-
connection();
-
listener();
-
}
-
-
private void connection() {
-
try {
-
this.client = CuratorFrameworkFactory.newClient(meta.getConnectionString(),
-
new RetryUntilElapsed(2000, 1000));
-
client.start();
-
client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", meta.getReadOnlyPassword().getBytes());
-
} catch (Exception e) {
-
e.printStackTrace();
-
LOGGER.error(e.getMessage());
-
}
-
}
-
-
public String get(final String serviceNode) {
-
-
Callable<String> c = new Callable<String>() {
-
-
public String call() throws Exception {
-
List<Service> list = providerMap.get(serviceNode);
-
if (list == null) {
-
list = new CopyOnWriteArrayList<Service>();
-
providerMap.put(serviceNode, list);
-
}
-
-
Iterator<Service> it = itMap.get(serviceNode);
-
if (it == null || !it.hasNext()) {
-
it = list.iterator();
-
itMap.put(serviceNode, it);
-
}
-
if (!it.hasNext()) {
-
LOGGER.error("节点:\"{}\",没有任何可用服务", serviceNode);
-
return "";
-
}
-
Service service = it.next();
-
long now = System.currentTimeMillis();
-
int retryCount = 5;
-
while (service.getLastErrorTime() != -1 && (now - service.getLastErrorTime()) < 1000 * 60 * 5) {
-
retryCount--;
-
if (retryCount == 0) {
-
LOGGER.error("节点:\"{}\",没有任何可用服务", serviceNode);
-
return "";
-
}
-
if (it.hasNext()) {
-
service = it.next();
-
} else {
-
it = providerMap.get(serviceNode).iterator();
-
itMap.put(serviceNode, it);
-
}
-
}
-
return service.getUrl();
-
}
-
-
};
-
-
String serviceUrl = "";
-
-
try {
-
isInitialized.await();
-
serviceUrl = threadPool.submit(c).get();
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
} catch (ExecutionException e) {
-
e.printStackTrace();
-
}
-
return serviceUrl;
-
}
-
-
private void listener() {
-
for (String serviceNode : meta.getInvokerMap().values()) {
-
-
PathChildrenCache cache = new PathChildrenCache(client, serviceNode, true);
-
try {
-
cache.start(StartMode.POST_INITIALIZED_EVENT);
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
cache.getListenable().addListener(new PathChildrenCacheListener() {
-
-
public void childEvent(CuratorFramework arg0, final PathChildrenCacheEvent event) throws Exception {
-
-
final String type = event.getType().name();
-
if (type.equals("INITIALIZED")) {
-
LOGGER.info("ZooKeeper数据初始化完成:INITIALIZED");
-
isInitialized.countDown();
-
return;
-
}
-
final String data = event.getData().getPath();
-
final String serviceNode = data.substring(0, data.lastIndexOf("/"));
-
-
final String serviceUrl = new String(event.getData().getData()).split("\\|\\|")[1];
-
Runnable r = new Runnable() {
-
public void run() {
-
List<Service> list = providerMap.get(serviceNode);
-
if (list == null) {
-
list = new CopyOnWriteArrayList<Service>();
-
providerMap.put(serviceNode, list);
-
}
-
Service s = new Service();
-
s.setName(serviceNode);
-
s.setUrl(serviceUrl);
-
if (type.equals("CHILD_ADDED")) {
-
list.add(s);
-
LOGGER.info("新增节点:\"{}\",服务地址:\"{}\"", data, serviceUrl);
-
} else if (type.equals("CHILD_REMOVED")) {
-
for (int i = 0; i < list.size(); i++) {
-
Service service = list.get(i);
-
if (service.equals(s)) {
-
list.remove(i);
-
LOGGER.info("删除节点:\"{}\",服务地址:\"{}\"", data, serviceUrl);
-
}
-
-
}
-
}
-
}
-
};
-
threadPool.submit(r);
-
}
-
});
-
-
}
-
}
-
-
public void setLastErrorTime(final String url) {
-
Runnable r = new Runnable() {
-
public void run() {
-
for (List<Service> list : providerMap.values()) {
-
Iterator<Service> it = list.iterator();
-
while (it.hasNext()) {
-
Service service = it.next();
-
if (service.getUrl().equals(url)) {
-
service.setLastErrorTime(System.currentTimeMillis());
-
LOGGER.error("节点:\"{}\",调用URL:\"{}\"异常,该节点停止服务5分钟", service.getName(), service.getUrl());
-
}
-
}
-
}
-
}
-
};
-
threadPool.submit(r);
-
}
-
-
public static void main(String[] args) throws Exception {
-
ServiceInvoker s = ServiceInvoker.getInstance();
-
while (true) {
-
String str = s.get("/service/vdfs/upload/dx");
-
s.setLastErrorTime(str);
-
Thread.sleep(5000);
-
}
-
}
-
}
MAVEN配置:
-
<properties>
-
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-
</properties>
-
-
<dependencies>
-
<dependency>
-
<groupId>org.apache.curator</groupId>
-
<artifactId>curator-framework</artifactId>
-
<version>2.4.2</version>
-
<exclusions>
-
<exclusion>
-
<groupId>log4j</groupId>
-
<artifactId>log4j</artifactId>
-
</exclusion>
-
<exclusion>
-
<groupId>org.slf4j</groupId>
-
<artifactId>slf4j-log4j12</artifactId>
-
-
</exclusion>
-
</exclusions>
-
</dependency>
-
<dependency>
-
<groupId>org.apache.curator</groupId>
-
<artifactId>curator-recipes</artifactId>
-
<version>2.4.2</version>
-
</dependency>
-
<dependency>
-
<groupId>junit</groupId>
-
<artifactId>junit</artifactId>
-
<version>3.8.1</version>
-
<scope>test</scope>
-
</dependency>
-
<dependency>
-
<groupId>commons-codec</groupId>
-
<artifactId>commons-codec</artifactId>
-
<version>20041127.091804</version>
-
</dependency>
-
<dependency>
-
<groupId>ch.qos.logback</groupId>
-
<artifactId>logback-core</artifactId>
-
<version>1.1.7</version>
-
</dependency>
-
<dependency>
-
<groupId>ch.qos.logback</groupId>
-
<artifactId>logback-classic</artifactId>
-
<version>1.1.7</version>
-
</dependency>
-
<dependency>
-
<groupId>org.slf4j</groupId>
-
<artifactId>slf4j-api</artifactId>
-
<version>1.7.7</version>
-
</dependency>
-
<dependency>
-
<groupId>org.slf4j</groupId>
-
<artifactId>log4j-over-slf4j</artifactId>
-
<version>1.7.7</version>
-
</dependency>
-
<dependency>
-
<groupId>org.slf4j</groupId>
-
<artifactId>jcl-over-slf4j</artifactId>
-
<version>1.7.7</version>
-
<scope>runtime</scope>
-
</dependency>
-
</dependencies>
-
-
<build>
-
<resources>
-
<resource>
-
<directory>src/main/java</directory>
-
<includes>
-
<include>**/*.properties</include>
-
</includes>
-
</resource>
-
<resource>
-
<directory>src/main/resources</directory>
-
</resource>
-
</resources>
-
</build>
ZooKeeper服务发现客户端
原文:http://blog.itpub.net/29254281/viewspace-2108223/