首页 > 其他 > 详细

flink kafka source

时间:2020-06-22 09:45:58      阅读:64      评论:0      收藏:0      [点我收藏+]
技术分享图片
 1 import java.util.Properties
 2 
 3 import org.apache.flink.api.common.serialization.SimpleStringSchema
 4 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 5 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 6 
 7 object FlinkDemo05_KafkaSource {
 8     
 9     val prop = new Properties()
10     prop.setProperty("bootstrap.servers", "linux01:9092")
11     prop.setProperty("group.id", "flink-grp")
12     prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
13     prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
14     prop.setProperty("auto.offset.reset", "latest")
15     
16     def main(args: Array[String]): Unit = {
17         //1 创建环境
18         val env = StreamExecutionEnvironment.getExecutionEnvironment
19         //2 获取Stream
20         import org.apache.flink.api.scala._
21         val topic = "flink-topic"
22         val schema = new SimpleStringSchema()
23         val dStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](topic, schema, prop))
24         //3 计算
25         val result = dStream.flatMap(_.split("\\s")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)
26         result.print()
27         
28         //4 执行
29         env.execute("kafka source job")
30         
31     }
32 }
View Code

 

flink kafka source

原文:https://www.cnblogs.com/xiefeichn/p/13174975.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!