/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import org.apache.beam.fn.harness.GroupingTable;
import org.apache.beam.runners.core.GlobalCombineFnRunner;
import org.apache.beam.runners.core.GlobalCombineFnRunners;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
import org.joda.time.Instant;

public class PrecombineGroupingTable<K, InputT, AccumT>
implements GroupingTable<K, InputT, AccumT> {
    private static final int BYTES_PER_JVM_WORD = PrecombineGroupingTable.getBytesPerJvmWord();
    private static final int PER_KEY_OVERHEAD = 24 * BYTES_PER_JVM_WORD;
    private static final double TARGET_LOAD = 0.9;
    private long maxSize;
    private final GroupingKeyCreator<? super K> groupingKeyCreator;
    private final PairInfo pairInfo;
    private final Combiner<? super K, InputT, AccumT, ?> combiner;
    private final SizeEstimator<? super K> keySizer;
    private final SizeEstimator<? super AccumT> accumulatorSizer;
    private long size = 0L;
    private Map<Object, GroupingTableEntry<K, InputT, AccumT>> table;

    private static long getGroupingTableSizeBytes(PipelineOptions options) {
        return (long)options.as(SdkHarnessOptions.class).getGroupingTableMaxSizeMb() * 1024L * 1024L;
    }

    public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT, AccumT> combining(PipelineOptions options, Combine.CombineFn<InputT, AccumT, ?> combineFn, Coder<K> keyCoder, Coder<? super AccumT> accumulatorCoder) {
        ValueCombiner valueCombiner = new ValueCombiner(GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options);
        return new PrecombineGroupingTable(PrecombineGroupingTable.getGroupingTableSizeBytes(options), new WindowingCoderGroupingKeyCreator<K>(keyCoder), WindowedPairInfo.create(), valueCombiner, new CoderSizeEstimator<K>(WindowedValue.getValueOnlyCoder(keyCoder)), new CoderSizeEstimator<AccumT>(accumulatorCoder));
    }

    public static <K, InputT, AccumT> GroupingTable<WindowedValue<K>, InputT, AccumT> combiningAndSampling(PipelineOptions options, Combine.CombineFn<InputT, AccumT, ?> combineFn, Coder<K> keyCoder, Coder<? super AccumT> accumulatorCoder, double sizeEstimatorSampleRate) {
        ValueCombiner valueCombiner = new ValueCombiner(GlobalCombineFnRunners.create(combineFn), NullSideInputReader.empty(), options);
        return new PrecombineGroupingTable(PrecombineGroupingTable.getGroupingTableSizeBytes(options), new WindowingCoderGroupingKeyCreator<K>(keyCoder), WindowedPairInfo.create(), valueCombiner, new SamplingSizeEstimator(new CoderSizeEstimator<K>(WindowedValue.getValueOnlyCoder(keyCoder)), sizeEstimatorSampleRate, 1.0), new SamplingSizeEstimator(new CoderSizeEstimator<AccumT>(accumulatorCoder), sizeEstimatorSampleRate, 1.0));
    }

    PrecombineGroupingTable(long maxSize, GroupingKeyCreator<? super K> groupingKeyCreator, PairInfo pairInfo, Combiner<? super K, InputT, AccumT, ?> combineFn, SizeEstimator<? super K> keySizer, SizeEstimator<? super AccumT> accumulatorSizer) {
        this.maxSize = maxSize;
        this.groupingKeyCreator = groupingKeyCreator;
        this.pairInfo = pairInfo;
        this.combiner = combineFn;
        this.keySizer = keySizer;
        this.accumulatorSizer = accumulatorSizer;
        this.table = new HashMap<Object, GroupingTableEntry<K, InputT, AccumT>>();
    }

    private GroupingTableEntry<K, InputT, AccumT> createTableEntry(final K key) throws Exception {
        return new GroupingTableEntry<K, InputT, AccumT>(){
            final long keySize;
            AccumT accumulator;
            long accumulatorSize;
            {
                this.keySize = PrecombineGroupingTable.this.keySizer.estimateSize(key);
                this.accumulator = PrecombineGroupingTable.this.combiner.createAccumulator(key);
                this.accumulatorSize = 0L;
            }

            @Override
            public K getKey() {
                return key;
            }

            @Override
            public AccumT getValue() {
                return this.accumulator;
            }

            @Override
            public long getSize() {
                return this.keySize + this.accumulatorSize;
            }

            @Override
            public void compact() throws Exception {
                Object newAccumulator = PrecombineGroupingTable.this.combiner.compact(key, this.accumulator);
                if (newAccumulator != this.accumulator) {
                    this.accumulator = newAccumulator;
                    this.accumulatorSize = PrecombineGroupingTable.this.accumulatorSizer.estimateSize(newAccumulator);
                }
            }

            @Override
            public void add(InputT value) throws Exception {
                this.accumulator = PrecombineGroupingTable.this.combiner.add(key, this.accumulator, value);
                this.accumulatorSize = PrecombineGroupingTable.this.accumulatorSizer.estimateSize(this.accumulator);
            }
        };
    }

    @Override
    public void put(Object pair, GroupingTable.Receiver receiver) throws Exception {
        this.put(this.pairInfo.getKeyFromInputPair(pair), this.pairInfo.getValueFromInputPair(pair), receiver);
    }

    public void put(K key, InputT value, GroupingTable.Receiver receiver) throws Exception {
        Object groupingKey = this.groupingKeyCreator.createGroupingKey(key);
        GroupingTableEntry<K, InputT, AccumT> entry = this.table.get(groupingKey);
        if (entry == null) {
            entry = this.createTableEntry(key);
            this.table.put(groupingKey, entry);
            this.size += (long)PER_KEY_OVERHEAD;
        } else {
            this.size -= entry.getSize();
        }
        entry.add(value);
        this.size += entry.getSize();
        if (this.size >= this.maxSize) {
            long targetSize = (long)(0.9 * (double)this.maxSize);
            Iterator<GroupingTableEntry<K, InputT, AccumT>> entries = this.table.values().iterator();
            while (this.size >= targetSize) {
                if (!entries.hasNext()) {
                    this.size = 0L;
                    break;
                }
                GroupingTableEntry<K, InputT, AccumT> toFlush = entries.next();
                entries.remove();
                this.size -= toFlush.getSize() + (long)PER_KEY_OVERHEAD;
                this.output(toFlush, receiver);
            }
        }
    }

    private void output(GroupingTableEntry<K, InputT, AccumT> entry, GroupingTable.Receiver receiver) throws Exception {
        entry.compact();
        receiver.process(this.pairInfo.makeOutputPair(entry.getKey(), entry.getValue()));
    }

    @Override
    public void flush(GroupingTable.Receiver output) throws Exception {
        for (GroupingTableEntry<K, InputT, AccumT> entry : this.table.values()) {
            this.output(entry, output);
        }
        this.table.clear();
        this.size = 0L;
    }

    @VisibleForTesting
    public void setMaxSize(long maxSize) {
        this.maxSize = maxSize;
    }

    @VisibleForTesting
    public long size() {
        return this.size;
    }

    private static int getBytesPerJvmWord() {
        String wordSizeInBits = System.getProperty("sun.arch.data.model");
        try {
            return Integer.parseInt(wordSizeInBits) / 8;
        }
        catch (NumberFormatException e) {
            return 8;
        }
    }

    @VisibleForTesting
    static class SamplingSizeEstimator<T>
    implements SizeEstimator<T> {
        static final double CONFIDENCE_INTERVAL_SIGMA = 3.0;
        static final double CONFIDENCE_INTERVAL_SIZE = 0.25;
        static final long DEFAULT_MIN_SAMPLED = 20L;
        private final SizeEstimator<T> underlying;
        private final double minSampleRate;
        private final double maxSampleRate;
        private final long minSampled;
        private final Random random;
        private long totalElements = 0L;
        private long sampledElements = 0L;
        private long sampledSum = 0L;
        private double sampledSumSquares = 0.0;
        private long estimate;
        private long nextSample = 0L;

        private SamplingSizeEstimator(SizeEstimator<T> underlying, double minSampleRate, double maxSampleRate) {
            this(underlying, minSampleRate, maxSampleRate, 20L, new Random());
        }

        @VisibleForTesting
        SamplingSizeEstimator(SizeEstimator<T> underlying, double minSampleRate, double maxSampleRate, long minSampled, Random random) {
            this.underlying = underlying;
            this.minSampleRate = minSampleRate;
            this.maxSampleRate = maxSampleRate;
            this.minSampled = minSampled;
            this.random = random;
        }

        @Override
        public long estimateSize(T element) throws Exception {
            if (this.sampleNow()) {
                return this.recordSample(this.underlying.estimateSize(element));
            }
            return this.estimate;
        }

        private boolean sampleNow() {
            ++this.totalElements;
            return --this.nextSample < 0L;
        }

        private long recordSample(long value) {
            double rate;
            ++this.sampledElements;
            this.sampledSum += value;
            this.sampledSumSquares += (double)(value * value);
            this.estimate = (long)Math.ceil((double)this.sampledSum / (double)this.sampledElements);
            long target = this.desiredSampleSize();
            this.nextSample = this.sampledElements < this.minSampled || this.sampledElements < target ? 0L : ((rate = SamplingSizeEstimator.cap(this.minSampleRate, this.maxSampleRate, Math.max(1.0 / (double)(this.totalElements - this.minSampled + 1L), (double)target / (double)this.totalElements))) == 1.0 ? 0L : (long)Math.floor(Math.log(this.random.nextDouble()) / Math.log(1.0 - rate)));
            return value;
        }

        private static double cap(double min2, double max, double value) {
            return Math.min(max, Math.max(min2, value));
        }

        private long desiredSampleSize() {
            double mean = (double)this.sampledSum / (double)this.sampledElements;
            double sumSquareDiff = this.sampledSumSquares - 2.0 * mean * (double)this.sampledSum + (double)this.sampledElements * mean * mean;
            double stddev = Math.sqrt(sumSquareDiff / (double)(this.sampledElements - 1L));
            double sqrtDesiredSamples = 3.0 * stddev / (0.25 * mean);
            return (long)Math.ceil(sqrtDesiredSamples * sqrtDesiredSamples);
        }
    }

    static interface GroupingTableEntry<K, InputT, AccumT> {
        public K getKey();

        public AccumT getValue();

        public void add(InputT var1) throws Exception;

        public long getSize();

        public void compact() throws Exception;
    }

    public static class ValueCombiner<K, InputT, AccumT, OutputT>
    implements Combiner<WindowedValue<K>, InputT, AccumT, OutputT> {
        private final GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn;
        private final SideInputReader sideInputReader;
        private final PipelineOptions options;

        private ValueCombiner(GlobalCombineFnRunner<InputT, AccumT, OutputT> combineFn, SideInputReader sideInputReader, PipelineOptions options) {
            this.combineFn = combineFn;
            this.sideInputReader = sideInputReader;
            this.options = options;
        }

        @Override
        public AccumT createAccumulator(WindowedValue<K> windowedKey) {
            return this.combineFn.createAccumulator(this.options, this.sideInputReader, windowedKey.getWindows());
        }

        @Override
        public AccumT add(WindowedValue<K> windowedKey, AccumT accumulator, InputT value) {
            return this.combineFn.addInput(accumulator, value, this.options, this.sideInputReader, windowedKey.getWindows());
        }

        @Override
        public AccumT merge(WindowedValue<K> windowedKey, Iterable<AccumT> accumulators) {
            return this.combineFn.mergeAccumulators(accumulators, this.options, this.sideInputReader, windowedKey.getWindows());
        }

        @Override
        public AccumT compact(WindowedValue<K> windowedKey, AccumT accumulator) {
            return this.combineFn.compact(accumulator, this.options, this.sideInputReader, windowedKey.getWindows());
        }

        @Override
        public OutputT extract(WindowedValue<K> windowedKey, AccumT accumulator) {
            return this.combineFn.extractOutput(accumulator, this.options, this.sideInputReader, windowedKey.getWindows());
        }
    }

    public static interface Combiner<K, InputT, AccumT, OutputT> {
        public AccumT createAccumulator(K var1);

        public AccumT add(K var1, AccumT var2, InputT var3);

        public AccumT merge(K var1, Iterable<AccumT> var2);

        public AccumT compact(K var1, AccumT var2);

        public OutputT extract(K var1, AccumT var2);
    }

    public static class WindowedPairInfo
    implements PairInfo {
        private static WindowedPairInfo theInstance = new WindowedPairInfo();

        public static WindowedPairInfo create() {
            return theInstance;
        }

        private WindowedPairInfo() {
        }

        @Override
        public Object getKeyFromInputPair(Object pair) {
            WindowedValue windowedKv = (WindowedValue)pair;
            return windowedKv.withValue(((KV)windowedKv.getValue()).getKey());
        }

        @Override
        public Object getValueFromInputPair(Object pair) {
            WindowedValue windowedKv = (WindowedValue)pair;
            return ((KV)windowedKv.getValue()).getValue();
        }

        @Override
        public Object makeOutputPair(Object key, Object values) {
            WindowedValue windowedKey = (WindowedValue)key;
            return windowedKey.withValue(KV.of(windowedKey.getValue(), values));
        }
    }

    public static interface PairInfo {
        public Object getKeyFromInputPair(Object var1);

        public Object getValueFromInputPair(Object var1);

        public Object makeOutputPair(Object var1, Object var2);
    }

    public static class CoderSizeEstimator<T>
    implements SizeEstimator<T> {
        final Coder<T> coder;

        CoderSizeEstimator(Coder<T> coder) {
            this.coder = coder;
        }

        @Override
        public long estimateSize(T value) throws Exception {
            Observer observer = new Observer();
            this.coder.registerByteSizeObserver(value, observer);
            if (!observer.getIsLazy()) {
                observer.advance();
                return observer.observedSize;
            }
            CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
            this.coder.encode(value, os);
            return os.getCount();
        }

        private static class Observer
        extends ElementByteSizeObserver {
            private long observedSize = 0L;

            private Observer() {
            }

            @Override
            protected void reportElementSize(long elementSize) {
                this.observedSize += elementSize;
            }
        }
    }

    public static interface SizeEstimator<T> {
        public long estimateSize(T var1) throws Exception;
    }

    public static class WindowingCoderGroupingKeyCreator<K>
    implements GroupingKeyCreator<WindowedValue<K>> {
        private static final Instant ignored = BoundedWindow.TIMESTAMP_MIN_VALUE;
        private final Coder<K> coder;

        WindowingCoderGroupingKeyCreator(Coder<K> coder) {
            this.coder = coder;
        }

        @Override
        public Object createGroupingKey(WindowedValue<K> key) {
            return WindowedValue.of(this.coder.structuralValue(key.getValue()), ignored, key.getWindows(), key.getPane());
        }
    }

    public static interface GroupingKeyCreator<K> {
        public Object createGroupingKey(K var1) throws Exception;
    }
}

