最简单的方式是按照官网的方法,命令行执行curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0,不过这种方法有些包还得自行添加,大家可以复制我的pom.xml,我已经将常用的包都放进去了,并且排除了冲突的包。注意的是,本地测试的时候,记得将scope注掉,不然会出现少包的情况。也可以在Run -> Edit Configurations中,勾选Include dependencies with "Provided" scope。最好在resources目录下丢个log4j的配置文件,这样有时候方便我们看日志找问题。
新建完项目之后,我们要做的第一件事,自然是写个Flink 版本的Hello World。所以,新建测试类,然后输入代码
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream dataStream = env.fromElements("Hello World"); dataStream.print(); env.execute("test");
看一下控制台
Hello World
如愿以偿的得到了想要的结果,不过这个4>是什么玩应?其实这个4代表是第四个分区输出的结果。很多人可能会问,我也妹指定并发啊,数据怎么会跑到第四个分区呢?其实是因为本地模式的时候,会以匹配CPU的核数,启动对应数量的分区。只要我们在每个算子之后加上setParallelism(1),就会只以一个分区来执行了。至此,我们的DataStream 版的Hellow World试验完毕,这里主要是为了验证一下环境是否正确,接下来才是我们今天的主题从kafka到mysql。另外,如果更想了解DataStream的内容,欢迎大家关注另一个系列Flink DataStream(不过目前还没开始写)
接下来咱们废话不多说,直接贴代码
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class FlinkSql02 { public static final String KAFKA_TABLE_SOURCE_DDL = "" + "CREATE TABLE user_behavior (\n" + " user_id BIGINT,\n" + " item_id BIGINT,\n" + " category_id BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP(3)\n" + ") WITH (\n" + " ‘connector.type‘ = ‘kafka‘, -- 指定连接类型是kafka\n" + " ‘connector.version‘ = ‘0.11‘, -- 与我们之前Docker安装的kafka版本要一致\n" + " ‘connector.topic‘ = ‘mykafka‘, -- 之前创建的topic \n" + " ‘connector.properties.group.id‘ = ‘flink-test-0‘, -- 消费者组,相关概念可自行百度\n" + " ‘connector.startup-mode‘ = ‘earliest-offset‘, --指定从最早消费\n" + " ‘connector.properties.zookeeper.connect‘ = ‘localhost:2181‘, -- zk地址\n" + " ‘connector.properties.bootstrap.servers‘ = ‘localhost:9092‘, -- broker地址\n" + " ‘format.type‘ = ‘json‘ -- json格式,和topic中的消息格式保持一致\n" + ")"; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //通过DDL,注册kafka数据源表 tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL); //执行查询 Table table = tEnv.sqlQuery("select * from user_behavior"); //转回DataStream并输出 tEnv.toAppendStream(table, Row.class).print().setParallelism(1); //任务启动,这行必不可少! env.execute("test"); } }
接下来就是激动人性的测试了,右击,run!查看控制台
543462,1715,1464116,pv,2017-11-26T01:00 543462,1715,1464116,pv,2017-11-26T01:00 543462,1715,1464116,pv,2017-11-26T01:00 543462,1715,1464116,pv,2017-11-26T01:00
嗯,跟我之前往kafka中丢的数据一样,没毛病!
如果大家在使用过程中遇到Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for ‘org.apache.flink.table.factories.TableSourceFactory‘ in这种异常,请仔细查看你的DDL语句,是否缺少或者用错了配置,这里大家可以参考一下Flink官网的手册,查看一下对应的配置。也可以在下方留言,一起交流。
CREATE TABLE `user_behavior` ( `user_id` bigint(20) DEFAULT NULL, `item_id` bigint(20) DEFAULT NULL, `behavior` varchar(255) DEFAULT NULL, `category_id` bigint(20) DEFAULT NULL, `ts` timestamp(6) NULL DEFAULT NULL )
在mysql端创建完成后,回到我们的代码,注册mysql数据结果表,并将从kafka中读取到的数据,插入mysql结果表中。下面是完整代码,包含kafka数据源表的构建。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class FlinkSql02 { public static final String KAFKA_TABLE_SOURCE_DDL = "" + "CREATE TABLE user_behavior (\n" + " user_id BIGINT,\n" + " item_id BIGINT,\n" + " category_id BIGINT,\n" + " behavior STRING,\n" + " ts TIMESTAMP(3)\n" + ") WITH (\n" + " ‘connector.type‘ = ‘kafka‘, -- 指定连接类型是kafka\n" + " ‘connector.version‘ = ‘0.11‘, -- 与我们之前Docker安装的kafka版本要一致\n" + " ‘connector.topic‘ = ‘mykafka‘, -- 之前创建的topic \n" + " ‘connector.properties.group.id‘ = ‘flink-test-0‘, -- 消费者组,相关概念可自行百度\n" + " ‘connector.startup-mode‘ = ‘earliest-offset‘, --指定从最早消费\n" + " ‘connector.properties.zookeeper.connect‘ = ‘localhost:2181‘, -- zk地址\n" + " ‘connector.properties.bootstrap.servers‘ = ‘localhost:9092‘, -- broker地址\n" + " ‘format.type‘ = ‘json‘ -- json格式,和topic中的消息格式保持一致\n" + ")"; public static final String MYSQL_TABLE_SINK_DDL=""+ "CREATE TABLE `user_behavior_mysql` (\n" + " `user_id` bigint ,\n" + " `item_id` bigint ,\n" + " `behavior` varchar ,\n" + " `category_id` bigint ,\n" + " `ts` timestamp(3) \n" + ")WITH (\n" + " ‘connector.type‘ = ‘jdbc‘, -- 连接方式\n" + " ‘connector.url‘ = ‘jdbc:mysql://localhost:3306/mysql‘, -- jdbc的url\n" + " ‘connector.table‘ = ‘user_behavior‘, -- 表名\n" + " ‘connector.driver‘ = ‘com.mysql.jdbc.Driver‘, -- 驱动名字,可以不填,会自动从上面的jdbc url解析 \n" + " ‘connector.username‘ = ‘root‘, -- 顾名思义 用户名\n" + " ‘connector.password‘ = ‘123456‘ , -- 密码\n" + " ‘connector.write.flush.max-rows‘ = ‘5000‘, -- 意思是攒满多少条才触发写入 \n" + " ‘connector.write.flush.interval‘ = ‘2s‘ -- 意思是攒满多少秒才触发写入;这2个参数,无论数据满足哪个条件,就会触发写入\n"+ ")" ; public static void main(String[] args) throws Exception { //构建StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //构建EnvironmentSettings 并指定Blink Planner EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); //构建StreamTableEnvironment StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings); //通过DDL,注册kafka数据源表 tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL); //通过DDL,注册mysql数据结果表 tEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL); //将从kafka中查到的数据,插入mysql中 tEnv.sqlUpdate("insert into user_behavior_mysql select user_id,item_id,behavior,category_id,ts from user_behavior"); //任务启动,这行必不可少! env.execute("test"); } }
打开我们的Navicat,看看我们的数据是否正确输入到mysql中。
| user_id | item_id | behavior | category_id | ts |
|---|---|---|---|---|
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
| 543462 | 1715 | pv | 1464116 | 2017-11-26 01:00:00.000 |
成功!并且数据和我们kafka中的数据也是一致,大家也可以通过上一章讲过的Java连接kafka来对比验证数据的一致性,此处就不再赘述。那么好了,本次的Flink Sql之旅就结束,下一章我们将带大家,在这次课程的基础上,完成常用聚合查询以及目前Flink Sql原生支持的维表Join。另外,有同学反映有些地方不知道为什么要这样做,不想只知其然而不知所以然,我们之后同样会有另外的专题讲述Flink 原理。
<properties>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<!-- Flink modules -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>scala-library</artifactId>
<groupId>org.scala-lang</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- CLI dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>javassist</artifactId>
<groupId>org.javassist</groupId>
</exclusion>
<exclusion>
<artifactId>scala-parser-combinators_2.11</artifactId>
<groupId>org.scala-lang.modules</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>snappy-java</artifactId>
<groupId>org.xerial.snappy</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<artifactId>kafka-clients</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
<exclusions>
<exclusion>
<artifactId>force-shading</artifactId>
<groupId>org.apache.flink</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.5</version>
</dependency>
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.0.5.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.4.Final</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<artifactSet>
<excludes>
<exclude>junit:junit</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
有点乱,懒得整理了,大家直接复制过去用就行。
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> <log4j:configuration xmlns:log4j=‘http://jakarta.apache.org/log4j/‘ > <appender name="myConsole" class="org.apache.log4j.ConsoleAppender"> <layout class="org.apache.log4j.PatternLayout"> <param name="ConversionPattern" value="[%d{dd HH:mm:ss,SSS\} %-5p] [%t] %c{2\} - %m%n" /> </layout> <!--过滤器设置输出的级别--> <filter class="org.apache.log4j.varia.LevelRangeFilter"> <param name="levelMin" value="info" /> <param name="levelMax" value="error" /> <param name="AcceptOnMatch" value="true" /> </filter> </appender> <!-- 指定logger的设置,additivity指示是否遵循缺省的继承机制--> <logger name="com.runway.bssp.activeXdemo" additivity="false"> <appender-ref ref="myConsole" /> </logger> <!-- 根logger的设置--> <root> <priority value ="debug"/> <appender-ref ref="myConsole"/> </root> </log4j:configuration>
记得要放在resource目录下,别放错了。
项目实战 从 0 到 1 学习之Flink (28)FlinkSql教程(二)
原文:https://www.cnblogs.com/huanghanyu/p/13913241.html