前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

Flink 的状态保存和恢复.png

为什么需要 state 和 checkpoint ?

package com.shockang.study.bigdata.flink.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCount {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> text = env.readTextFile("the_path_for_input");

        DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer())
                        .keyBy(0).sum(1);

        counts.writeAsText("the_path_for_output");

        // execute program
        env.execute("WordCount");
    }

    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

}

上面写的word count的例子,没有包含状态管理。如果一个task在处理过程中挂掉了,那么它在内存中的状态都会丢失,所有的数据都需要重新计算。从容错和消息处理的语义上(at least once, exactly once),Flink引入了state和checkpoint。

什么是 state 和 checkpoint ?

state一般指一个具体的task/operator的状态【state数据默认保存在java的堆内存中】

而checkpoint【可以理解为checkpoint是把state数据持久化存储了】,则表示了一个Flink Job在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态

注意:task是Flink中执行的基本单位。operator指算子(transformation)。

State

State可以被记录,在失败的情况下数据还可以恢复

Flink中有两种基本类型的State

  • Keyed State
  • Operator State

针对两种state,每种state都有两种方式存在

  • 原始状态(raw state)
  • 托管状态(managed state)

托管状态是由Flink框架管理的状态

而原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态

1、keyed state的托管状态

顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,都对应一个state。stream.keyBy(…)

保存state的数据结构

  1. ValueState:即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值
  2. ListState:即key上的状态值为一个列表。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值
  3. ReducingState:这种状态通过用户传入的reduceFunction,每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值
  4. MapState<UK, UV>:即状态值为一个map。用户通过put或putAll方法添加元素
    需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄

2、operator state托管状态

对于与key无关的dataStream可以进行状态托管,与算子进行绑定,对我们的数据进行处理
与Key无关的State,与Operator绑定的state,整个operator只对应一个state保存state的数据结构一般使用ListState
举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射

checkPoint

1、checkPoint的基本概念

为了保证state的容错性,Flink需要对state进行checkpoint

Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常

2、checkPoint的前提

Flink的checkpoint机制可以与(stream和state)的持久化存储交互的前提:

1、持久化的source,它需要支持在一定时间内重放事件。

这种sources的典型例子是持久化的消息队列(比如Apache Kafka,RabbitMQ等)或文件系统(比如HDFS,S3,GFS等)

2、用于state的持久化存储

例如分布式文件系统(比如HDFS,S3,GFS等)

checkpoint 流程

  1. 暂停新数据的输入
  2. 等待流中on-the-fly的数据被处理干净,此时得到flink graph的一个snapshot
  3. 将所有Task中的State拷贝到State Backend中,如HDFS。此动作由各个Task Manager完成
  4. 各个Task Manager将Task State的位置上报给Job Manager,完成checkpoint
  5. 恢复数据的输入
    如上所述,这里才需要“暂停输入+排干on-the-fly数据”的操作,这样才能拿到同一时刻下所有subtask的state

配置checkPoint

默认checkpoint功能是disabled的,想要使用的时候需要先启用

checkpoint开启之后,默认的checkPointMode是Exactly-once

checkpoint的checkPointMode有两种,Exactly-once和At-least-once

Exactly-once对于大多数应用来说是最合适的。At-least-once可能用在某些延迟超低的应用程序(始终延迟为几毫秒)

//默认checkpoint功能是disabled的,想要使用的时候需要先启用
// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】
environment.enableCheckpointing(1000);
// 高级选项:
// 设置模式为exactly-once (这是默认值)
environment.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
environment.getCheckpointConfig.setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
environment.getCheckpointConfig.setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
environment.getCheckpointConfig.setMaxConcurrentCheckpoints(1);
// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】

/**
  * ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
  * ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
  */
environment.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

State Backend

默认情况下,state会保存在taskmanager的内存中,checkpoint会存储在JobManager的内存中。

state 的store和checkpoint的位置取决于State Backend的配置

env.setStateBackend(…)

一共有三种State Backend

  1. MemoryStateBackend # 内存存储
  2. FsStateBackend # 文件系统存储
  3. RocksDBStateBackend # rocksDB是一个数据库

1、MemoryStateBackend

将数据持久化状态存储到内存当中,state数据保存在java堆内存中,执行checkpoint的时候,会把state的快照数据保存到jobmanager的内存中。基于内存的state backend在生产环境下不建议使用
代码配置:

// environment.setStateBackend(new MemoryStateBackend())

2、FsStateBackend

state数据保存在taskmanager的内存中,执行checkpoint的时候,会把state的快照数据保存到配置的文件系统中。可以使用hdfs等分布式文件系统
代码配置:

//environment.setStateBackend(new FsStateBackend("hdfs://node01:8020"))

3、RocksDBStateBackend

RocksDB介绍:RocksDB使用一套日志结构的数据库引擎,为了更好的性能,这套引擎是用C++编写的。 Key和value是任意大小的字节流。RocksDB跟上面的都略有不同,它会在本地文件系统中维护状态,state会直接写入本地rocksdb中。同时它需要配置一个远端的filesystem uri(一般是HDFS),在做checkpoint的时候,会把本地的数据直接复制到filesystem中。fail over的时候从filesystem中恢复到本地RocksDB克服了state受内存限制的缺点,同时又能够持久化到远端文件系统中,比较适合在生产中使用

代码配置:导入jar包然后配置代码

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
    <version>1.8.1</version>
</dependency>

配置代码

environment.setStateBackend(new RocksDBStateBackend("hdfs://node01:8020/flink/checkDir",true))

4、修改state-backend的两种方式

修改State Backend的两种方式

第一种:单任务调整

修改当前任务代码

env.setStateBackend(new FsStateBackend("hdfs://node01:8020/flink/checkpoints"));

或者new MemoryStateBackend()

或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依赖】

第二种:全局调整

修改flink-conf.yaml

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面几种:
jobmanager(MemoryStateBackend),
filesystem(FsStateBackend),
rocksdb(RocksDBStateBackend)

checkPoint保存多个历史版本

默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。但是,如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前

Flink可以支持保留多个Checkpoint,需要在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的个数

state.checkpoints.num-retained: 20

这样设置以后就查看对应的Checkpoint在HDFS上存储的文件目录

hdfs dfs -ls hdfs://node01:8020/flink/checkpoints

如果希望回退到某个Checkpoint点,只需要指定对应的某个Checkpoint路径即可实现

checkPoint恢复历史某个版本数据

如果Flink程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个Checkpoint点进行恢复

bin/flink run -s hdfs://node01:8020/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata flink-job.jar

程序正常运行后,还会按照Checkpoint配置进行运行,继续生成Checkpoint数据

savePoint

savePoint的介绍

  1. Flink通过Savepoint功能可以做到程序升级后,继续从升级前的那个点开始执行计算,保证数据不中断。
  2. 全局,一致性快照。可以保存数据源offset,operator操作状态等信息,可以从应用在过去任意做了savepoint的时刻开始继续消费
  3. 用户手动执行,是指向Checkpoint的指针,不会过期
  4. 在程序升级的情况下使用

注意:为了能够在作业的不同版本之间以及 Flink 的不同版本之间顺利升级,强烈推荐程序员通过 uid(String) 方法手动的给算子赋予 ID,这些 ID 将用于确定每一个算子的状态范围。如果不手动给各算子指定 ID,则会由 Flink 自动给每个算子生成一个 ID。只要这些 ID 没有改变就能从保存点(savepoint)将程序恢复回来。而这些自动生成的 ID 依赖于程序的结构,并且对代码的更改是很敏感的。因此,强烈建议用户手动的设置 ID。

savePoint的使用

1:在flink-conf.yaml中配置Savepoint存储位置
不是必须设置,但是设置后,后面创建指定Job的Savepoint时,可以不用在手动执行命令时指定Savepoint的位置

state.savepoints.dir: hdfs://node01:8020/flink/savepoints

2:触发一个savepoint【直接触发或者在cancel的时候触发】

bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】

bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【针对on yarn模式需要指定-yid参数】

3:从指定的savepoint启动job

bin/flink run -s savepointPath [runArgs]
上一篇 下一篇