首页 > 数据库技术 > 详细

sparksql系列(四) sparksql 操作数据库

时间:2019-10-20 14:31:18      阅读:78      评论:0      收藏:0      [点我收藏+]

一:SparkSql操作mysql

老规矩:先抽出来公共的方法:

import java.util.Arrays

import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SaveMode
import java.util.ArrayList


object WordCount {

  def dataAndJdbcoption() = {  

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD1 = javasc.parallelize(Arrays.asList("{‘id‘:‘7‘}","{‘id‘:‘8‘}","{‘id‘:‘9‘}"));
    val nameRDD1df = sparkSession.read.json(nameRDD1)

    val prop = new java.util.Properties
    prop.setProperty("user","root")
    prop.setProperty("password","123456")
    prop.setProperty("driver","com.mysql.jdbc.Driver")
    prop.setProperty("dbtable","blog")
    prop.setProperty("url","jdbc:mysql://127.0.0.1:3306/test")

    (nameRDD1df,prop)

  }

}

读mysql

    val df = dataAndJdbcoption()._1
    val prop = dataAndJdbcoption()._2

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val data = sparkSession.read.format("jdbc").option("user","root").option("password","123456")
      .option("driver","com.mysql.jdbc.Driver")
.      option("url","jdbc:mysql://127.0.0.1:3306/test").option("dbtable", "blog")
      .load()
    data.show(100)

写mysql

  val df = dataAndJdbcoption()._1
  val prop = dataAndJdbcoption()._2
  df.write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), prop.getProperty("dbtable"), prop)

二:SparkSql操作Hive

公司读Hive数据

                     其实是读Hive表的location的文件,生成最终的文件。

公司写Hive数据

                     生成文件后将数据load进Hive

直接使用Sql操作Hive的数据       

    val conf = new SparkConf().setAppName("WordCount")
    //合并小文件,sparksql默认有200个task执行文件,会生成很多小文件。其实有很多参数可以优化详见sparkSession.sql("SET -v")

    conf.set("mapreduce.input.fileinputformat.split.minsize","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.minsize.per.node","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize.per.node","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize.per.rack","1024000000")
    val sparkSession= SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()

      sparkSession.sql("insert into table table1 select aa from sparksqlTempTable")

 

    除了上述方法可以合并文件之外,还有一种方法可以合并文件:

    val dataFrame = sparkSession.sql("select aa from table ").coalesce(3);//日志看task数量3
    dataFrame.createOrReplaceTempView("sparksqlTempTable")
    sparkSession.sql("insert into table table1 select aa from sparksqlTempTable")

 

    但是这种方法并不实用,因为大部分操作的Sql操作是需要insert。

 

    网上说还有第三种方法:

    即在Sql中插入一个REPARTITION(4),但是我在实验过程中并没有作用,可能这种语法只是针对于HiveSql本身,使用SparkSql并没有作用。

    栗子:select /*+ REPARTITION(4) */ aa from table 

sparksql系列(四) sparksql 操作数据库

原文:https://www.cnblogs.com/wuxiaolong4/p/11707341.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!