import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD //map(func) object rdd2 { def main(args: Array[String]): Unit = { //本地模式 val conf: SparkConf = new SparkConf().setAppName("My scala word count").setMaster("local") //创建spark上下文对象 val sc = new SparkContext(conf) //1)map对所有数据进行操作 val listRDD: RDD[Int] = sc.makeRDD(1 to 10,2) val mapRDD: RDD[Int] = listRDD.map(data=>data*2) mapRDD.collect().foreach(println) //2)mapPartions对所有分区进行操作 //mapPartitions效率比map高 //mapPartitions可能会发生内存溢出 val mapPartitionsRDD: RDD[Int] = listRDD.mapPartitions(datas => { datas.map(data=>data*2) }) mapPartitionsRDD.collect().foreach(println) //3)mapPartitionsWithIndex算子,分区号 val tupleRDD: RDD[(Int, Int)] = listRDD.mapPartitionsWithIndex { case (num, datas) => { datas.map(data => (data, num)) } } tupleRDD.collect().foreach(println) } }
原文:https://www.cnblogs.com/hapyygril/p/13691433.html