转载自http://blog.csdn.net/zhu_tianwei/article/details/44115667
http://blog.csdn.net/column/details/slimina-thrift.html
对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:
1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。
2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。
3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法。
4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。
5.其他的改造如:
1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问
2)Thrift通过两种方式调用服务Client和Iface
 
- (EchoService.Client)client.echo("hello lilei");  ---(1)  
 
- (EchoService.Iface)service.echo("hello lilei");  ---(2)  
 
 
Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。
 
下面我们来一一实现:
一、pom.xml引入依赖jar包
 
- <dependency>  
 
-             <groupId>org.apache.thrift</groupId>  
 
-             <artifactId>libthrift</artifactId>  
 
-             <version>0.9.2</version>  
 
-         </dependency>  
 
-         <dependency>  
 
-             <groupId>commons-pool</groupId>  
 
-             <artifactId>commons-pool</artifactId>  
 
-             <version>1.6</version>  
 
-         </dependency>  
 
-         <dependency>  
 
-             <groupId>org.springframework</groupId>  
 
-             <artifactId>spring-context</artifactId>  
 
-             <version>4.0.9.RELEASE</version>  
 
-         </dependency>  
 
-   
 
-         <dependency>  
 
-             <groupId>org.apache.zookeeper</groupId>  
 
-             <artifactId>zookeeper</artifactId>  
 
-             <version>3.4.6</version>  
 
-         </dependency>  
 
-         <dependency>  
 
-             <groupId>org.apache.curator</groupId>  
 
-             <artifactId>curator-recipes</artifactId>  
 
-             <version>2.7.1</version>  
 
-         </dependency>  
 
 
二、使用zookeeper管理服务节点配置
RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。

注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.
Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构: 

每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
  1). PERSISTENT: 永久节点
  2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
  3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
  4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
  注: 临时节点不能成为父节点
  Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
  1). KeeperState: Disconnected,SyncConnected,Expired
  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
  作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
  1). namespace: 命名空间,来区分不同应用 
  2). service: 服务接口, 采用发布方的类全名来表示
  3). version: 版本号
  借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境. 
  *) 数据模型的设计
  具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
  RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.
1.定义Zookeeper的客户端的管理
ZookeeperFactory.java
 
- package cn.slimsmart.thrift.rpc.zookeeper;  
 
-   
 
- import org.apache.curator.framework.CuratorFramework;  
 
- import org.apache.curator.framework.CuratorFrameworkFactory;  
 
- import org.apache.curator.retry.ExponentialBackoffRetry;  
 
- import org.springframework.beans.factory.FactoryBean;  
 
- import org.springframework.util.StringUtils;  
 
-   
 
- public class ZookeeperFactory implements FactoryBean<CuratorFramework> {  
 
-   
 
-     private String zkHosts;  
 
-     
 
-     private int sessionTimeout = 30000;  
 
-     private int connectionTimeout = 30000;  
 
-   
 
-     
 
-     private boolean singleton = true;  
 
-   
 
-     
 
-     private String namespace;  
 
-   
 
-     private final static String ROOT = "rpc";  
 
-   
 
-     private CuratorFramework zkClient;  
 
-   
 
-     public void setZkHosts(String zkHosts) {  
 
-         this.zkHosts = zkHosts;  
 
-     }  
 
-   
 
-     public void setSessionTimeout(int sessionTimeout) {  
 
-         this.sessionTimeout = sessionTimeout;  
 
-     }  
 
-   
 
-     public void setConnectionTimeout(int connectionTimeout) {  
 
-         this.connectionTimeout = connectionTimeout;  
 
-     }  
 
-   
 
-     public void setSingleton(boolean singleton) {  
 
-         this.singleton = singleton;  
 
-     }  
 
-   
 
-     public void setNamespace(String namespace) {  
 
-         this.namespace = namespace;  
 
-     }  
 
-   
 
-     public void setZkClient(CuratorFramework zkClient) {  
 
-         this.zkClient = zkClient;  
 
-     }  
 
-   
 
-     @Override  
 
-     public CuratorFramework getObject() throws Exception {  
 
-         if (singleton) {  
 
-             if (zkClient == null) {  
 
-                 zkClient = create();  
 
-                 zkClient.start();  
 
-             }  
 
-             return zkClient;  
 
-         }  
 
-         return create();  
 
-     }  
 
-   
 
-     @Override  
 
-     public Class<?> getObjectType() {  
 
-         return CuratorFramework.class;  
 
-     }  
 
-   
 
-     @Override  
 
-     public boolean isSingleton() {  
 
-         return singleton;  
 
-     }  
 
-   
 
-     public CuratorFramework create() throws Exception {  
 
-         if (StringUtils.isEmpty(namespace)) {  
 
-             namespace = ROOT;  
 
-         } else {  
 
-             namespace = ROOT +"/"+ namespace;  
 
-         }  
 
-         return create(zkHosts, sessionTimeout, connectionTimeout, namespace);  
 
-     }  
 
-   
 
-     public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {  
 
-         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();  
 
-         return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)  
 
-                 .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
 
-                 .defaultData(null).build();  
 
-     }  
 
-   
 
-     public void close() {  
 
-         if (zkClient != null) {  
 
-             zkClient.close();  
 
-         }  
 
-     }  
 
- }  
 
 
 
2.服务端注册服务
由于服务端配置需要获取本机的IP地址,因此定义IP获取接口
ThriftServerIpResolve.java
 
- package cn.slimsmart.thrift.rpc.zookeeper;  
 
-   
 
- public interface ThriftServerIpResolve {  
 
-       
 
-     String getServerIp() throws Exception;  
 
-       
 
-     void reset();  
 
-       
 
-     
 
-     static interface IpRestCalllBack{  
 
-         public void rest(String newIp);  
 
-     }  
 
- }  
 
 
可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
ThriftServerIpLocalNetworkResolve.java
 
 
- package cn.slimsmart.thrift.rpc.zookeeper;  
 
-   
 
- import java.net.Inet6Address;  
 
- import java.net.InetAddress;  
 
- import java.net.NetworkInterface;  
 
- import java.net.SocketException;  
 
- import java.util.Enumeration;  
 
-   
 
- import org.slf4j.Logger;  
 
- import org.slf4j.LoggerFactory;  
 
-   
 
- public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {  
 
-       
 
-     private Logger logger = LoggerFactory.getLogger(getClass());  
 
-   
 
-     
 
-     private String serverIp;  
 
-       
 
-     public void setServerIp(String serverIp) {  
 
-         this.serverIp = serverIp;  
 
-     }  
 
-   
 
-     @Override  
 
-     public String getServerIp() {  
 
-         if (serverIp != null) {  
 
-             return serverIp;  
 
-         }  
 
-         
 
-         try {  
 
-             Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();  
 
-             while (netInterfaces.hasMoreElements()) {  
 
-                 NetworkInterface netInterface = netInterfaces.nextElement();  
 
-                 
 
-                 Enumeration<InetAddress> addresses = netInterface.getInetAddresses();  
 
-                 while (addresses.hasMoreElements()) {  
 
-                     InetAddress address = addresses.nextElement();  
 
-                     if(address instanceof Inet6Address){  
 
-                         continue;  
 
-                     }  
 
-                     if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {  
 
-                         serverIp = address.getHostAddress();  
 
-                         logger.info("resolve server ip :"+ serverIp);  
 
-                         continue;  
 
-                     }  
 
-                 }  
 
-             }  
 
-         } catch (SocketException e) {  
 
-             e.printStackTrace();  
 
-         }  
 
-         return serverIp;  
 
-     }  
 
-   
 
-     @Override  
 
-     public void reset() {  
 
-         serverIp = null;  
 
-     }  
 
- }  
 
 
接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
ThriftServerAddressRegister.java
 
 
- package cn.slimsmart.thrift.rpc.zookeeper;  
 
-   
 
- public interface ThriftServerAddressRegister {  
 
-     
 
-     void register(String service,String version,String address);  
 
- }  
 
 
实现:ThriftServerAddressRegisterZookeeper.java
 
 
- package cn.slimsmart.thrift.rpc.zookeeper;  
 
-   
 
- import java.io.UnsupportedEncodingException;  
 
-   
 
- import org.apache.curator.framework.CuratorFramework;  
 
- import org.apache.curator.framework.imps.CuratorFrameworkState;  
 
- import org.apache.zookeeper.CreateMode;  
 
- import org.slf4j.Logger;  
 
- import org.slf4j.LoggerFactory;  
 
- import org.springframework.util.StringUtils;  
 
-   
 
- import cn.slimsmart.thrift.rpc.ThriftException;  
 
-   
 
- public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{  
 
-       
 
-     private Logger logger = LoggerFactory.getLogger(getClass());  
 
-       
 
-     private CuratorFramework zkClient;  
 
-       
 
-     public ThriftServerAddressRegisterZookeeper(){}  
 
-       
 
-     public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){  
 
-         this.zkClient = zkClient;  
 
-     }  
 
-   
 
-     public void setZkClient(CuratorFramework zkClient) {  
 
-         this.zkClient = zkClient;  
 
-     }  
 
-   
 
-     @Override  
 
-     public void register(String service, String version, String address) {  
 
-         if(zkClient.getState() == CuratorFrameworkState.LATENT){  
 
-             zkClient.start();  
 
-         }  
 
-         if(StringUtils.isEmpty(version)){  
 
-             version="1.0.0";  
 
-         }  
 
-         
 
-         try {  
 
-             zkClient.create()  
 
-                 .creatingParentsIfNeeded()  
 
-                 .withMode(CreateMode.EPHEMERAL)  
 
-                 .forPath("/"+service+"/"+version+"/"+address);  
 
-         } catch (UnsupportedEncodingException e) {  
 
-             logger.error("register service address to zookeeper exception:{}",e);  
 
-             throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);  
 
-         } catch (Exception e) {  
 
-             logger.error("register service address to zookeeper exception:{}",e);  
 
-             throw new ThriftException("register service address to zookeeper exception:{}", e);  
 
-         }  
 
-     }  
 
-       
 
-     public void close(){  
 
-         zkClient.close();  
 
-     }  
 
- }  
 
 
 
3.客户端发现服务
定义获取服务地址接口
ThriftServerAddressProvider.java
 
- package cn.slimsmart.thrift.rpc.zookeeper;  
 
-   
 
- import java.net.InetSocketAddress;  
 
- import java.util.List;  
 
-   
 
- public interface ThriftServerAddressProvider {  
 
-       
 
-     
 
-     String getService();  
 
-   
 
-     
 
-     List<InetSocketAddress> findServerAddressList();  
 
-   
 
-     
 
-     InetSocketAddress selector();  
 
-   
 
-     void close();  
 
- }  
 
 
基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java
- package cn.slimsmart.thrift.rpc.zookeeper;  
 
-   
 
- import java.net.InetSocketAddress;  
 
- import java.util.ArrayList;  
 
- import java.util.Collections;  
 
- import java.util.HashSet;  
 
- import java.util.LinkedList;  
 
- import java.util.List;  
 
- import java.util.Queue;  
 
- import java.util.Set;  
 
-   
 
- import org.apache.curator.framework.CuratorFramework;  
 
- import org.apache.curator.framework.imps.CuratorFrameworkState;  
 
- import org.apache.curator.framework.recipes.cache.ChildData;  
 
- import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
 
- import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
 
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
 
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
 
- import org.slf4j.Logger;  
 
- import org.slf4j.LoggerFactory;  
 
- import org.springframework.beans.factory.InitializingBean;  
 
-   
 
- public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {  
 
-   
 
-     private Logger logger = LoggerFactory.getLogger(getClass());  
 
-   
 
-     
 
-     private String service;  
 
-     
 
-     private String version = "1.0.0";  
 
-   
 
-     private PathChildrenCache cachedPath;  
 
-   
 
-     private CuratorFramework zkClient;  
 
-   
 
-     
 
-     
 
-     private Set<String> trace = new HashSet<String>();  
 
-   
 
-     private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
 
-   
 
-     private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
 
-   
 
-     private Object lock = new Object();  
 
-   
 
-     
 
-     private static final Integer DEFAULT_WEIGHT = 1;  
 
-   
 
-     public void setService(String service) {  
 
-         this.service = service;  
 
-     }  
 
-   
 
-     public void setVersion(String version) {  
 
-         this.version = version;  
 
-     }  
 
-   
 
-     public ThriftServerAddressProviderZookeeper() {  
 
-     }  
 
-   
 
-     public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {  
 
-         this.zkClient = zkClient;  
 
-     }  
 
-   
 
-     public void setZkClient(CuratorFramework zkClient) {  
 
-         this.zkClient = zkClient;  
 
-     }  
 
-   
 
-     @Override  
 
-     public void afterPropertiesSet() throws Exception {  
 
-         
 
-         if (zkClient.getState() == CuratorFrameworkState.LATENT) {  
 
-             zkClient.start();  
 
-         }  
 
-         buildPathChildrenCache(zkClient, getServicePath(), true);  
 
-         cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
 
-     }  
 
-   
 
-     private String getServicePath(){  
 
-         return "/" + service + "/" + version;  
 
-     }  
 
-     private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {  
 
-         cachedPath = new PathChildrenCache(client, path, cacheData);  
 
-         cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
 
-             @Override  
 
-             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
 
-                 PathChildrenCacheEvent.Type eventType = event.getType();  
 
-                 switch (eventType) {  
 
-                 case CONNECTION_RECONNECTED:  
 
-                     logger.info("Connection is reconection.");  
 
-                     break;  
 
-                 case CONNECTION_SUSPENDED:  
 
-                     logger.info("Connection is suspended.");  
 
-                     break;  
 
-                 case CONNECTION_LOST:  
 
-                     logger.warn("Connection error,waiting...");  
 
-                     return;  
 
-                 default:  
 
-                     
 
-                 }  
 
-                 
 
-                 cachedPath.rebuild();  
 
-                 rebuild();  
 
-             }  
 
-   
 
-             protected void rebuild() throws Exception {  
 
-                 List<ChildData> children = cachedPath.getCurrentData();  
 
-                 if (children == null || children.isEmpty()) {  
 
-                     
 
-                     
 
-                     
 
-                     container.clear();  
 
-                     logger.error("thrift server-cluster error....");  
 
-                     return;  
 
-                 }  
 
-                 List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
 
-                 String path = null;  
 
-                 for (ChildData data : children) {  
 
-                     path = data.getPath();  
 
-                     logger.debug("get path:"+path);  
 
-                     path = path.substring(getServicePath().length()+1);  
 
-                     logger.debug("get serviceAddress:"+path);  
 
-                     String address = new String(path.getBytes(), "utf-8");  
 
-                     current.addAll(transfer(address));  
 
-                     trace.add(address);  
 
-                 }  
 
-                 Collections.shuffle(current);  
 
-                 synchronized (lock) {  
 
-                     container.clear();  
 
-                     container.addAll(current);  
 
-                     inner.clear();  
 
-                     inner.addAll(current);  
 
-   
 
-                 }  
 
-             }  
 
-         });  
 
-     }  
 
-   
 
-     private List<InetSocketAddress> transfer(String address) {  
 
-         String[] hostname = address.split(":");  
 
-         Integer weight = DEFAULT_WEIGHT;  
 
-         if (hostname.length == 3) {  
 
-             weight = Integer.valueOf(hostname[2]);  
 
-         }  
 
-         String ip = hostname[0];  
 
-         Integer port = Integer.valueOf(hostname[1]);  
 
-         List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
 
-         
 
-         for (int i = 0; i < weight; i++) {  
 
-             result.add(new InetSocketAddress(ip, port));  
 
-         }  
 
-         return result;  
 
-     }  
 
-   
 
-     @Override  
 
-     public List<InetSocketAddress> findServerAddressList() {  
 
-         return Collections.unmodifiableList(container);  
 
-     }  
 
-   
 
-     @Override  
 
-     public synchronized InetSocketAddress selector() {  
 
-         if (inner.isEmpty()) {  
 
-             if (!container.isEmpty()) {  
 
-                 inner.addAll(container);  
 
-             } else if (!trace.isEmpty()) {  
 
-                 synchronized (lock) {  
 
-                     for (String hostname : trace) {  
 
-                         container.addAll(transfer(hostname));  
 
-                     }  
 
-                     Collections.shuffle(container);  
 
-                     inner.addAll(container);  
 
-                 }  
 
-             }  
 
-         }  
 
-         return inner.poll();  
 
-     }  
 
-   
 
-     @Override  
 
-     public void close() {  
 
-         try {  
 
-             cachedPath.close();  
 
-             zkClient.close();  
 
-         } catch (Exception e) {  
 
-         }  
 
-     }  
 
-   
 
-     @Override  
 
-     public String getService() {  
 
-         return service;  
 
-     }  
 
-   
 
- }  
 
 
对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java
 
三、服务端服务注册实现
ThriftServiceServerFactory.java
 
 
四、客户端获取服务代理及连接池实现
客户端连接池实现:ThriftClientPoolFactory.java
 
- package cn.slimsmart.thrift.rpc;  
 
-   
 
- import java.net.InetSocketAddress;  
 
-   
 
- import org.apache.commons.pool.BasePoolableObjectFactory;  
 
- import org.apache.thrift.TServiceClient;  
 
- import org.apache.thrift.TServiceClientFactory;  
 
- import org.apache.thrift.protocol.TBinaryProtocol;  
 
- import org.apache.thrift.protocol.TProtocol;  
 
- import org.apache.thrift.transport.TFramedTransport;  
 
- import org.apache.thrift.transport.TSocket;  
 
- import org.apache.thrift.transport.TTransport;  
 
-   
 
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
 
-   
 
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {  
 
-   
 
-     private final ThriftServerAddressProvider serverAddressProvider;  
 
-     private final TServiceClientFactory<TServiceClient> clientFactory;  
 
-     private PoolOperationCallBack callback;  
 
-   
 
-     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  
 
-         this.serverAddressProvider = addressProvider;  
 
-         this.clientFactory = clientFactory;  
 
-     }  
 
-   
 
-     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,  
 
-             PoolOperationCallBack callback) throws Exception {  
 
-         this.serverAddressProvider = addressProvider;  
 
-         this.clientFactory = clientFactory;  
 
-         this.callback = callback;  
 
-     }  
 
-   
 
-     static interface PoolOperationCallBack {  
 
-         
 
-         void destroy(TServiceClient client);  
 
-   
 
-         
 
-         void make(TServiceClient client);  
 
-     }  
 
-   
 
-     public void destroyObject(TServiceClient client) throws Exception {  
 
-         if (callback != null) {  
 
-             try {  
 
-                 callback.destroy(client);  
 
-             } catch (Exception e) {  
 
-                 
 
-             }  
 
-         }  
 
-         TTransport pin = client.getInputProtocol().getTransport();  
 
-         pin.close();  
 
-     }  
 
-   
 
-     public boolean validateObject(TServiceClient client) {  
 
-         TTransport pin = client.getInputProtocol().getTransport();  
 
-         return pin.isOpen();  
 
-     }  
 
-   
 
-     @Override  
 
-     public TServiceClient makeObject() throws Exception {  
 
-         InetSocketAddress address = serverAddressProvider.selector();  
 
-         TSocket tsocket = new TSocket(address.getHostName(), address.getPort());  
 
-         TTransport transport = new TFramedTransport(tsocket);  
 
-         TProtocol protocol = new TBinaryProtocol(transport);  
 
-         TServiceClient client = this.clientFactory.getClient(protocol);  
 
-         transport.open();  
 
-         if (callback != null) {  
 
-             try {  
 
-                 callback.make(client);  
 
-             } catch (Exception e) {  
 
-                 
 
-             }  
 
-         }  
 
-         return client;  
 
-     }  
 
-   
 
- }  
 
 
客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java
 
 
- package cn.slimsmart.thrift.rpc;  
 
-   
 
- import java.lang.reflect.InvocationHandler;  
 
- import java.lang.reflect.Method;  
 
- import java.lang.reflect.Proxy;  
 
-   
 
- import org.apache.commons.pool.impl.GenericObjectPool;  
 
- import org.apache.thrift.TServiceClient;  
 
- import org.apache.thrift.TServiceClientFactory;  
 
- import org.springframework.beans.factory.FactoryBean;  
 
- import org.springframework.beans.factory.InitializingBean;  
 
-   
 
- import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;  
 
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
 
-   
 
- @SuppressWarnings({ "unchecked", "rawtypes" })  
 
- public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {  
 
-   
 
-     private Integer maxActive = 32;
 
-   
 
-     
 
-     
 
-     private Integer idleTime = 180000;  
 
-     private ThriftServerAddressProvider serverAddressProvider;  
 
-   
 
-     private Object proxyClient;  
 
-     private Class<?> objectClass;  
 
-   
 
-     private GenericObjectPool<TServiceClient> pool;  
 
-   
 
-     private PoolOperationCallBack callback = new PoolOperationCallBack() {  
 
-         @Override  
 
-         public void make(TServiceClient client) {  
 
-             System.out.println("create");  
 
-         }  
 
-   
 
-         @Override  
 
-         public void destroy(TServiceClient client) {  
 
-             System.out.println("destroy");  
 
-         }  
 
-     };  
 
-       
 
-     public void setMaxActive(Integer maxActive) {  
 
-         this.maxActive = maxActive;  
 
-     }  
 
-   
 
-     public void setIdleTime(Integer idleTime) {  
 
-         this.idleTime = idleTime;  
 
-     }  
 
-   
 
-     public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {  
 
-         this.serverAddressProvider = serverAddressProvider;  
 
-     }  
 
-   
 
-     @Override  
 
-     public void afterPropertiesSet() throws Exception {  
 
-         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
 
-         
 
-         objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");  
 
-         
 
-         Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");  
 
-         TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  
 
-         ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);  
 
-         GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  
 
-         poolConfig.maxActive = maxActive;  
 
-         poolConfig.minIdle = 0;  
 
-         poolConfig.minEvictableIdleTimeMillis = idleTime;  
 
-         poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;  
 
-         pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);  
 
-         proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {  
 
-             @Override  
 
-             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
 
-                 
 
-                 TServiceClient client = pool.borrowObject();  
 
-                 try {  
 
-                     return method.invoke(client, args);  
 
-                 } catch (Exception e) {  
 
-                     throw e;  
 
-                 } finally {  
 
-                     pool.returnObject(client);  
 
-                 }  
 
-             }  
 
-         });  
 
-     }  
 
-   
 
-     @Override  
 
-     public Object getObject() throws Exception {  
 
-         return proxyClient;  
 
-     }  
 
-   
 
-     @Override  
 
-     public Class<?> getObjectType() {  
 
-         return objectClass;  
 
-     }  
 
-   
 
-     @Override  
 
-     public boolean isSingleton() {  
 
-         return true;  
 
-     }  
 
-   
 
-     public void close() {  
 
-         if (serverAddressProvider != null) {  
 
-             serverAddressProvider.close();  
 
-         }  
 
-     }  
 
- }  
 
 
下面我们看一下服务端和客户端的配置;
 
服务端spring-context-thrift-server.xml
 
- <?xml version="1.0" encoding="UTF-8"?>  
 
- <beans xmlns="http://www.springframework.org/schema/beans"  
 
-     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
 
-     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
 
-     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
 
-                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
 
-                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
 
-                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
 
-     default-lazy-init="false">  
 
-   
 
-     
 
-     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
 
-         destroy-method="close">  
 
-         <property name="zkHosts"  
 
-             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
 
-         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
 
-         <property name="connectionTimeout" value="3000" />  
 
-         <property name="sessionTimeout" value="3000" />  
 
-         <property name="singleton" value="true" />  
 
-     </bean>  
 
-     <bean id="sericeAddressRegister"  
 
-         class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"  
 
-         destroy-method="close">  
 
-         <property name="zkClient" ref="thriftZookeeper" />  
 
-     </bean>  
 
-     <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />  
 
-   
 
-     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
 
-         destroy-method="close">  
 
-         <property name="service" ref="echoSerivceImpl" />  
 
-         <property name="port" value="9000" />  
 
-         <property name="version" value="1.0.0" />  
 
-         <property name="weight" value="1" />  
 
-         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
 
-     </bean>  
 
-       
 
-     <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
 
-         destroy-method="close">  
 
-         <property name="service" ref="echoSerivceImpl" />  
 
-         <property name="port" value="9001" />  
 
-         <property name="version" value="1.0.0" />  
 
-         <property name="weight" value="1" />  
 
-         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
 
-     </bean>  
 
-       
 
-     <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
 
-         destroy-method="close">  
 
-         <property name="service" ref="echoSerivceImpl" />  
 
-         <property name="port" value="9002" />  
 
-         <property name="version" value="1.0.0" />  
 
-         <property name="weight" value="1" />  
 
-         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
 
-     </bean>  
 
- </beans>  
 
 
客户端:spring-context-thrift-client.xml
- <?xml version="1.0" encoding="UTF-8"?>  
 
- <beans xmlns="http://www.springframework.org/schema/beans"  
 
-     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
 
-     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
 
-     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
 
-                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
 
-                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
 
-                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
 
-     default-lazy-init="false">  
 
-       
 
-     
 
-     <!--   
 
-     <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">  
 
-          <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
 
-          <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />  
 
-     </bean>  
 
-     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">  
 
-         <property name="maxActive" value="5" />  
 
-         <property name="idleTime" value="10000" />  
 
-         <property name="serverAddressProvider" ref="fixedAddressProvider" />  
 
-     </bean>  
 
-    -->  
 
-     
 
-     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
 
-         destroy-method="close">  
 
-         <property name="zkHosts"  
 
-             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
 
-         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
 
-         <property name="connectionTimeout" value="3000" />  
 
-         <property name="sessionTimeout" value="3000" />  
 
-         <property name="singleton" value="true" />  
 
-     </bean>  
 
-     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">  
 
-         <property name="maxActive" value="5" />  
 
-         <property name="idleTime" value="1800000" />  
 
-         <property name="serverAddressProvider">  
 
-             <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">  
 
-                 <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
 
-                 <property name="version" value="1.0.0" />  
 
-                 <property name="zkClient" ref="thriftZookeeper" />  
 
-             </bean>  
 
-         </property>  
 
-     </bean>  
 
- </beans>  
 
 
运行服务端后,我们可以看见zookeeper注册了多个服务地址。
 

详细实例这里就不详述了,请参考实例代码:https://github.com/slimina/thrift-zookeeper-rpc
关于Thrift设计优化文档:
Thrift RPC服务框架日志的优化
[转载] 基于zookeeper、连接池、Failover/LoadBalance等改造Thrift 服务化
原文:http://www.cnblogs.com/scott19820130/p/4919174.html