/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct.portable;

import java.io.Closeable;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.graph.PipelineNode;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.metrics.MetricUpdates;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.direct.portable.CommittedBundle;
import org.apache.beam.runners.direct.portable.CompletionCallback;
import org.apache.beam.runners.direct.portable.EvaluationContext;
import org.apache.beam.runners.direct.portable.TransformEvaluator;
import org.apache.beam.runners.direct.portable.TransformEvaluatorRegistry;
import org.apache.beam.runners.direct.portable.TransformExecutor;
import org.apache.beam.runners.direct.portable.TransformExecutorFactory;
import org.apache.beam.runners.direct.portable.TransformExecutorService;
import org.apache.beam.runners.direct.portable.TransformResult;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DirectTransformExecutor<T>
implements TransformExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(DirectTransformExecutor.class);
    private final TransformEvaluatorRegistry evaluatorRegistry;
    private final PipelineNode.PTransformNode transform;
    private final CommittedBundle<T> inputBundle;
    private final CompletionCallback onComplete;
    private final TransformExecutorService transformEvaluationState;
    private final EvaluationContext context;

    @VisibleForTesting
    DirectTransformExecutor(EvaluationContext context, TransformEvaluatorRegistry factory, CommittedBundle<T> inputBundle, PipelineNode.PTransformNode transform, CompletionCallback completionCallback, TransformExecutorService transformEvaluationState) {
        this.evaluatorRegistry = factory;
        this.inputBundle = inputBundle;
        this.transform = transform;
        this.onComplete = completionCallback;
        this.transformEvaluationState = transformEvaluationState;
        this.context = context;
    }

    @Override
    public void run() {
        MetricsContainerImpl metricsContainer = new MetricsContainerImpl(this.transform.getId());
        try (Closeable metricsScope = MetricsEnvironment.scopedMetricsContainer((MetricsContainer)metricsContainer);){
            TransformEvaluator evaluator = this.evaluatorRegistry.forApplication(this.transform, this.inputBundle);
            if (evaluator == null) {
                this.onComplete.handleEmpty(this.transform);
                return;
            }
            this.processElements(evaluator, metricsContainer);
            this.finishBundle(evaluator, metricsContainer);
        }
        catch (Exception e) {
            this.onComplete.handleException(this.inputBundle, e);
            if (e instanceof RuntimeException) {
                throw (RuntimeException)e;
            }
            throw new RuntimeException(e);
        }
        catch (Error err) {
            LOG.error("Error occurred within {}", (Object)this, (Object)err);
            this.onComplete.handleError(err);
            throw err;
        }
        finally {
            this.context.getMetrics().commitPhysical(this.inputBundle, metricsContainer.getCumulative());
            this.transformEvaluationState.complete(this);
        }
    }

    private void processElements(TransformEvaluator<T> evaluator, MetricsContainerImpl metricsContainer) throws Exception {
        if (this.inputBundle != null) {
            for (WindowedValue<T> value : this.inputBundle.getElements()) {
                evaluator.processElement(value);
                MetricUpdates deltas = metricsContainer.getUpdates();
                if (deltas == null) continue;
                this.context.getMetrics().updatePhysical(this.inputBundle, deltas);
                metricsContainer.commitUpdates();
            }
        }
    }

    private TransformResult<T> finishBundle(TransformEvaluator<T> evaluator, MetricsContainerImpl metricsContainer) throws Exception {
        TransformResult<T> result = evaluator.finishBundle().withLogicalMetricUpdates(metricsContainer.getCumulative());
        this.onComplete.handleResult(this.inputBundle, result);
        return result;
    }

    static class Factory
    implements TransformExecutorFactory {
        private final EvaluationContext context;
        private final TransformEvaluatorRegistry registry;

        Factory(EvaluationContext context, TransformEvaluatorRegistry registry) {
            this.context = context;
            this.registry = registry;
        }

        @Override
        public TransformExecutor create(CommittedBundle<?> bundle, PipelineNode.PTransformNode transform, CompletionCallback onComplete, TransformExecutorService executorService) {
            return new DirectTransformExecutor(this.context, this.registry, bundle, transform, onComplete, executorService);
        }
    }
}

