---
通过spark-shell启动StreamingContext,实时监控文件夹
1 打开terminal 1,输入如下:
import org.apache.spark.streaming._ // SparkStreaming将输入流数据按照5秒钟进行数据切分 val ssc = new StreamingContext(sc,Seconds(5)) val linesDS = ssc.textFileStream("file:///Users/walker/learn/mycode/spark/test_data/log_file") val wordsCountDS = linesDS.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) wordsCountDS.print() ssc.start() // 在spark-shell中,不用写这个,直接ctrl+c即可退出程序 ssc.awaitTermination()
2 打开terminal 2,创建 logfile文件夹,在上一步的程序运行起来之后,往logfile中添加新的文件
$ cat word3.txt huahua hadoop spark huahua hadoop spark huahua hadoop spark huahua hadoop spark huahua hadoop spark huahua hadoop spark huahua hadoop spark huahua hadoop spark
3 添加完新文件之后,等5秒钟即可得到程序的实时统计结果。
------------------------------------------- Time: 1581905830000 ms ------------------------------------------- ------------------------------------------- Time: 1581905835000 ms ------------------------------------------- (hadoop,8) (spark,8) (huahua,8) ------------------------------------------- Time: 1581905840000 ms ------------------------------------------- -------------------------------------------
---
原文:https://www.cnblogs.com/wooluwalker/p/12320703.html