前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

正文

部署安装准备工作:

关闭防火墙,关闭selinux,安装jdk,更改主机名,更改主机名与IP地址的映射关系,ssh免密码登录等

1、Flink的local模式部署安装

在local模式下,不需要启动任何的进程,仅仅是使用本地线程来模拟flink的进程,适用于测试开发调试等,这种模式下,不用更改任何配置,只需要保证jdk8安装正常即可

第一步:上传安装包并解压

将编译之后的压缩包,上传到node01服务器的/opt/bigdata/soft路径下,然后进行解压

cd /opt/bigdata/soft/
tar -zxf flink-1.8.1.tar.gz  -C /opt/bigdata/install/

编译后的压缩包下载链接

第二步:直接使用脚本启动

flink在处于local模式下,不需要更改任何配置,直接解压之后启动即可
执行以下命令直接启动local模式

cd /opt/bigdata/install/flink-1.8.1
bin/start-cluster.sh

启动成功之后,执行jps就能查看到启动了两个进程

18180 StandaloneSessionClusterEntrypoint
18614 TaskManagerRunner

第三步:webUI界面访问

启动两个进程成功之后,访问8081端口号即可访问到flink的web管理界面
http://node01:8081/#/overview

第四步:运行flink自带的测试

node01使用linux的nc命令来向socket当中发送一些单词

sudo yum -y install nc
nc -lk 9000

node01启动flink的自带的单词统计程序,接受输入的socket数据并进行统计

cd /opt/bigdata/install/flink-1.8.1
bin/flink run examples/streaming/SocketWindowWordCount.jar   --hostname localhost  --port 9000

查看统计结果:
flink自带的测试用例统计结果在log文件夹下面
node01执行以下命令查看统计结果

cd /opt/bigdata/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-0-node01.kaikeba.com.out

local模式运行成功之后,关闭local模式,我们接下来运行standAlone模式

cd /opt/bigdata/install/flink-1.8.1
bin/stop-cluster.sh

2、Flink的standAlone模式环境安装

请参考我的这篇博客完成三节点伪分布式大数据环境的安装
三节点大数据环境安装详细教程

使用standalone模式,需要启动flink的主节点JobManager以及从节点taskManager

服务以及ip 192.168.52.100 192.168.52.110 192.168.52.120
JobManager
TaskManager

第一步:更改配置文件

停止node01服务器上面local模式下的两个进程,然后修改node01服务器配置文件

node01服务器更改flink-conf.yaml配置文件文件
node01服务器执行以下命令更改flink配置文件

cd /opt/bigdata/install/flink-1.8.1/conf/
vim flink-conf.yaml

更改这个配置,指定jobmanager所在的服务器为node01

jobmanager.rpc.address: node01

node01服务器更改slaves配置文件
node01执行以下命令更改从节点slaves配置文件

cd /opt/bigdata/install/flink-1.8.1/conf
vim slaves

node01
node02
node03

第二步:安装包分发

将node01服务器的flink安装包分发到其他机器上面去
node01服务器执行以下命令分发安装包

cd /opt/bigdata/install
scp -r flink-1.8.1/ node02:$PWD
scp -r flink-1.8.1/ node03:$PWD

第三步:启动flink集群

node01执行以下命令启动flink集群

cd /opt/bigdata/install/flink-1.8.1
bin/start-cluster.sh

第四步:页面访问

http://node01:8081/#/overview

第五步:运行flink自带的测试用例

node01执行以下命令启动socket服务,输入单词

nc -lk 9000

node01启动flink的自带的单词统计程序,接受输入的socket数据并进行统计

cd /opt/bigdata/install/flink-1.8.1
bin/flink run examples/streaming/SocketWindowWordCount.jar   --hostname node01  --port 9000

node01服务器执行以下命令查看统计结果

cd /opt/bigdata/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-0-node01.kaikeba.com.out

3、Flink的standAlone模式的HA环境

在上一节当中,我们实现了flink的standAlone模式的环境安装,并且能够正常提交任务到集群上面去,我们的主节点是jobManager,但是唯一的问题是jobmanager是单节点的,必然会有单节点故障问题的产生,所以我们也可以在standAlone模式下,借助于zk,将我们的jobManager实现成为高可用的模式

首先停止Flink的standAlone模式,并启动zk和hadoop集群服务

第一步:修改配置文件

node01执行以下命令修改Flink的配置文件
node01修改flink-conf.yaml配置文件

cd /opt/bigdata/install/flink-1.8.1/conf

vim flink-conf.yaml
jobmanager.rpc.address: node01
high-availability: zookeeper
high-availability.storageDir: hdfs://node01:8020/flink
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:218

node01修改masters配置文件
node01执行以下命令修改master配置文件

cd /opt/bigdata/install/flink-1.8.1/conf
vim masters

node01:8081
node02:8081

node01修改slaves配置文件
node01执行以下命令修改slaves配置文件

cd /opt/bigdata/install/flink-1.8.1/conf
vim slaves

node01
node02
node03

第二步:hdfs上面创建flink对应的文件夹

node01执行以下命令,在hdfs上面创建文件夹

hdfs dfs -mkdir -p /flink

第三步:拷贝配置文件

将node01服务器修改后的配置文件拷贝到其他服务器上面去
node01执行以下命令拷贝配置文件

cd /opt/bigdata/install/flink-1.8.1/conf
scp flink-conf.yaml  masters  slaves  node02:$PWD
scp flink-conf.yaml  masters  slaves  node03:$PWD

第四步:启动flink集群

node01执行以下命令启动flink集群

cd /opt/bigdata/install/flink-1.8.1
bin/start-cluster.sh

第五步:页面访问

访问node01服务器的web界面
http://node01:8081/#/overview
访问node02服务器的web界面
http://node02:8081/#/overview

注意:一旦访问node02的web界面,会发现我们的web界面会自动跳转到node01的web界面上,因为此时,我们的node01服务器才是真正的active状态的节点

第六步:模拟故障宕机实现自动切换

将node01服务器的jobManager进程杀死,然后过一段时间之后查看node02的jobManager是否能够访问

注意: JobManager发生切换时,TaskManager也会跟着发生重启,这其实是一个隐患问题

第七步:flink的standAlone模式在HA下提交任务

在HA这种模式下,提交任务与standAlone单节点模式提交任务是一样的,即使JobManager服务器宕机了也没有关系,会自动进行切换
node01执行以下命令启动socket服务,输入单词

nc -lk 9000

node01启动flink的自带的单词统计程序,接受输入的socket数据并进
行统计

cd /opt/bigdata/install/flink-1.8.1
bin/flink run examples/streaming/SocketWindowWordCount.jar   --hostname node01  --port 9000

node01服务器执行以下命令查看统计结果

cd /opt/bigdata/install/flink-1.8.1/log
tail -200f flink-hadoop-taskexecutor-0-node01.kaikeba.com.out

在这里插入图片描述

flink的任务也可以运行在yarn上面,将flnk的任务提交到yarn平台,通过yarn平台来实现我们的任务统一的资源调度管理,方便我们管理集群当中的CPU和内存等资源
依赖环境说明:
至少hadoop2.2版本及以上
hdfs以及yarn服务正常启动
flink on yarn又分为两种模式:

1、第一种模式:单个yarn session模式

这种方式需要先启动集群,然后在提交作业,接着会向yarn申请一块资源空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,那下一个作业才会正常提交,实际工作当中一般不会使用这种模式
这种模式,不需要做任何配置,直接将任务提价到yarn集群上面去,我们需要提前启动hdfs以及yarn集群即可
启动单个Yarn Session模式

第一步:修改yarn-site.xml配置为文件

node01执行以下命令修改yarn-site.xml,添加以下配置属性

cd /opt/bigdata/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop
vim yarn-site.xml
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>

然后将修改后的配置文件拷贝到node02与node03服务器
node01执行以下命令进行拷贝配置文件

cd /opt/bigdata/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop
scp yarn-site.xml  node02:$PWD
scp yarn-site.xml  node03:$PWD

然后重新启动yarn集群即可

第二步:修改flink配置文件

node01执行以下命令更改flink配置文件

cd /opt/bigdata/install/flink-1.8.1/conf

vim flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs://node01:8020/flink_yarn_ha
high-availability.zookeeper.path.root: /flink-yarn
high-availability.zookeeper.quorum: node01:2181,node02:2181,node03:2181
yarn.application-attempts: 10

hdfs上面创建文件夹
node01执行以下命令创建hdfs文件夹

hdfs dfs -mkdir -p /flink_yarn_ha

第三步:在yarn当中启动flink集群

直接在node01执行以下命令,在yarn当中启动一个全新的flink集群,可以直接使用yarn-session.sh这个脚本来进行启动

cd /opt/bigdata/install/flink-1.8.1/
bin/yarn-session.sh -n 2 -jm 1024 -tm 1024 [-d]

我们也可以使用 --help 来查看更多参数设置

bin/yarn-session.sh –help

Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                     as typing Ctrl + C.
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode

注意:如果在启动的时候,yarn的内存太小,可能会报以下错误
Diagnostics: Container [] is running beyond virtual memory limits. Current usage: 250.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing containerpid=6386,containerID=container_1521277661809_0006_01_000001
我们需要修改yarn-site.xml添加以下配置,然后重启yarn即可

<property>  
    <name>yarn.nodemanager.vmem-check-enabled</name>  
    <value>false</value>  
</property>  

第四步:查看yarn管理界面8088

访问yarn的8088管理界面,发现yarn当中有一个应用
http://node01:8088/cluster
yarn当中会存在一个常驻的application,就是为我们flink单独启动的一个session

第五步:提交任务

使用flink自带的jar包,实现单词计数统计功能
node01准备文件并上传hdfs

cd /opt/bigdata
vim wordcount.txt

内容如下

hello world
flink hadoop
hive spark

hdfs上面创建文件夹并上传文件

hdfs dfs -mkdir -p /flink_input
hdfs dfs -put wordcount.txt  /flink_input

node01执行以下命令,提交任务到flink集群

cd /opt/bigdata/install/flink-1.8.1
bin/flink run ./examples/batch/WordCount.jar -input hdfs://node01:8020/flink_input -output hdfs://node01:8020/flink_output/wordcount-result.txt 

第六步:验证Yarn Session的高可用

通过node01:8088这个界面,查看yarn session启动在哪一台机器上,然后杀死yarn session进程,我们会发现yarn session会重新启动在另外一台机器上面
找到YarnSessionClusterEntrypoint所在的服务器,然后杀死该进程

[hadoop@node02 ~]$ jps
10065 QuorumPeerMain
10547 YarnSessionClusterEntrypoint
10134 DataNode
10234 NodeManager
10652 Jps
[hadoop@node02 ~]$ kill -9 10547

杀死YarnSessionClusterEntrypoint进程之后,会发现,yarn集群会重新启动一个YarnSessionClusterEntrypoint进程在其他机器上面

2、第二种模式:多个yarn session模式

这种方式的好处是一个任务会对应一个job,即每提交一个作业会根据自身的情况,向yarn申请资源,直到作业执行完成,并不会影响下一个作业的正常运行,除非是yarn上面没有任何资源的情况下。
注意:client端必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败
不需要在yarn当中启动任何集群,直接提交任务即可

第一步:直接执行命令提交任务

cd /opt/bigdata/install/flink-1.8.1/
bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 ./examples/batch/WordCount.jar -input hdfs://node01:8020/flink_input -output hdfs://node01:8020/out_result/out_count.txt

第二步:查看输出结果

hdfs执行以下命令查看输出结果

hdfs dfs -text hdfs://node01:8020/out_result/out_count.txt

我们可以使用–help 来查看帮助文档可以添加哪些参数

cd /opt/bigdata/install/flink-1.8.1/     
bin/flink run --help

得到结果内容如下

Action "run" compiles and runs a program.

  Syntax: run [OPTIONS] <jar-file> <arguments>
  "run" action options:
     -c,--class <classname>               Class with the program entry point
                                          ("main" method or "getPlan()" method.
                                          Only needed if the JAR file does not
                                          specify the class in its manifest.
     -C,--classpath <url>                 Adds a URL to each user code
                                          classloader  on all nodes in the
                                          cluster. The paths must specify a
                                          protocol (e.g. file://) and be
                                          accessible on all nodes (e.g. by means
                                          of a NFS share). You can use this
                                          option multiple times for specifying
                                          more than one URL. The protocol must
                                          be supported by the {@link
                                          java.net.URLClassLoader}.
     -d,--detached                        If present, runs the job in detached
                                          mode
     -n,--allowNonRestoredState           Allow to skip savepoint state that
                                          cannot be restored. You need to allow
                                          this if you removed an operator from
                                          your program that was part of the
                                          program when the savepoint was
                                          triggered.
     -p,--parallelism <parallelism>       The parallelism with which to run the
                                          program. Optional flag to override the
                                          default value specified in the
                                          configuration.
     -q,--sysoutLogging                   If present, suppress logging output to
                                          standard out.
     -s,--fromSavepoint <savepointPath>   Path to a savepoint to restore the job
                                          from (for example
                                          hdfs:///flink/savepoint-1537).
     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
                                          mode, perform a best-effort cluster
                                          shutdown when the CLI is terminated
                                          abruptly, e.g., in response to a user
                                         interrupt, such as typing Ctrl + C.
  Options for yarn-cluster mode:
     -d,--detached                        If present, runs the job in detached
                                          mode
     -m,--jobmanager <arg>                Address of the JobManager (master) to
                                          which to connect. Use this flag to
                                          connect to a different JobManager than
                                          the one specified in the
                                          configuration.
     -sae,--shutdownOnAttachedExit        If the job is submitted in attached
                                          mode, perform a best-effort cluster
                                          shutdown when the CLI is terminated
                                          abruptly, e.g., in response to a user
                                          interrupt, such as typing Ctrl + C.
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached
                                          mode (deprecated; use non-YARN
                                          specific option instead)
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                          optional unit (default: MB)
     -yn,--yarncontainer <arg>            Number of YARN container to allocate
                                          (=Number of Task Managers)
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                          application
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yst,--yarnstreaming                 Start Flink in streaming mode
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                          optional unit (default: MB)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                          sub-paths for high availability mode

  Options for default mode:
     -m,--jobmanager <arg>           Address of the JobManager (master) to which
                                     to connect. Use this flag to connect to a
                                     different JobManager than the one specified
                                     in the configuration.
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths
                                     for high availability mode

我们提交flink任务的时候,可以加以下这些参数
1、默认查找当前yarn集群中已有的yarn-session信息中的jobmanager【/tmp/.yarn-properties-root】:

bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1

2、连接指定host和port的jobmanager:

bin/flink run -m node01:8081 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1

3、启动一个新的yarn-session:

bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar -input hdfs://hostname:port/hello.txt -output hdfs://hostname:port/result1

注意:yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀
例如:

bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 
上一篇 下一篇