/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.core;

import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.SplittableProcessElementInvoker;
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class OutputAndTimeBoundedSplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT>
extends SplittableProcessElementInvoker<InputT, OutputT, RestrictionT, PositionT, WatermarkEstimatorStateT> {
    private final DoFn<InputT, OutputT> fn;
    private final PipelineOptions pipelineOptions;
    private final OutputWindowedValue<OutputT> output;
    private final SideInputReader sideInputReader;
    private final ScheduledExecutorService executor;
    private final int maxNumOutputs;
    private final Duration maxDuration;
    private final Supplier<DoFn.BundleFinalizer> bundleFinalizer;

    public OutputAndTimeBoundedSplittableProcessElementInvoker(DoFn<InputT, OutputT> fn, PipelineOptions pipelineOptions, OutputWindowedValue<OutputT> output, SideInputReader sideInputReader, ScheduledExecutorService executor, int maxNumOutputs, Duration maxDuration, Supplier<DoFn.BundleFinalizer> bundleFinalizer) {
        this.fn = fn;
        this.pipelineOptions = pipelineOptions;
        this.output = output;
        this.sideInputReader = sideInputReader;
        this.executor = executor;
        this.maxNumOutputs = maxNumOutputs;
        this.maxDuration = maxDuration;
        this.bundleFinalizer = bundleFinalizer;
    }

    @Override
    public SplittableProcessElementInvoker.Result invokeProcessElement(DoFnInvoker<InputT, OutputT> invoker, WindowedValue<InputT> element, final RestrictionTracker<RestrictionT, PositionT> tracker, WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator, final Map<String, PCollectionView<?>> sideInputMapping) {
        final ProcessContext processContext = new ProcessContext(element, tracker, watermarkEstimator);
        DoFn.ProcessContinuation cont = invoker.invokeProcessElement(new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>(){

            @Override
            public String getErrorContext() {
                return OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName();
            }

            @Override
            public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
                return processContext;
            }

            @Override
            public Object sideInput(String tagId) {
                PCollectionView view = (PCollectionView)sideInputMapping.get(tagId);
                if (view == null) {
                    throw new IllegalArgumentException("calling getSideInput() with unknown view");
                }
                return processContext.sideInput(view);
            }

            @Override
            public Object restriction() {
                return tracker.currentRestriction();
            }

            @Override
            public InputT element(DoFn<InputT, OutputT> doFn) {
                return processContext.element();
            }

            @Override
            public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                return processContext.timestamp();
            }

            @Override
            public String timerId(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException("Cannot access timerId as parameter outside of @OnTimer method.");
            }

            @Override
            public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException("Access to time domain not supported in ProcessElement");
            }

            @Override
            public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
                return DoFnOutputReceivers.windowedReceiver(processContext, null);
            }

            @Override
            public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
                throw new UnsupportedOperationException("Not supported in SplittableDoFn");
            }

            @Override
            public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
                return DoFnOutputReceivers.windowedMultiReceiver(processContext, null);
            }

            @Override
            public RestrictionTracker<?, ?> restrictionTracker() {
                return processContext.tracker;
            }

            @Override
            public WatermarkEstimator<?> watermarkEstimator() {
                return processContext.watermarkEstimator;
            }

            @Override
            public PipelineOptions pipelineOptions() {
                return OutputAndTimeBoundedSplittableProcessElementInvoker.this.pipelineOptions;
            }

            @Override
            public DoFn.BundleFinalizer bundleFinalizer() {
                return (DoFn.BundleFinalizer)OutputAndTimeBoundedSplittableProcessElementInvoker.this.bundleFinalizer.get();
            }

            @Override
            public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException("Should not access startBundleContext() from @" + DoFn.ProcessElement.class.getSimpleName());
            }

            @Override
            public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
                throw new IllegalStateException("Should not access finishBundleContext() from @" + DoFn.ProcessElement.class.getSimpleName());
            }
        });
        processContext.cancelScheduledCheckpoint();
        @Nullable KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> residual = processContext.getTakenCheckpoint();
        if (cont.shouldResume()) {
            Preconditions.checkState(!processContext.hasClaimFailed, "After tryClaim() returned false, @ProcessElement must return stop(), but returned resume()");
            if (residual == null) {
                residual = processContext.takeCheckpointNow();
                processContext.tracker.checkDone();
            } else {
                processContext.tracker.checkDone();
            }
        } else {
            processContext.tracker.checkDone();
        }
        if (residual == null) {
            return new SplittableProcessElementInvoker.Result(null, cont, null, null);
        }
        return new SplittableProcessElementInvoker.Result(residual.getKey(), cont, residual.getValue().getKey(), residual.getValue().getValue());
    }

    private class ProcessContext
    extends DoFn.ProcessContext
    implements RestrictionTrackers.ClaimObserver<PositionT> {
        private final WindowedValue<InputT> element;
        private final RestrictionTracker<RestrictionT, PositionT> tracker;
        private final WatermarkEstimators.WatermarkAndStateObserver<WatermarkEstimatorStateT> watermarkEstimator;
        private int numClaimedBlocks;
        private boolean hasClaimFailed;
        private int numOutputs;
        private @Nullable RestrictionT checkpoint;
        private @Nullable KV<Instant, WatermarkEstimatorStateT> residualWatermarkAndState;
        private @Nullable Future<?> scheduledCheckpoint;

        public ProcessContext(WindowedValue<InputT> element, RestrictionTracker<RestrictionT, PositionT> tracker, WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator) {
            super(OutputAndTimeBoundedSplittableProcessElementInvoker.this.fn);
            this.element = element;
            this.tracker = RestrictionTrackers.observe(tracker, this);
            this.watermarkEstimator = WatermarkEstimators.threadSafe(watermarkEstimator);
        }

        @Override
        public void onClaimed(PositionT position) {
            Preconditions.checkState(!this.hasClaimFailed, "Must not call tryClaim() after it has previously returned false");
            if (this.numClaimedBlocks == 0) {
                this.scheduledCheckpoint = OutputAndTimeBoundedSplittableProcessElementInvoker.this.executor.schedule(this::takeCheckpointNow, OutputAndTimeBoundedSplittableProcessElementInvoker.this.maxDuration.getMillis(), TimeUnit.MILLISECONDS);
            }
            ++this.numClaimedBlocks;
        }

        @Override
        public void onClaimFailed(PositionT position) {
            Preconditions.checkState(!this.hasClaimFailed, "Must not call tryClaim() after it has previously returned false");
            this.hasClaimFailed = true;
        }

        void cancelScheduledCheckpoint() {
            if (this.scheduledCheckpoint == null) {
                return;
            }
            this.scheduledCheckpoint.cancel(true);
            try {
                Futures.getUnchecked(this.scheduledCheckpoint);
            }
            catch (CancellationException cancellationException) {
                // empty catch block
            }
        }

        synchronized KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> takeCheckpointNow() {
            if (this.checkpoint == null) {
                this.residualWatermarkAndState = this.watermarkEstimator.getWatermarkAndState();
                SplitResult split = this.tracker.trySplit(0.0);
                if (split != null) {
                    this.checkpoint = Preconditions.checkNotNull(split.getResidual());
                }
            }
            return this.getTakenCheckpoint();
        }

        synchronized @Nullable KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> getTakenCheckpoint() {
            return this.checkpoint == null ? null : KV.of(this.checkpoint, this.residualWatermarkAndState);
        }

        @Override
        public InputT element() {
            return this.element.getValue();
        }

        @Override
        public <T> T sideInput(PCollectionView<T> view) {
            return OutputAndTimeBoundedSplittableProcessElementInvoker.this.sideInputReader.get(view, (BoundedWindow)view.getWindowMappingFn().getSideInputWindow(Iterables.getOnlyElement(this.element.getWindows())));
        }

        @Override
        public Instant timestamp() {
            return this.element.getTimestamp();
        }

        @Override
        public PaneInfo pane() {
            return this.element.getPane();
        }

        @Override
        public PipelineOptions getPipelineOptions() {
            return OutputAndTimeBoundedSplittableProcessElementInvoker.this.pipelineOptions;
        }

        @Override
        public void output(OutputT output) {
            this.outputWithTimestamp(output, this.element.getTimestamp());
        }

        @Override
        public void outputWithTimestamp(OutputT value, Instant timestamp) {
            this.noteOutput();
            if (this.watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
                ((TimestampObservingWatermarkEstimator)((Object)this.watermarkEstimator)).observeTimestamp(timestamp);
            }
            OutputAndTimeBoundedSplittableProcessElementInvoker.this.output.outputWindowedValue(value, timestamp, this.element.getWindows(), this.element.getPane());
        }

        @Override
        public <T> void output(TupleTag<T> tag, T value) {
            this.outputWithTimestamp(tag, value, this.element.getTimestamp());
        }

        @Override
        public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
            this.noteOutput();
            if (this.watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
                ((TimestampObservingWatermarkEstimator)((Object)this.watermarkEstimator)).observeTimestamp(timestamp);
            }
            OutputAndTimeBoundedSplittableProcessElementInvoker.this.output.outputWindowedValue(tag, value, timestamp, this.element.getWindows(), this.element.getPane());
        }

        private void noteOutput() {
            Preconditions.checkState(!this.hasClaimFailed, "Output is not allowed after a failed tryClaim()");
            Preconditions.checkState(this.numClaimedBlocks > 0, "Output is not allowed before tryClaim()");
            ++this.numOutputs;
            if (this.numOutputs >= OutputAndTimeBoundedSplittableProcessElementInvoker.this.maxNumOutputs) {
                this.takeCheckpointNow();
            }
        }
    }
}

