/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;
import scala.Tuple2;

class AggregatorCombiner<@UnknownKeyFor K, @UnknownKeyFor InputT, @UnknownKeyFor AccumT, @UnknownKeyFor OutputT, @UnknownKeyFor W extends @UnknownKeyFor @NonNull @Initialized BoundedWindow>
extends Aggregator<WindowedValue<KV<K, InputT>>, Iterable<WindowedValue<AccumT>>, Iterable<WindowedValue<OutputT>>> {
    private final // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
    private @UnknownKeyFor @NonNull @Initialized WindowingStrategy<InputT, W> windowingStrategy;
    private @UnknownKeyFor @NonNull @Initialized TimestampCombiner timestampCombiner;
    private @UnknownKeyFor @NonNull @Initialized Coder<AccumT> accumulatorCoder;
    private @UnknownKeyFor @NonNull @Initialized IterableCoder<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> bufferEncoder;
    private @UnknownKeyFor @NonNull @Initialized IterableCoder<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>> outputCoder;

    public AggregatorCombiner(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Combine.CombineFn<InputT, AccumT, OutputT> combineFn, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized WindowingStrategy<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> windowingStrategy, @UnknownKeyFor @NonNull @Initialized Coder<AccumT> accumulatorCoder, @UnknownKeyFor @NonNull @Initialized Coder<OutputT> outputCoder) {
        this.combineFn = combineFn;
        this.windowingStrategy = windowingStrategy;
        this.timestampCombiner = windowingStrategy.getTimestampCombiner();
        this.accumulatorCoder = accumulatorCoder;
        this.bufferEncoder = IterableCoder.of((Coder)WindowedValue.FullWindowedValueCoder.of(accumulatorCoder, (Coder)windowingStrategy.getWindowFn().windowCoder()));
        this.outputCoder = IterableCoder.of((Coder)WindowedValue.FullWindowedValueCoder.of(outputCoder, (Coder)windowingStrategy.getWindowFn().windowCoder()));
    }

    public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> zero() {
        return new ArrayList<WindowedValue<AccumT>>();
    }

    private @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> createAccumulator(@UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>> inputWv) {
        Object accumulator = this.combineFn.createAccumulator();
        Object accumT = this.combineFn.addInput(accumulator, ((KV)inputWv.getValue()).getValue());
        return Lists.newArrayList((Object[])new WindowedValue[]{WindowedValue.of((Object)accumT, (Instant)inputWv.getTimestamp(), (Collection)inputWv.getWindows(), (PaneInfo)inputWv.getPane())});
    }

    public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> reduce(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> accumulators, @UnknownKeyFor @NonNull @Initialized WindowedValue<@UnknownKeyFor @NonNull @Initialized KV<K, InputT>> inputWv) {
        return this.merge(accumulators, this.createAccumulator(inputWv));
    }

    public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> merge(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> accumulators1, @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> accumulators2) {
        Map<W, W> windowToMergeResult;
        Iterable accumulators = Iterables.concat(accumulators1, accumulators2);
        Set<W> accumulatorsWindows = this.collectAccumulatorsWindows(accumulators);
        try {
            windowToMergeResult = this.mergeWindows(this.windowingStrategy, accumulatorsWindows);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to merge accumulators windows", e);
        }
        HashMap<BoundedWindow, ArrayList> mergedWindowToAccumulators = new HashMap<BoundedWindow, ArrayList>();
        for (WindowedValue accumulatorWv : accumulators) {
            byte[] encodedAccumT = null;
            if (accumulatorWv.getWindows().size() > 1) {
                try {
                    encodedAccumT = CoderUtils.encodeToByteArray(this.accumulatorCoder, (Object)accumulatorWv.getValue());
                }
                catch (CoderException e) {
                    throw new RuntimeException(String.format("Unable to encode accumulator %s with coder %s.", accumulatorWv.getValue(), this.accumulatorCoder), e);
                }
            }
            for (BoundedWindow accumulatorWindow : accumulatorWv.getWindows()) {
                Object accumT;
                BoundedWindow mergedWindowForAccumulator = (BoundedWindow)windowToMergeResult.get(accumulatorWindow);
                BoundedWindow boundedWindow = mergedWindowForAccumulator = mergedWindowForAccumulator == null ? accumulatorWindow : mergedWindowForAccumulator;
                if (encodedAccumT != null) {
                    try {
                        accumT = CoderUtils.decodeFromByteArray(this.accumulatorCoder, (byte[])encodedAccumT);
                    }
                    catch (CoderException e) {
                        throw new RuntimeException(String.format("Unable to encode accumulator %s with coder %s.", accumulatorWv.getValue(), this.accumulatorCoder), e);
                    }
                } else {
                    accumT = accumulatorWv.getValue();
                }
                Tuple2 accumAndInstant = new Tuple2(accumT, (Object)this.timestampCombiner.assign(mergedWindowForAccumulator, accumulatorWv.getTimestamp()));
                if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null) {
                    mergedWindowToAccumulators.put(mergedWindowForAccumulator, Lists.newArrayList((Object[])new Tuple2[]{accumAndInstant}));
                    continue;
                }
                ((List)mergedWindowToAccumulators.get(mergedWindowForAccumulator)).add(accumAndInstant);
            }
        }
        ArrayList<WindowedValue<AccumT>> result = new ArrayList<WindowedValue<AccumT>>();
        for (Map.Entry entry : mergedWindowToAccumulators.entrySet()) {
            BoundedWindow mergedWindow = (BoundedWindow)entry.getKey();
            List accumsAndInstantsForMergedWindow = (List)entry.getValue();
            Object first = this.combineFn.createAccumulator();
            Iterable accumulatorsToMerge = Iterables.concat(Collections.singleton(first), (Iterable)accumsAndInstantsForMergedWindow.stream().map(x -> x._1()).collect(Collectors.toList()));
            result.add(WindowedValue.of((Object)this.combineFn.mergeAccumulators(accumulatorsToMerge), (Instant)this.timestampCombiner.combine((Iterable)accumsAndInstantsForMergedWindow.stream().map(x -> (Instant)x._2()).collect(Collectors.toList())), (BoundedWindow)mergedWindow, (PaneInfo)PaneInfo.NO_FIRING));
        }
        return result;
    }

    public @UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>> finish(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> reduction) {
        ArrayList<WindowedValue<OutputT>> result = new ArrayList<WindowedValue<OutputT>>();
        for (WindowedValue<AccumT> windowedValue : reduction) {
            result.add(windowedValue.withValue(this.combineFn.extractOutput(windowedValue.getValue())));
        }
        return result;
    }

    public @UnknownKeyFor @NonNull @Initialized Encoder<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>>> bufferEncoder() {
        return EncoderHelpers.fromBeamCoder(this.bufferEncoder);
    }

    public @UnknownKeyFor @NonNull @Initialized Encoder<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<OutputT>>> outputEncoder() {
        return EncoderHelpers.fromBeamCoder(this.outputCoder);
    }

    private @UnknownKeyFor @NonNull @Initialized Set<W> collectAccumulatorsWindows(@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized WindowedValue<AccumT>> accumulators) {
        HashSet<BoundedWindow> windows = new HashSet<BoundedWindow>();
        for (WindowedValue<AccumT> accumulator : accumulators) {
            Iterator iterator = accumulator.getWindows().iterator();
            while (iterator.hasNext()) {
                BoundedWindow untypedWindow;
                BoundedWindow window = untypedWindow = (BoundedWindow)iterator.next();
                windows.add(window);
            }
        }
        return windows;
    }

    private @UnknownKeyFor @NonNull @Initialized Map<W, W> mergeWindows(@UnknownKeyFor @NonNull @Initialized WindowingStrategy<InputT, W> windowingStrategy, @UnknownKeyFor @NonNull @Initialized Set<W> windows) throws @UnknownKeyFor @NonNull @Initialized Exception {
        WindowFn windowFn = windowingStrategy.getWindowFn();
        if (!windowingStrategy.needsMerge()) {
            return Collections.emptyMap();
        }
        HashMap windowToMergeResult = new HashMap();
        windowFn.mergeWindows((WindowFn.MergeContext)new MergeContextImpl(windowFn, windows, windowToMergeResult));
        return windowToMergeResult;
    }

    private class MergeContextImpl
    extends WindowFn.MergeContext {
        private @UnknownKeyFor @NonNull @Initialized Set<W> windows;
        private @UnknownKeyFor @NonNull @Initialized Map<W, W> windowToMergeResult;

        MergeContextImpl(@UnknownKeyFor @NonNull @Initialized WindowFn<InputT, W> windowFn, @UnknownKeyFor @NonNull @Initialized Set<W> windows, Map<W, W> windowToMergeResult) {
            super(windowFn);
            this.windows = windows;
            this.windowToMergeResult = windowToMergeResult;
        }

        public @UnknownKeyFor @NonNull @Initialized Collection<W> windows() {
            return this.windows;
        }

        public void merge(@UnknownKeyFor @NonNull @Initialized Collection<W> toBeMerged, W mergeResult) throws @UnknownKeyFor @NonNull @Initialized Exception {
            for (BoundedWindow w : toBeMerged) {
                this.windowToMergeResult.put(w, mergeResult);
            }
        }
    }
}

