/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.OutputWindowedValue;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.ReduceFnRunner;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SystemReduceFn;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.RehydratedComponents;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.TriggerTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.fnexecution.wire.WireCoders;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.StructuralKey;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.portable.BundleFactory;
import org.apache.beam.runners.direct.portable.CommittedBundle;
import org.apache.beam.runners.direct.portable.CopyOnAccessInMemoryStateInternals;
import org.apache.beam.runners.direct.portable.DirectGroupByKey;
import org.apache.beam.runners.direct.portable.DirectTimerInternals;
import org.apache.beam.runners.direct.portable.StepStateAndTimers;
import org.apache.beam.runners.direct.portable.StepTransformResult;
import org.apache.beam.runners.direct.portable.TransformEvaluator;
import org.apache.beam.runners.direct.portable.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.portable.TransformResult;
import org.apache.beam.runners.direct.portable.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class GroupAlsoByWindowEvaluatorFactory
implements TransformEvaluatorFactory {
    private final BundleFactory bundleFactory;
    private final ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph;
    private final RunnerApi.Components components;
    private final StepStateAndTimers.Provider stp;

    GroupAlsoByWindowEvaluatorFactory(ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph, RunnerApi.Components components, BundleFactory bundleFactory, StepStateAndTimers.Provider stp) {
        this.bundleFactory = bundleFactory;
        this.graph = graph;
        this.components = components;
        this.stp = stp;
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(PipelineNode.PTransformNode application, CommittedBundle<?> inputBundle) {
        TransformEvaluator evaluator = this.createEvaluator(application, inputBundle);
        return evaluator;
    }

    @Override
    public void cleanup() {
    }

    private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(PipelineNode.PTransformNode application, CommittedBundle<KeyedWorkItem<K, V>> inputBundle) {
        StructuralKey<?> key = inputBundle.getKey();
        return new GroupAlsoByWindowEvaluator(this.bundleFactory, key, application, this.graph, this.components, this.stp.forStepAndKey(application, key));
    }

    private static class OutputWindowedValueToBundle<K, V>
    implements OutputWindowedValue<KV<K, Iterable<V>>> {
        private final UncommittedBundle<KV<K, Iterable<V>>> bundle;

        private OutputWindowedValueToBundle(UncommittedBundle<KV<K, Iterable<V>>> bundle) {
            this.bundle = bundle;
        }

        @Override
        public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            this.bundle.add(WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
        }

        @Override
        public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
            throw new UnsupportedOperationException(String.format("%s should not use tagged outputs", "DirectGroupAlsoByWindow"));
        }
    }

    private static class GroupAlsoByWindowEvaluator<K, V>
    implements TransformEvaluator<KeyedWorkItem<K, V>> {
        private final BundleFactory bundleFactory;
        private final PipelineNode.PTransformNode application;
        private final PipelineNode.PCollectionNode outputCollection;
        private final StructuralKey<?> key;
        private final CopyOnAccessInMemoryStateInternals<K> stateInternals;
        private final DirectTimerInternals timerInternals;
        private final WindowingStrategy<?, BoundedWindow> windowingStrategy;
        private final Collection<UncommittedBundle<?>> outputBundles;
        private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn;
        private final Counter droppedDueToLateness;

        private GroupAlsoByWindowEvaluator(BundleFactory bundleFactory, StructuralKey<K> key, PipelineNode.PTransformNode application, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph, RunnerApi.Components components, StepStateAndTimers<K> stp) {
            Coder valueCoder;
            this.bundleFactory = bundleFactory;
            this.application = application;
            this.outputCollection = (PipelineNode.PCollectionNode)Iterables.getOnlyElement(graph.getProduced(application));
            this.key = key;
            this.stateInternals = stp.stateInternals();
            this.timerInternals = stp.timerInternals();
            PipelineNode.PCollectionNode inputCollection = (PipelineNode.PCollectionNode)Iterables.getOnlyElement(graph.getPerElementInputs(application));
            try {
                this.windowingStrategy = RehydratedComponents.forComponents(components).getWindowingStrategy(inputCollection.getPCollection().getWindowingStrategyId());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            this.outputBundles = new ArrayList();
            try {
                Coder windowedValueCoder = WireCoders.instantiateRunnerWireCoder(this.outputCollection, components);
                Preconditions.checkArgument((boolean)(windowedValueCoder instanceof WindowedValue.WindowedValueCoder));
                Coder outputKvCoder = ((WindowedValue.WindowedValueCoder)windowedValueCoder).getValueCoder();
                Preconditions.checkArgument((boolean)(outputKvCoder instanceof KvCoder));
                Coder iterVCoder = ((KvCoder)outputKvCoder).getValueCoder();
                Preconditions.checkArgument((boolean)(iterVCoder instanceof IterableLikeCoder));
                valueCoder = ((IterableLikeCoder)iterVCoder).getElemCoder();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            this.reduceFn = SystemReduceFn.buffering(valueCoder);
            this.droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class, (String)"DroppedDueToLateness");
        }

        @Override
        public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
            KeyedWorkItem workItem = (KeyedWorkItem)element.getValue();
            UncommittedBundle bundle = this.bundleFactory.createKeyedBundle(this.key, this.outputCollection);
            this.outputBundles.add(bundle);
            RunnerApi.Trigger runnerApiTrigger = TriggerTranslation.toProto(this.windowingStrategy.getTrigger());
            ReduceFnRunner reduceFnRunner = new ReduceFnRunner(workItem.key(), this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(runnerApiTrigger)), this.stateInternals, this.timerInternals, new OutputWindowedValueToBundle(bundle), null, this.reduceFn, null);
            reduceFnRunner.processElements(this.dropExpiredWindows(workItem.key(), workItem.elementsIterable(), this.timerInternals));
            reduceFnRunner.onTimers(workItem.timersIterable());
            reduceFnRunner.persist();
        }

        @Override
        public TransformResult<KeyedWorkItem<K, V>> finishBundle() throws Exception {
            CopyOnAccessInMemoryStateInternals<K> state = this.stateInternals.commit();
            return StepTransformResult.withHold(this.application, state.getEarliestWatermarkHold()).withState(state).addOutput(this.outputBundles).withTimerUpdate(this.timerInternals.getTimerUpdate()).build();
        }

        Iterable<WindowedValue<V>> dropExpiredWindows(K key, Iterable<WindowedValue<V>> elements, TimerInternals timerInternals) {
            return StreamSupport.stream(elements.spliterator(), false).flatMap(wv -> StreamSupport.stream(wv.explodeWindows().spliterator(), false)).filter(input -> {
                BoundedWindow window = (BoundedWindow)Iterables.getOnlyElement((Iterable)input.getWindows());
                boolean expired = window.maxTimestamp().plus((ReadableDuration)this.windowingStrategy.getAllowedLateness()).isBefore((ReadableInstant)timerInternals.currentInputWatermarkTime());
                if (expired) {
                    this.droppedDueToLateness.inc();
                    WindowTracing.debug((String)"{}: Dropping element at {} for key: {}; window: {} since it is too far behind inputWatermark: {}", (Object[])new Object[]{DirectGroupByKey.DirectGroupAlsoByWindow.class.getSimpleName(), input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime()});
                }
                return !expired;
            }).collect(Collectors.toList());
        }
    }
}

