首页 > 其他 > 详细

基于tcp的订阅推送服务实现

时间:2014-03-05 18:05:43      阅读:1093      评论:0      收藏:0      [点我收藏+]

1、业务背景  

     服务后台实时收集着全国近100个城市的出租车、手机和pad等移动终端的位置点gps信息,然后根据gps所在城市区域,分发给不同的有权限合作商。

     其业务逻辑图如下:

                                  bubuko.com,布布扣

 

    

    参考hadoop的RPC模块,实现一套订阅发布实时推送服务

 

2、架构图   

说明:

3、类说明

 

4、性能优化

 4.1 异步发送数据

    异步发送逻辑如下: 

bubuko.com,布布扣
 1 private int channelIO(WritableByteChannel writeCh, ByteBuffer buf)
 2             throws IOException {
 3         int originalLimit = buf.limit();
 4         int initialRemaining = buf.remaining();
 5         int ret = 0;
 6         while (buf.remaining() > 0) {
 7             try {
 8                 int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
 9                 buf.limit(buf.position() + ioSize);
10                 ret = writeCh.write(buf);
11                 if (ret < ioSize) {
12                     break;
13                 }
14             } finally {
15                 buf.limit(originalLimit);
16             }
17         }
18         int nBytes = initialRemaining - buf.remaining(); 
19         return (nBytes > 0) ? nBytes : ret;
20     }
bubuko.com,布布扣

     这里主要有两优化点:①为防止待写的数据量过大导致独占线程时间片过长,在8行代码对ByteBuffer进行切分发送,②在11行代码,通道没法写完数据时,应让出线程,立刻返回注册到selector,待下次writeCh通道变成writable可写状态时,再进行channelIO写操作(这是niobio的最大区别)。

   4.2、使用多selector机制,分离网络读、写操作

      ReadSelector负责监听用户的请求和鉴权响应,若用户请求为合法,则把相应连接注册给WriteSelector;WriteSelector负责将接收的实时gps点数据推送给已鉴权成功的注册用户连接        

   4.3、使用多selector机制,进行异步写数据

     可以根据客户端的端口hash到不同的Selector上去执行写的操作,如下:

    private Responder selectResponder(int remotePort){
        int index = Math.abs(remotePort % responderCount);
        return responders[index];
    }

 

5 容错健壮性

      最后还得考虑实时数据流大和频率高的特征,当存在网络不好或带宽不足时,服务会存在数据发送不赢而导致堆积的潜在风险。所以为每个连接增加个队列ResponseQueue,来维护待发送数据集。只有数据队列中存在数据时,就将相应连接注册到WriteSelector。如下图:

                                        bubuko.com,布布扣

        这里主要用到两个trick:  

    5.1、避免服务数据堆积

     当网络状况不好对方接收较慢或者发送数据量比较大时,这两种情况下,都会造成服务数据堆积。因此,引入参数连接的缓冲数据队列大小限制maxAllowedQueueSize如果数据批次队列大于maxAllowedQueueSize,则直接丢弃,避免数据无上限增长,如下代码:

bubuko.com,布布扣
void doRespond(Call call) throws IOException {
        try {
                    synchronized (call.connection.responseQueue) {
                        if (call.connection.responseQueue.size() < maxAllowedQueueSize) {
                            call.connection.responseQueue.addLast(call);
                            if (call.connection.responseQueue.size() == 1) {
                                processResponse(call.connection.responseQueue, true);
                            }
                        } else {
                            logger.warn(
                                    "incoming data discarded from connection {}",
                                    call.connection);
                        }
                    }
                } catch (NullPointerException e) {
                    logger.error(e.getMessage(), e);
                }
            }
bubuko.com,布布扣


     5.2 、定期清理关掉坏掉的连接资源

     这里的坏掉是指数据在一段时间内一直停留在连接connection的数据队列里,则认为该连接已失效而直接清理队列数据后关掉连接。代码如下:

bubuko.com,布布扣
            private void doPurge(Call call, long now) throws IOException {
              if(call.connection == null || call.connection.responseQueue == null){
                  return ;
              }
              LinkedList<Call> responseQueue = call.connection.responseQueue;
              synchronized (responseQueue) {
                Iterator<Call> iter = responseQueue.listIterator(0);
                while (iter.hasNext()) {
                  call = iter.next();
                  if (now > call.timestamp + PURGE_INTERVAL) {
                    logger.info("dalay of current connection {}  exceeds 10 mins",call.connection);
                    closeConnection(call.connection);
                    
} } } }
bubuko.com,布布扣

     最后希望本文对有类似需求的网友能提供参考。

基于tcp的订阅推送服务实现,布布扣,bubuko.com

基于tcp的订阅推送服务实现

原文:http://www.cnblogs.com/gisorange/p/3581493.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!