前言

本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

正文

什么是计数器?

计数器(Counter)是 MapReduce 应用程序报告其统计数据的设施。

Mapper 和 Reducer 实现可以使用计数器报告统计数据。

Hadoop MapReduce 附带了一个非常有用的 mapper, reducer 和 partitioner 库。

举例

比如,用户可能想快速实现文件行数,以及其中错误记录的统计。

为了使用这样的特性,用户代码创建一个叫作 Counter 的对象,并且在适当的时候,Map 和 Reduce 函数中增加 Counter 的值。

这些 Counter 的值,会定时从各个单独的 Worker 机器上传递给 Master(通过 Ping 的应答包传递)。

Master 把执行成功的 Map 或者 Reduce 任务的 Counter 值进行累计,并且当 MapReduce 操作完成之后,返回给用户代码。

当前 Counter 值也会显示在 Master 的状态页面,这样用户可以看到计算现场的进度。

当累计 Counter 的值的时侯, Master 会检查是否有对同一个 Map 或者 Reduce 任务的相同累计,避免重复累计

Backup 任务或者机器失效导致的重新执行 Map 任务或者 Reduce 任务或导致这个 Counter 重复执行,所以需要检査,避免 Master 进行重复统计。

部分计数器的值是由 MapReduce 函数库进行自动维持的,比如经过处理后输入的 Key/Value 对的数量,或者输出的 Key/Value 键值对等 Counter 特性对于 MapReduce 操作的完整性检查非常有用。

实践

package com.shockang.study.bigdata.mapreduce.counter;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMapperWithCounter extends Mapper<LongWritable, Text, LongWritable, IntWritable> {
    /**
     * 定义一个枚举类型
     */
    public static enum FileRecorder {
        ErrorRecorder,
        TotalRecorder
    }

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        if ("error".equals(value.toString())) {
            /**
             * 把counter实现累加
             */
            context.getCounter(FileRecorder.ErrorRecorder).increment(1);
        }
        /**
         * 把counter实现累加
         */
        context.getCounter(FileRecorder.TotalRecorder).increment(1);
    }
}
package com.shockang.study.bigdata.mapreduce.counter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JobMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        /**
         * 使NLineInputFormat来分割一个小文件,近而模拟分布式大文件的处理
         */
        configuration.setInt("mapreduce.input.lineinputformat.linespermap", 5);
        Job job = new Job(configuration, "counter-job");
        job.setInputFormatClass(NLineInputFormat.class);
        job.setJarByClass(JobMain.class);
        job.setMapperClass(MyMapperWithCounter.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        Path outputDir = new Path(args[1]);
        FileSystem fs = FileSystem.get(configuration);
        if (fs.exists(outputDir)) {
            fs.delete(outputDir, true);
        }
        FileOutputFormat.setOutputPath(job, outputDir);
        if (job.waitForCompletion(true)) {
            System.out.println("Error num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.ErrorRecorder).getValue());
            System.out.println("Total num:" + job.getCounters().findCounter(MyMapperWithCounter.FileRecorder.TotalRecorder).getValue());
        }
    }
}
上一篇 下一篇