首先还是pom文件:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.12</scala.version> <spark.version>2.4.5</spark.version> <hadoop.version>2.7.7</hadoop.version> <scala.compat.version>2.11</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.45</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies>
代码:读mysql
import java.sql.DriverManager
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("readMysql").setMaster("local[*]")
val sparkContext = new SparkContext(sparkConf)
val jdbcrdd: JdbcRDD[String] = new JdbcRDD(sparkContext
, ()=>{
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://hadoop01:3306/transaction", "root", "root")
}
, "select * from orders where realTotalMoney>? and realTotalMoney<?"
, 150
, 151
, 1
, (r) => {
r.getString(1)+","+
r.getString(2)+","+
r.getString(3)+","+
r.getString(4)+","+
r.getString(5)
}
)
jdbcrdd.foreach(println)
print(jdbcrdd.count())
sparkContext.stop()
}
}
写入mysql,这里有效率问题需要注意:
低效版本:
import java.sql.DriverManager
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddToMysql {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")
val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)
val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)
rdd.foreach{ case (a: Int, b: String, c: Int) => {
Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")
val sql = "insert into student(id,name,age) values(?,?,?)"
val preparedStatement = connection.prepareStatement(sql)
preparedStatement.setInt(1, a)
preparedStatement.setString(2, b)
preparedStatement.setInt(3, c)
preparedStatement.executeUpdate()
preparedStatement.close()
}}
sparkContext.stop()
}
}
效率提升版本:
import java.sql.DriverManager
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object RddToMysql {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("rddToMysql").setMaster("local[*]")
val sparkContext: SparkContext = SparkContext.getOrCreate(sparkConf)
val rdd: RDD[(Int, String, Int)] = sparkContext.parallelize(List((1, "yls", 31), (2, "byl", 27), (3, "yms", 29)),1)
rdd.foreachPartition{case it:Iterator[(Int,String,Int)]=>{
Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://hadoop01:3306/test", "root", "root")
val sql = "insert into student(id,name,age) values(?,?,?)"
it.foreach{case (a:Int,b:String,c:Int)=>{
val preparedStatement = connection.prepareStatement(sql)
preparedStatement.setInt(1, a)
preparedStatement.setString(2, b)
preparedStatement.setInt(3, c)
preparedStatement.executeUpdate()
preparedStatement.close()
}
}
}}
sparkContext.stop()
}
}
原文:https://www.cnblogs.com/ls-oyang/p/13337863.html