Transformation(转换):根据数据集创建一个新的 数据集,计算后返回一个新的RDD。例如,一个RDD进行map操作后,生成了新的RDD。
Action(动作):对RDD结果计算返回一个数值value给驱动程序,或者把结果存储到外部存储系统中;
例如:collect算子将数据集的所有元数据收集完成返回给驱动程序。
RDD中的所有转换都是延迟加载的,也就是说,他们并不会直接计算结果。相反的,他们只是记住这些应用到基础数据集(例如一个文件)上转换动作。只有当发生一个要求返回结果给Driver的动作或者将结果写入到外部存储中,这写转换才会真正的运行,这种设计让Spark更加有效率的运行。
Action算子返回结果或保存结果,如count,collect,save等,Action操作是返回结果或将结果写入存储的操作,Action是Spark应用程序真正执行的触发动作 .
说明:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24 scala> source.collect() res4: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> var source2 = source.map(_+1) source2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:25 scala> source2.collect() res5: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)

l类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]。
假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区
scala> val rdd = sc.parallelize(List(("kpop","female"),("zorro","male"),("mobin","male"),("lucy","female")))
rdd: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[16] at parallelize at <console>:24
scala> :paste
// Entering paste mode (ctrl-D to finish)
def partitionsFun(iter : Iterator[(String,String)]) : Iterator[String] = {
var woman = List[String]()
while (iter.hasNext){
val next = iter.next()
next match {
case (_,"female") => woman = next._1 :: woman
case _ =>
}
}
woman.iterator
}
// Exiting paste mode, now interpreting.
partitionsFun: (iter: Iterator[(String, String)])Iterator[String]
scala> val result = rdd.mapPartitions(partitionsFun)
result: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at mapPartitions at <console>:28
scala> result.collect()
res13: Array[String] = Array(kpop, lucy)


将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]
scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at <console>:24 scala> rdd.glom().collect() res25: Array[Array[Int]] = Array(Array(1, 2, 3, 4), Array(5, 6, 7, 8), Array(9, 10, 11, 12), Array(13, 14, 15, 16))

类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:24 scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at <console>:26 scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
scala> var sourceFilter = sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi")) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24 scala> val filter = sourceFilter.filter(_.contains("xiao")) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at <console>:26 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

原文:https://www.cnblogs.com/phy2020/p/12747593.html