/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.MergingStateAccessor;
import org.apache.beam.runners.core.ReduceFn;
import org.apache.beam.runners.core.StateMerging;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

class WatermarkHold<W extends BoundedWindow>
implements Serializable {
    @VisibleForTesting
    public static final StateTag<WatermarkHoldState> EXTRA_HOLD_TAG = StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("extra", TimestampCombiner.EARLIEST));
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final TimerInternals timerInternals;
    private final WindowingStrategy<?, W> windowingStrategy;
    private final StateTag<WatermarkHoldState> elementHoldTag;

    public static <W extends BoundedWindow> StateTag<WatermarkHoldState> watermarkHoldTagForTimestampCombiner(TimestampCombiner timestampCombiner) {
        return StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal("hold", timestampCombiner));
    }

    public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) {
        this.timerInternals = timerInternals;
        this.windowingStrategy = windowingStrategy;
        this.elementHoldTag = WatermarkHold.watermarkHoldTagForTimestampCombiner(windowingStrategy.getTimestampCombiner());
    }

    public @Nullable Instant addHolds(ReduceFn.ProcessValueContext context) {
        Instant hold = this.addElementHold(context.timestamp(), context);
        if (hold == null) {
            hold = this.addGarbageCollectionHold(context, false);
        }
        return hold;
    }

    private Instant shift(Instant timestamp, W window) {
        Instant shifted = this.windowingStrategy.getTimestampCombiner().assign((BoundedWindow)window, timestamp);
        if (shifted.isBefore(timestamp)) {
            throw new IllegalStateException(String.format("TimestampCombiner moved element from %s to earlier time %s for window %s", BoundedWindow.formatTimestamp(timestamp), BoundedWindow.formatTimestamp(shifted), window));
        }
        Preconditions.checkState(timestamp.isAfter(((BoundedWindow)window).maxTimestamp()) || !shifted.isAfter(((BoundedWindow)window).maxTimestamp()), "TimestampCombiner moved element from %s to %s which is beyond end of window %s", (Object)timestamp, (Object)shifted, window);
        return shifted;
    }

    private @Nullable Instant addElementHold(Instant timestamp, ReduceFn.Context context) {
        boolean tooLate;
        String which;
        Instant elementHold = this.shift(timestamp, context.window());
        Instant outputWM = this.timerInternals.currentOutputWatermarkTime();
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        if (outputWM != null && elementHold.isBefore(outputWM)) {
            which = "too late to effect output watermark";
            tooLate = true;
        } else if (((BoundedWindow)context.window()).maxTimestamp().isBefore(inputWM)) {
            which = "too late for end-of-window timer";
            tooLate = true;
        } else {
            which = "on time";
            tooLate = false;
            Preconditions.checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Element hold %s is beyond end-of-time", (Object)elementHold);
            context.state().access(this.elementHoldTag).add(elementHold);
        }
        WindowTracing.trace("WatermarkHold.addHolds: element hold at {} is {} for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", elementHold, which, context.key(), context.window(), inputWM, outputWM);
        return tooLate ? null : elementHold;
    }

    private @Nullable Instant addGarbageCollectionHold(ReduceFn.Context context, boolean paneIsEmpty) {
        Instant outputWM = this.timerInternals.currentOutputWatermarkTime();
        Instant inputWM = this.timerInternals.currentInputWatermarkTime();
        Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), this.windowingStrategy);
        if (gcHold.isBefore(inputWM)) {
            WindowTracing.trace("{}.addGarbageCollectionHold: gc hold would be before the input watermark for key:{}; window: {}; inputWatermark: {}; outputWatermark: {}", this.getClass().getSimpleName(), context.key(), context.window(), inputWM, outputWM);
            return null;
        }
        if (paneIsEmpty && context.windowingStrategy().getClosingBehavior() == Window.ClosingBehavior.FIRE_IF_NON_EMPTY) {
            WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM);
            return null;
        }
        if (!gcHold.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L));
        }
        Preconditions.checkState(!gcHold.isBefore(inputWM), "Garbage collection hold %s cannot be before input watermark %s", (Object)gcHold, (Object)inputWM);
        Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), "Garbage collection hold %s is beyond end-of-time", (Object)gcHold);
        context.state().access(EXTRA_HOLD_TAG).add(gcHold);
        WindowTracing.trace("WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM);
        return gcHold;
    }

    @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"})
    public void prefetchOnMerge(MergingStateAccessor<?, W> context) {
        Map<W, WatermarkHoldState> map = context.accessInEachMergingWindow(this.elementHoldTag);
        WatermarkHoldState result = context.access(this.elementHoldTag);
        if (map.isEmpty()) {
            return;
        }
        if (map.size() == 1 && map.values().contains(result) && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp()) {
            return;
        }
        if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
            return;
        }
        for (WatermarkHoldState source : map.values()) {
            source.readLater();
        }
    }

    @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"})
    public void onMerge(ReduceFn.OnMergeContext context) {
        WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        Collection<WatermarkHoldState> sources = context.state().accessInEachMergingWindow(this.elementHoldTag).values();
        WatermarkHoldState result = context.state().access(this.elementHoldTag);
        if (!(sources.isEmpty() || sources.size() == 1 && sources.contains(result) && result.getTimestampCombiner().dependsOnlyOnEarliestTimestamp())) {
            if (result.getTimestampCombiner().dependsOnlyOnWindow()) {
                for (WatermarkHoldState source : sources) {
                    source.clear();
                }
                this.addElementHold(BoundedWindow.TIMESTAMP_MIN_VALUE, context);
            } else {
                for (WatermarkHoldState source : sources) {
                    source.readLater();
                }
                Instant mergedHold = null;
                for (ReadableState readableState : sources) {
                    Instant sourceOutputTime = (Instant)readableState.read();
                    if (sourceOutputTime == null) continue;
                    if (mergedHold == null) {
                        mergedHold = sourceOutputTime;
                        continue;
                    }
                    mergedHold = result.getTimestampCombiner().merge((BoundedWindow)context.window(), mergedHold, sourceOutputTime);
                }
                for (WatermarkHoldState watermarkHoldState : sources) {
                    watermarkHoldState.clear();
                }
                if (mergedHold != null) {
                    result.add(mergedHold);
                }
            }
        }
        StateMerging.clear(context.state(), EXTRA_HOLD_TAG);
        this.addGarbageCollectionHold(context, false);
    }

    public void prefetchExtract(ReduceFn.Context context) {
        context.state().access(this.elementHoldTag).readLater();
        context.state().access(EXTRA_HOLD_TAG).readLater();
    }

    public ReadableState<OldAndNewHolds> extractAndRelease(final ReduceFn.Context context, final boolean isFinished) {
        WindowTracing.debug("WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        final WatermarkHoldState elementHoldState = context.state().access(this.elementHoldTag);
        final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG);
        return new ReadableState<OldAndNewHolds>(){

            @Override
            public ReadableState<OldAndNewHolds> readLater() {
                elementHoldState.readLater();
                extraHoldState.readLater();
                return this;
            }

            @Override
            public OldAndNewHolds read() {
                @Nullable Instant elementHold = (Instant)elementHoldState.read();
                @Nullable Instant extraHold = (Instant)extraHoldState.read();
                Instant oldHold = elementHold == null ? extraHold : (extraHold == null ? elementHold : (elementHold.isBefore(extraHold) ? elementHold : extraHold));
                if (oldHold == null || oldHold.isAfter(((BoundedWindow)context.window()).maxTimestamp())) {
                    WindowTracing.debug("WatermarkHold.extractAndRelease.read: clipping from {} to end of window for key:{}; window:{}", oldHold, context.key(), context.window());
                    oldHold = ((BoundedWindow)context.window()).maxTimestamp();
                }
                WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window());
                elementHoldState.clear();
                extraHoldState.clear();
                Instant newHold = null;
                if (!isFinished) {
                    newHold = WatermarkHold.this.addGarbageCollectionHold(context, true);
                }
                return new OldAndNewHolds(oldHold, newHold);
            }
        };
    }

    public void clearHolds(ReduceFn.Context context) {
        WindowTracing.debug("WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", context.key(), context.window(), this.timerInternals.currentInputWatermarkTime(), this.timerInternals.currentOutputWatermarkTime());
        context.state().access(this.elementHoldTag).clear();
        context.state().access(EXTRA_HOLD_TAG).clear();
    }

    public @Nullable Instant getDataCurrent(ReduceFn.Context context) {
        return (Instant)context.state().access(this.elementHoldTag).read();
    }

    public static class OldAndNewHolds {
        public final Instant oldHold;
        public final @Nullable Instant newHold;

        public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
            this.oldHold = oldHold;
            this.newHold = newHold;
        }
    }
}

