首页 > 其他 > 详细

PYFLINK(一):PYFLINK安装与本地运行

时间:2021-06-10 12:12:18      阅读:70      评论:0      收藏:0      [点我收藏+]

来源:https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/python/table_api_tutorial/

一 安装环境与安装

您需要一台具有以下功能的计算机:

  • Java 8 or 11
  • Python 3.6, 3.7 or 3.8

使用Python Table API需要安装PyFlink,它已经被发布到 PyPi,您可以通过如下方式安装PyFlink:

$ python -m pip install apache-flink

安装PyFlink后,您便可以编写Python Table API作业了。

二 编写一个Flink Python Table API程序 

编写Flink Python Table API程序的第一步是创建TableEnvironment。这是Python Table API作业的入口类。

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

接下来,我们将介绍如何创建源表和结果表。

# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path(/tmp/input))     .with_format(OldCsv()
                 .field(word, DataTypes.STRING()))     .with_schema(Schema()
                 .field(word, DataTypes.STRING()))     .create_temporary_table(mySource)

t_env.connect(FileSystem().path(/tmp/output))     .with_format(OldCsv()
                 .field_delimiter(\t)
                 .field(word, DataTypes.STRING())
                 .field(count, DataTypes.BIGINT()))     .with_schema(Schema()
                 .field(word, DataTypes.STRING())
                 .field(count, DataTypes.BIGINT()))     .create_temporary_table(mySink)

You can also use the TableEnvironment.sql_update() method to register a source/sink table defined in DDL:

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        ‘connector‘ = ‘filesystem‘,
        ‘format‘ = ‘csv‘,
        ‘path‘ = ‘/tmp/input‘
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        ‘connector‘ = ‘filesystem‘,
        ‘format‘ = ‘csv‘,
        ‘path‘ = ‘/tmp/output‘
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

上面的程序展示了如何创建及在ExecutionEnvironment中注册表名分别为mySourcemySink的表。 其中,源表mySource有一列: word,该表代表了从输入文件/tmp/input中读取的单词; 结果表mySink有两列: word和count,该表会将计算结果输出到文件/tmp/output中,字段之间使用\t作为分隔符。

接下来,我们介绍如何创建一个作业:该作业读取表mySource中的数据,进行一些变换,然后将结果写入表mySink

最后,需要做的就是启动Flink Python Table API作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当execute_insert(sink_name)被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

from pyflink.table.expressions import lit

tab = t_env.from_path(mySource)
tab.group_by(tab.word)    .select(tab.word, lit(1).count)    .execute_insert(mySink).wait()

该教程的完整代码如下:

from pyflink.table import DataTypes, TableEnvironment, EnvironmentSettings
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
t_env = TableEnvironment.create(settings)

# write all the data to one file
t_env.get_config().get_configuration().set_string("parallelism.default", "1")
t_env.connect(FileSystem().path(/tmp/input))     .with_format(OldCsv()
                 .field(word, DataTypes.STRING()))     .with_schema(Schema()
                 .field(word, DataTypes.STRING()))     .create_temporary_table(mySource)

t_env.connect(FileSystem().path(/tmp/output))     .with_format(OldCsv()
                 .field_delimiter(\t)
                 .field(word, DataTypes.STRING())
                 .field(count, DataTypes.BIGINT()))     .with_schema(Schema()
                 .field(word, DataTypes.STRING())
                 .field(count, DataTypes.BIGINT()))     .create_temporary_table(mySink)

tab = t_env.from_path(mySource)
tab.group_by(tab.word)    .select(tab.word, lit(1).count)    .execute_insert(mySink).wait()

三 执行一个Flink Python Table API程序

首先,你需要在文件 “/tmp/input” 中准备好输入数据。你可以选择通过如下命令准备输入数据:

$ echo -e  "flink\npyflink\nflink" > /tmp/input

接下来,可以在命令行中运行作业(假设作业名为WordCount.py)(注意:如果输出结果文件“/tmp/output”已经存在,你需要先删除文件,否则程序将无法正确运行起来):

$ python WordCount.py

上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例

最后,你可以通过如下命令查看你的运行结果:

$ cat /tmp/output
flink    2
pyflink    1

 

PYFLINK(一):PYFLINK安装与本地运行

原文:https://www.cnblogs.com/qiu-hua/p/14869332.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!