/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import com.google.auto.value.AutoValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItem;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.KeyedWorkItems;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.ExecutionDriver;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.local.PipelineMessageReceiver;
import org.apache.beam.runners.direct.ExecutableGraph;
import org.apache.beam.runners.direct.WatermarkManager;
import org.apache.beam.runners.direct.portable.AutoValue_QuiescenceDriver_WorkUpdate;
import org.apache.beam.runners.direct.portable.BundleProcessor;
import org.apache.beam.runners.direct.portable.CommittedBundle;
import org.apache.beam.runners.direct.portable.CommittedResult;
import org.apache.beam.runners.direct.portable.CompletionCallback;
import org.apache.beam.runners.direct.portable.EvaluationContext;
import org.apache.beam.runners.direct.portable.TransformResult;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class QuiescenceDriver
implements ExecutionDriver {
    private static final Logger LOG = LoggerFactory.getLogger(QuiescenceDriver.class);
    private final EvaluationContext evaluationContext;
    private final ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph;
    private final BundleProcessor<PipelineNode.PCollectionNode, CommittedBundle<?>, PipelineNode.PTransformNode> bundleProcessor;
    private final PipelineMessageReceiver pipelineMessageReceiver;
    private final CompletionCallback defaultCompletionCallback = new TimerIterableCompletionCallback(Collections.emptyList());
    private final Map<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootBundles;
    private final Queue<WorkUpdate> pendingWork = new ConcurrentLinkedQueue<WorkUpdate>();
    private final AtomicReference<ExecutorState> state = new AtomicReference<ExecutorState>(ExecutorState.QUIESCENT);
    private final AtomicLong outstandingWork = new AtomicLong(0L);
    private boolean exceptionThrown = false;

    public static ExecutionDriver create(EvaluationContext context, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph, BundleProcessor<PipelineNode.PCollectionNode, CommittedBundle<?>, PipelineNode.PTransformNode> bundleProcessor, PipelineMessageReceiver messageReceiver, Map<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> initialBundles) {
        return new QuiescenceDriver(context, graph, bundleProcessor, messageReceiver, initialBundles);
    }

    private QuiescenceDriver(EvaluationContext evaluationContext, ExecutableGraph<PipelineNode.PTransformNode, PipelineNode.PCollectionNode> graph, BundleProcessor<PipelineNode.PCollectionNode, CommittedBundle<?>, PipelineNode.PTransformNode> bundleProcessor, PipelineMessageReceiver pipelineMessageReceiver, Map<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootBundles) {
        this.evaluationContext = evaluationContext;
        this.graph = graph;
        this.bundleProcessor = bundleProcessor;
        this.pipelineMessageReceiver = pipelineMessageReceiver;
        this.pendingRootBundles = pendingRootBundles;
    }

    @Override
    public ExecutionDriver.DriverState drive() {
        boolean noWorkOutstanding = this.outstandingWork.get() == 0L;
        ExecutorState startingState = this.state.get();
        if (startingState == ExecutorState.ACTIVE) {
            this.state.compareAndSet(ExecutorState.ACTIVE, ExecutorState.PROCESSING);
        } else if (startingState == ExecutorState.PROCESSING && noWorkOutstanding) {
            this.state.compareAndSet(ExecutorState.PROCESSING, ExecutorState.QUIESCING);
        } else if (startingState == ExecutorState.QUIESCING && noWorkOutstanding) {
            this.state.compareAndSet(ExecutorState.QUIESCING, ExecutorState.QUIESCENT);
        }
        this.fireTimers();
        ArrayList<WorkUpdate> updates = new ArrayList<WorkUpdate>();
        WorkUpdate pendingUpdate = this.pendingWork.poll();
        while (pendingUpdate != null) {
            updates.add(pendingUpdate);
            pendingUpdate = this.pendingWork.poll();
        }
        for (WorkUpdate update : updates) {
            this.applyUpdate(noWorkOutstanding, startingState, update);
        }
        this.addWorkIfNecessary();
        if (this.exceptionThrown) {
            return ExecutionDriver.DriverState.FAILED;
        }
        if (this.evaluationContext.isDone()) {
            return ExecutionDriver.DriverState.SHUTDOWN;
        }
        return ExecutionDriver.DriverState.CONTINUE;
    }

    private void applyUpdate(boolean noWorkOutstanding, ExecutorState startingState, WorkUpdate update) {
        LOG.debug("Executor Update: {}", (Object)update);
        if (update.getBundle().isPresent()) {
            if (ExecutorState.ACTIVE == startingState || ExecutorState.PROCESSING == startingState && noWorkOutstanding) {
                CommittedBundle bundle = (CommittedBundle)update.getBundle().get();
                for (PipelineNode.PTransformNode consumer : update.getConsumers()) {
                    this.outstandingWork.incrementAndGet();
                    this.bundleProcessor.process(bundle, consumer, this.defaultCompletionCallback);
                }
            } else {
                this.pendingWork.offer(update);
            }
        } else if (update.getException().isPresent()) {
            this.pipelineMessageReceiver.failed((Exception)update.getException().get());
            this.exceptionThrown = true;
        }
    }

    private void fireTimers() {
        try {
            for (WatermarkManager.FiredTimers<PipelineNode.PTransformNode> transformTimers : this.evaluationContext.extractFiredTimers()) {
                Collection<TimerInternals.TimerData> delivery = transformTimers.getTimers();
                KeyedWorkItem work = KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
                PipelineNode.PCollectionNode inputPCollection = (PipelineNode.PCollectionNode)Iterables.getOnlyElement(this.graph.getPerElementInputs(transformTimers.getExecutable()));
                CommittedBundle bundle = this.evaluationContext.createKeyedBundle(transformTimers.getKey(), inputPCollection).add(WindowedValue.valueInGlobalWindow(work)).commit(this.evaluationContext.now());
                this.outstandingWork.incrementAndGet();
                this.bundleProcessor.process(bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery));
                this.state.set(ExecutorState.ACTIVE);
            }
        }
        catch (Exception e) {
            LOG.error("Internal Error while delivering timers", (Throwable)e);
            this.pipelineMessageReceiver.failed(e);
            this.exceptionThrown = true;
        }
    }

    private void addWorkIfNecessary() {
        if (this.state.get() == ExecutorState.QUIESCENT) {
            for (Map.Entry<PipelineNode.PTransformNode, ConcurrentLinkedQueue<CommittedBundle<?>>> pendingRootEntry : this.pendingRootBundles.entrySet()) {
                ArrayList bundles = new ArrayList();
                while (!pendingRootEntry.getValue().isEmpty()) {
                    CommittedBundle<?> bundle = pendingRootEntry.getValue().poll();
                    bundles.add(bundle);
                }
                for (CommittedBundle committedBundle : bundles) {
                    this.outstandingWork.incrementAndGet();
                    this.bundleProcessor.process(committedBundle, pendingRootEntry.getKey(), this.defaultCompletionCallback);
                    this.state.set(ExecutorState.ACTIVE);
                }
            }
        }
    }

    @AutoValue
    static abstract class WorkUpdate {
        WorkUpdate() {
        }

        private static WorkUpdate fromBundle(CommittedBundle<?> bundle, Collection<PipelineNode.PTransformNode> consumers) {
            return new AutoValue_QuiescenceDriver_WorkUpdate(Optional.of(bundle), consumers, (Optional<? extends Exception>)Optional.absent());
        }

        private static WorkUpdate fromException(Exception e) {
            return new AutoValue_QuiescenceDriver_WorkUpdate(Optional.absent(), Collections.emptyList(), (Optional<? extends Exception>)Optional.of((Object)e));
        }

        public abstract Optional<? extends CommittedBundle<?>> getBundle();

        public abstract Collection<PipelineNode.PTransformNode> getConsumers();

        public abstract Optional<? extends Exception> getException();
    }

    private class TimerIterableCompletionCallback
    implements CompletionCallback {
        private final Iterable<TimerInternals.TimerData> timers;

        TimerIterableCompletionCallback(Iterable<TimerInternals.TimerData> timers) {
            this.timers = timers;
        }

        @Override
        public final CommittedResult handleResult(CommittedBundle<?> inputBundle, TransformResult<?> result) {
            CommittedResult<PipelineNode.PTransformNode> committedResult = QuiescenceDriver.this.evaluationContext.handleResult(inputBundle, this.timers, result);
            for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
                QuiescenceDriver.this.pendingWork.offer(WorkUpdate.fromBundle(outputBundle, QuiescenceDriver.this.graph.getPerElementConsumers(outputBundle.getPCollection())));
            }
            Optional<CommittedBundle<?>> unprocessedInputs = committedResult.getUnprocessedInputs();
            if (unprocessedInputs.isPresent()) {
                if (inputBundle.getPCollection() == null) {
                    ((ConcurrentLinkedQueue)QuiescenceDriver.this.pendingRootBundles.get(result.getTransform())).offer((CommittedBundle)unprocessedInputs.get());
                } else {
                    QuiescenceDriver.this.pendingWork.offer(WorkUpdate.fromBundle((CommittedBundle)unprocessedInputs.get(), Collections.singleton(committedResult.getExecutable())));
                }
            }
            if (!committedResult.getProducedOutputTypes().isEmpty()) {
                QuiescenceDriver.this.state.set(ExecutorState.ACTIVE);
            }
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
            return committedResult;
        }

        @Override
        public void handleEmpty(PipelineNode.PTransformNode transform) {
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
        }

        @Override
        public final void handleException(CommittedBundle<?> inputBundle, Exception e) {
            QuiescenceDriver.this.pendingWork.offer(WorkUpdate.fromException(e));
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
        }

        @Override
        public void handleError(Error err) {
            QuiescenceDriver.this.outstandingWork.decrementAndGet();
            QuiescenceDriver.this.pipelineMessageReceiver.failed(err);
        }
    }

    private static enum ExecutorState {
        ACTIVE,
        PROCESSING,
        QUIESCING,
        QUIESCENT;

    }
}

