首页 > 其他 > 详细

Spark的dataframe转rdd通用工具类

时间:2020-11-05 08:38:00      阅读:39      评论:0      收藏:0      [点我收藏+]

需求解决问题

当每次读取hive表或者其他数据源,获取数据,相对其进行rdd操作,遇到任何类都需要df.rdd(row>row.getstring(0))去获取,就很麻烦,所以可以实现个通用的转换方式

1.dataframe转为rdd通用方法

 /**
   * 根据schema信息进行判断与封装
   *
   * @param dataType
   * @return
   */
  def squareRowkey2(dataType: (StructField, Int)): (Row) => Any = {
    val (structField, index) = dataType
    structField.dataType match {
      case StringType =>
        (row: Row) => row.getString(index)
      case LongType =>
        (row: Row) => row.getLong(index)
      case FloatType =>
        (row: Row) => row.getFloat(index)
      case DoubleType =>
        (row: Row) => row.getDouble(index)
      case IntegerType =>
        (row: Row) => row.getInt(index)
      case BooleanType =>
        (row: Row) => row.getBoolean(index)
      case DateType =>
        (row: Row) => row.getDate(index)
      case TimestampType =>
        (row: Row) => row.getTimestamp(index)
      case BinaryType =>
        (row: Row) => row.getAs[Array[Byte]](index)
//遇到array与json(struct)类型进行返回为json字符串更好一点,这样所以的结果都可以转为string case ArrayType(elementType, containsNull) => (row: Row) => { val value: mutable.WrappedArray[_ >: Integer with String <: io.Serializable with Comparable[_ >: Integer with String]] = elementType match { case IntegerType => { row.getAs[mutable.WrappedArray[Integer]](index) } case StringType => { row.getAs[mutable.WrappedArray[String]](index) } case _ => row.getAs[mutable.WrappedArray[String]](index) } //这儿必须转换为java的list 防止map转json字符串不符合要求 if (value == null) { util.Collections.emptyList() }
//这儿没转数组是应为我后续有用到 JavaConversions.bufferAsJavaList(value.toBuffer) } case StructType(fields) => (row: Row) => row.getAs[mutable.Map[String, String]](index) case _ => (row: Row) => row.getString(index) } }


 二、rdd转实体对象

大多数是都是讲数据分装为case calss或者对象

  def  dataFrameToEntity [U: ClassTag] (frame: DataFrame, clazz: Class[U], hiveRdd: RDD[Array[Any]]) = {
    val fields: Array[StructField] = frame.schema.toArray
    val rdd = hiveRdd.map(array => {
      val map = new util.HashMap[String, Any]()
      fields.map(_.name).zip(array)
        .foreach {
          case (k, v) => (map.put(k, v))
        }
      val str = GsonUtil.toJsonString(map)
//这边转换工具类 就是gson的转为对象的方法 val value: U = GsonUtil.GsonToBean(str, clazz) value }) rdd }

  

Spark的dataframe转rdd通用工具类

原文:https://www.cnblogs.com/hejunhong/p/13929463.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!