1. socket消息发送
import java.net.ServerSocket import java.io.PrintWriter import scala.collection.mutable.ListBuffer import java.util.Random /** * Created by zzy on 8/28/15. */ /** * 模拟socket消息发送 */ object SparkSoketSender { def main(args: Array[String]) { if(args.length != 2){ //校验 System.err.println("usage: <port> <time>") //端口 时间(毫秒) System.exit(1) } val listener = new ServerSocket(args(0).toInt) while(true){ val socket = listener.accept() new Thread(){ override def run = { println("find connected from : " + socket.getInetAddress()) val out = new PrintWriter(socket.getOutputStream(),true) while(true){ Thread.sleep(args(1).toLong) val context = createContext(index) println(context) out.write(context + "\n") out.flush() } socket.close() } }.start() } } def createContext(index:Int) :String= { //发送的内容 val charList = ListBuffer[Char]() for( i <- 65 to 90){ charList += i.toChar // A B C D E F } var arr = charList.toArray arr(index).toString } def index = { //产生一个随机数 val num = new Random num.nextInt(5) // 0 1 2 3 4 5 } }
2.
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.PairDStreamFunctions
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
/**
* Created by zzy on 8/28/15.
*/
object SparkStreaming {
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("usage: <hostname> <port> <seconds>") //socket发送的机器 socket消息发送的端口 时间片
System.exit(1)
}
val ssc = new StreamingContext(new SparkConf,Seconds(args(2).toInt))
//输入源 可以有很多种
val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_ONLY_SER) //网络数据存两份
val words = lines.flatMap(_.split(" ")) //返回DStream checkpoint(interval)必须指定时间
//时间间隔操作
val wc = words.map((_,1)).reduceByKey(_+_)
/*window操作*/
// val wc = words.map((_,1)).reduceByKeyAndWindow(_+_, _-_, windowDuration, slideDuration, numPartitions, filterFunc)
/* //带状态的操作,使用updateStateByKey
val sDstream = words.map((_,1)).updateStateByKey(updateFunc) //传入保存状态函数
val updateFunc = (currValues: Seq[Int], state: Option[Int]) => {
val currentCount = currValues.foldLeft(0)(_ + _)
// 已累加的结果值
val previousCount = state.getOrElse(0)
// 返回累加后的结果,是一个Option[Int]类型
Some(currentCount + previousCount)
}*/
wc.print()
ssc.start()
ssc.awaitTermination()
}
}
3.提交任务
Streaming
spark-submit --class cn.crxy.SocketSender original-testSpark-1.0-SNAPSHOT.jar 2015 1000
spark-submit --class cn.crxy.SparkStreaming original-testSpark-1.0-SNAPSHOT.jar crxy164 2015 10
原文:http://www.cnblogs.com/chaoren399/p/4768018.html