/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.beam_runners_direct_java.runners.core;

import java.util.Collection;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.DoFnRunners;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.OutputWindowedValue;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.ReduceFnRunner;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SideInputReader;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.StateInternalsFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.SystemReduceFn;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternalsFactory;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.TriggerTranslation;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

@SystemDoFnInternal
public class GroupAlsoByWindowViaWindowSetNewDoFn<K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
extends DoFn<RinT, KV<K, OutputT>> {
    private static final long serialVersionUID = 1L;
    private final WindowingStrategy<Object, W> windowingStrategy;
    private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
    private transient StateInternalsFactory<K> stateInternalsFactory;
    private transient TimerInternalsFactory<K> timerInternalsFactory;
    private transient SideInputReader sideInputReader;
    private transient DoFnRunners.OutputManager outputManager;
    private TupleTag<KV<K, OutputT>> mainTag;

    public static <K, InputT, OutputT, W extends BoundedWindow> DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory, TimerInternalsFactory<K> timerInternalsFactory, SideInputReader sideInputReader, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn, DoFnRunners.OutputManager outputManager, TupleTag<KV<K, OutputT>> mainTag) {
        return new GroupAlsoByWindowViaWindowSetNewDoFn(strategy, stateInternalsFactory, timerInternalsFactory, sideInputReader, reduceFn, outputManager, mainTag);
    }

    public GroupAlsoByWindowViaWindowSetNewDoFn(WindowingStrategy<?, W> windowingStrategy, StateInternalsFactory<K> stateInternalsFactory, TimerInternalsFactory<K> timerInternalsFactory, SideInputReader sideInputReader, SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn, DoFnRunners.OutputManager outputManager, TupleTag<KV<K, OutputT>> mainTag) {
        this.timerInternalsFactory = timerInternalsFactory;
        this.sideInputReader = sideInputReader;
        this.outputManager = outputManager;
        this.mainTag = mainTag;
        WindowingStrategy<?, W> noWildcard = windowingStrategy;
        this.windowingStrategy = noWildcard;
        this.reduceFn = reduceFn;
        this.stateInternalsFactory = stateInternalsFactory;
    }

    private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
        return new OutputWindowedValue<KV<K, OutputT>>(){

            @Override
            public void outputWindowedValue(KV<K, OutputT> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                GroupAlsoByWindowViaWindowSetNewDoFn.this.outputManager.output(GroupAlsoByWindowViaWindowSetNewDoFn.this.mainTag, WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
            }

            @Override
            public <AdditionalOutputT> void outputWindowedValue(TupleTag<AdditionalOutputT> tag, AdditionalOutputT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
                GroupAlsoByWindowViaWindowSetNewDoFn.this.outputManager.output(tag, WindowedValue.of(output, (Instant)timestamp, windows, (PaneInfo)pane));
            }
        };
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext c) throws Exception {
        KeyedWorkItem keyedWorkItem = (KeyedWorkItem)c.element();
        Object key = keyedWorkItem.key();
        StateInternals stateInternals = this.stateInternalsFactory.stateInternalsForKey(key);
        TimerInternals timerInternals = this.timerInternalsFactory.timerInternalsForKey(key);
        ReduceFnRunner reduceFnRunner = new ReduceFnRunner(key, this.windowingStrategy, ExecutableTriggerStateMachine.create(TriggerStateMachines.stateMachineForTrigger(TriggerTranslation.toProto(this.windowingStrategy.getTrigger()))), stateInternals, timerInternals, this.outputWindowedValue(), this.sideInputReader, this.reduceFn, c.getPipelineOptions());
        reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
        reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
        reduceFnRunner.persist();
    }
}

