/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.repackaged.direct_java.runners.local.ExecutionDriver;
import org.apache.beam.repackaged.direct_java.runners.local.PipelineMessageReceiver;
import org.apache.beam.runners.direct.BundleProcessor;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.CompletionCallback;
import org.apache.beam.runners.direct.DirectGraph;
import org.apache.beam.runners.direct.DirectTransformExecutor;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ModelEnforcementFactory;
import org.apache.beam.runners.direct.PipelineExecutor;
import org.apache.beam.runners.direct.QuiescenceDriver;
import org.apache.beam.runners.direct.RootProviderRegistry;
import org.apache.beam.runners.direct.StepAndKey;
import org.apache.beam.runners.direct.TransformEvaluatorRegistry;
import org.apache.beam.runners.direct.TransformExecutor;
import org.apache.beam.runners.direct.TransformExecutorFactory;
import org.apache.beam.runners.direct.TransformExecutorService;
import org.apache.beam.runners.direct.TransformExecutorServices;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalListener;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Queues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.KeyForBottom;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ExecutorServiceParallelExecutor
implements PipelineExecutor,
BundleProcessor<PCollection<?>, CommittedBundle<?>, AppliedPTransform<?, ?, ?>> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
    private final @UnknownKeyFor @NonNull @Initialized int targetParallelism;
    private final @UnknownKeyFor @NonNull @Initialized ExecutorService executorService;
    private final @UnknownKeyFor @NonNull @Initialized TransformEvaluatorRegistry registry;
    private final @UnknownKeyFor @NonNull @Initialized EvaluationContext evaluationContext;
    private final @UnknownKeyFor @NonNull @Initialized TransformExecutorFactory executorFactory;
    private final @UnknownKeyFor @NonNull @Initialized TransformExecutorService parallelExecutorService;
    private final @UnknownKeyFor @NonNull @Initialized LoadingCache<@UnknownKeyFor @NonNull @Initialized StepAndKey, @UnknownKeyFor @NonNull @Initialized TransformExecutorService> serialExecutorServices;
    private final @UnknownKeyFor @NonNull @Initialized QueueMessageReceiver visibleUpdates;
    private final @UnknownKeyFor @NonNull @Initialized ExecutorService metricsExecutor;
    private @UnknownKeyFor @NonNull @Initialized AtomicReference<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PipelineResult.State> pipelineState = new AtomicReference<PipelineResult.State>(PipelineResult.State.RUNNING);

    public static @UnknownKeyFor @NonNull @Initialized ExecutorServiceParallelExecutor create(@UnknownKeyFor @NonNull @Initialized int targetParallelism, @UnknownKeyFor @NonNull @Initialized TransformEvaluatorRegistry registry, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized ModelEnforcementFactory>> transformEnforcements, @UnknownKeyFor @NonNull @Initialized EvaluationContext context, @UnknownKeyFor @NonNull @Initialized ExecutorService metricsExecutor) {
        return new ExecutorServiceParallelExecutor(targetParallelism, registry, transformEnforcements, context, metricsExecutor);
    }

    private ExecutorServiceParallelExecutor(@UnknownKeyFor @NonNull @Initialized int targetParallelism, @UnknownKeyFor @NonNull @Initialized TransformEvaluatorRegistry registry, @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Collection<@UnknownKeyFor @NonNull @Initialized ModelEnforcementFactory>> transformEnforcements, @UnknownKeyFor @NonNull @Initialized EvaluationContext context, @UnknownKeyFor @NonNull @Initialized ExecutorService metricsExecutor) {
        this.targetParallelism = targetParallelism;
        this.metricsExecutor = metricsExecutor;
        this.executorService = Executors.newFixedThreadPool(targetParallelism, new ThreadFactoryBuilder().setThreadFactory(MoreExecutors.platformThreadFactory()).setNameFormat("direct-runner-worker").build());
        this.registry = registry;
        this.evaluationContext = context;
        this.serialExecutorServices = CacheBuilder.newBuilder().weakValues().removalListener(this.shutdownExecutorServiceListener()).build(this.serialTransformExecutorServiceCacheLoader());
        this.visibleUpdates = new QueueMessageReceiver();
        this.parallelExecutorService = TransformExecutorServices.parallel(this.executorService);
        this.executorFactory = new DirectTransformExecutor.Factory(context, registry, transformEnforcements);
    }

    private @UnknownKeyFor @NonNull @Initialized CacheLoader<@UnknownKeyFor @NonNull @Initialized StepAndKey, @UnknownKeyFor @NonNull @Initialized TransformExecutorService> serialTransformExecutorServiceCacheLoader() {
        return new CacheLoader<StepAndKey, TransformExecutorService>(){

            public @UnknownKeyFor @NonNull @Initialized TransformExecutorService load(@UnknownKeyFor @NonNull @Initialized StepAndKey stepAndKey) throws @UnknownKeyFor @NonNull @Initialized Exception {
                return TransformExecutorServices.serial(ExecutorServiceParallelExecutor.this.executorService);
            }
        };
    }

    private @UnknownKeyFor @NonNull @Initialized RemovalListener<@UnknownKeyFor @NonNull @Initialized StepAndKey, @UnknownKeyFor @NonNull @Initialized TransformExecutorService> shutdownExecutorServiceListener() {
        return notification -> {
            TransformExecutorService service = (TransformExecutorService)notification.getValue();
            if (service != null) {
                service.shutdown();
            }
        };
    }

    @Override
    public void start(@UnknownKeyFor @NonNull @Initialized DirectGraph graph, @UnknownKeyFor @NonNull @Initialized RootProviderRegistry rootProviderRegistry) {
        int numTargetSplits = Math.max(3, this.targetParallelism);
        ImmutableMap.Builder pendingRootBundles = ImmutableMap.builder();
        for (AppliedPTransform root : graph.getRootTransforms()) {
            ArrayDeque pending = Queues.newArrayDeque();
            try {
                Collection<CommittedBundle<?>> initialInputs = rootProviderRegistry.getInitialInputs(root, numTargetSplits);
                pending.addAll(initialInputs);
            }
            catch (Exception e) {
                throw UserCodeException.wrap((Throwable)e);
            }
            pendingRootBundles.put((Object)root, (Object)pending);
        }
        this.evaluationContext.initialize((Map<AppliedPTransform<?, ?, ?>, ? extends Iterable<CommittedBundle<?>>>)pendingRootBundles.build());
        final ExecutionDriver executionDriver = QuiescenceDriver.create(this.evaluationContext, graph, this, this.visibleUpdates, pendingRootBundles.build());
        this.executorService.submit(new Runnable(){

            @Override
            public void run() {
                ExecutionDriver.DriverState drive = executionDriver.drive();
                if (drive.isTerminal()) {
                    PipelineResult.State newPipelineState = PipelineResult.State.UNKNOWN;
                    switch (drive) {
                        case FAILED: {
                            newPipelineState = PipelineResult.State.FAILED;
                            break;
                        }
                        case SHUTDOWN: {
                            newPipelineState = PipelineResult.State.DONE;
                            break;
                        }
                        case CONTINUE: {
                            throw new IllegalStateException(String.format("%s should not be a terminal state", new Object[]{ExecutionDriver.DriverState.CONTINUE}));
                        }
                        default: {
                            throw new IllegalArgumentException(String.format("Unknown %s %s", new Object[]{ExecutionDriver.DriverState.class.getSimpleName(), drive}));
                        }
                    }
                    ExecutorServiceParallelExecutor.this.shutdownIfNecessary(newPipelineState);
                } else {
                    ExecutorServiceParallelExecutor.this.executorService.submit(this);
                }
            }
        });
    }

    @Override
    public void process(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized CommittedBundle<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> bundle, /*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> consumer, @UnknownKeyFor @NonNull @Initialized CompletionCallback onComplete) {
        this.evaluateBundle(consumer, bundle, onComplete);
    }

    private <T> void evaluateBundle(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized AppliedPTransform<@UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized ?> transform, @UnknownKeyFor @NonNull @Initialized CommittedBundle<T> bundle, @UnknownKeyFor @NonNull @Initialized CompletionCallback onComplete) {
        TransformExecutorService transformExecutor;
        if (this.isKeyed((PValue)bundle.getPCollection())) {
            StepAndKey stepAndKey = StepAndKey.of(transform, bundle.getKey());
            transformExecutor = (TransformExecutorService)this.serialExecutorServices.getUnchecked((Object)stepAndKey);
        } else {
            transformExecutor = this.parallelExecutorService;
        }
        TransformExecutor callable = this.executorFactory.create(bundle, transform, onComplete, transformExecutor);
        if (!this.pipelineState.get().isTerminal()) {
            transformExecutor.schedule(callable);
        }
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isKeyed(@UnknownKeyFor @NonNull @Initialized PValue pvalue) {
        return this.evaluationContext.isKeyed(pvalue);
    }

    @Override
    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PipelineResult.State waitUntilFinish(@UnknownKeyFor @NonNull @Initialized Duration duration) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Instant completionTime = duration.equals((Object)Duration.ZERO) ? new Instant(Long.MAX_VALUE) : Instant.now().plus((ReadableDuration)duration);
        while (Instant.now().isBefore((ReadableInstant)completionTime)) {
            VisibleExecutorUpdate update = this.visibleUpdates.tryNext(Duration.millis((long)25L));
            if (update == null && this.pipelineState.get().isTerminal() && (update = this.visibleUpdates.tryNext(Duration.millis((long)1L))) == null) {
                return this.pipelineState.get();
            }
            if (update == null) continue;
            if (this.isTerminalStateUpdate(update)) {
                return this.pipelineState.get();
            }
            if (!update.thrown.isPresent()) continue;
            Throwable thrown = (Throwable)update.thrown.get();
            if (thrown instanceof Exception) {
                throw (Exception)thrown;
            }
            if (thrown instanceof Error) {
                throw (Error)thrown;
            }
            throw new Exception("Unknown Type of Throwable", thrown);
        }
        return null;
    }

    @Override
    public // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PipelineResult.State getPipelineState() {
        return this.pipelineState.get();
    }

    private @UnknownKeyFor @NonNull @Initialized boolean isTerminalStateUpdate(@UnknownKeyFor @NonNull @Initialized VisibleExecutorUpdate update) {
        return update.getNewState() != null && update.getNewState().isTerminal();
    }

    @Override
    public void stop() {
        this.shutdownIfNecessary(PipelineResult.State.CANCELLED);
        this.visibleUpdates.cancelled();
    }

    private void shutdownIfNecessary(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized PipelineResult.State newState) {
        if (!newState.isTerminal()) {
            return;
        }
        LOG.debug("Pipeline has terminated. Shutting down.");
        ArrayList<Exception> errors = new ArrayList<Exception>();
        try {
            this.serialExecutorServices.invalidateAll();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.serialExecutorServices.cleanUp();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.parallelExecutorService.shutdown();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.executorService.shutdown();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.metricsExecutor.shutdown();
        }
        catch (RuntimeException re) {
            errors.add(re);
        }
        try {
            this.registry.cleanup();
        }
        catch (Exception e) {
            errors.add(e);
        }
        this.pipelineState.compareAndSet(PipelineResult.State.RUNNING, newState);
        if (!errors.isEmpty()) {
            IllegalStateException exception = new IllegalStateException("Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n" + errors.stream().map(Throwable::getMessage).collect(Collectors.joining("\n- ", "- ", "")));
            this.visibleUpdates.failed(exception);
            throw exception;
        }
    }

    private static class QueueMessageReceiver
    implements PipelineMessageReceiver {
        private final @UnknownKeyFor @NonNull @Initialized BlockingQueue<@UnknownKeyFor @NonNull @Initialized VisibleExecutorUpdate> updates = new LinkedBlockingQueue<VisibleExecutorUpdate>();

        private QueueMessageReceiver() {
        }

        @Override
        @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
        public void failed(@UnknownKeyFor @NonNull @Initialized Exception e) {
            this.updates.offer(VisibleExecutorUpdate.fromException(e));
        }

        @Override
        @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
        public void failed(@UnknownKeyFor @NonNull @Initialized Error e) {
            this.updates.offer(VisibleExecutorUpdate.fromError(e));
        }

        @Override
        @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
        public void cancelled() {
            this.updates.offer(VisibleExecutorUpdate.cancelled());
        }

        @Override
        @SuppressFBWarnings(value={"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
        public void completed() {
            this.updates.offer(VisibleExecutorUpdate.finished());
        }

        private @Nullable @UnknownKeyFor @Initialized VisibleExecutorUpdate tryNext(@UnknownKeyFor @NonNull @Initialized Duration timeout) throws @UnknownKeyFor @NonNull @Initialized InterruptedException {
            return this.updates.poll(timeout.getMillis(), TimeUnit.MILLISECONDS);
        }
    }

    private static class VisibleExecutorUpdate {
        private final @UnknownKeyFor @NonNull @Initialized Optional<@KeyForBottom @NonNull @Initialized ? extends @UnknownKeyFor @NonNull @Initialized Throwable> thrown;
        private final // Could not load outer class - annotation placement on inner may be incorrect
         @Nullable @UnknownKeyFor @Initialized PipelineResult.State newState;

        public static @UnknownKeyFor @NonNull @Initialized VisibleExecutorUpdate fromException(@UnknownKeyFor @NonNull @Initialized Exception e) {
            return new VisibleExecutorUpdate(null, e);
        }

        public static @UnknownKeyFor @NonNull @Initialized VisibleExecutorUpdate fromError(@UnknownKeyFor @NonNull @Initialized Error err) {
            return new VisibleExecutorUpdate(PipelineResult.State.FAILED, err);
        }

        public static @UnknownKeyFor @NonNull @Initialized VisibleExecutorUpdate finished() {
            return new VisibleExecutorUpdate(PipelineResult.State.DONE, null);
        }

        public static @UnknownKeyFor @NonNull @Initialized VisibleExecutorUpdate cancelled() {
            return new VisibleExecutorUpdate(PipelineResult.State.CANCELLED, null);
        }

        private VisibleExecutorUpdate(// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized PipelineResult.State newState, @Nullable @UnknownKeyFor @Initialized Throwable exception) {
            this.thrown = Optional.ofNullable(exception);
            this.newState = newState;
        }

        // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized PipelineResult.State getNewState() {
            return this.newState;
        }
    }
}

