前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

正文

通过唯一一个 Sink 作为接收器来接收后续需要的数据,有时候会出现当前 Sink 故障或者数据收集请求量较大的情况,这时候单一的 Sink 配置可能就无法保证 Flume 开发的可靠性。

为此, Flume 提供了 Flume Sink Processors ( FlumeSink 处理器)来解决上述问题。

Sink 处理器允许开发者定义一个 Sink groups (接收器组),将多个 Sink 分组到一个实体中,

这样 Sink 处理器就可以通过组内的多个 Sink 为服务提供负载均衡功能,或者是在某个 Sink 出现短暂故障的时候实现从一个 Sink 到另一个 Sink 的故障转移

负载均衡

负载均衡接收器处理器( Load balancing sink processor )提供了在多个 Sink 上进行负载均衡流量的功能,它维护了一个活跃的 Sink 索引列表,必须在其上分配负载。

Load balancing sink processor 支持使用 round_robin (轮询)和 random (随机)选择机制进行流量分配,其默认选择机制为 round_robin ,但可以通过配置进行覆盖,还支持继承 AbstractSinkSelector 的自定义类来自定义选择机制。

在使用时,选择器( selector )会根据配置的选择机制挑选下一个可用的 Sink 并进行调用。

对于 round robin 和 random 两种选择机制,如果所选 Sink 无法收集 event ,则处理器会通过其配置的选择机制选择下一个可用 Sink 。

这种实现方案不会将失败的 Sink 列入黑名单,而是继续乐观地尝试每个可用的 Sink 。

如果所有 Sink 都调用失败,则选择器将故障传播到接收器运行器( sink runner )。

如果启用了 backoff 属性,则 Sink 处理器会将失败的 Sink 列入黑名单。

当超时结束时,如果 Sink 仍然没有响应,则超时会呈指数级増加,以避免在无响应的 Sink 上长时间等待时卡住。

在禁用 backoff 功能的情况下,在 round robin 机制下,所有失败的 Sink 将被传递到 Sink 队列中的下一个 Sink 后,因此不再均衡。

Load balancing sink processor 提供的配置属性,如表所示(加粗部分为必须属性)。

属性名称 默认值 说明
sinks - 以空格分隔的参与sink组的sink列表
processor.type default 组件类型名必须是 load_ba1ance
processor.backoff fa1se 设置失败的sink进入黑名单
processor.selector round_robin 选择机制必须是 round_robin、 random 或是继承自 AbstractSinkSelector 的自定义选择机制类全路径名
processor.selector.maxTimeout 30000 失败sink放置在黑名单的超时时间,失败sink在指定时间后仍无法启用,则超时时间呈指数增加

从表中可以看出, processor.type 属性的默认值为 default,这是因为Sink处理器的 processor.type 提供了3种处理机制: default(默认值)、 failover和load_ba1ance。

其中, default表示配置单独一个sink配置和使用非常简单,同时也不强制要求使用 sink group进行封装;

另外的 failover 和load_ba1ance 就分别代表故障转移和负载均衡情况下的配置属性。

故障转移

故障转移接收器处理器( Failover Sink Processor )维护一个具有优先级的 Sink 列表,保证在处理 event 只要有一个可用的 Sink 即可。

故障转移机制的工作原理

将故障的 Sink 降级到故障池中,在池中为它们分配一个冷却期,在重试之前冷却时间会增加,当 Sink 成功发送 event 后,它将恢复到活跃池中。

Sink 具有与之相关的优先级,数值越大,优先级越高。

如果在发送 event 时 Sink 发生故障,则会尝试下一个具有最高优先级的 Sink 来继续发送 event 。

如果未指定优先级,则根据配置文件中指定 Sink 的顺序确定优先级。

FailoverSinkProcessor 提供的配置属性,如下表所示(加粗部分为必须属性)

属性名称 默认值 说明
sinks - 以空格分隔的参与sink组的sink列表
processor.type default 组件类型名必须是 failover
processor.priority.< sinkname > - 设置sink的优先级取值
processor.maxPenalty 30000 失败sink的最大退避时间

对比

Failover Sink Processor 与 Load balancing sink processor 的 Flume 结构图基本一样。

而这两种处理器的主要区别在于,Load balancing sink processor 中会让每一个活跃的 Sink 轮流/随机地处理 event ;

而 Failover Sink Processor 只允许一个活跃的且优先级高的 Sink 来处理 event ,只有在当前 Sink 故障后才会向下继续选择另一个活跃的且优先级高的 Sink 来处理 event 。

实践

  1. 使用 Load balancing sink processor 配置一个名称为 a1 的 Agent 示例如下。
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=random
  1. 使用 Failover Sink Processor 配置一个名称为 a1 的 Agent示例如下。
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=failover
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
a1.sinkgroups.g1.processor.maxpenalty=10000
上一篇 下一篇