在上一个版本中,实现了使用HBase的协处理器将HBase的二级索引同步到Solr中,但是仍旧有几个缺陷:
针对上面的三个主要问题,我们一一解决
Map[表名->List[(Collection1,List[Columns]),(Collection2,List[Columns])...]]这样的类型,根据表名获取所有的Collection和Column。e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString()其中e是ObserverContext使用typesafe的config组件读取morphlines.conf文件,将内容转换为 Map<String,List<HBaseIndexerMappin>>。具体代码如下
public class ConfigManager {private static SourceConfig sourceConfig = new SourceConfig();public static Config config;static {sourceConfig.setConfigFiles("morphlines.conf");config = sourceConfig.getConfig();}public static Map<String,List<HBaseIndexerMappin>> getHBaseIndexerMappin(){Map<String,List<HBaseIndexerMappin>> mappin = new HashMap<String, List<HBaseIndexerMappin>>();Config mappinConf = config.getConfig("Mappin");List<String> tables = mappinConf.getStringList("HBaseTables");for (String table :tables){List<Config> confList = (List<Config>) mappinConf.getConfigList(table);List<HBaseIndexerMappin> maps = new LinkedList<HBaseIndexerMappin>();for(Config tmp :confList){HBaseIndexerMappin map = new HBaseIndexerMappin();map.solrConnetion = tmp.getString("SolrCollection");map.columns = tmp.getStringList("Columns");maps.add(map);}mappin.put(table,maps);}return mappin;}}
因为目前我使用的环境是Solr和HBase公用的同一套Zookeeper,因此我们完全可以借助HBase的Zookeeper信息。HBase的协处理器是运行在HBase的环境中的,自然可以通过HBase的Configuration获取当前的Zookeeper节点和端口,然后轻松的获取到Solr的地址。
public class SolrServerManager implements LogManager {static Configuration conf = HBaseConfiguration.create();public static String ZKHost = conf.get("hbase.zookeeper.quorum","bqdpm1,bqdpm2,bqdps2");public static String ZKPort = conf.get("hbase.zookeeper.property.clientPort","2181");public static String SolrUrl = ZKHost + ":" + ZKPort + "/" + "solr";public static int zkClientTimeout = 1800000;// 心跳public static int zkConnectTimeout = 1800000;// 连接时间public static CloudSolrServer create(String defaultCollection){log.info("Create SolrCloudeServer .This collection is " + defaultCollection);CloudSolrServer solrServer = new CloudSolrServer(SolrUrl);solrServer.setDefaultCollection(defaultCollection);solrServer.setZkClientTimeout(zkClientTimeout);solrServer.setZkConnectTimeout(zkConnectTimeout);return solrServer;}}
理想状态下,我们时时刻刻都需要提交数据到Solr中,但是事实上我们数据写入的时间是比较分散的,可能集中再每一天的某几个时间点。因此我们必须保证在高并发下能达到一定数据量自动提交,在低并发的情况下能隔一段时间写入一次。只有两种机制并存的情况下才能保证数据能即时写入。
public class SolrCommitTimer extends TimerTask implements LogManager {public Map<String,List<SolrInputDocument>> putCache = new HashMap<String, List<SolrInputDocument>>();//Collection名字->更新(插入)操作缓存public Map<String,List<String>> deleteCache = new HashMap<String, List<String>>();//Collection名字->删除操作缓存Map<String,CloudSolrServer> solrServers = new HashMap<String, CloudSolrServer>();//Collection名字->SolrServersint maxCache = ConfigManager.config.getInt("MaxCommitSize");// 任何时候,保证只能有一个线程在提交索引,并清空集合final static Semaphore semp = new Semaphore(1);//添加Collection和SolrServerpublic void addCollecttion(String collection,CloudSolrServer server){this.solrServers.put(collection,server);}//往Solr添加(更新)数据public UpdateResponse put(CloudSolrServer server,SolrInputDocument doc) throws IOException, SolrServerException {server.add(doc);return server.commit(false, false);}//往Solr添加(更新)数据public UpdateResponse put(CloudSolrServer server,List<SolrInputDocument> docs) throws IOException, SolrServerException {server.add(docs);return server.commit(false, false);}//根据ID删除Solr数据public UpdateResponse delete(CloudSolrServer server,String rowkey) throws IOException, SolrServerException {server.deleteById(rowkey);return server.commit(false, false);}//根据ID删除Solr数据public UpdateResponse delete(CloudSolrServer server,List<String> rowkeys) throws IOException, SolrServerException {server.deleteById(rowkeys);return server.commit(false, false);}//将doc添加到缓存public void addPutDocToCache(String collection, SolrInputDocument doc) throws IOException, SolrServerException, InterruptedException {semp.acquire();log.debug("addPutDocToCache:" + "collection=" + collection + "data=" + doc.toString());if(!putCache.containsKey(collection)){List<SolrInputDocument> docs = new LinkedList<SolrInputDocument>();docs.add(doc);putCache.put(collection,docs);}else {List<SolrInputDocument> cache = putCache.get(collection);cache.add(doc);if (cache.size() >= maxCache) {try {this.put(solrServers.get(collection), cache);} finally {putCache.get(collection).clear();}}}semp.release();//释放信号量}//添加删除操作到缓存public void addDeleteIdCache(String collection,String rowkey) throws IOException, SolrServerException, InterruptedException {semp.acquire();log.debug("addDeleteIdCache:" + "collection=" + collection + "rowkey=" + rowkey);if(!deleteCache.containsKey(collection)){List<String> rowkeys = new LinkedList<String>();rowkeys.add(rowkey);deleteCache.put(collection,rowkeys);}else{List<String> cache = deleteCache.get(collection);cache.add(rowkey);if (cache.size() >= maxCache) {try{this.delete(solrServers.get(collection),cache);}finally {putCache.get(collection).clear();}}}semp.release();//释放信号量}@Overridepublic void run() {try {semp.acquire();log.debug("开始插入....");Set<String> collections = solrServers.keySet();for(String collection:collections){if(putCache.containsKey(collection) && (!putCache.get(collection).isEmpty()) ){this.put(solrServers.get(collection),putCache.get(collection));putCache.get(collection).clear();}if(deleteCache.containsKey(collection) && (!deleteCache.get(collection).isEmpty())){this.delete(solrServers.get(collection),deleteCache.get(collection));deleteCache.get(collection).clear();}}} catch (InterruptedException e) {e.printStackTrace();} catch (Exception e) {log.error("Commit putCache to Solr error!Because :" + e.getMessage());}finally {semp.release();//释放信号量}}}
在每个prePut和preDelete中拦截操作信息,记录表名、列名、值。将这些信息根据表名和Collection名进行分类写入缓存。
public class HBaseIndexerToSolrObserver extends BaseRegionObserver implements LogManager{Map<String,List<HBaseIndexerMappin>> mappins = ConfigManager.getHBaseIndexerMappin();Timer timer = new Timer();int maxCommitTime = ConfigManager.config.getInt("MaxCommitTime"); //最大提交时间,sSolrCommitTimer solrCommit = new SolrCommitTimer();public HBaseIndexerToSolrObserver(){log.info("Initialization HBaseIndexerToSolrObserver ...");for(Map.Entry<String,List<HBaseIndexerMappin>> entry : mappins.entrySet() ){List<HBaseIndexerMappin> solrmappin = entry.getValue();for(HBaseIndexerMappin map:solrmappin){String collection = map.solrConnetion;//获取Collection名字log.info("Create Solr Server connection .The collection is " + collection);CloudSolrServer solrserver = SolrServerManager.create(collection);//根据Collection初始化SolrServer连接solrCommit.addCollecttion(collection,solrserver);}}timer.schedule(solrCommit, 10 * 1000L, maxCommitTime * 1000L);}@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e,Put put, WALEdit edit, Durability durability) throws IOException {String table = e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();//获取表名String rowkey= Bytes.toString(put.getRow());//获取主键SolrInputDocument doc = new SolrInputDocument();List<HBaseIndexerMappin> mappin = mappins.get(table);for(HBaseIndexerMappin mapp : mappin){for(String column : mapp.columns){String[] tmp = column.split(":");String cf = tmp[0];String cq = tmp[1];if(put.has(Bytes.toBytes(cf),Bytes.toBytes(cq))){Cell cell = put.get(Bytes.toBytes(cf),Bytes.toBytes(cq)).get(0);//获取制定列的数据Map<String, String > operation = new HashMap<String,String>();operation.put("set",Bytes.toString(CellUtil.cloneValue(cell)));doc.setField(cq,operation);//使用原子更新的方式将HBase二级索引写入Solr}}doc.addField("id",rowkey);try {solrCommit.addPutDocToCache(mapp.solrConnetion,doc);//添加doc到缓存} catch (SolrServerException e1) {e1.printStackTrace();} catch (InterruptedException e1) {e1.printStackTrace();}}}@Overridepublic void postDelete(ObserverContext<RegionCoprocessorEnvironment> e,Delete delete,WALEdit edit,Durability durability) throws IOException{String table = e.getEnvironment().getRegion().getTableDesc().getTableName().getNameAsString();String rowkey= Bytes.toString(delete.getRow());List<HBaseIndexerMappin> mappin = mappins.get(table);for(HBaseIndexerMappin mapp : mappin){try {solrCommit.addDeleteIdCache(mapp.solrConnetion,rowkey);//添加删除操作到缓存} catch (SolrServerException e1) {e1.printStackTrace();} catch (InterruptedException e1) {e1.printStackTrace();}}}}
首先需要添加morphlines.conf文件。里面包含了需要同步数据到Solr的HBase表名、对应的Solr Collection的名字、要同步的列、多久提交一次、最大批次容量的相关信息。具体配置如下:
#最大提交时间(单位:秒)MaxCommitTime = 30#最大批次容量MaxCommitSize = 10000Mappin {HBaseTables: ["HBASE_OBSERVER_TEST"] #需要同步的HBase表名"HBASE_OBSERVER_TEST": [{SolrCollection: "bqjr" #Solr Collection名字Columns: ["cf1:test_age", #需要同步的列,格式<列族:列>"cf1:test_name"]},]}
该配置文件默认放在各个节点的/etc/hbase/conf/下。如果你希望将配置文件路径修改为其他路径,请修改com.bqjr.bigdata.HBaseObserver.comm.config.SourceConfig类中的configHome路径。
然后将代码打包,上传到HDFS中,将协处理器添加到对应的表中。
#先禁用这张表disable ‘HBASE_OBSERVER_TEST‘#为这张表添加协处理器,设置的参数具体为: jar文件路径|类名|优先级(SYSTEM或者USER)alter ‘HBASE_OBSERVER_TEST‘,‘coprocessor‘=>‘hdfs://hostname:8020/ext_lib/HBaseObserver-1.0.0.jar|com.bqjr.bigdata.HBaseObserver.server.HBaseIndexerToSolrObserver||‘#启用这张表enable ‘HBASE_OBSERVER_TEST‘#删除某个协处理器,"$<bumber>"后面跟的ID号与desc里面的ID号相同alter ‘HBASE_OBSERVER_TEST‘,METHOD=>‘table_att_unset‘,NAME => ‘coprocessor$1‘
如果需要新增一张表同步到Solr。只需要修改morphlines.conf文件,分发倒各个节点。然后将协处理器添加到HBase表中,这样就不用再次修改代码了。
原文:http://www.cnblogs.com/kekukekro/p/6478794.html