前面我们使用Spark Streaming去监听了端口数据,接下来我们将使用Spark Streaming作为kafka的消费者。
name := "sbt-spark"
version := "0.1"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.1.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-8_2.11" % "2.1.0"
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils
object SparkStreamingAsKafkaConsumer{
def main(args:Array[String]){
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///root/hadoop/checkpoint")
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1" //topic所在的group
val topics = "spark" //topics的名称
val numThreads = 1 //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}
cd /root/spark-2.2.1-bin-hadoop2.7/jars
mkdir kafka
cd kafka
cp /root/kafka_2.11-2.2.1/libs/* .
下载jar包:
http://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11/2.1.0
将此jar包上传至/root/spark-2.2.1-bin-hadoop2.7/jars/kafka
cd /root/kafka_2.11-2.2.1/bin
# 启动ZK服务
./zookeeper-server-start.sh ../config/zookeeper.properties &
# 启动Kafka服务
./kafka-server-start.sh ../config/server.properties
# 新打开一个xshell窗口,再次连接到linux
cd /root/kafka_2.11-2.2.1/bin
# 创建topic
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark &
# 启动生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic spark
cd /root/spark-2.2.1-bin-hadoop2.7/bin
./spark-submit --driver-class-path /root/spark-2.2.1-bin-hadoop2.7/jars/*:/root/spark-2.2.1-bin-hadoop2.7/jars/kafka/* --class SparkStreamingAsKafkaConsumer /root/sbt-spark_2.11-0.1.jar
原文:https://www.cnblogs.com/alichengxuyuan/p/12576823.html