spark将计算结果写入到hdfs的两种方法
第一种方法:
rdd.saveAsTextFile(path, classOf[com.hadoop.compression.lzo.LzopCodec])
这种方法有这么几个问题
1、需要运行MapRreduce,输出目录必须不存在才可以
2、生成的lzo文件不会创建index文件,需要手动进行创建。
3、每个文件的名称不能自定义。
第二种方法是直接调用LzopOutputstream的接口和hdfs的api,直接操作hdfs文件。可以规避以上两个问题。
def main(args: Array[String]) { //保存的路径 val basePath = "/tmp/kuan" //设置日志级别 // Example.setStreamingLogLevels() //创建sparkConf val sparkConf = new SparkConf().setAppName("runJob") //设置master,此处设置本地执行 sparkConf.setMaster("local[*]") //创建SparkContext val sc = new SparkContext(sparkConf) //创建3个分区的RDD val rdd = sc.makeRDD(List("a", "b", "c", "d", "e", "f", "g", "宽"), 3).map(_ * 10) //在每个executor上执行的函数 //此处定义的是,针对每个分区,我们把计算好的结果写入到本地目录中 val func = (tc: TaskContext, it: Iterator[String]) => { //输出文件路径 val outFilePath = s"${basePath}/${tc.partitionId()}.lzo" val outIndexFilePath = s"${basePath}/${tc.partitionId()}.index" //****************开始往文件中写数据********************// //得到文件系统 val fs = FileSystem.get(new Configuration) //目标路径 val dstPath = new Path(outFilePath); val dstIndexPath = new Path(outIndexFilePath); //打开一个输出流 // val pw = new PrintWriter(new OutputStreamWriter(LzoUtil.getLzopOutputStream(new File(outFilePath), new File(outIndexFilePath)), Charset.forName("UTF-8"))) val pw = new PrintWriter(new OutputStreamWriter(LzoUtil.getLzopOutputStream(fs.create(dstPath), fs.create(dstIndexPath)), Charset.forName("UTF-8"))) try { var count = 0 while (it.hasNext) { //写数据 pw.println(it.next()) //增加计数 count = count + 1 //判断是否需要将数据写到硬盘中 if (count >= 1000) { //强制写入到存储中 pw.flush() //数量重新计算 count = 0 } } } finally { //关闭数据流 pw.close(); fs.close(); } //此处单机测试,所有的输出本机文件,如果分布式运行,那么输出文件还是放到hdfs吧 //测试输出 s"I Am Partition ${tc.partitionId()}" } //开始执行函数 val res = sc.runJob(rdd, func) //输出各个partition的执行结果.如果返回结果比较小,直接返回到driver res.foreach(println) }
创建com.hadoop.compression.lzo包,创建LzoUtil类
public class LzoUtil { private static int lzoBufferSize = 256 * 1024; /** * @param lzoOutFile 数据文件 * @param lzoIndexFile 索引文件 * @return LzopOutputStream * @throws Exception io异常 */ public static LzopOutputStream getLzopOutputStream(File lzoOutFile, File lzoIndexFile) throws Exception { return getLzopOutputStream(new FileOutputStream(lzoOutFile),new FileOutputStream(lzoIndexFile)); } public static LzopOutputStream getLzopOutputStream(OutputStream lzoOutputStream, OutputStream lzoIndexOutputStream) throws Exception { LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1; LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize); return new LzopOutputStream(lzoOutputStream, new DataOutputStream(lzoIndexOutputStream), lzoCompressor, lzoBufferSize, strategy); } }
每个task输出的文件的文件名可以自定义,同时可以生成索引文件
输出的目录如果不存在,可以在执行job之前进行创建。如果存在也不影响运行,生成的文件会进行覆盖。
原文:http://www.cnblogs.com/luckuan/p/5252580.html