package stuSpark.com;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
public class JavaSparkJDBCSQL {
public static void main(String[] args)throws IOException{
System.out.println("begin");
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkJDBCSQL").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(sc);
//设置数据库连接参数
Map<String,String> dBConOption = new HashMap<String,String>();
dBConOption.put("url", "jdbc:oracle:thin:@127.0.0.1:1521:ORCL");
dBConOption.put("user", "Xho");
dBConOption.put("password", "sys");
dBConOption.put("driver", "oracle.jdbc.driver.OracleDriver");
dBConOption.put("dbtable", "NUMB");
DataFrameReader dfRead = sqlContext.read().format("jdbc").options(dBConOption);
DataFrame df=dfRead.load();
//注册为表,然后在SQL语句中使用
df.registerTempTable("lk");
// SQL可以在已注册为表的RDDS上运行
DataFrame df2 = sqlContext.sql("select * from lk");
df2.show();
/*+---+---+-----+----+
|ONE|TWO|THREE|FOUR|
+---+---+-----+----+
| a| b| c| d|
| a| a| b| b|
| c| c| a| d|
| a| a| c| s|
| m| s| b| j|
| a| l| o| k|
+---+---+-----+----+*/
List<String> list = df2.toJavaRDD().map(new Function<Row, String>(){
public String call(Row row){
return row.getString(0);
}
}).collect();
JavaRDD<String> words = df2.toJavaRDD().flatMap(new FlatMapFunction<Row,String>(){
public Iterable<String> call(Row row){
List<String> ll = new ArrayList<String>();
for (int i = 0; i < row.length(); i++) {
ll.add(row.getString(i));
}
return ll;
}
});
//maptopair 将集合数据存为key value
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
//reduceBykey 根据key聚集,对value进行操作
JavaPairRDD<String, Integer> counts = ones
.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
//collect封装返回一个数组
List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
/*d: 2
s: 2
a: 7
k: 1
b: 4
o: 1
j: 1
l: 1
m: 1
c: 4*/
sc.stop();
System.out.println("end");
}
}
原文:https://www.cnblogs.com/ToDoNow/p/9542823.html