首页 > 其他 > 详细

SparkStreaming 1st Demo

时间:2020-02-17 13:29:21      阅读:65      评论:0      收藏:0      [点我收藏+]

---

通过spark-shell启动StreamingContext,实时监控文件夹

1 打开terminal 1,输入如下:

import org.apache.spark.streaming._
// SparkStreaming将输入流数据按照5秒钟进行数据切分
val ssc = new StreamingContext(sc,Seconds(5))

val linesDS = ssc.textFileStream("file:///Users/walker/learn/mycode/spark/test_data/log_file")

val wordsCountDS = linesDS.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

wordsCountDS.print()

ssc.start()

// 在spark-shell中,不用写这个,直接ctrl+c即可退出程序
ssc.awaitTermination() 

 

2 打开terminal 2,创建 logfile文件夹,在上一步的程序运行起来之后,往logfile中添加新的文件

$ cat word3.txt 
huahua hadoop spark
huahua hadoop spark
huahua hadoop spark
huahua hadoop spark
huahua hadoop spark
huahua hadoop spark
huahua hadoop spark
huahua hadoop spark

 

3 添加完新文件之后,等5秒钟即可得到程序的实时统计结果。

-------------------------------------------
Time: 1581905830000 ms
-------------------------------------------

-------------------------------------------
Time: 1581905835000 ms
-------------------------------------------
(hadoop,8)
(spark,8)
(huahua,8)

-------------------------------------------
Time: 1581905840000 ms
-------------------------------------------

-------------------------------------------

 

---

SparkStreaming 1st Demo

原文:https://www.cnblogs.com/wooluwalker/p/12320703.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!