/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.expressions.Aggregator;
import org.joda.time.Instant;
import scala.Tuple2;

class AggregatorCombiner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends Aggregator<WindowedValue<KV<K, InputT>>, Iterable<WindowedValue<AccumT>>, Iterable<WindowedValue<OutputT>>> {
    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
    private WindowingStrategy<InputT, W> windowingStrategy;
    private TimestampCombiner timestampCombiner;
    private IterableCoder<WindowedValue<AccumT>> accumulatorCoder;
    private IterableCoder<WindowedValue<OutputT>> outputCoder;

    public AggregatorCombiner(Combine.CombineFn<InputT, AccumT, OutputT> combineFn, WindowingStrategy<?, ?> windowingStrategy, Coder<AccumT> accumulatorCoder, Coder<OutputT> outputCoder) {
        this.combineFn = combineFn;
        this.windowingStrategy = windowingStrategy;
        this.timestampCombiner = windowingStrategy.getTimestampCombiner();
        this.accumulatorCoder = IterableCoder.of((Coder)WindowedValue.FullWindowedValueCoder.of(accumulatorCoder, (Coder)windowingStrategy.getWindowFn().windowCoder()));
        this.outputCoder = IterableCoder.of((Coder)WindowedValue.FullWindowedValueCoder.of(outputCoder, (Coder)windowingStrategy.getWindowFn().windowCoder()));
    }

    public Iterable<WindowedValue<AccumT>> zero() {
        return new ArrayList<WindowedValue<AccumT>>();
    }

    private Iterable<WindowedValue<AccumT>> createAccumulator(WindowedValue<KV<K, InputT>> inputWv) {
        Object accumulator = this.combineFn.createAccumulator();
        Object accumT = this.combineFn.addInput(accumulator, ((KV)inputWv.getValue()).getValue());
        return Lists.newArrayList((Object[])new WindowedValue[]{WindowedValue.of((Object)accumT, (Instant)inputWv.getTimestamp(), (Collection)inputWv.getWindows(), (PaneInfo)inputWv.getPane())});
    }

    public Iterable<WindowedValue<AccumT>> reduce(Iterable<WindowedValue<AccumT>> accumulators, WindowedValue<KV<K, InputT>> inputWv) {
        return this.merge(accumulators, this.createAccumulator(inputWv));
    }

    public Iterable<WindowedValue<AccumT>> merge(Iterable<WindowedValue<AccumT>> accumulators1, Iterable<WindowedValue<AccumT>> accumulators2) {
        Map<W, W> windowToMergeResult;
        Iterable accumulators = Iterables.concat(accumulators1, accumulators2);
        Set<W> accumulatorsWindows = this.collectAccumulatorsWindows(accumulators);
        try {
            windowToMergeResult = this.mergeWindows(this.windowingStrategy, accumulatorsWindows);
        }
        catch (Exception e) {
            throw new RuntimeException("Unable to merge accumulators windows", e);
        }
        HashMap<BoundedWindow, ArrayList> mergedWindowToAccumulators = new HashMap<BoundedWindow, ArrayList>();
        for (WindowedValue accumulatorWv : accumulators) {
            for (BoundedWindow accumulatorWindow : accumulatorWv.getWindows()) {
                BoundedWindow mergedWindowForAccumulator = (BoundedWindow)windowToMergeResult.get(accumulatorWindow);
                mergedWindowForAccumulator = mergedWindowForAccumulator == null ? accumulatorWindow : mergedWindowForAccumulator;
                Tuple2 accumAndInstant = new Tuple2(accumulatorWv.getValue(), (Object)this.timestampCombiner.assign(mergedWindowForAccumulator, this.windowingStrategy.getWindowFn().getOutputTime(accumulatorWv.getTimestamp(), mergedWindowForAccumulator)));
                if (mergedWindowToAccumulators.get(mergedWindowForAccumulator) == null) {
                    mergedWindowToAccumulators.put(mergedWindowForAccumulator, Lists.newArrayList((Object[])new Tuple2[]{accumAndInstant}));
                    continue;
                }
                ((List)mergedWindowToAccumulators.get(mergedWindowForAccumulator)).add(accumAndInstant);
            }
        }
        ArrayList<WindowedValue<AccumT>> result = new ArrayList<WindowedValue<AccumT>>();
        for (Map.Entry entry : mergedWindowToAccumulators.entrySet()) {
            BoundedWindow mergedWindow = (BoundedWindow)entry.getKey();
            List accumsAndInstantsForMergedWindow = (List)entry.getValue();
            Object first = this.combineFn.createAccumulator();
            Iterable accumulatorsToMerge = Iterables.concat(Collections.singleton(first), (Iterable)accumsAndInstantsForMergedWindow.stream().map(x -> x._1()).collect(Collectors.toList()));
            result.add(WindowedValue.of((Object)this.combineFn.mergeAccumulators(accumulatorsToMerge), (Instant)this.timestampCombiner.combine((Iterable)accumsAndInstantsForMergedWindow.stream().map(x -> (Instant)x._2()).collect(Collectors.toList())), (BoundedWindow)mergedWindow, (PaneInfo)PaneInfo.NO_FIRING));
        }
        return result;
    }

    public Iterable<WindowedValue<OutputT>> finish(Iterable<WindowedValue<AccumT>> reduction) {
        ArrayList<WindowedValue<OutputT>> result = new ArrayList<WindowedValue<OutputT>>();
        for (WindowedValue<AccumT> windowedValue : reduction) {
            result.add(windowedValue.withValue(this.combineFn.extractOutput(windowedValue.getValue())));
        }
        return result;
    }

    public Encoder<Iterable<WindowedValue<AccumT>>> bufferEncoder() {
        return EncoderHelpers.fromBeamCoder(this.accumulatorCoder);
    }

    public Encoder<Iterable<WindowedValue<OutputT>>> outputEncoder() {
        return EncoderHelpers.fromBeamCoder(this.outputCoder);
    }

    private Set<W> collectAccumulatorsWindows(Iterable<WindowedValue<AccumT>> accumulators) {
        HashSet<BoundedWindow> windows = new HashSet<BoundedWindow>();
        for (WindowedValue<AccumT> accumulator : accumulators) {
            Iterator iterator = accumulator.getWindows().iterator();
            while (iterator.hasNext()) {
                BoundedWindow untypedWindow;
                BoundedWindow window = untypedWindow = (BoundedWindow)iterator.next();
                windows.add(window);
            }
        }
        return windows;
    }

    private Map<W, W> mergeWindows(WindowingStrategy<InputT, W> windowingStrategy, Set<W> windows) throws Exception {
        WindowFn windowFn = windowingStrategy.getWindowFn();
        if (windowingStrategy.getWindowFn().isNonMerging()) {
            return Collections.emptyMap();
        }
        HashMap windowToMergeResult = new HashMap();
        windowFn.mergeWindows((WindowFn.MergeContext)new MergeContextImpl(windowFn, windows, windowToMergeResult));
        return windowToMergeResult;
    }

    private class MergeContextImpl
    extends WindowFn.MergeContext {
        private Set<W> windows;
        private Map<W, W> windowToMergeResult;

        MergeContextImpl(WindowFn<InputT, W> windowFn, Set<W> windows, Map<W, W> windowToMergeResult) {
            super(windowFn);
            this.windows = windows;
            this.windowToMergeResult = windowToMergeResult;
        }

        public Collection<W> windows() {
            return this.windows;
        }

        public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
            for (BoundedWindow w : toBeMerged) {
                this.windowToMergeResult.put(w, mergeResult);
            }
        }
    }
}

