首页 > 其他 > 详细

spark将计算结果写入到hdfs的两种方法

时间:2016-03-08 07:00:43      阅读:4352      评论:0      收藏:0      [点我收藏+]

spark将计算结果写入到hdfs的两种方法
第一种方法:

rdd.saveAsTextFile(path, classOf[com.hadoop.compression.lzo.LzopCodec])

这种方法有这么几个问题

1、需要运行MapRreduce,输出目录必须不存在才可以

2、生成的lzo文件不会创建index文件,需要手动进行创建。

3、每个文件的名称不能自定义。


第二种方法是直接调用LzopOutputstream的接口和hdfs的api,直接操作hdfs文件。可以规避以上两个问题。

def main(args: Array[String]) {

    //保存的路径
    val basePath = "/tmp/kuan"

    //设置日志级别
    //    Example.setStreamingLogLevels()
    //创建sparkConf
    val sparkConf = new SparkConf().setAppName("runJob")
    //设置master,此处设置本地执行
    sparkConf.setMaster("local[*]")
    //创建SparkContext
    val sc = new SparkContext(sparkConf)

    //创建3个分区的RDD
    val rdd = sc.makeRDD(List("a", "b", "c", "d", "e", "f", "g", "宽"), 3).map(_ * 10)

    //在每个executor上执行的函数
    //此处定义的是,针对每个分区,我们把计算好的结果写入到本地目录中
    val func = (tc: TaskContext, it: Iterator[String]) => {

      //输出文件路径
      val outFilePath = s"${basePath}/${tc.partitionId()}.lzo"
      val outIndexFilePath = s"${basePath}/${tc.partitionId()}.index"

      //****************开始往文件中写数据********************//
      //得到文件系统
      val fs = FileSystem.get(new Configuration)
      //目标路径
      val dstPath = new Path(outFilePath);
      val dstIndexPath = new Path(outIndexFilePath);
      //打开一个输出流
      //      val pw = new PrintWriter(new OutputStreamWriter(LzoUtil.getLzopOutputStream(new File(outFilePath), new File(outIndexFilePath)), Charset.forName("UTF-8")))
      val pw = new PrintWriter(new OutputStreamWriter(LzoUtil.getLzopOutputStream(fs.create(dstPath), fs.create(dstIndexPath)), Charset.forName("UTF-8")))

      try {
        var count = 0
        while (it.hasNext) {
          //写数据
          pw.println(it.next())
          //增加计数
          count = count + 1
          //判断是否需要将数据写到硬盘中
          if (count >= 1000) {

            //强制写入到存储中
            pw.flush()
            //数量重新计算
            count = 0
          }
        }
      } finally {
        //关闭数据流
        pw.close();
        fs.close();
      }
      
      //此处单机测试,所有的输出本机文件,如果分布式运行,那么输出文件还是放到hdfs吧
      //测试输出
      s"I Am Partition ${tc.partitionId()}"
    }

    //开始执行函数
    val res = sc.runJob(rdd, func)
    //输出各个partition的执行结果.如果返回结果比较小,直接返回到driver
    res.foreach(println)
  }

创建com.hadoop.compression.lzo包,创建LzoUtil类

 public class LzoUtil {
    private static int lzoBufferSize = 256 * 1024;
    /**
     * @param lzoOutFile   数据文件
     * @param lzoIndexFile 索引文件
     * @return LzopOutputStream
     * @throws Exception io异常
     */
    public static LzopOutputStream getLzopOutputStream(File lzoOutFile, File lzoIndexFile) throws Exception {
        return getLzopOutputStream(new FileOutputStream(lzoOutFile),new FileOutputStream(lzoIndexFile));
    }

    public static LzopOutputStream getLzopOutputStream(OutputStream lzoOutputStream, OutputStream lzoIndexOutputStream) throws Exception {
        LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1;
        LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize);
        return new LzopOutputStream(lzoOutputStream, new DataOutputStream(lzoIndexOutputStream), lzoCompressor, lzoBufferSize, strategy);
    }
}

每个task输出的文件的文件名可以自定义,同时可以生成索引文件

输出的目录如果不存在,可以在执行job之前进行创建。如果存在也不影响运行,生成的文件会进行覆盖。

spark将计算结果写入到hdfs的两种方法

原文:http://www.cnblogs.com/luckuan/p/5252580.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!