/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.common.Abortable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableFactories;
import org.apache.hadoop.io.WritableFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.ShuffleConsumerPlugin;
import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapred.SortedRanges;
import org.apache.hadoop.mapred.Task;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskStatus;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.mapreduce.task.reduce.Shuffle;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class ReduceTask
extends Task {
    private static final Logger LOG;
    private int numMaps;
    private CompressionCodec codec;
    private Map<TaskAttemptID, MapOutputFile> localMapFiles;
    private Progress copyPhase;
    private Progress sortPhase;
    private Progress reducePhase;
    private Counters.Counter shuffledMapsCounter;
    private Counters.Counter reduceShuffleBytes;
    private Counters.Counter reduceInputKeyCounter;
    private Counters.Counter reduceInputValueCounter;
    private Counters.Counter reduceOutputCounter;
    private Counters.Counter reduceCombineInputCounter;
    private Counters.Counter reduceCombineOutputCounter;
    private Counters.Counter fileOutputByteCounter;
    private Comparator<FileStatus> mapOutputFileComparator;
    private final SortedSet<FileStatus> mapOutputFilesOnDisk;

    public ReduceTask() {
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.shuffledMapsCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
        this.reduceShuffleBytes = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
        this.reduceInputKeyCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
        this.reduceCombineInputCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
        this.reduceCombineOutputCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        this.fileOutputByteCounter = (Counters.Counter)this.getCounters().findCounter(FileOutputFormatCounter.BYTES_WRITTEN);
        this.mapOutputFileComparator = new Comparator<FileStatus>(){

            @Override
            public int compare(FileStatus a, FileStatus b) {
                if (a.getLen() < b.getLen()) {
                    return -1;
                }
                if (a.getLen() == b.getLen()) {
                    if (a.getPath().toString().equals(b.getPath().toString())) {
                        return 0;
                    }
                    return -1;
                }
                return 1;
            }
        };
        this.mapOutputFilesOnDisk = new TreeSet<FileStatus>(this.mapOutputFileComparator);
    }

    public ReduceTask(String jobFile, TaskAttemptID taskId, int partition, int numMaps, int numSlotsRequired) {
        super(jobFile, taskId, partition, numSlotsRequired);
        this.getProgress().setStatus("reduce");
        this.setPhase(TaskStatus.Phase.SHUFFLE);
        this.shuffledMapsCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
        this.reduceShuffleBytes = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
        this.reduceInputKeyCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
        this.reduceInputValueCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
        this.reduceOutputCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
        this.reduceCombineInputCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
        this.reduceCombineOutputCounter = (Counters.Counter)this.getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);
        this.fileOutputByteCounter = (Counters.Counter)this.getCounters().findCounter(FileOutputFormatCounter.BYTES_WRITTEN);
        this.mapOutputFileComparator = new /* invalid duplicate definition of identical inner class */;
        this.mapOutputFilesOnDisk = new TreeSet<FileStatus>(this.mapOutputFileComparator);
        this.numMaps = numMaps;
    }

    public void setLocalMapFiles(Map<TaskAttemptID, MapOutputFile> mapFiles) {
        this.localMapFiles = mapFiles;
    }

    private CompressionCodec initCodec() {
        if (this.conf.getCompressMapOutput()) {
            Class<? extends CompressionCodec> codecClass = this.conf.getMapOutputCompressorClass(DefaultCodec.class);
            return ReflectionUtils.newInstance(codecClass, this.conf);
        }
        return null;
    }

    @Override
    public boolean isMapTask() {
        return false;
    }

    public int getNumMaps() {
        return this.numMaps;
    }

    @Override
    public void localizeConfiguration(JobConf conf) throws IOException {
        super.localizeConfiguration(conf);
        conf.setNumMapTasks(this.numMaps);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        super.write(out);
        out.writeInt(this.numMaps);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        super.readFields(in);
        this.numMaps = in.readInt();
    }

    private Path[] getMapFiles(FileSystem fs) throws IOException {
        ArrayList<Path> fileList = new ArrayList<Path>();
        for (int i = 0; i < this.numMaps; ++i) {
            fileList.add(this.mapOutputFile.getInputFile(i));
        }
        return fileList.toArray(new Path[0]);
    }

    @Override
    public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {
        job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
        if (this.isMapOrReduce()) {
            this.copyPhase = this.getProgress().addPhase("copy");
            this.sortPhase = this.getProgress().addPhase("sort");
            this.reducePhase = this.getProgress().addPhase("reduce");
        }
        Task.TaskReporter reporter = this.startReporter(umbilical);
        boolean useNewApi = job.getUseNewReducer();
        this.initialize(job, this.getJobID(), reporter, useNewApi);
        if (this.jobCleanup) {
            this.runJobCleanupTask(umbilical, reporter);
            return;
        }
        if (this.jobSetup) {
            this.runJobSetupTask(umbilical, reporter);
            return;
        }
        if (this.taskCleanup) {
            this.runTaskCleanupTask(umbilical, reporter);
            return;
        }
        this.codec = this.initCodec();
        RawKeyValueIterator rIter = null;
        ShuffleConsumerPlugin shuffleConsumerPlugin = null;
        Class<? extends Reducer> combinerClass = this.conf.getCombinerClass();
        Task.CombineOutputCollector combineCollector = null != combinerClass ? new Task.CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null;
        Class<ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class);
        shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
        ShuffleConsumerPlugin.Context shuffleContext = new ShuffleConsumerPlugin.Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, this.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles);
        shuffleConsumerPlugin.init(shuffleContext);
        rIter = shuffleConsumerPlugin.run();
        this.mapOutputFilesOnDisk.clear();
        this.sortPhase.complete();
        this.setPhase(TaskStatus.Phase.REDUCE);
        this.statusUpdate(umbilical);
        Class<?> keyClass = job.getMapOutputKeyClass();
        Class<?> valueClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputValueGroupingComparator();
        if (useNewApi) {
            this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        } else {
            this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        }
        shuffleConsumerPlugin.close();
        this.done(umbilical, reporter);
    }

    private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runOldReducer(JobConf job, TaskUmbilicalProtocol umbilical, final Task.TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException {
        OldTrackingRecordWriter out;
        Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job);
        String finalName = ReduceTask.getOutputName(this.getPartition());
        final OldTrackingRecordWriter finalOut = out = new OldTrackingRecordWriter(this, job, reporter, finalName);
        OutputCollector collector = new OutputCollector<OUTKEY, OUTVALUE>(){

            @Override
            public void collect(OUTKEY key, OUTVALUE value) throws IOException {
                finalOut.write(key, value);
                reporter.progress();
            }
        };
        try {
            boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job) > 0L && SkipBadRecords.getAutoIncrReducerProcCount(job);
            ReduceValuesIterator values = this.isSkipping() ? new SkippingReduceValuesIterator<INKEY, INVALUE>(rIter, comparator, keyClass, valueClass, job, reporter, umbilical) : new ReduceValuesIterator<INKEY, INVALUE>(rIter, comparator, keyClass, valueClass, job, reporter);
            values.informReduceProgress();
            while (values.more()) {
                this.reduceInputKeyCounter.increment(1L);
                reducer.reduce(values.getKey(), values, collector, reporter);
                if (incrProcCount) {
                    reporter.incrCounter("SkippingTaskCounters", "ReduceProcessedGroups", 1L);
                }
                values.nextKey();
                values.informReduceProgress();
            }
            reducer.close();
            reducer = null;
            out.close(reporter);
            out = null;
        }
        catch (IOException ioe) {
            try {
                if (out instanceof Abortable) {
                    try {
                        ((Abortable)((Object)out)).abort();
                    }
                    catch (IOException iOException) {
                        // empty catch block
                    }
                }
                throw ioe;
            }
            catch (Throwable throwable) {
                IOUtils.cleanupWithLogger(LOG, reducer);
                this.closeQuietly(out, reporter);
                throw throwable;
            }
        }
        IOUtils.cleanupWithLogger(LOG, reducer);
        this.closeQuietly(out, reporter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final Task.TaskReporter reporter, RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException, ClassNotFoundException {
        final RawKeyValueIterator rawIter = rIter;
        rIter = new RawKeyValueIterator(){

            @Override
            public void close() throws IOException {
                rawIter.close();
            }

            @Override
            public DataInputBuffer getKey() throws IOException {
                return rawIter.getKey();
            }

            @Override
            public Progress getProgress() {
                return rawIter.getProgress();
            }

            @Override
            public DataInputBuffer getValue() throws IOException {
                return rawIter.getValue();
            }

            @Override
            public boolean next() throws IOException {
                boolean ret = rawIter.next();
                reporter.setProgress(rawIter.getProgress().getProgress());
                return ret;
            }
        };
        TaskAttemptContextImpl taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
        org.apache.hadoop.mapreduce.Reducer<?, ?, ?, ?> reducer = ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
        NewTrackingRecordWriter trackedRW = new NewTrackingRecordWriter(this, taskContext);
        job.setBoolean("mapred.skip.on", this.isSkipping());
        job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
        Reducer.Context reducerContext = ReduceTask.createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);
        try {
            reducer.run(reducerContext);
        }
        finally {
            ((RecordWriter)trackedRW).close(reducerContext);
        }
    }

    private <OUTKEY, OUTVALUE> void closeQuietly(org.apache.hadoop.mapred.RecordWriter<OUTKEY, OUTVALUE> c, Reporter r) {
        if (c != null) {
            try {
                c.close(r);
            }
            catch (Exception e) {
                LOG.info("Exception in closing " + c, (Throwable)e);
            }
        }
    }

    static {
        WritableFactories.setFactory(ReduceTask.class, new WritableFactory(){

            @Override
            public Writable newInstance() {
                return new ReduceTask();
            }
        });
        LOG = LoggerFactory.getLogger((String)ReduceTask.class.getName());
    }

    static class NewTrackingRecordWriter<K, V>
    extends RecordWriter<K, V> {
        private final RecordWriter<K, V> real;
        private final Counter outputRecordCounter;
        private final Counter fileOutputByteCounter;
        private final List<FileSystem.Statistics> fsStats;

        NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException {
            this.outputRecordCounter = reduce.reduceOutputCounter;
            this.fileOutputByteCounter = reduce.fileOutputByteCounter;
            List<FileSystem.Statistics> matchedStats = null;
            if (reduce.outputFormat instanceof FileOutputFormat) {
                matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration());
            }
            this.fsStats = matchedStats;
            long bytesOutPrev = this.getOutputBytes(this.fsStats);
            this.real = reduce.outputFormat.getRecordWriter(taskContext);
            long bytesOutCurr = this.getOutputBytes(this.fsStats);
            this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            long bytesOutPrev = this.getOutputBytes(this.fsStats);
            this.real.close(context);
            long bytesOutCurr = this.getOutputBytes(this.fsStats);
            this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
        }

        @Override
        public void write(K key, V value) throws IOException, InterruptedException {
            long bytesOutPrev = this.getOutputBytes(this.fsStats);
            this.real.write(key, value);
            long bytesOutCurr = this.getOutputBytes(this.fsStats);
            this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
            this.outputRecordCounter.increment(1L);
        }

        private long getOutputBytes(List<FileSystem.Statistics> stats) {
            if (stats == null) {
                return 0L;
            }
            long bytesWritten = 0L;
            for (FileSystem.Statistics stat : stats) {
                bytesWritten += stat.getBytesWritten();
            }
            return bytesWritten;
        }
    }

    static class OldTrackingRecordWriter<K, V>
    implements org.apache.hadoop.mapred.RecordWriter<K, V> {
        private final org.apache.hadoop.mapred.RecordWriter<K, V> real;
        private final Counters.Counter reduceOutputCounter;
        private final Counters.Counter fileOutputByteCounter;
        private final List<FileSystem.Statistics> fsStats;

        public OldTrackingRecordWriter(ReduceTask reduce, JobConf job, Task.TaskReporter reporter, String finalName) throws IOException {
            this.reduceOutputCounter = reduce.reduceOutputCounter;
            this.fileOutputByteCounter = reduce.fileOutputByteCounter;
            List<FileSystem.Statistics> matchedStats = null;
            if (job.getOutputFormat() instanceof org.apache.hadoop.mapred.FileOutputFormat) {
                matchedStats = Task.getFsStatistics(org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(job), job);
            }
            this.fsStats = matchedStats;
            FileSystem fs = FileSystem.get(job);
            long bytesOutPrev = this.getOutputBytes(this.fsStats);
            this.real = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
            long bytesOutCurr = this.getOutputBytes(this.fsStats);
            this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
        }

        @Override
        public void write(K key, V value) throws IOException {
            long bytesOutPrev = this.getOutputBytes(this.fsStats);
            this.real.write(key, value);
            long bytesOutCurr = this.getOutputBytes(this.fsStats);
            this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
            this.reduceOutputCounter.increment(1L);
        }

        @Override
        public void close(Reporter reporter) throws IOException {
            long bytesOutPrev = this.getOutputBytes(this.fsStats);
            this.real.close(reporter);
            long bytesOutCurr = this.getOutputBytes(this.fsStats);
            this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
        }

        private long getOutputBytes(List<FileSystem.Statistics> stats) {
            if (stats == null) {
                return 0L;
            }
            long bytesWritten = 0L;
            for (FileSystem.Statistics stat : stats) {
                bytesWritten += stat.getBytesWritten();
            }
            return bytesWritten;
        }
    }

    private class SkippingReduceValuesIterator<KEY, VALUE>
    extends ReduceValuesIterator<KEY, VALUE> {
        private SortedRanges.SkipRangeIterator skipIt;
        private TaskUmbilicalProtocol umbilical;
        private Counters.Counter skipGroupCounter;
        private Counters.Counter skipRecCounter;
        private long grpIndex;
        private Class<KEY> keyClass;
        private Class<VALUE> valClass;
        private SequenceFile.Writer skipWriter;
        private boolean toWriteSkipRecs;
        private boolean hasNext;
        private Task.TaskReporter reporter;

        public SkippingReduceValuesIterator(RawKeyValueIterator in, RawComparator<KEY> comparator, Class<KEY> keyClass, Class<VALUE> valClass, Configuration conf, Task.TaskReporter reporter, TaskUmbilicalProtocol umbilical) throws IOException {
            super(in, comparator, keyClass, valClass, conf, reporter);
            this.grpIndex = -1L;
            this.umbilical = umbilical;
            this.skipGroupCounter = reporter.getCounter((Enum)TaskCounter.REDUCE_SKIPPED_GROUPS);
            this.skipRecCounter = reporter.getCounter((Enum)TaskCounter.REDUCE_SKIPPED_RECORDS);
            this.toWriteSkipRecs = ReduceTask.this.toWriteSkipRecs() && SkipBadRecords.getSkipOutputPath(conf) != null;
            this.keyClass = keyClass;
            this.valClass = valClass;
            this.reporter = reporter;
            this.skipIt = ReduceTask.this.getSkipRanges().skipRangeIterator();
            this.mayBeSkip();
        }

        @Override
        public void nextKey() throws IOException {
            super.nextKey();
            this.mayBeSkip();
        }

        @Override
        public boolean more() {
            return super.more() && this.hasNext;
        }

        private void mayBeSkip() throws IOException {
            this.hasNext = this.skipIt.hasNext();
            if (!this.hasNext) {
                LOG.warn("Further groups got skipped.");
                return;
            }
            ++this.grpIndex;
            long nextGrpIndex = this.skipIt.next();
            long skip = 0L;
            long skipRec = 0L;
            while (this.grpIndex < nextGrpIndex && super.more()) {
                while (this.hasNext()) {
                    Object value = this.moveToNext();
                    if (this.toWriteSkipRecs) {
                        this.writeSkippedRec(this.getKey(), value);
                    }
                    ++skipRec;
                }
                super.nextKey();
                ++this.grpIndex;
                ++skip;
            }
            if (skip > 0L && this.skipIt.skippedAllRanges() && this.skipWriter != null) {
                this.skipWriter.close();
            }
            this.skipGroupCounter.increment(skip);
            this.skipRecCounter.increment(skipRec);
            ReduceTask.this.reportNextRecordRange(this.umbilical, this.grpIndex);
        }

        private void writeSkippedRec(KEY key, VALUE value) throws IOException {
            if (this.skipWriter == null) {
                Path skipDir = SkipBadRecords.getSkipOutputPath(ReduceTask.this.conf);
                Path skipFile = new Path(skipDir, ReduceTask.this.getTaskID().toString());
                this.skipWriter = SequenceFile.createWriter(skipFile.getFileSystem(ReduceTask.this.conf), (Configuration)ReduceTask.this.conf, skipFile, this.keyClass, this.valClass, SequenceFile.CompressionType.BLOCK, this.reporter);
            }
            this.skipWriter.append(key, value);
        }
    }

    private class ReduceValuesIterator<KEY, VALUE>
    extends Task.ValuesIterator<KEY, VALUE> {
        public ReduceValuesIterator(RawKeyValueIterator in, RawComparator<KEY> comparator, Class<KEY> keyClass, Class<VALUE> valClass, Configuration conf, Progressable reporter) throws IOException {
            super(in, comparator, keyClass, valClass, conf, reporter);
        }

        @Override
        public VALUE next() {
            ReduceTask.this.reduceInputValueCounter.increment(1L);
            return this.moveToNext();
        }

        protected VALUE moveToNext() {
            return super.next();
        }

        public void informReduceProgress() {
            ReduceTask.this.reducePhase.set(this.in.getProgress().getProgress());
            this.reporter.progress();
        }
    }
}

