首页 > 其他 > 详细

spark-wordcount

时间:2020-05-03 23:00:28      阅读:47      评论:0      收藏:0      [点我收藏+]
package cn.spark.study.core;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.sql.SQLOutput;
import java.util.Arrays;

/**
 * @author: yangchun
 * @description:
 * @date: Created in 2020-05-03 21:27
 */
public class WordCountLocal {
    public static void main(String[] args) {
        //第一步:创建SparkConf对象,设置Spark应用的配置信息
        //setMaster()可以设置Spark应用程序要连接的机器的master机器,设置为local表示本地运行
        SparkConf conf = new SparkConf().setAppName("WordCountLocal")
                .setMaster("local");
        //第二步:创建JavaSparkContext对象
        /*在Spark,SparkContext是所有Spark所有功能的一个入口,你无论是java,scala,还是python编写的
        都必须有一个SparkContext,它的主要作用,包括初始化Spark应用程序所需的一些核心组件,包括调度器
        (DAGSchedule,TaskSchedule),还会去Spark Master节点进行注册,等等。Spark Context是Spark中
        最重要的一个对象。不同类型的Spark应用程序,SparkContext不同
        Java的SparkContext,就是JavaSparkContext
        Spark SQL程序,SQLContext,HiveContext
        Spark Streaming SparkContext
        Scala 就是SparkContext
         */
        JavaSparkContext sc = new JavaSparkContext(conf);
        /*
        第三步,针对输入源创建R初始RDD,输入源中的数据会打散,分配到RDD的每个partition中,形成一个分布式数据集
        SparkContext根据本地文件创建RDD的方法叫做textFile(),Java中,创建的普通RDD,都叫做JavaRDD。RDD中有元素的
        概念,如果hdfs和本地文件,创建的RDD每一个元素相当于文件里面的一行
         */
        JavaRDD<String> lines = sc.textFile("E:\\spark\\spark.txt");
        /*
         第四步,对初始RDD进行transformation操作,通过创建function,并配合RDD的map、flatMap等算子
         来执行function,通常,如果比较简单,则创建指定的Function匿名内部类,如果比较复杂就定义一个类实现
         function接口。
          先将一行拆分成单词
         */
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID= 1l;
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        /**
         * 将每一个单词映射成一个Tuple(单词,1),mapToPair与PairFunction配合使用。第一个参数是输入,第二个,第三个是
         * Tuple的组成
         */
        JavaPairRDD<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<>(word,1);
            }
        });
        /**
         * 接着对所有Tuple进行reduce操作,相当于将根据key对所有tuple进行值的累加
         */
        JavaPairRDD<String,Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        /**
         * 最后Spark程序中光有transformation操作,是不行的必须哟action操作,不会执行,可以用foreach操作来触发程序执行
         */
        wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            @Override
            public void call(Tuple2<String, Integer> wordCount) throws Exception {
                System.out.println(wordCount._1+" appeared "+wordCount._2+" times");
            }
        });
        sc.close();
    }
}

 

spark-wordcount

原文:https://www.cnblogs.com/xiaofeiyang/p/12823910.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!