/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.runners.direct.BundleFactory;
import org.apache.beam.runners.direct.Clock;
import org.apache.beam.runners.direct.CloningBundleFactory;
import org.apache.beam.runners.direct.DirectGBKIntoKeyedWorkItemsOverrideFactory;
import org.apache.beam.runners.direct.DirectGraph;
import org.apache.beam.runners.direct.DirectGraphVisitor;
import org.apache.beam.runners.direct.DirectGroupByKeyOverrideFactory;
import org.apache.beam.runners.direct.DirectOptions;
import org.apache.beam.runners.direct.DirectTestOptions;
import org.apache.beam.runners.direct.DisplayDataValidator;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.ExecutorServiceParallelExecutor;
import org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory;
import org.apache.beam.runners.direct.ImmutabilityEnforcementFactory;
import org.apache.beam.runners.direct.ImmutableListBundleFactory;
import org.apache.beam.runners.direct.KeyedPValueTrackingVisitor;
import org.apache.beam.runners.direct.ModelEnforcementFactory;
import org.apache.beam.runners.direct.MultiStepCombine;
import org.apache.beam.runners.direct.NanosOffsetClock;
import org.apache.beam.runners.direct.ParDoMultiOverrideFactory;
import org.apache.beam.runners.direct.PipelineExecutor;
import org.apache.beam.runners.direct.RootProviderRegistry;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory;
import org.apache.beam.runners.direct.TransformEvaluatorRegistry;
import org.apache.beam.runners.direct.ViewOverrideFactory;
import org.apache.beam.runners.direct.WriteWithShardingFactory;
import org.apache.beam.runners.direct.repackaged.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Supplier;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableCollection;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableSet;
import org.apache.beam.runners.direct.repackaged.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.direct.repackaged.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;

public class DirectRunner
extends PipelineRunner<DirectPipelineResult> {
    private final DirectOptions options;
    private final Set<Enforcement> enabledEnforcements;
    private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();

    public static DirectRunner fromOptions(PipelineOptions options) {
        return new DirectRunner((DirectOptions)options.as(DirectOptions.class));
    }

    private DirectRunner(DirectOptions options) {
        this.options = options;
        this.enabledEnforcements = Enforcement.enabled(options);
    }

    public DirectOptions getPipelineOptions() {
        return this.options;
    }

    Supplier<Clock> getClockSupplier() {
        return this.clockSupplier;
    }

    void setClockSupplier(Supplier<Clock> supplier) {
        this.clockSupplier = supplier;
    }

    public DirectPipelineResult run(Pipeline originalPipeline) {
        Pipeline pipeline;
        if (this.getPipelineOptions().isProtoTranslation()) {
            try {
                pipeline = PipelineTranslation.fromProto(PipelineTranslation.toProto(originalPipeline));
            }
            catch (IOException exception) {
                throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
            }
        } else {
            pipeline = originalPipeline;
        }
        pipeline.replaceAll(this.defaultTransformOverrides());
        MetricsEnvironment.setMetricsSupported((boolean)true);
        DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)graphVisitor);
        KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create();
        pipeline.traverseTopologically((Pipeline.PipelineVisitor)keyedPValueVisitor);
        DisplayDataValidator.validatePipeline(pipeline);
        DisplayDataValidator.validateOptions(this.getPipelineOptions());
        DirectGraph graph = graphVisitor.getGraph();
        EvaluationContext context = EvaluationContext.create(this.getPipelineOptions(), this.clockSupplier.get(), Enforcement.bundleFactoryFor(this.enabledEnforcements, graph), graph, keyedPValueVisitor.getKeyedPValues());
        TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context);
        ExecutorServiceParallelExecutor executor = ExecutorServiceParallelExecutor.create(this.options.getTargetParallelism(), registry, Enforcement.defaultModelEnforcements(this.enabledEnforcements), context);
        executor.start(graph, RootProviderRegistry.defaultRegistry(context));
        DirectPipelineResult result = new DirectPipelineResult(executor, context);
        if (this.options.isBlockOnRun()) {
            try {
                result.waitUntilFinish();
            }
            catch (UserCodeException userException) {
                throw new Pipeline.PipelineExecutionException(userException.getCause());
            }
            catch (Throwable t) {
                if (t instanceof RuntimeException) {
                    throw (RuntimeException)t;
                }
                throw new RuntimeException(t);
            }
        }
        return result;
    }

    @VisibleForTesting
    List<PTransformOverride> defaultTransformOverrides() {
        DirectTestOptions testOptions = (DirectTestOptions)this.options.as(DirectTestOptions.class);
        ImmutableCollection.ArrayBasedBuilder builder = ImmutableList.builder();
        if (testOptions.isRunnerDeterminedSharding()) {
            ((ImmutableList.Builder)builder).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.writeWithRunnerDeterminedSharding(), new WriteWithShardingFactory()));
        }
        builder = ((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)((ImmutableList.Builder)builder).add(PTransformOverride.of((PTransformMatcher)MultiStepCombine.matcher(), (PTransformOverrideFactory)MultiStepCombine.Factory.create()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.urnEqualTo("urn:beam:transform:create_view:v1"), new ViewOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.urnEqualTo("urn:beam:transform:teststream:v1"), new TestStreamEvaluatorFactory.DirectTestStreamFactory(this)))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.splittableParDo(), new ParDoMultiOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.stateOrTimerParDo(), new ParDoMultiOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.urnEqualTo("urn:beam:runners_core:transforms:splittable_process_keyed_elements:v1"), new SplittableParDoViaKeyedWorkItems.OverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.urnEqualTo("urn:beam:runners_core:transforms:splittable_gbkikwi:v1"), new DirectGBKIntoKeyedWorkItemsOverrideFactory()))).add(PTransformOverride.of((PTransformMatcher)PTransformMatchers.urnEqualTo("urn:beam:transform:groupbykey:v1"), new DirectGroupByKeyOverrideFactory()));
        return ((ImmutableList.Builder)builder).build();
    }

    private static class NanosOffsetClockSupplier
    implements Supplier<Clock> {
        private NanosOffsetClockSupplier() {
        }

        @Override
        public Clock get() {
            return NanosOffsetClock.create();
        }
    }

    public static class DirectPipelineResult
    implements PipelineResult {
        private final PipelineExecutor executor;
        private final EvaluationContext evaluationContext;
        private PipelineResult.State state;

        private DirectPipelineResult(PipelineExecutor executor, EvaluationContext evaluationContext) {
            this.executor = executor;
            this.evaluationContext = evaluationContext;
            this.state = PipelineResult.State.RUNNING;
        }

        public PipelineResult.State getState() {
            return this.state;
        }

        public MetricResults metrics() {
            return this.evaluationContext.getMetrics();
        }

        public PipelineResult.State waitUntilFinish() {
            return this.waitUntilFinish(Duration.ZERO);
        }

        public PipelineResult.State cancel() {
            this.state = this.executor.getPipelineState();
            if (!this.state.isTerminal()) {
                this.executor.stop();
                this.state = this.executor.getPipelineState();
            }
            return this.executor.getPipelineState();
        }

        public PipelineResult.State waitUntilFinish(Duration duration) {
            PipelineResult.State startState = this.state;
            if (!startState.isTerminal()) {
                try {
                    this.state = this.executor.waitUntilFinish(duration);
                }
                catch (UserCodeException uce) {
                    throw new Pipeline.PipelineExecutionException(uce.getCause());
                }
                catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    if (e instanceof RuntimeException) {
                        throw (RuntimeException)e;
                    }
                    throw new RuntimeException(e);
                }
            }
            return this.state;
        }
    }

    static enum Enforcement {
        ENCODABILITY{

            @Override
            public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
                return true;
            }
        }
        ,
        IMMUTABILITY{

            @Override
            public boolean appliesTo(PCollection<?> collection, DirectGraph graph) {
                return CONTAINS_UDF.contains(PTransformTranslation.urnForTransform(graph.getProducer(collection).getTransform()));
            }
        };

        private static final Set<String> CONTAINS_UDF;

        public abstract boolean appliesTo(PCollection<?> var1, DirectGraph var2);

        static Set<Enforcement> enabled(DirectOptions options) {
            EnumSet<Enforcement> enabled = EnumSet.noneOf(Enforcement.class);
            if (options.isEnforceEncodability()) {
                enabled.add(ENCODABILITY);
            }
            if (options.isEnforceImmutability()) {
                enabled.add(IMMUTABILITY);
            }
            return Collections.unmodifiableSet(enabled);
        }

        static BundleFactory bundleFactoryFor(Set<Enforcement> enforcements, DirectGraph graph) {
            BundleFactory bundleFactory;
            BundleFactory bundleFactory2 = bundleFactory = enforcements.contains((Object)ENCODABILITY) ? CloningBundleFactory.create() : ImmutableListBundleFactory.create();
            if (enforcements.contains((Object)IMMUTABILITY)) {
                bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory, graph);
            }
            return bundleFactory;
        }

        private static Map<String, Collection<ModelEnforcementFactory>> defaultModelEnforcements(Set<Enforcement> enabledEnforcements) {
            ImmutableMap.Builder<String, ImmutableCollection> enforcements = ImmutableMap.builder();
            ImmutableList.Builder enabledParDoEnforcements = ImmutableList.builder();
            if (enabledEnforcements.contains((Object)IMMUTABILITY)) {
                enabledParDoEnforcements.add(ImmutabilityEnforcementFactory.create());
            }
            ImmutableCollection parDoEnforcements = enabledParDoEnforcements.build();
            enforcements.put("urn:beam:transform:pardo:v1", parDoEnforcements);
            return enforcements.build();
        }

        static {
            CONTAINS_UDF = ImmutableSet.of("urn:beam:transform:read:v1", "urn:beam:transform:pardo:v1");
        }
    }
}

