首页 > 其他 > 详细

Spark Streaming(一)

时间:2020-05-12 14:09:49      阅读:50      评论:0      收藏:0      [点我收藏+]

微批处理
伪实时处理

数据源

1.非自定义数据源

 val conf: SparkConf = new SparkConf().setAppName("111").setMaster("local[*]")

    val context = new StreamingContext(conf, Seconds(3))

    //从指定端口监听数据
    val line = context.socketTextStream("xxx.xxx.xxx.xxx", 9999)

    //从指定文件夹采集数据,比较鸡肋,没有flume好用
//    val file = context.textFileStream("path")
    //数据处理
    val value = line.flatMap(line => line.split(" ")).map((_, 1)).reduceByKey((_ + _))
    value.print()

    //打开监听器
    context.start()
    //Driver等待监听器停止
    context.awaitTermination()

2.自定义数据源

  • 自定义收集器,集成Receiver
class MyReceiver(hostname:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
    var socket :java.net.Socket= null

    def receive:Unit={
        socket =new java.net.Socket(hostname, port)
      val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, "UTF-8"))

      var line:String = null
      while((line = reader.readLine())!=null){
        if("END".equals(line)){
          return
        }else{
          this.store(line)
        }


      }
    }
    override def onStart(): Unit = {
      new Thread(()=>receive).start()
    }

    override def onStop(): Unit = {
      if(socket != null){
        socket.close()
        socket = null
      }
    }
  }
  • 使用自定义收集器
val line = context.receiverStream(new MyReceiver("xxx.xxx.xxx.xxx", 9999))

Spark Streaming(一)

原文:https://www.cnblogs.com/zqzhen/p/12871853.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!