使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。
先看下,整体的拓扑图:

然后,再来看下,使用scala写的spark程序:
- package com.easy.build.index
-
- import java.util
-
- import org.apache.solr.client.solrj.beans.Field
- import org.apache.solr.client.solrj.impl.HttpSolrClient
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- import scala.annotation.meta.field
-
- case class Record(
- @(Field@field)("rowkey") rowkey:String,
- @(Field@field)("title") title:String,
- @(Field@field)("content") content:String,
- @(Field@field)("isdel") isdel:String,
- @(Field@field)("t1") t1:String,
- @(Field@field)("t2")t2:String,
- @(Field@field)("t3")t3:String,
- @(Field@field)("dtime") dtime:String
-
-
- )
-
- object SparkIndex {
-
-
- val client=new HttpSolrClient("http://192.168.1.188:8984/solr/monitor");
-
- val batchCount=10000;
-
- def main2(args: Array[String]) {
-
- val d1=new Record("row1","title","content","1","01","57","58","3");
- val d2=new Record("row2","title","content","1","01","57","58","45");
- val d3=new Record("row3","title","content","1","01","57","58",null);
- client.addBean(d1);
- client.addBean(d2)
- client.addBean(d3)
- client.commit();
- println("提交成功!")
-
-
- }
-
-
-
- def indexPartition(lines:scala.Iterator[String] ): Unit ={
-
- val datas = new util.ArrayList[Record]()
-
- lines.foreach(line=>indexLineToModel(line,datas))
-
- commitSolr(datas,true);
- }
-
-
- def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={
-
- if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {
- client.addBeans(datas);
- client.commit();
- datas.clear();
- }
- }
-
-
-
- def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={
-
- val fields=line.split("\1",-1).map(field =>etl_field(field))
-
- val tuple=buildTuble(fields)
-
- val recoder=Record.tupled(tuple)
-
- datas.add(recoder);
-
- commitSolr(datas,false);
- }
-
-
-
- def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={
- array match {
- case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)
- }
- }
-
-
-
- def etl_field(field:String):String={
- field match {
- case "" => null
- case _ => field
- }
- }
-
-
- def deleteSolrByQuery(query:String): Unit ={
- client.deleteByQuery(query);
- client.commit()
- println("删除成功!")
- }
-
-
- def main(args: Array[String]) {
-
- deleteSolrByQuery("t1:03")
-
- val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";
-
- System.setProperty("user.name", "webmaster");
-
- val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");
-
- val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"
- conf.setJars(seq)
-
- val sc = new SparkContext(conf);
-
- val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");
-
- indexRDD(rdd);
-
- client.close();
-
- sc.stop();
-
-
- }
-
-
-
- def indexRDD(rdd:RDD[String]): Unit ={
-
- rdd.foreachPartition(line=>indexPartition(line));
- }
-
-
-
- }
ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client ) 模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建
如何使用Spark大规模并行构建索引
原文:http://www.cnblogs.com/qindongliang/p/5175189.html