/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.manual;

import java.io.Serializable;
import java.util.Iterator;
import java.util.Random;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.ReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.util.SplittableIterator;

public class ReducePerformance {
    public static void main(String[] args) throws Exception {
        int numElements = 40000000;
        int keyRange = 4000000;
        ReducePerformance.testReducePerformance(new TupleIntIntIterator(1000), TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{Integer.class, Integer.class}), ReduceOperatorBase.CombineHint.SORT, 10000, false);
        ReducePerformance.testReducePerformance(new TupleIntIntIterator(1000), TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{Integer.class, Integer.class}), ReduceOperatorBase.CombineHint.HASH, 10000, false);
        ReducePerformance.testReducePerformance(new TupleIntIntIterator(4000000), TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{Integer.class, Integer.class}), ReduceOperatorBase.CombineHint.SORT, 40000000, true);
        ReducePerformance.testReducePerformance(new TupleIntIntIterator(4000000), TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{Integer.class, Integer.class}), ReduceOperatorBase.CombineHint.HASH, 40000000, true);
        ReducePerformance.testReducePerformance(new TupleStringIntIterator(4000000), TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class, Integer.class}), ReduceOperatorBase.CombineHint.SORT, 40000000, true);
        ReducePerformance.testReducePerformance(new TupleStringIntIterator(4000000), TupleTypeInfo.getBasicTupleTypeInfo((Class[])new Class[]{String.class, Integer.class}), ReduceOperatorBase.CombineHint.HASH, 40000000, true);
    }

    private static <T, B extends CopyableIterator<T>> void testReducePerformance(B iterator, TypeInformation<T> typeInfo, ReduceOperatorBase.CombineHint hint, int numRecords, boolean print) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().enableObjectReuse();
        ReduceOperator output = env.fromParallelCollection(new SplittableRandomIterator(numRecords, iterator), typeInfo).groupBy(new String[]{"0"}).reduce(new SumReducer()).setCombineHint(hint);
        long start = System.currentTimeMillis();
        System.out.println(output.count());
        long end = System.currentTimeMillis();
        if (print) {
            System.out.println("=== Time for " + iterator.getClass().getSimpleName() + " with hint " + hint.toString() + ": " + (end - start) + "ms ===");
        }
    }

    private static final class SumReducer<K>
    implements ReduceFunction<Tuple2<K, Integer>> {
        private SumReducer() {
        }

        public Tuple2<K, Integer> reduce(Tuple2<K, Integer> a, Tuple2<K, Integer> b) throws Exception {
            if (!a.f0.equals(b.f0)) {
                throw new RuntimeException("SumReducer was called with two record that have differing keys.");
            }
            a.f1 = (Integer)a.f1 + (Integer)b.f1;
            return a;
        }
    }

    private static final class TupleStringIntIterator
    implements CopyableIterator<Tuple2<String, Integer>>,
    Serializable {
        private final int keyRange;
        private Tuple2<String, Integer> reuse = new Tuple2();
        private int rndSeed = 11;
        private Random rnd;

        public TupleStringIntIterator(int keyRange) {
            this.keyRange = keyRange;
            this.rnd = new Random(this.rndSeed);
        }

        public TupleStringIntIterator(int keyRange, int rndSeed) {
            this.keyRange = keyRange;
            this.rndSeed = rndSeed;
            this.rnd = new Random(rndSeed);
        }

        @Override
        public boolean hasNext() {
            return true;
        }

        @Override
        public Tuple2<String, Integer> next() {
            this.reuse.f0 = String.valueOf(this.rnd.nextInt(this.keyRange));
            this.reuse.f1 = 1;
            return this.reuse;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public CopyableIterator<Tuple2<String, Integer>> copy() {
            return new TupleStringIntIterator(this.keyRange, this.rndSeed + this.rnd.nextInt(10000));
        }
    }

    private static final class TupleIntIntIterator
    implements CopyableIterator<Tuple2<Integer, Integer>>,
    Serializable {
        private final int keyRange;
        private Tuple2<Integer, Integer> reuse = new Tuple2();
        private int rndSeed = 11;
        private Random rnd;

        public TupleIntIntIterator(int keyRange) {
            this.keyRange = keyRange;
            this.rnd = new Random(this.rndSeed);
        }

        public TupleIntIntIterator(int keyRange, int rndSeed) {
            this.keyRange = keyRange;
            this.rndSeed = rndSeed;
            this.rnd = new Random(rndSeed);
        }

        @Override
        public boolean hasNext() {
            return true;
        }

        @Override
        public Tuple2<Integer, Integer> next() {
            this.reuse.f0 = this.rnd.nextInt(this.keyRange);
            this.reuse.f1 = 1;
            return this.reuse;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public CopyableIterator<Tuple2<Integer, Integer>> copy() {
            return new TupleIntIntIterator(this.keyRange, this.rndSeed + this.rnd.nextInt(10000));
        }
    }

    private static interface CopyableIterator<T>
    extends Iterator<T> {
        public CopyableIterator<T> copy();
    }

    private static final class SplittableRandomIterator<T, B extends CopyableIterator<T>>
    extends SplittableIterator<T>
    implements Serializable {
        private int numElements;
        private final B baseIterator;

        public SplittableRandomIterator(int numElements, B baseIterator) {
            this.numElements = numElements;
            this.baseIterator = baseIterator;
        }

        public boolean hasNext() {
            return this.numElements > 0;
        }

        public T next() {
            --this.numElements;
            return (T)this.baseIterator.next();
        }

        public SplittableRandomIterator<T, B>[] split(int numPartitions) {
            int splitSize = this.numElements / numPartitions;
            int rem = this.numElements % numPartitions;
            SplittableRandomIterator[] res = new SplittableRandomIterator[numPartitions];
            for (int i = 0; i < numPartitions; ++i) {
                res[i] = new SplittableRandomIterator(i < rem ? splitSize : splitSize + 1, this.baseIterator.copy());
            }
            return res;
        }

        public int getMaximumNumberOfSplits() {
            return this.numElements;
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
    }
}

