??Spark Streaming是Spark 核心API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。 数据可以从诸如Kafka,Flume,Kinesis或TCP套接字的许多来源中获取,并且可以使用由高级功能(如map,reduce,join和window)表达的复杂算法进行处理。 最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。 事实上,您可以在数据流上应用Spark的机器学习和图形处理算法。 
 
??在内部,它的工作原理如下。 Spark Streaming接收实时输入数据流,并将数据分成批,然后由Spark引擎对其进行处理,以批量生成最终的结果流。 

??Spark Streaming提供称为离散流或DStream的高级抽象,它表示连续的数据流。 可以从诸如Kafka,Flume和Kinesis等来源的输入数据流中创建DStream,或者通过对其他DStream应用高级操作来创建。 在内部,DStream表示为一系列RDD。 
??本指南介绍如何开始使用DStreams编写Spark Streaming程序。 您可以在Scala,Java或Python(在Spark 1.2中引入)中编写Spark Streaming程序。
??DStream是一个抽象的概念, 表示一系列的RDD
        //使用两个工作线程和1秒的批量间隔创建本地StreamingContext
        SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
        // 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
        // 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
        // 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
        //首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
        //此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
        JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);
lines的DStream表示将从数据服务器接收的数据流。 此流中的每条记录都是一行文本。 然后,我们要将空格划分为单词。        //使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });
        //Count each word in each batch
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        //将此DStream中生成的每个RDD的前十个元素打印到控制台
        wcs.print();
The words DStream is further mapped (one-to-one transformation) to a DStream of 
(word, 1) pairs, using a PairFunction object. Then, it is reduced to get the 
frequency of words in each batch of data, using a Function2 object. Finally, 
wcs.print() will print a few of the counts generated every second.
wordsDStream进一步映射(one-to-one transformation)到(word,1)pairs的DStream。 然后,使用Function2对象减少每批数据中的单词的频率。 最后,wcs.print()将打印每秒产生的几个计数。jsc.start();              // Start the computation
jsc.awaitTermination();   // Wait for the computation to terminate
yum install nc
nc -lk 9999
在控制台写入数据
package com.chb.spark.streaming;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class WordCount {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("Spark Streaming WordCount").setMaster("local[2]");
        // 创建该对象就类似于Spark Core中的JavaSparkContext,类似于Spark SQL中的SQLContext
        // 该对象除了接受SparkConf对象,还要接受一个Batch Interval参数,就是说,每收集多长时间数据划分一个batch去进行处理
        // 这里我们看Durations里面可以设置分钟、毫秒、秒,这里设置10秒
        JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(10));
        //首先创建输入的DStream, 代表一个数据源如:从Scoket或Kafka来持续不断的获取实时的数据流
        //此处创建一个监听端口的Scoket的数据流, 这里面就会每10秒生成一个RDD,RDD的元素类型为String,就是一行行的文本
        JavaDStream<String> lines = jsc.socketTextStream("192.168.1.224", 9999);
        //使用Spark Core提供的算子直接作用于DStreams上, 算子底层会应用在里面的每个RDD上面,RDD转换后的新RDD会作为新DStream中RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(word, 1);
            }
        });
        JavaPairDStream<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });
        // 最后每次计算完,都打印一下这10秒钟的单词计数情况,并休眠5秒钟,以便于我们测试和观察
        wcs.print();
        jsc.start();
        jsc.awaitTermination();
        jsc.close();
    }
}
package com.chb.scala
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
 * Counts words in UTF8 encoded, ‘\n‘ delimited text received from the network every second.
 *
 * Usage: NetworkWordCount <hostname> <port>
 * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server
 *    `$ nc -lk 9999`
 * and then run the example
 *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
 */
object NetworkWordCount {
    def main(args: Array[String]) {
        if (args.length < 2) {
            System.err.println("Usage: NetworkWordCount <hostname> <port>")
            System.exit(1)
        }
        // Create the context with a 1 second batch size
        val sparkConf = new SparkConf().setAppName("NetworkWordCount")
        val ssc = new StreamingContext(sparkConf, Seconds(1))
        // Create a socket stream on target ip:port and count the
        // words in input stream of \n delimited text (eg. generated by ‘nc‘)
        // Note that no duplication in storage level only for running locally.
        // Replication necessary in distributed scenario for fault tolerance.
        val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
        val words = lines.flatMap(_.split(" "))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        ssc.start()
        ssc.awaitTermination()
    }
}
   JavaDStream<String> lines = jsc.textFileStream("hdfs://192.168.1.224:9000/user/root/");
原文:http://blog.csdn.net/wuxintdrh/article/details/71077098