/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import com.google.auto.value.AutoValue;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.TreeSet;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Phaser;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.BeamFnDataReadRunner;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.AutoValue_ProcessBundleHandler_BundleProcessor;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.control.FinalizeBundleHandler;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.fn.harness.data.BeamFnTimerGrpcClient;
import org.apache.beam.fn.harness.data.PCollectionConsumerRegistry;
import org.apache.beam.fn.harness.data.PTransformFunctionRegistry;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.ExecutionStateSampler;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerStepMap;
import org.apache.beam.runners.core.metrics.ShortIdMap;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.DataEndpoint;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.TimerEndpoint;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.TextFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SetMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessBundleHandler {
    private static final String DATA_INPUT_URN = "beam:runner:source:v1";
    private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
    public static final String JAVA_SOURCE_URN = "beam:source:java:0.1";
    private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
    @VisibleForTesting
    static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
    private final PipelineOptions options;
    private final Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry;
    private final BeamFnDataClient beamFnDataClient;
    private final BeamFnStateGrpcClientCache beamFnStateGrpcClientCache;
    private final FinalizeBundleHandler finalizeBundleHandler;
    private final ShortIdMap shortIds;
    private final boolean runnerAcceptsShortIds;
    private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
    private final PTransformRunnerFactory defaultPTransformRunnerFactory;
    private final Cache<Object, Object> processWideCache;
    @VisibleForTesting
    final BundleProcessorCache bundleProcessorCache;

    public ProcessBundleHandler(PipelineOptions options, Set<String> runnerCapabilities, Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, FinalizeBundleHandler finalizeBundleHandler, ShortIdMap shortIds, Cache<Object, Object> processWideCache) {
        this(options, runnerCapabilities, fnApiRegistry, beamFnDataClient, beamFnStateGrpcClientCache, finalizeBundleHandler, shortIds, REGISTERED_RUNNER_FACTORIES, processWideCache, new BundleProcessorCache());
    }

    @VisibleForTesting
    ProcessBundleHandler(PipelineOptions options, Set<String> runnerCapabilities, Function<String, BeamFnApi.ProcessBundleDescriptor> fnApiRegistry, BeamFnDataClient beamFnDataClient, BeamFnStateGrpcClientCache beamFnStateGrpcClientCache, FinalizeBundleHandler finalizeBundleHandler, ShortIdMap shortIds, Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap, Cache<Object, Object> processWideCache, BundleProcessorCache bundleProcessorCache) {
        this.options = options;
        this.fnApiRegistry = fnApiRegistry;
        this.beamFnDataClient = beamFnDataClient;
        this.beamFnStateGrpcClientCache = beamFnStateGrpcClientCache;
        this.finalizeBundleHandler = finalizeBundleHandler;
        this.shortIds = shortIds;
        this.runnerAcceptsShortIds = runnerCapabilities.contains(BeamUrns.getUrn(RunnerApi.StandardRunnerProtocols.Enum.MONITORING_INFO_SHORT_IDS));
        this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
        this.defaultPTransformRunnerFactory = new UnknownPTransformRunnerFactory(urnToPTransformRunnerFactoryMap.keySet());
        this.processWideCache = processWideCache;
        this.bundleProcessorCache = bundleProcessorCache;
    }

    private void createRunnerAndConsumersForPTransformRecursively(final BeamFnStateClient beamFnStateClient, final BeamFnTimerClient beamFnTimerClient, final BeamFnDataClient queueingClient, final String pTransformId, final RunnerApi.PTransform pTransform, final Supplier<String> processBundleInstructionId, final Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens, final Supplier<Cache<?, ?>> bundleCache, final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, SetMultimap<String, String> pCollectionIdsToConsumingPTransforms, final PCollectionConsumerRegistry pCollectionConsumerRegistry, Set<String> processedPTransformIds, final PTransformFunctionRegistry startFunctionRegistry, final PTransformFunctionRegistry finishFunctionRegistry, final Consumer<ThrowingRunnable> addResetFunction, final Consumer<ThrowingRunnable> addTearDownFunction, final BiConsumer<Endpoints.ApiServiceDescriptor, DataEndpoint<?>> addDataEndpoint, final Consumer<TimerEndpoint<?>> addTimerEndpoint, final Consumer<PTransformRunnerFactory.ProgressRequestCallback> addProgressRequestCallback, final BundleSplitListener splitListener, final DoFn.BundleFinalizer bundleFinalizer, Collection<BeamFnDataReadRunner> channelRoots) throws IOException {
        for (String pCollectionId : pTransform.getOutputsMap().values()) {
            for (String consumingPTransformId : pCollectionIdsToConsumingPTransforms.get((Object)pCollectionId)) {
                this.createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, beamFnTimerClient, queueingClient, consumingPTransformId, processBundleDescriptor.getTransformsMap().get(consumingPTransformId), processBundleInstructionId, cacheTokens, bundleCache, processBundleDescriptor, pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, startFunctionRegistry, finishFunctionRegistry, addResetFunction, addTearDownFunction, addDataEndpoint, addTimerEndpoint, addProgressRequestCallback, splitListener, bundleFinalizer, channelRoots);
            }
        }
        if (!pTransform.hasSpec()) {
            throw new IllegalArgumentException(String.format("Cannot process transform with no spec: %s", TextFormat.printToString(pTransform)));
        }
        if (pTransform.getSubtransformsCount() > 0) {
            throw new IllegalArgumentException(String.format("Cannot process composite transform: %s", TextFormat.printToString(pTransform)));
        }
        if (!processedPTransformIds.contains(pTransformId)) {
            Object runner = this.urnToPTransformRunnerFactoryMap.getOrDefault(pTransform.getSpec().getUrn(), this.defaultPTransformRunnerFactory).createRunnerForPTransform(new PTransformRunnerFactory.Context(){

                @Override
                public PipelineOptions getPipelineOptions() {
                    return ProcessBundleHandler.this.options;
                }

                @Override
                public BeamFnDataClient getBeamFnDataClient() {
                    return queueingClient;
                }

                @Override
                public BeamFnStateClient getBeamFnStateClient() {
                    return beamFnStateClient;
                }

                @Override
                public BeamFnTimerClient getBeamFnTimerClient() {
                    return beamFnTimerClient;
                }

                @Override
                public String getPTransformId() {
                    return pTransformId;
                }

                @Override
                public RunnerApi.PTransform getPTransform() {
                    return pTransform;
                }

                @Override
                public Supplier<String> getProcessBundleInstructionIdSupplier() {
                    return processBundleInstructionId;
                }

                @Override
                public Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> getCacheTokensSupplier() {
                    return cacheTokens;
                }

                @Override
                public Supplier<Cache<?, ?>> getBundleCacheSupplier() {
                    return bundleCache;
                }

                @Override
                public Cache<?, ?> getProcessWideCache() {
                    return ProcessBundleHandler.this.processWideCache;
                }

                @Override
                public Map<String, RunnerApi.PCollection> getPCollections() {
                    return processBundleDescriptor.getPcollectionsMap();
                }

                @Override
                public Map<String, RunnerApi.Coder> getCoders() {
                    return processBundleDescriptor.getCodersMap();
                }

                @Override
                public Map<String, RunnerApi.WindowingStrategy> getWindowingStrategies() {
                    return processBundleDescriptor.getWindowingStrategiesMap();
                }

                @Override
                public <T> void addPCollectionConsumer(String pCollectionId, FnDataReceiver<WindowedValue<T>> consumer, Coder<T> valueCoder) {
                    pCollectionConsumerRegistry.register(pCollectionId, pTransformId, consumer, valueCoder);
                }

                public FnDataReceiver<?> getPCollectionConsumer(String pCollectionId) {
                    return pCollectionConsumerRegistry.getMultiplexingConsumer(pCollectionId);
                }

                @Override
                public void addStartBundleFunction(ThrowingRunnable startFunction) {
                    startFunctionRegistry.register(pTransformId, startFunction);
                }

                @Override
                public void addFinishBundleFunction(ThrowingRunnable finishFunction) {
                    finishFunctionRegistry.register(pTransformId, finishFunction);
                }

                @Override
                public <T> void addIncomingDataEndpoint(Endpoints.ApiServiceDescriptor apiServiceDescriptor, Coder<T> coder, FnDataReceiver<T> receiver) {
                    addDataEndpoint.accept(apiServiceDescriptor, DataEndpoint.create(pTransformId, coder, receiver));
                }

                @Override
                public <T> void addIncomingTimerEndpoint(String timerFamilyId, Coder<Timer<T>> coder, FnDataReceiver<Timer<T>> receiver) {
                    addTimerEndpoint.accept(TimerEndpoint.create(pTransformId, timerFamilyId, coder, receiver));
                }

                @Override
                public void addResetFunction(ThrowingRunnable resetFunction) {
                    addResetFunction.accept(resetFunction);
                }

                @Override
                public void addTearDownFunction(ThrowingRunnable tearDownFunction) {
                    addTearDownFunction.accept(tearDownFunction);
                }

                @Override
                public void addProgressRequestCallback(PTransformRunnerFactory.ProgressRequestCallback progressRequestCallback) {
                    addProgressRequestCallback.accept(progressRequestCallback);
                }

                @Override
                public BundleSplitListener getSplitListener() {
                    return splitListener;
                }

                @Override
                public DoFn.BundleFinalizer getBundleFinalizer() {
                    return bundleFinalizer;
                }
            });
            if (runner instanceof BeamFnDataReadRunner) {
                channelRoots.add((BeamFnDataReadRunner)runner);
            }
            processedPTransformIds.add(pTransformId);
        }
    }

    public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request) throws Exception {
        BeamFnApi.ProcessBundleResponse.Builder response = BeamFnApi.ProcessBundleResponse.newBuilder();
        BundleProcessor bundleProcessor = this.bundleProcessorCache.get(request, () -> {
            try {
                return this.createBundleProcessor(request.getProcessBundle().getProcessBundleDescriptorId(), request.getProcessBundle());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        });
        try {
            PTransformFunctionRegistry startFunctionRegistry = bundleProcessor.getStartFunctionRegistry();
            PTransformFunctionRegistry finishFunctionRegistry = bundleProcessor.getFinishFunctionRegistry();
            ExecutionStateTracker stateTracker = bundleProcessor.getStateTracker();
            try (HandleStateCallsForBundle beamFnStateClient = bundleProcessor.getBeamFnStateClient();){
                Closeable closeTracker = stateTracker.activate();
                Object object = null;
                try {
                    for (ThrowingRunnable startFunction : startFunctionRegistry.getFunctions()) {
                        LOG.debug("Starting function {}", (Object)startFunction);
                        startFunction.run();
                    }
                    if (request.getProcessBundle().hasElements()) {
                        boolean bl = bundleProcessor.getInboundObserver().multiplexElements(request.getProcessBundle().getElements());
                        if (!bl) {
                            throw new RuntimeException("Elements embedded in ProcessBundleRequest do not contain stream terminators for all data and timer inputs. Unterminated endpoints: " + bundleProcessor.getInboundObserver().getUnfinishedEndpoints());
                        }
                    } else if (!bundleProcessor.getInboundEndpointApiServiceDescriptors().isEmpty()) {
                        BeamFnDataInboundObserver2 beamFnDataInboundObserver2 = bundleProcessor.getInboundObserver();
                        this.beamFnDataClient.registerReceiver(request.getInstructionId(), bundleProcessor.getInboundEndpointApiServiceDescriptors(), beamFnDataInboundObserver2);
                        beamFnDataInboundObserver2.awaitCompletion();
                        this.beamFnDataClient.unregisterReceiver(request.getInstructionId(), bundleProcessor.getInboundEndpointApiServiceDescriptors());
                    }
                    for (ThrowingRunnable finishFunction : Lists.reverse(finishFunctionRegistry.getFunctions())) {
                        LOG.debug("Finishing function {}", (Object)finishFunction);
                        finishFunction.run();
                    }
                }
                catch (Throwable throwable) {
                    object = throwable;
                    throw throwable;
                }
                finally {
                    if (closeTracker != null) {
                        ProcessBundleHandler.$closeResource((Throwable)object, closeTracker);
                    }
                }
                response.addAllResidualRoots(bundleProcessor.getSplitListener().getResidualRoots());
                ImmutableMap<String, ByteString> monitoringData = this.monitoringData(bundleProcessor);
                if (this.runnerAcceptsShortIds) {
                    response.putAllMonitoringData(monitoringData);
                } else {
                    for (Map.Entry entry : monitoringData.entrySet()) {
                        response.addMonitoringInfos(this.shortIds.get((String)entry.getKey()).toBuilder().setPayload((ByteString)entry.getValue()));
                    }
                }
                if (!bundleProcessor.getBundleFinalizationCallbackRegistrations().isEmpty()) {
                    this.finalizeBundleHandler.registerCallbacks(bundleProcessor.getInstructionId(), ImmutableList.copyOf(bundleProcessor.getBundleFinalizationCallbackRegistrations()));
                    response.setRequiresFinalization(true);
                }
            }
            this.bundleProcessorCache.release(request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
        }
        catch (Exception e) {
            this.bundleProcessorCache.discard(bundleProcessor);
            throw e;
        }
    }

    public BeamFnApi.InstructionResponse.Builder progress(BeamFnApi.InstructionRequest request) throws Exception {
        BundleProcessor bundleProcessor = this.bundleProcessorCache.find(request.getProcessBundleProgress().getInstructionId());
        BeamFnApi.ProcessBundleProgressResponse.Builder response = BeamFnApi.ProcessBundleProgressResponse.newBuilder();
        if (bundleProcessor == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(BeamFnApi.ProcessBundleProgressResponse.getDefaultInstance());
        }
        ImmutableMap<String, ByteString> monitoringData = this.monitoringData(bundleProcessor);
        if (this.runnerAcceptsShortIds) {
            response.putAllMonitoringData(monitoringData);
        } else {
            for (Map.Entry metric : monitoringData.entrySet()) {
                response.addMonitoringInfos(this.shortIds.get((String)metric.getKey()).toBuilder().setPayload((ByteString)metric.getValue()));
            }
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleProgress(response);
    }

    private ImmutableMap<String, ByteString> monitoringData(BundleProcessor bundleProcessor) throws Exception {
        ImmutableMap.Builder<String, ByteString> result = ImmutableMap.builder();
        result.putAll(bundleProcessor.getStartFunctionRegistry().getExecutionTimeMonitoringData(this.shortIds));
        result.putAll(bundleProcessor.getpCollectionConsumerRegistry().getExecutionTimeMonitoringData(this.shortIds));
        result.putAll(bundleProcessor.getFinishFunctionRegistry().getExecutionTimeMonitoringData(this.shortIds));
        result.putAll(bundleProcessor.getMetricsContainerRegistry().getMonitoringData(this.shortIds));
        for (PTransformRunnerFactory.ProgressRequestCallback progressRequestCallback : bundleProcessor.getProgressRequestCallbacks()) {
            for (MetricsApi.MonitoringInfo monitoringInfo : progressRequestCallback.getMonitoringInfos()) {
                ByteString payload = monitoringInfo.getPayload();
                String shortId = this.shortIds.getOrCreateShortId(monitoringInfo.toBuilder().clearPayload().build());
                result.put(shortId, payload);
            }
        }
        return result.build();
    }

    public BeamFnApi.InstructionResponse.Builder trySplit(BeamFnApi.InstructionRequest request) {
        BundleProcessor bundleProcessor = this.bundleProcessorCache.find(request.getProcessBundleSplit().getInstructionId());
        BeamFnApi.ProcessBundleSplitResponse.Builder response = BeamFnApi.ProcessBundleSplitResponse.newBuilder();
        if (bundleProcessor == null) {
            return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(BeamFnApi.ProcessBundleSplitResponse.getDefaultInstance());
        }
        for (BeamFnDataReadRunner channelRoot : bundleProcessor.getChannelRoots()) {
            channelRoot.trySplit(request.getProcessBundleSplit(), response);
        }
        return BeamFnApi.InstructionResponse.newBuilder().setProcessBundleSplit(response);
    }

    public void shutdown() throws Exception {
        this.bundleProcessorCache.shutdown();
    }

    private BundleProcessor createBundleProcessor(String bundleId, BeamFnApi.ProcessBundleRequest processBundleRequest) throws IOException {
        HandleStateCallsForBundle beamFnStateClient;
        BeamFnApi.ProcessBundleDescriptor bundleDescriptor = this.fnApiRegistry.apply(bundleId);
        HashMultimap<String, String> pCollectionIdsToConsumingPTransforms = HashMultimap.create();
        MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
        ExecutionStateTracker stateTracker = new ExecutionStateTracker(ExecutionStateSampler.instance());
        PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry(metricsContainerRegistry, stateTracker);
        HashSet<String> processedPTransformIds = new HashSet<String>();
        PTransformFunctionRegistry startFunctionRegistry = new PTransformFunctionRegistry(metricsContainerRegistry, stateTracker, "start");
        PTransformFunctionRegistry finishFunctionRegistry = new PTransformFunctionRegistry(metricsContainerRegistry, stateTracker, "finish");
        ArrayList<ThrowingRunnable> resetFunctions = new ArrayList<ThrowingRunnable>();
        ArrayList<ThrowingRunnable> tearDownFunctions = new ArrayList<ThrowingRunnable>();
        ArrayList<PTransformRunnerFactory.ProgressRequestCallback> progressRequestCallbacks = new ArrayList<PTransformRunnerFactory.ProgressRequestCallback>();
        for (Map.Entry<String, RunnerApi.PTransform> entry : bundleDescriptor.getTransformsMap().entrySet()) {
            for (String pCollectionId : entry.getValue().getInputsMap().values()) {
                pCollectionIdsToConsumingPTransforms.put(pCollectionId, entry.getKey());
            }
        }
        if (bundleDescriptor.hasStateApiServiceDescriptor()) {
            BeamFnStateClient underlyingClient = this.beamFnStateGrpcClientCache.forApiServiceDescriptor(bundleDescriptor.getStateApiServiceDescriptor());
            beamFnStateClient = new BlockTillStateCallsFinish(underlyingClient);
        } else {
            beamFnStateClient = new FailAllStateCallsForBundle(processBundleRequest);
        }
        BeamFnTimerClient beamFnTimerClient = bundleDescriptor.hasTimerApiServiceDescriptor() ? new BeamFnTimerGrpcClient(this.beamFnDataClient, bundleDescriptor.getTimerApiServiceDescriptor()) : new FailAllTimerRegistrations(processBundleRequest);
        BundleSplitListener.InMemory splitListener = BundleSplitListener.InMemory.create();
        final ArrayList<FinalizeBundleHandler.CallbackRegistration> bundleFinalizationCallbackRegistrations = new ArrayList<FinalizeBundleHandler.CallbackRegistration>();
        DoFn.BundleFinalizer bundleFinalizer = new DoFn.BundleFinalizer(){

            @Override
            public void afterBundleCommit(Instant callbackExpiry, DoFn.BundleFinalizer.Callback callback) {
                bundleFinalizationCallbackRegistrations.add(FinalizeBundleHandler.CallbackRegistration.create(callbackExpiry, callback));
            }
        };
        BundleProcessor bundleProcessor = BundleProcessor.create(this.processWideCache, bundleDescriptor, startFunctionRegistry, finishFunctionRegistry, resetFunctions, tearDownFunctions, progressRequestCallbacks, splitListener, pCollectionConsumerRegistry, metricsContainerRegistry, stateTracker, beamFnStateClient, bundleFinalizationCallbackRegistrations);
        for (Map.Entry<String, RunnerApi.PTransform> entry : bundleDescriptor.getTransformsMap().entrySet()) {
            if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn()) && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn()) && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn()) && !"beam:transform:read:v1".equals(entry.getValue().getSpec().getUrn())) continue;
            this.createRunnerAndConsumersForPTransformRecursively(beamFnStateClient, beamFnTimerClient, this.beamFnDataClient, entry.getKey(), entry.getValue(), bundleProcessor::getInstructionId, bundleProcessor::getCacheTokens, bundleProcessor::getBundleCache, bundleDescriptor, pCollectionIdsToConsumingPTransforms, pCollectionConsumerRegistry, processedPTransformIds, startFunctionRegistry, finishFunctionRegistry, resetFunctions::add, tearDownFunctions::add, (apiServiceDescriptor, dataEndpoint) -> {
                if (!bundleProcessor.getInboundEndpointApiServiceDescriptors().contains(apiServiceDescriptor)) {
                    bundleProcessor.getInboundEndpointApiServiceDescriptors().add((Endpoints.ApiServiceDescriptor)apiServiceDescriptor);
                }
                bundleProcessor.getInboundDataEndpoints().add((DataEndpoint<?>)dataEndpoint);
            }, timerEndpoint -> {
                if (!bundleDescriptor.hasTimerApiServiceDescriptor()) {
                    throw FailAllTimerRegistrations.fail(processBundleRequest);
                }
                bundleProcessor.getTimerEndpoints().add((TimerEndpoint<?>)timerEndpoint);
            }, progressRequestCallbacks::add, splitListener, bundleFinalizer, bundleProcessor.getChannelRoots());
        }
        bundleProcessor.finish();
        return bundleProcessor;
    }

    public BundleProcessorCache getBundleProcessorCache() {
        return this.bundleProcessorCache;
    }

    static {
        TreeSet<Object> pipelineRunnerRegistrars = Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
        pipelineRunnerRegistrars.addAll(Lists.newArrayList(ServiceLoader.load(PTransformRunnerFactory.Registrar.class, ReflectHelpers.findClassLoader())));
        ImmutableMap.Builder<String, PTransformRunnerFactory> builder = ImmutableMap.builder();
        for (PTransformRunnerFactory.Registrar registrar : pipelineRunnerRegistrars) {
            builder.putAll(registrar.getPTransformRunnerFactories());
        }
        REGISTERED_RUNNER_FACTORIES = builder.build();
    }

    private static class UnknownPTransformRunnerFactory
    implements PTransformRunnerFactory<Object> {
        private final Set<String> knownUrns;

        private UnknownPTransformRunnerFactory(Set<String> knownUrns) {
            this.knownUrns = knownUrns;
        }

        @Override
        public Object createRunnerForPTransform(PTransformRunnerFactory.Context context) {
            String message = String.format("No factory registered for %s, known factories %s", context.getPTransform().getSpec().getUrn(), this.knownUrns);
            LOG.error(message);
            throw new IllegalStateException(message);
        }
    }

    static abstract class HandleStateCallsForBundle
    implements AutoCloseable,
    BeamFnStateClient {
        HandleStateCallsForBundle() {
        }
    }

    private static class FailAllTimerRegistrations
    implements BeamFnTimerClient {
        private final BeamFnApi.ProcessBundleRequest request;

        private FailAllTimerRegistrations(BeamFnApi.ProcessBundleRequest request) {
            this.request = request;
        }

        public <T> CloseableFnDataReceiver<Timer<T>> register(LogicalEndpoint timerEndpoint, Coder<Timer<T>> coder) {
            throw FailAllTimerRegistrations.fail(this.request);
        }

        private static IllegalStateException fail(BeamFnApi.ProcessBundleRequest request) {
            throw new IllegalStateException(String.format("Timers are unsupported because the ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", request));
        }
    }

    private static class FailAllStateCallsForBundle
    extends HandleStateCallsForBundle {
        private final BeamFnApi.ProcessBundleRequest request;

        private FailAllStateCallsForBundle(BeamFnApi.ProcessBundleRequest request) {
            this.request = request;
        }

        @Override
        public void close() throws Exception {
        }

        @Override
        public CompletableFuture<BeamFnApi.StateResponse> handle(BeamFnApi.StateRequest.Builder requestBuilder) {
            throw new IllegalStateException(String.format("State API calls are unsupported because the ProcessBundleRequest %s does not support state.", this.request));
        }
    }

    private static class BlockTillStateCallsFinish
    extends HandleStateCallsForBundle {
        private final BeamFnStateClient beamFnStateClient;
        private final Phaser phaser;
        private int currentPhase;

        private BlockTillStateCallsFinish(BeamFnStateClient beamFnStateClient) {
            this.beamFnStateClient = beamFnStateClient;
            this.phaser = new Phaser(1);
            this.currentPhase = this.phaser.getPhase();
        }

        @Override
        public void close() throws Exception {
            int unarrivedParties = this.phaser.getUnarrivedParties();
            if (unarrivedParties > 0) {
                LOG.debug("Waiting for {} parties to arrive before closing, current phase {}.", (Object)unarrivedParties, (Object)this.currentPhase);
            }
            this.currentPhase = this.phaser.arriveAndAwaitAdvance();
        }

        @Override
        public CompletableFuture<BeamFnApi.StateResponse> handle(BeamFnApi.StateRequest.Builder requestBuilder) {
            CompletableFuture<BeamFnApi.StateResponse> response = this.beamFnStateClient.handle(requestBuilder);
            this.phaser.register();
            response.whenComplete((stateResponse, throwable) -> this.phaser.arriveAndDeregister());
            return response;
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class BundleProcessor {
        private String instructionId;
        private List<BeamFnApi.ProcessBundleRequest.CacheToken> cacheTokens;
        private Caches.ClearableCache<Object, Object> bundleCache;
        private BeamFnDataInboundObserver2 inboundObserver2;

        public static BundleProcessor create(Cache<Object, Object> processWideCache, BeamFnApi.ProcessBundleDescriptor processBundleDescriptor, PTransformFunctionRegistry startFunctionRegistry, PTransformFunctionRegistry finishFunctionRegistry, List<ThrowingRunnable> resetFunctions, List<ThrowingRunnable> tearDownFunctions, List<PTransformRunnerFactory.ProgressRequestCallback> progressRequestCallbacks, BundleSplitListener.InMemory splitListener, PCollectionConsumerRegistry pCollectionConsumerRegistry, MetricsContainerStepMap metricsContainerRegistry, ExecutionStateTracker stateTracker, HandleStateCallsForBundle beamFnStateClient, Collection<FinalizeBundleHandler.CallbackRegistration> bundleFinalizationCallbackRegistrations) {
            return new AutoValue_ProcessBundleHandler_BundleProcessor(processWideCache, processBundleDescriptor, startFunctionRegistry, finishFunctionRegistry, resetFunctions, tearDownFunctions, progressRequestCallbacks, splitListener, pCollectionConsumerRegistry, metricsContainerRegistry, stateTracker, beamFnStateClient, new ArrayList<Endpoints.ApiServiceDescriptor>(), new ArrayList(), new ArrayList(), bundleFinalizationCallbackRegistrations, new ArrayList<BeamFnDataReadRunner>());
        }

        abstract Cache<?, ?> getProcessWideCache();

        abstract BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor();

        abstract PTransformFunctionRegistry getStartFunctionRegistry();

        abstract PTransformFunctionRegistry getFinishFunctionRegistry();

        abstract List<ThrowingRunnable> getResetFunctions();

        abstract List<ThrowingRunnable> getTearDownFunctions();

        abstract List<PTransformRunnerFactory.ProgressRequestCallback> getProgressRequestCallbacks();

        abstract BundleSplitListener.InMemory getSplitListener();

        abstract PCollectionConsumerRegistry getpCollectionConsumerRegistry();

        abstract MetricsContainerStepMap getMetricsContainerRegistry();

        public abstract ExecutionStateTracker getStateTracker();

        abstract HandleStateCallsForBundle getBeamFnStateClient();

        abstract List<Endpoints.ApiServiceDescriptor> getInboundEndpointApiServiceDescriptors();

        abstract List<DataEndpoint<?>> getInboundDataEndpoints();

        abstract List<TimerEndpoint<?>> getTimerEndpoints();

        abstract Collection<FinalizeBundleHandler.CallbackRegistration> getBundleFinalizationCallbackRegistrations();

        abstract Collection<BeamFnDataReadRunner> getChannelRoots();

        synchronized String getInstructionId() {
            return this.instructionId;
        }

        synchronized List<BeamFnApi.ProcessBundleRequest.CacheToken> getCacheTokens() {
            return this.cacheTokens;
        }

        synchronized Cache<Object, Object> getBundleCache() {
            if (this.bundleCache == null) {
                this.bundleCache = new Caches.ClearableCache(Caches.subCache(this.getProcessWideCache(), "Bundle", this.instructionId));
            }
            return this.bundleCache;
        }

        BeamFnDataInboundObserver2 getInboundObserver() {
            return this.inboundObserver2;
        }

        void finish() {
            this.inboundObserver2 = BeamFnDataInboundObserver2.forConsumers(this.getInboundDataEndpoints(), this.getTimerEndpoints());
        }

        synchronized void setupForProcessBundleRequest(BeamFnApi.InstructionRequest request) {
            this.instructionId = request.getInstructionId();
            this.cacheTokens = request.getProcessBundle().getCacheTokensList();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void reset() throws Exception {
            BundleProcessor bundleProcessor = this;
            synchronized (bundleProcessor) {
                this.instructionId = null;
                this.cacheTokens = null;
                if (this.bundleCache != null) {
                    this.bundleCache.clear();
                    this.bundleCache = null;
                }
            }
            this.getStartFunctionRegistry().reset();
            this.getFinishFunctionRegistry().reset();
            this.getSplitListener().clear();
            this.getpCollectionConsumerRegistry().reset();
            this.getMetricsContainerRegistry().reset();
            this.getStateTracker().reset();
            this.getBundleFinalizationCallbackRegistrations().clear();
            for (ThrowingRunnable resetFunction : this.getResetFunctions()) {
                resetFunction.run();
            }
            this.getInboundObserver().reset();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void discard() {
            BundleProcessor bundleProcessor = this;
            synchronized (bundleProcessor) {
                this.instructionId = null;
                this.cacheTokens = null;
                if (this.bundleCache != null) {
                    this.bundleCache.clear();
                }
            }
        }

        void shutdown() {
            for (ThrowingRunnable tearDownFunction : this.getTearDownFunctions()) {
                LOG.debug("Tearing down function {}", (Object)tearDownFunction);
                try {
                    tearDownFunction.run();
                }
                catch (Exception e) {
                    LOG.error("Exceptions are thrown from DoFn.teardown method. Note that it will not fail the pipeline execution,", e);
                }
            }
        }
    }

    public static class BundleProcessorCache {
        private final LoadingCache<String, ConcurrentLinkedQueue<BundleProcessor>> cachedBundleProcessors = CacheBuilder.newBuilder().expireAfterAccess(Duration.ofMinutes(1L)).removalListener(removalNotification -> ((ConcurrentLinkedQueue)removalNotification.getValue()).forEach(bundleProcessor -> bundleProcessor.shutdown())).build(new CacheLoader<String, ConcurrentLinkedQueue<BundleProcessor>>(){

            @Override
            public ConcurrentLinkedQueue<BundleProcessor> load(String s2) throws Exception {
                return new ConcurrentLinkedQueue<BundleProcessor>();
            }
        });
        private final Map<String, BundleProcessor> activeBundleProcessors = Collections.synchronizedMap(new WeakHashMap());

        public int hashCode() {
            return super.hashCode();
        }

        BundleProcessorCache() {
        }

        @VisibleForTesting
        Map<String, ConcurrentLinkedQueue<BundleProcessor>> getCachedBundleProcessors() {
            return ImmutableMap.copyOf(this.cachedBundleProcessors.asMap());
        }

        public Map<String, BundleProcessor> getActiveBundleProcessors() {
            return ImmutableMap.copyOf(this.activeBundleProcessors);
        }

        BundleProcessor get(BeamFnApi.InstructionRequest processBundleRequest, Supplier<BundleProcessor> bundleProcessorSupplier) {
            ConcurrentLinkedQueue<BundleProcessor> bundleProcessors = this.cachedBundleProcessors.getUnchecked(processBundleRequest.getProcessBundle().getProcessBundleDescriptorId());
            BundleProcessor bundleProcessor = bundleProcessors.poll();
            if (bundleProcessor == null) {
                bundleProcessor = bundleProcessorSupplier.get();
            }
            bundleProcessor.setupForProcessBundleRequest(processBundleRequest);
            this.activeBundleProcessors.put(processBundleRequest.getInstructionId(), bundleProcessor);
            return bundleProcessor;
        }

        public BundleProcessor find(String instructionId) {
            return this.activeBundleProcessors.get(instructionId);
        }

        void release(String bundleDescriptorId, BundleProcessor bundleProcessor) {
            this.activeBundleProcessors.remove(bundleProcessor.getInstructionId());
            try {
                bundleProcessor.reset();
                this.cachedBundleProcessors.get(bundleDescriptorId).add(bundleProcessor);
            }
            catch (Exception e) {
                LOG.warn("Was unable to reset bundle processor safely. Bundle processor will be discarded and re-instantiated on next bundle for descriptor {}.", (Object)bundleDescriptorId, (Object)e);
            }
        }

        void discard(BundleProcessor bundleProcessor) {
            bundleProcessor.discard();
            this.activeBundleProcessors.remove(bundleProcessor.getInstructionId());
        }

        void shutdown() throws Exception {
            this.cachedBundleProcessors.invalidateAll();
        }
    }
}

