/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex;
import org.apache.beam.fn.harness.AutoValue_FnApiDoFnRunner_WindowedSplitResult;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.FnApiDoFnRunner;
import org.apache.beam.fn.harness.HandlesSplits;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnTimerClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.FnApiStateAccessor;
import org.apache.beam.fn.harness.state.FnApiTimerBundleTracker;
import org.apache.beam.fn.harness.state.SideInputSpec;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.MetricsApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.LateDataUtils;
import org.apache.beam.runners.core.construction.PCollectionViewTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.state.ReadableState;
import org.apache.beam.sdk.state.State;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerMap;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTimeUtils;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;

public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT> {
    private final PipelineOptions pipelineOptions;
    private final String pTransformId;
    private final RunnerApi.PTransform pTransform;
    private final Supplier<String> processBundleInstructionId;
    private final RehydratedComponents rehydratedComponents;
    private final DoFn<InputT, OutputT> doFn;
    private final DoFnSignature doFnSignature;
    private final TupleTag<OutputT> mainOutputTag;
    private final Coder<?> inputCoder;
    private final Coder<?> keyCoder;
    private final SchemaCoder<OutputT> mainOutputSchemaCoder;
    private final Coder<? extends BoundedWindow> windowCoder;
    private final WindowingStrategy<InputT, ?> windowingStrategy;
    private final Map<TupleTag<?>, SideInputSpec> tagToSideInputSpecMap;
    private final Map<TupleTag<?>, Coder<?>> outputCoders;
    private final BeamFnTimerClient beamFnTimerClient;
    private final Map<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfos;
    private final RunnerApi.ParDoPayload parDoPayload;
    private final ListMultimap<String, FnDataReceiver<WindowedValue<?>>> localNameToConsumer;
    private final BundleSplitListener splitListener;
    private final DoFn.BundleFinalizer bundleFinalizer;
    private final Collection<FnDataReceiver<WindowedValue<OutputT>>> mainOutputConsumers;
    private final String mainInputId;
    private final FnApiStateAccessor<?> stateAccessor;
    private Map<String, CloseableFnDataReceiver<Timer<Object>>> outboundTimerReceivers;
    private FnApiTimerBundleTracker timerBundleTracker;
    private final DoFnInvoker<InputT, OutputT> doFnInvoker;
    private final StartBundleArgumentProvider startBundleArgumentProvider;
    private final ProcessBundleContextBase processContext;
    private final OnTimerContext<?> onTimerContext;
    private final OnWindowExpirationContext<?> onWindowExpirationContext;
    private final FinishBundleArgumentProvider finishBundleArgumentProvider;
    private final Object splitLock = new Object();
    private final DoFnSchemaInformation doFnSchemaInformation;
    private final Map<String, PCollectionView<?>> sideInputMapping;
    private WindowedValue<InputT> currentElement;
    private Object currentKey;
    private List<BoundedWindow> currentWindows;
    private int windowStopIndex;
    private int windowCurrentIndex;
    private RestrictionT currentRestriction;
    private WatermarkEstimatorStateT currentWatermarkEstimatorState;
    private Instant initialWatermark;
    private WatermarkEstimators.WatermarkAndStateObserver<WatermarkEstimatorStateT> currentWatermarkEstimator;
    private BoundedWindow currentWindow;
    private RestrictionTracker<RestrictionT, PositionT> currentTracker;
    private Timer<?> currentTimer;
    private TimeDomain currentTimeDomain;

    /*
     * WARNING - void declaration
     */
    FnApiDoFnRunner(PipelineOptions pipelineOptions, BeamFnStateClient beamFnStateClient, BeamFnTimerClient beamFnTimerClient, final String pTransformId, RunnerApi.PTransform pTransform, Supplier<String> processBundleInstructionId, Supplier<List<BeamFnApi.ProcessBundleRequest.CacheToken>> cacheTokens, Supplier<Cache<?, ?>> bundleCache, Cache<?, ?> processWideCache, Map<String, RunnerApi.PCollection> pCollections, Map<String, RunnerApi.Coder> coders, Map<String, RunnerApi.WindowingStrategy> windowingStrategies, Consumer<ThrowingRunnable> addStartFunction, Consumer<ThrowingRunnable> addFinishFunction, Consumer<ThrowingRunnable> addTearDownFunction, Function<String, FnDataReceiver<WindowedValue<?>>> getPCollectionConsumer, TriFunction<String, FnDataReceiver<WindowedValue<?>>, Coder<?>> addPCollectionConsumer, Consumer<PTransformRunnerFactory.ProgressRequestCallback> addProgressRequestCallback, BundleSplitListener splitListener, DoFn.BundleFinalizer bundleFinalizer) {
        boolean bl;
        void var26_70;
        void var24_52;
        void var26_63;
        void var24_36;
        Object mainInput;
        this.pipelineOptions = pipelineOptions;
        this.beamFnTimerClient = beamFnTimerClient;
        this.pTransformId = pTransformId;
        this.pTransform = pTransform;
        this.processBundleInstructionId = processBundleInstructionId;
        ImmutableMap.Builder tagToSideInputSpecMapBuilder = ImmutableMap.builder();
        try {
            this.rehydratedComponents = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(coders).putAllPcollections(pCollections).putAllWindowingStrategies(windowingStrategies).build()).withPipeline(Pipeline.create());
            this.parDoPayload = RunnerApi.ParDoPayload.parseFrom(pTransform.getSpec().getPayload());
            this.doFn = ParDoTranslation.getDoFn(this.parDoPayload);
            this.doFnSignature = DoFnSignatures.signatureForDoFn(this.doFn);
            switch (pTransform.getSpec().getUrn()) {
                case "beam:transform:sdf_process_sized_element_and_restrictions:v1": 
                case "beam:transform:pardo:v1": {
                    this.mainOutputTag = ParDoTranslation.getMainOutputTag(this.parDoPayload);
                    break;
                }
                case "beam:transform:sdf_pair_with_restriction:v1": 
                case "beam:transform:sdf_split_and_size_restrictions:v1": 
                case "beam:transform:sdf_truncate_sized_restrictions:v1": {
                    this.mainOutputTag = new TupleTag(Iterables.getOnlyElement(pTransform.getOutputsMap().keySet()));
                    break;
                }
                default: {
                    throw new IllegalStateException(String.format("Unknown urn: %s", pTransform.getSpec().getUrn()));
                }
            }
            String mainInputTag = Iterables.getOnlyElement(Sets.difference(pTransform.getInputsMap().keySet(), this.parDoPayload.getSideInputsMap().keySet()));
            mainInput = pCollections.get(pTransform.getInputsOrThrow(mainInputTag));
            Coder<?> coder = this.rehydratedComponents.getCoder(((RunnerApi.PCollection)mainInput).getCoderId());
            this.inputCoder = coder instanceof WindowedValue.WindowedValueCoder ? ((WindowedValue.WindowedValueCoder)coder).getValueCoder() : coder;
            this.keyCoder = this.inputCoder instanceof KvCoder ? ((KvCoder)this.inputCoder).getKeyCoder() : null;
            this.windowingStrategy = this.rehydratedComponents.getWindowingStrategy(((RunnerApi.PCollection)mainInput).getWindowingStrategyId());
            this.windowCoder = this.windowingStrategy.getWindowFn().windowCoder();
            this.outputCoders = Maps.newHashMap();
            for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
                TupleTag tupleTag = new TupleTag(entry.getKey());
                RunnerApi.PCollection outputPCollection = pCollections.get(entry.getValue());
                Coder<Object> outputCoder = this.rehydratedComponents.getCoder(outputPCollection.getCoderId());
                if (outputCoder instanceof WindowedValue.WindowedValueCoder) {
                    outputCoder = ((WindowedValue.WindowedValueCoder)outputCoder).getValueCoder();
                }
                this.outputCoders.put(tupleTag, outputCoder);
            }
            Coder<?> outputCoder = this.outputCoders.get(this.mainOutputTag);
            this.mainOutputSchemaCoder = outputCoder instanceof SchemaCoder ? (SchemaCoder)outputCoder : null;
            for (Map.Entry<String, RunnerApi.SideInput> entry : this.parDoPayload.getSideInputsMap().entrySet()) {
                String sideInputTag = entry.getKey();
                RunnerApi.SideInput sideInput = entry.getValue();
                RunnerApi.PCollection sideInputPCollection = pCollections.get(pTransform.getInputsOrThrow(sideInputTag));
                WindowingStrategy<?, ?> sideInputWindowingStrategy = this.rehydratedComponents.getWindowingStrategy(sideInputPCollection.getWindowingStrategyId());
                tagToSideInputSpecMapBuilder.put(new TupleTag(entry.getKey()), SideInputSpec.create(sideInput.getAccessPattern().getUrn(), this.rehydratedComponents.getCoder(sideInputPCollection.getCoderId()), sideInputWindowingStrategy.getWindowFn().windowCoder(), PCollectionViewTranslation.viewFnFromProto(entry.getValue().getViewFn()), PCollectionViewTranslation.windowMappingFnFromProto(entry.getValue().getWindowMappingFn())));
            }
            ImmutableMap.Builder<String, KV<TimeDomain, Coder<?>>> builder = ImmutableMap.builder();
            for (Map.Entry<String, RunnerApi.TimerFamilySpec> entry : this.parDoPayload.getTimerFamilySpecsMap().entrySet()) {
                String timerIdOrTimerFamilyId = entry.getKey();
                TimeDomain timeDomain = this.translateTimeDomain(entry.getValue().getTimeDomain());
                Coder<?> timerCoder = this.rehydratedComponents.getCoder(entry.getValue().getTimerFamilyCoderId());
                builder.put(timerIdOrTimerFamilyId, KV.of(timeDomain, timerCoder));
            }
            this.timerFamilyInfos = builder.build();
        }
        catch (IOException exn) {
            throw new IllegalArgumentException("Malformed ParDoPayload", exn);
        }
        ImmutableListMultimap.Builder localNameToConsumerBuilder = ImmutableListMultimap.builder();
        for (Map.Entry entry : pTransform.getOutputsMap().entrySet()) {
            localNameToConsumerBuilder.putAll((Object)((String)entry.getKey()), new FnDataReceiver[]{getPCollectionConsumer.apply((String)entry.getValue())});
        }
        this.localNameToConsumer = localNameToConsumerBuilder.build();
        this.tagToSideInputSpecMap = tagToSideInputSpecMapBuilder.build();
        this.splitListener = splitListener;
        this.bundleFinalizer = bundleFinalizer;
        this.onTimerContext = new OnTimerContext();
        this.onWindowExpirationContext = new OnWindowExpirationContext();
        try {
            this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.mainOutputConsumers = this.localNameToConsumer.get((Object)this.mainOutputTag.getId());
        this.doFnSchemaInformation = ParDoTranslation.getSchemaInformation(this.parDoPayload);
        this.sideInputMapping = ParDoTranslation.getSideInputMapping(this.parDoPayload);
        this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(this.doFn, pipelineOptions);
        this.startBundleArgumentProvider = new StartBundleArgumentProvider();
        String e = pTransform.getSpec().getUrn();
        int n = -1;
        switch (e.hashCode()) {
            case -1912952382: {
                if (!e.equals("beam:transform:pardo:v1")) break;
                boolean bl2 = false;
                break;
            }
            case 1881026324: {
                if (!e.equals("beam:transform:sdf_process_sized_element_and_restrictions:v1")) break;
                boolean bl3 = true;
                break;
            }
            case -1759350560: {
                if (!e.equals("beam:transform:sdf_pair_with_restriction:v1")) break;
                int n2 = 2;
                break;
            }
            case 400474464: {
                if (!e.equals("beam:transform:sdf_split_and_size_restrictions:v1")) break;
                int n3 = 3;
                break;
            }
            case -1994500900: {
                if (!e.equals("beam:transform:sdf_truncate_sized_restrictions:v1")) break;
                int n4 = 4;
            }
        }
        switch (var24_36) {
            case 0: 
            case 1: {
                addStartFunction.accept(this::startBundle);
                break;
            }
        }
        try {
            mainInput = ParDoTranslation.getMainInputName(pTransform);
        }
        catch (IOException iOException) {
            throw new RuntimeException(iOException);
        }
        Object object = pTransform.getSpec().getUrn();
        int n5 = -1;
        switch (((String)object).hashCode()) {
            case -1912952382: {
                if (!((String)object).equals("beam:transform:pardo:v1")) break;
                boolean bl4 = false;
                break;
            }
            case -1759350560: {
                if (!((String)object).equals("beam:transform:sdf_pair_with_restriction:v1")) break;
                boolean bl5 = true;
                break;
            }
            case 400474464: {
                if (!((String)object).equals("beam:transform:sdf_split_and_size_restrictions:v1")) break;
                int n6 = 2;
                break;
            }
            case -1994500900: {
                if (!((String)object).equals("beam:transform:sdf_truncate_sized_restrictions:v1")) break;
                int n7 = 3;
                break;
            }
            case 1881026324: {
                if (!((String)object).equals("beam:transform:sdf_process_sized_element_and_restrictions:v1")) break;
                int n8 = 4;
            }
        }
        switch (var26_63) {
            case 0: {
                if (this.doFnSignature.processElement().observesWindow() || !this.sideInputMapping.isEmpty()) {
                    FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForWindowObservingParDo;
                    this.processContext = new WindowObservingProcessBundleContext();
                    break;
                }
                FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForParDo;
                this.processContext = new NonWindowObservingProcessBundleContext();
                break;
            }
            case 1: {
                if (this.doFnSignature.getInitialRestriction().observesWindow() || this.doFnSignature.getInitialWatermarkEstimatorState() != null && this.doFnSignature.getInitialWatermarkEstimatorState().observesWindow() || !this.sideInputMapping.isEmpty()) {
                    FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForWindowObservingPairWithRestriction;
                    this.processContext = new WindowObservingProcessBundleContext();
                    break;
                }
                FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForPairWithRestriction;
                this.processContext = new NonWindowObservingProcessBundleContext();
                break;
            }
            case 2: {
                if (this.doFnSignature.splitRestriction() != null && this.doFnSignature.splitRestriction().observesWindow() || this.doFnSignature.newTracker() != null && this.doFnSignature.newTracker().observesWindow() || this.doFnSignature.getSize() != null && this.doFnSignature.getSize().observesWindow() || !this.sideInputMapping.isEmpty()) {
                    FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForWindowObservingSplitRestriction;
                    this.processContext = new SizedRestrictionWindowObservingProcessBundleContext("beam:transform:sdf_split_and_size_restrictions:v1");
                    break;
                }
                FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForSplitRestriction;
                this.processContext = new SizedRestrictionNonWindowObservingProcessBundleContext("beam:transform:sdf_split_and_size_restrictions:v1");
                break;
            }
            case 3: {
                if (this.doFnSignature.truncateRestriction() != null && this.doFnSignature.truncateRestriction().observesWindow() || this.doFnSignature.newTracker() != null && this.doFnSignature.newTracker().observesWindow() || this.doFnSignature.getSize() != null && this.doFnSignature.getSize().observesWindow() || !this.sideInputMapping.isEmpty()) {
                    if (this.mainOutputConsumers.size() == 1 && Iterables.getOnlyElement(this.mainOutputConsumers) instanceof HandlesSplits) {
                        SplittableFnDataReceiver splittableFnDataReceiver = new SplittableFnDataReceiver(){
                            private final HandlesSplits splitDelegate;
                            {
                                this.splitDelegate = (HandlesSplits)Iterables.getOnlyElement(FnApiDoFnRunner.this.mainOutputConsumers);
                            }

                            @Override
                            public void accept(WindowedValue input) throws Exception {
                                FnApiDoFnRunner.this.processElementForWindowObservingTruncateRestriction(input);
                            }

                            @Override
                            public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
                                return FnApiDoFnRunner.this.trySplitForWindowObservingTruncateRestriction(fractionOfRemainder, this.splitDelegate);
                            }

                            @Override
                            public double getProgress() {
                                double totalWork;
                                RestrictionTracker.Progress progress = FnApiDoFnRunner.this.getProgressFromWindowObservingTruncate(this.splitDelegate.getProgress());
                                if (progress != null && (totalWork = progress.getWorkCompleted() + progress.getWorkRemaining()) > 0.0) {
                                    return progress.getWorkCompleted() / totalWork;
                                }
                                return 0.0;
                            }
                        };
                    } else {
                        FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForWindowObservingTruncateRestriction;
                    }
                    this.processContext = new SizedRestrictionWindowObservingProcessBundleContext("beam:transform:sdf_truncate_sized_restrictions:v1");
                    break;
                }
                if (this.mainOutputConsumers.size() == 1 && Iterables.getOnlyElement(this.mainOutputConsumers) instanceof HandlesSplits) {
                    SplittableFnDataReceiver splittableFnDataReceiver = new SplittableFnDataReceiver(){
                        private final HandlesSplits splitDelegate;
                        {
                            this.splitDelegate = (HandlesSplits)Iterables.getOnlyElement(FnApiDoFnRunner.this.mainOutputConsumers);
                        }

                        @Override
                        public void accept(WindowedValue input) throws Exception {
                            FnApiDoFnRunner.this.processElementForTruncateRestriction(input);
                        }

                        @Override
                        public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
                            return this.splitDelegate.trySplit(fractionOfRemainder);
                        }

                        @Override
                        public double getProgress() {
                            return this.splitDelegate.getProgress();
                        }
                    };
                } else {
                    FnDataReceiver<WindowedValue> fnDataReceiver = this::processElementForTruncateRestriction;
                }
                this.processContext = new SizedRestrictionNonWindowObservingProcessBundleContext("beam:transform:sdf_truncate_sized_restrictions:v1");
                break;
            }
            case 4: {
                if (this.doFnSignature.processElement().observesWindow() || this.doFnSignature.newTracker() != null && this.doFnSignature.newTracker().observesWindow() || this.doFnSignature.getSize() != null && this.doFnSignature.getSize().observesWindow() || this.doFnSignature.newWatermarkEstimator() != null && this.doFnSignature.newWatermarkEstimator().observesWindow() || !this.sideInputMapping.isEmpty()) {
                    SplittableFnDataReceiver splittableFnDataReceiver = new SplittableFnDataReceiver(){

                        @Override
                        public void accept(WindowedValue input) throws Exception {
                            FnApiDoFnRunner.this.processElementForWindowObservingSizedElementAndRestriction(input);
                        }
                    };
                    this.processContext = new WindowObservingProcessBundleContext();
                    break;
                }
                SplittableFnDataReceiver splittableFnDataReceiver = new SplittableFnDataReceiver(){

                    @Override
                    public void accept(WindowedValue input) throws Exception {
                        FnApiDoFnRunner.this.processElementForWindowObservingSizedElementAndRestriction(input);
                    }
                };
                this.processContext = new WindowObservingProcessBundleContext();
                break;
            }
            default: {
                throw new IllegalStateException("Unknown urn: " + pTransform.getSpec().getUrn());
            }
        }
        addPCollectionConsumer.accept(pTransform.getInputsOrThrow((String)mainInput), (FnDataReceiver<WindowedValue<?>>)var24_52, this.inputCoder);
        this.finishBundleArgumentProvider = new FinishBundleArgumentProvider();
        object = pTransform.getSpec().getUrn();
        int n9 = -1;
        switch (((String)object).hashCode()) {
            case -1912952382: {
                if (!((String)object).equals("beam:transform:pardo:v1")) break;
                boolean bl6 = false;
                break;
            }
            case 1881026324: {
                if (!((String)object).equals("beam:transform:sdf_process_sized_element_and_restrictions:v1")) break;
                boolean bl7 = true;
                break;
            }
            case -1759350560: {
                if (!((String)object).equals("beam:transform:sdf_pair_with_restriction:v1")) break;
                int n10 = 2;
                break;
            }
            case 400474464: {
                if (!((String)object).equals("beam:transform:sdf_split_and_size_restrictions:v1")) break;
                int n11 = 3;
                break;
            }
            case -1994500900: {
                if (!((String)object).equals("beam:transform:sdf_truncate_sized_restrictions:v1")) break;
                int n12 = 4;
            }
        }
        switch (var26_70) {
            case 0: 
            case 1: {
                addFinishFunction.accept(this::finishBundle);
                break;
            }
        }
        addTearDownFunction.accept(this::tearDown);
        object = pTransform.getSpec().getUrn();
        int n13 = -1;
        switch (((String)object).hashCode()) {
            case 1881026324: {
                if (!((String)object).equals("beam:transform:sdf_process_sized_element_and_restrictions:v1")) break;
                bl = false;
            }
        }
        switch (bl) {
            case 0: {
                addProgressRequestCallback.accept(new PTransformRunnerFactory.ProgressRequestCallback(){

                    @Override
                    public List<MetricsApi.MonitoringInfo> getMonitoringInfos() throws Exception {
                        RestrictionTracker.Progress progress = FnApiDoFnRunner.this.getProgress();
                        if (progress == null) {
                            return Collections.emptyList();
                        }
                        MetricsApi.MonitoringInfo.Builder completedBuilder = MetricsApi.MonitoringInfo.newBuilder();
                        completedBuilder.setUrn(MonitoringInfoConstants.Urns.WORK_COMPLETED);
                        completedBuilder.setType("beam:metrics:progress:v1");
                        completedBuilder.putLabels("PTRANSFORM", pTransformId);
                        completedBuilder.setPayload(this.encodeProgress(progress.getWorkCompleted()));
                        MetricsApi.MonitoringInfo.Builder remainingBuilder = MetricsApi.MonitoringInfo.newBuilder();
                        remainingBuilder.setUrn(MonitoringInfoConstants.Urns.WORK_REMAINING);
                        remainingBuilder.setType("beam:metrics:progress:v1");
                        remainingBuilder.putLabels("PTRANSFORM", pTransformId);
                        remainingBuilder.setPayload(this.encodeProgress(progress.getWorkRemaining()));
                        return ImmutableList.of(completedBuilder.build(), remainingBuilder.build());
                    }

                    private ByteString encodeProgress(double value) throws IOException {
                        ByteString.Output output = ByteString.newOutput();
                        IterableCoder.of(DoubleCoder.of()).encode(Arrays.asList(value), (OutputStream)output);
                        return output.toByteString();
                    }
                });
                break;
            }
        }
        this.stateAccessor = new FnApiStateAccessor<Object>(pipelineOptions, pTransformId, processBundleInstructionId, cacheTokens, bundleCache, processWideCache, this.tagToSideInputSpecMap, beamFnStateClient, this.keyCoder, this.windowCoder, this::getCurrentKey, () -> this.currentWindow);
    }

    private Object getCurrentKey() {
        if (this.currentKey != null) {
            return this.currentKey;
        }
        if (this.currentElement != null) {
            Preconditions.checkState(this.currentElement.getValue() instanceof KV, "Accessing state in unkeyed context. Current element is not a KV: %s.", this.currentElement.getValue());
            return ((KV)this.currentElement.getValue()).getKey();
        }
        if (this.currentTimer != null) {
            return this.currentTimer.getUserKey();
        }
        return null;
    }

    private void startBundle() {
        this.outboundTimerReceivers = new HashMap<String, CloseableFnDataReceiver<Timer<Object>>>();
        this.timerBundleTracker = new FnApiTimerBundleTracker<Object>(this.keyCoder, this.windowCoder, this::getCurrentKey, () -> this.currentWindow);
        for (Map.Entry<String, KV<TimeDomain, Coder<Timer<Object>>>> timerFamilyInfo : this.timerFamilyInfos.entrySet()) {
            String localName = timerFamilyInfo.getKey();
            Coder timerCoder = timerFamilyInfo.getValue().getValue();
            this.outboundTimerReceivers.put(localName, this.beamFnTimerClient.register(LogicalEndpoint.timer(this.processBundleInstructionId.get(), this.pTransformId, localName), timerCoder));
        }
        this.doFnInvoker.invokeStartBundle(this.startBundleArgumentProvider);
    }

    private void processElementForParDo(WindowedValue<InputT> elem) {
        this.currentElement = elem;
        try {
            this.doFnInvoker.invokeProcessElement(this.processContext);
        }
        finally {
            this.currentElement = null;
        }
    }

    private void processElementForWindowObservingParDo(WindowedValue<InputT> elem) {
        this.currentElement = elem;
        try {
            for (BoundedWindow this.currentWindow : elem.getWindows()) {
                this.doFnInvoker.invokeProcessElement(this.processContext);
            }
        }
        finally {
            this.currentElement = null;
            this.currentWindow = null;
        }
    }

    private void processElementForPairWithRestriction(WindowedValue<InputT> elem) {
        this.currentElement = elem;
        try {
            this.currentRestriction = this.doFnInvoker.invokeGetInitialRestriction(this.processContext);
            this.outputTo(this.mainOutputConsumers, elem.withValue(KV.of(elem.getValue(), KV.of(this.currentRestriction, this.doFnInvoker.invokeGetInitialWatermarkEstimatorState(this.processContext)))));
        }
        finally {
            this.currentElement = null;
            this.currentRestriction = null;
        }
        this.stateAccessor.finalizeState();
    }

    private void processElementForWindowObservingPairWithRestriction(WindowedValue<InputT> elem) {
        this.currentElement = elem;
        try {
            for (BoundedWindow this.currentWindow : elem.getWindows()) {
                this.currentRestriction = this.doFnInvoker.invokeGetInitialRestriction(this.processContext);
                this.outputTo(this.mainOutputConsumers, WindowedValue.of(KV.of(elem.getValue(), KV.of(this.currentRestriction, this.doFnInvoker.invokeGetInitialWatermarkEstimatorState(this.processContext))), this.currentElement.getTimestamp(), this.currentWindow, this.currentElement.getPane()));
            }
        }
        finally {
            this.currentElement = null;
            this.currentWindow = null;
            this.currentRestriction = null;
        }
        this.stateAccessor.finalizeState();
    }

    private void processElementForSplitRestriction(WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
        this.currentElement = elem.withValue(elem.getValue().getKey());
        this.currentRestriction = elem.getValue().getValue().getKey();
        this.currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
        this.currentTracker = RestrictionTrackers.observe(this.doFnInvoker.invokeNewTracker(this.processContext), new RestrictionTrackers.ClaimObserver<PositionT>(){

            @Override
            public void onClaimed(PositionT position) {
            }

            @Override
            public void onClaimFailed(PositionT position) {
            }
        });
        try {
            this.doFnInvoker.invokeSplitRestriction(this.processContext);
        }
        finally {
            this.currentElement = null;
            this.currentRestriction = null;
            this.currentWatermarkEstimatorState = null;
            this.currentTracker = null;
        }
        this.stateAccessor.finalizeState();
    }

    private void processElementForWindowObservingSplitRestriction(WindowedValue<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>> elem) {
        this.currentElement = elem.withValue(elem.getValue().getKey());
        this.currentRestriction = elem.getValue().getValue().getKey();
        this.currentWatermarkEstimatorState = elem.getValue().getValue().getValue();
        try {
            for (BoundedWindow this.currentWindow : elem.getWindows()) {
                this.currentTracker = RestrictionTrackers.observe(this.doFnInvoker.invokeNewTracker(this.processContext), new RestrictionTrackers.ClaimObserver<PositionT>(){

                    @Override
                    public void onClaimed(PositionT position) {
                    }

                    @Override
                    public void onClaimFailed(PositionT position) {
                    }
                });
                this.doFnInvoker.invokeSplitRestriction(this.processContext);
            }
        }
        finally {
            this.currentElement = null;
            this.currentRestriction = null;
            this.currentWatermarkEstimatorState = null;
            this.currentWindow = null;
            this.currentTracker = null;
        }
        this.stateAccessor.finalizeState();
    }

    private void processElementForTruncateRestriction(WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
        this.currentElement = elem.withValue(elem.getValue().getKey().getKey());
        this.currentRestriction = elem.getValue().getKey().getValue().getKey();
        this.currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
        this.currentTracker = RestrictionTrackers.observe(this.doFnInvoker.invokeNewTracker(this.processContext), new RestrictionTrackers.ClaimObserver<PositionT>(){

            @Override
            public void onClaimed(PositionT position) {
            }

            @Override
            public void onClaimFailed(PositionT position) {
            }
        });
        try {
            RestrictionTracker.TruncateResult truncatedRestriction = this.doFnInvoker.invokeTruncateRestriction(this.processContext);
            if (truncatedRestriction != null) {
                this.processContext.output(truncatedRestriction.getTruncatedRestriction());
            }
        }
        finally {
            this.currentTracker = null;
            this.currentElement = null;
            this.currentRestriction = null;
            this.currentWatermarkEstimatorState = null;
        }
        this.stateAccessor.finalizeState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processElementForWindowObservingTruncateRestriction(WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
        this.currentElement = elem.withValue(elem.getValue().getKey().getKey());
        try {
            this.windowCurrentIndex = -1;
            this.windowStopIndex = this.currentElement.getWindows().size();
            this.currentWindows = ImmutableList.copyOf(this.currentElement.getWindows());
            while (true) {
                Object object = this.splitLock;
                synchronized (object) {
                    ++this.windowCurrentIndex;
                    if (this.windowCurrentIndex >= this.windowStopIndex) {
                        break;
                    }
                    this.currentRestriction = elem.getValue().getKey().getValue().getKey();
                    this.currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
                    this.currentWindow = this.currentWindows.get(this.windowCurrentIndex);
                    this.currentTracker = RestrictionTrackers.observe(this.doFnInvoker.invokeNewTracker(this.processContext), new RestrictionTrackers.ClaimObserver<PositionT>(){

                        @Override
                        public void onClaimed(PositionT position) {
                        }

                        @Override
                        public void onClaimFailed(PositionT position) {
                        }
                    });
                    this.currentWatermarkEstimator = WatermarkEstimators.threadSafe(this.doFnInvoker.invokeNewWatermarkEstimator(this.processContext));
                    this.initialWatermark = this.currentWatermarkEstimator.getWatermarkAndState().getKey();
                }
                RestrictionTracker.TruncateResult truncatedRestriction = this.doFnInvoker.invokeTruncateRestriction(this.processContext);
                if (truncatedRestriction == null) continue;
                this.processContext.output(truncatedRestriction.getTruncatedRestriction());
            }
        }
        finally {
            this.currentTracker = null;
            this.currentElement = null;
            this.currentRestriction = null;
            this.currentWatermarkEstimatorState = null;
            this.currentWatermarkEstimator = null;
            this.currentWindow = null;
            this.currentWindows = null;
            this.initialWatermark = null;
        }
        this.stateAccessor.finalizeState();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processElementForWindowObservingSizedElementAndRestriction(WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
        this.currentElement = elem.withValue(elem.getValue().getKey().getKey());
        try {
            this.windowCurrentIndex = -1;
            this.windowStopIndex = this.currentElement.getWindows().size();
            this.currentWindows = ImmutableList.copyOf(this.currentElement.getWindows());
            while (true) {
                Object object = this.splitLock;
                synchronized (object) {
                    block17: {
                        ++this.windowCurrentIndex;
                        if (this.windowCurrentIndex < this.windowStopIndex) break block17;
                        return;
                    }
                    this.currentRestriction = elem.getValue().getKey().getValue().getKey();
                    this.currentWatermarkEstimatorState = elem.getValue().getKey().getValue().getValue();
                    this.currentWindow = this.currentWindows.get(this.windowCurrentIndex);
                    this.currentTracker = RestrictionTrackers.observe(this.doFnInvoker.invokeNewTracker(this.processContext), new RestrictionTrackers.ClaimObserver<PositionT>(){

                        @Override
                        public void onClaimed(PositionT position) {
                        }

                        @Override
                        public void onClaimFailed(PositionT position) {
                        }
                    });
                    this.currentWatermarkEstimator = WatermarkEstimators.threadSafe(this.doFnInvoker.invokeNewWatermarkEstimator(this.processContext));
                    this.initialWatermark = this.currentWatermarkEstimator.getWatermarkAndState().getKey();
                }
                DoFn.ProcessContinuation continuation = this.doFnInvoker.invokeProcessElement(this.processContext);
                if (!continuation.shouldResume()) {
                    this.currentTracker.checkDone();
                    continue;
                }
                HandlesSplits.SplitResult splitResult = this.trySplitForElementAndRestriction(0.0, continuation.resumeDelay());
                if (splitResult == null) {
                    this.currentTracker.checkDone();
                    continue;
                }
                this.splitListener.split(splitResult.getPrimaryRoots(), splitResult.getResidualRoots());
            }
        }
        finally {
            Object object = this.splitLock;
            synchronized (object) {
                this.currentElement = null;
                this.currentRestriction = null;
                this.currentWatermarkEstimatorState = null;
                this.currentWindow = null;
                this.currentTracker = null;
                this.currentWatermarkEstimator = null;
                this.currentWindows = null;
                this.initialWatermark = null;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RestrictionTracker.Progress getProgress() {
        Object object = this.splitLock;
        synchronized (object) {
            if (this.currentTracker instanceof RestrictionTracker.HasProgress) {
                return FnApiDoFnRunner.scaleProgress(((RestrictionTracker.HasProgress)((Object)this.currentTracker)).getProgress(), this.windowCurrentIndex, this.windowStopIndex);
            }
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private RestrictionTracker.Progress getProgressFromWindowObservingTruncate(double elementCompleted) {
        Object object = this.splitLock;
        synchronized (object) {
            if (this.currentWindow != null) {
                return FnApiDoFnRunner.scaleProgress(RestrictionTracker.Progress.from(elementCompleted, 1.0 - elementCompleted), this.windowCurrentIndex, this.windowStopIndex);
            }
        }
        return null;
    }

    @VisibleForTesting
    static RestrictionTracker.Progress scaleProgress(RestrictionTracker.Progress progress, int currentWindowIndex, int stopWindowIndex) {
        double totalWorkPerWindow = progress.getWorkCompleted() + progress.getWorkRemaining();
        double completed = totalWorkPerWindow * (double)currentWindowIndex + progress.getWorkCompleted();
        double remaining = totalWorkPerWindow * (double)(stopWindowIndex - currentWindowIndex - 1) + progress.getWorkRemaining();
        return RestrictionTracker.Progress.from(completed, remaining);
    }

    private WindowedSplitResult calculateRestrictionSize(final WindowedSplitResult splitResult, String errorContext) {
        double fullSize = splitResult.getResidualInUnprocessedWindowsRoot() == null && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null ? 0.0 : this.doFnInvoker.invokeGetSize(new DoFnInvoker.DelegatingArgumentProvider<InputT, OutputT>(this.processContext, errorContext){

            @Override
            public Object restriction() {
                return FnApiDoFnRunner.this.currentRestriction;
            }

            @Override
            public RestrictionTracker<?, ?> restrictionTracker() {
                return FnApiDoFnRunner.this.doFnInvoker.invokeNewTracker(this);
            }
        });
        double primarySize = splitResult.getPrimarySplitRoot() == null ? 0.0 : this.doFnInvoker.invokeGetSize(new DoFnInvoker.DelegatingArgumentProvider<InputT, OutputT>(this.processContext, errorContext){

            @Override
            public Object restriction() {
                return ((KV)((KV)splitResult.getPrimarySplitRoot().getValue()).getValue()).getKey();
            }

            @Override
            public RestrictionTracker<?, ?> restrictionTracker() {
                return FnApiDoFnRunner.this.doFnInvoker.invokeNewTracker(this);
            }
        });
        double residualSize = splitResult.getResidualSplitRoot() == null ? 0.0 : this.doFnInvoker.invokeGetSize(new DoFnInvoker.DelegatingArgumentProvider<InputT, OutputT>(this.processContext, errorContext){

            @Override
            public Object restriction() {
                return ((KV)((KV)splitResult.getResidualSplitRoot().getValue()).getValue()).getKey();
            }

            @Override
            public RestrictionTracker<?, ?> restrictionTracker() {
                return FnApiDoFnRunner.this.doFnInvoker.invokeNewTracker(this);
            }
        });
        return WindowedSplitResult.forRoots(splitResult.getPrimaryInFullyProcessedWindowsRoot() == null ? null : WindowedValue.of(KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize), splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(), splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(), splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()), splitResult.getPrimarySplitRoot() == null ? null : WindowedValue.of(KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize), splitResult.getPrimarySplitRoot().getTimestamp(), splitResult.getPrimarySplitRoot().getWindows(), splitResult.getPrimarySplitRoot().getPane()), splitResult.getResidualSplitRoot() == null ? null : WindowedValue.of(KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize), splitResult.getResidualSplitRoot().getTimestamp(), splitResult.getResidualSplitRoot().getWindows(), splitResult.getResidualSplitRoot().getPane()), splitResult.getResidualInUnprocessedWindowsRoot() == null ? null : WindowedValue.of(KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize), splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(), splitResult.getResidualInUnprocessedWindowsRoot().getWindows(), splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(double fractionOfRemainder, HandlesSplits splitDelegate) {
        WindowedSplitResult windowedSplitResult = null;
        HandlesSplits.SplitResult downstreamSplitResult = null;
        Object object = this.splitLock;
        synchronized (object) {
            if (this.currentWindow == null) {
                return null;
            }
            SplitResultsWithStopIndex splitResult = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.currentWindow, this.currentWindows, this.currentWatermarkEstimatorState, fractionOfRemainder, null, splitDelegate, null, this.windowCurrentIndex, this.windowStopIndex);
            if (splitResult == null) {
                return null;
            }
            this.windowStopIndex = splitResult.getNewWindowStopIndex();
            windowedSplitResult = this.calculateRestrictionSize(splitResult.getWindowSplit(), "beam:transform:sdf_truncate_sized_restrictions:v1/GetSize");
            downstreamSplitResult = splitResult.getDownstreamSplit();
        }
        WindowedValue.FullWindowedValueCoder<?> fullInputCoder = WindowedValue.getFullCoder(this.inputCoder, this.windowCoder);
        return FnApiDoFnRunner.constructSplitResult(windowedSplitResult, downstreamSplitResult, fullInputCoder, this.initialWatermark, null, this.pTransformId, this.mainInputId, this.pTransform.getOutputsMap().keySet(), null);
    }

    private static <WatermarkEstimatorStateT> WindowedSplitResult computeWindowSplitResult(WindowedValue currentElement, Object currentRestriction, BoundedWindow currentWindow, List<BoundedWindow> windows, WatermarkEstimatorStateT currentWatermarkEstimatorState, int toIndex, int fromIndex, int stopWindowIndex, SplitResult<?> splitResult, KV<Instant, WatermarkEstimatorStateT> watermarkAndState) {
        List<BoundedWindow> primaryFullyProcessedWindows = windows.subList(0, toIndex);
        List<BoundedWindow> residualUnprocessedWindows = windows.subList(fromIndex, stopWindowIndex);
        WindowedSplitResult windowedSplitResult = WindowedSplitResult.forRoots(primaryFullyProcessedWindows.isEmpty() ? null : WindowedValue.of(KV.of(currentElement.getValue(), KV.of(currentRestriction, currentWatermarkEstimatorState)), currentElement.getTimestamp(), primaryFullyProcessedWindows, currentElement.getPane()), splitResult == null ? null : WindowedValue.of(KV.of(currentElement.getValue(), KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)), currentElement.getTimestamp(), currentWindow, currentElement.getPane()), splitResult == null ? null : WindowedValue.of(KV.of(currentElement.getValue(), KV.of(splitResult.getResidual(), watermarkAndState.getValue())), currentElement.getTimestamp(), currentWindow, currentElement.getPane()), residualUnprocessedWindows.isEmpty() ? null : WindowedValue.of(KV.of(currentElement.getValue(), KV.of(currentRestriction, currentWatermarkEstimatorState)), currentElement.getTimestamp(), residualUnprocessedWindows, currentElement.getPane()));
        return windowedSplitResult;
    }

    @VisibleForTesting
    static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcessOrTruncate(WindowedValue currentElement, Object currentRestriction, BoundedWindow currentWindow, List<BoundedWindow> windows, WatermarkEstimatorStateT currentWatermarkEstimatorState, double fractionOfRemainder, RestrictionTracker currentTracker, HandlesSplits splitDelegate, KV<Instant, WatermarkEstimatorStateT> watermarkAndState, int currentWindowIndex, int stopWindowIndex) {
        Preconditions.checkArgument(currentTracker != null ^ splitDelegate != null);
        if (currentTracker != null) {
            Preconditions.checkNotNull(watermarkAndState);
        }
        WindowedSplitResult windowedSplitResult = null;
        HandlesSplits.SplitResult downstreamSplitResult = null;
        int newWindowStopIndex = stopWindowIndex;
        if (currentWindowIndex != stopWindowIndex - 1) {
            RestrictionTracker.Progress elementProgress;
            if (currentTracker != null) {
                elementProgress = currentTracker instanceof RestrictionTracker.HasProgress ? ((RestrictionTracker.HasProgress)((Object)currentTracker)).getProgress() : RestrictionTracker.Progress.from(0.0, 1.0);
            } else {
                double elementCompleted = splitDelegate.getProgress();
                elementProgress = RestrictionTracker.Progress.from(elementCompleted, 1.0 - elementCompleted);
            }
            RestrictionTracker.Progress scaledProgress = FnApiDoFnRunner.scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
            double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
            if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
                newWindowStopIndex = (int)Math.min((long)(stopWindowIndex - 1), (long)currentWindowIndex + Math.max(1L, Math.round((elementProgress.getWorkCompleted() + scaledFractionOfRemainder) / (elementProgress.getWorkCompleted() + elementProgress.getWorkRemaining()))));
                windowedSplitResult = FnApiDoFnRunner.computeWindowSplitResult(currentElement, currentRestriction, currentWindow, windows, currentWatermarkEstimatorState, newWindowStopIndex, newWindowStopIndex, stopWindowIndex, null, watermarkAndState);
            } else {
                SplitResult elementSplit = null;
                if (currentTracker != null) {
                    elementSplit = currentTracker.trySplit(scaledFractionOfRemainder / elementProgress.getWorkRemaining());
                } else {
                    downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
                }
                newWindowStopIndex = currentWindowIndex + 1;
                int toIndex = elementSplit == null && downstreamSplitResult == null ? newWindowStopIndex : currentWindowIndex;
                windowedSplitResult = FnApiDoFnRunner.computeWindowSplitResult(currentElement, currentRestriction, currentWindow, windows, currentWatermarkEstimatorState, toIndex, newWindowStopIndex, stopWindowIndex, elementSplit, watermarkAndState);
            }
        } else {
            SplitResult elementSplitResult = null;
            newWindowStopIndex = stopWindowIndex;
            if (currentTracker != null) {
                elementSplitResult = currentTracker.trySplit(fractionOfRemainder);
            } else {
                downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
            }
            if (elementSplitResult == null && downstreamSplitResult == null) {
                return null;
            }
            windowedSplitResult = FnApiDoFnRunner.computeWindowSplitResult(currentElement, currentRestriction, currentWindow, windows, currentWatermarkEstimatorState, currentWindowIndex, stopWindowIndex, stopWindowIndex, elementSplitResult, watermarkAndState);
        }
        return SplitResultsWithStopIndex.of(windowedSplitResult, downstreamSplitResult, newWindowStopIndex);
    }

    @VisibleForTesting
    static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult(WindowedSplitResult windowedSplitResult, HandlesSplits.SplitResult downstreamElementSplit, Coder fullInputCoder, Instant initialWatermark, KV<Instant, WatermarkEstimatorStateT> watermarkAndState, String pTransformId, String mainInputId, Collection<String> outputIds, Duration resumeDelay) {
        Preconditions.checkArgument(windowedSplitResult == null || windowedSplitResult.getResidualSplitRoot() == null || downstreamElementSplit == null);
        ArrayList<BeamFnApi.BundleApplication> primaryRoots = new ArrayList<BeamFnApi.BundleApplication>();
        ArrayList<BeamFnApi.DelayedBundleApplication> residualRoots = new ArrayList<BeamFnApi.DelayedBundleApplication>();
        if (windowedSplitResult != null && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
            ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
            try {
                fullInputCoder.encode(windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(), primaryInOtherWindowsBytes);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            BeamFnApi.BundleApplication.Builder primaryApplicationInOtherWindows = BeamFnApi.BundleApplication.newBuilder().setTransformId(pTransformId).setInputId(mainInputId).setElement(primaryInOtherWindowsBytes.toByteString());
            primaryRoots.add(primaryApplicationInOtherWindows.build());
        }
        if (windowedSplitResult != null && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
            ByteString.Output bytesOut = ByteString.newOutput();
            try {
                fullInputCoder.encode(windowedSplitResult.getResidualInUnprocessedWindowsRoot(), bytesOut);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            BeamFnApi.BundleApplication.Builder residualInUnprocessedWindowsRoot = BeamFnApi.BundleApplication.newBuilder().setTransformId(pTransformId).setInputId(mainInputId).setElement(bytesOut.toByteString());
            HashMap<String, Timestamp> outputWatermarkMapForUnprocessedWindows = new HashMap<String, Timestamp>();
            if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
                Timestamp outputWatermark = Timestamp.newBuilder().setSeconds(initialWatermark.getMillis() / 1000L).setNanos((int)(initialWatermark.getMillis() % 1000L) * 1000000).build();
                for (String outputId : outputIds) {
                    outputWatermarkMapForUnprocessedWindows.put(outputId, outputWatermark);
                }
            }
            residualInUnprocessedWindowsRoot.putAllOutputWatermarks(outputWatermarkMapForUnprocessedWindows);
            residualRoots.add(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(residualInUnprocessedWindowsRoot).build());
        }
        ByteString.Output primaryBytes = ByteString.newOutput();
        ByteString.Output residualBytes = ByteString.newOutput();
        if (windowedSplitResult != null && windowedSplitResult.getResidualSplitRoot() != null) {
            Preconditions.checkNotNull(resumeDelay);
            try {
                fullInputCoder.encode(windowedSplitResult.getPrimarySplitRoot(), primaryBytes);
                fullInputCoder.encode(windowedSplitResult.getResidualSplitRoot(), residualBytes);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            primaryRoots.add(BeamFnApi.BundleApplication.newBuilder().setTransformId(pTransformId).setInputId(mainInputId).setElement(primaryBytes.toByteString()).build());
            BeamFnApi.BundleApplication.Builder residualApplication = BeamFnApi.BundleApplication.newBuilder().setTransformId(pTransformId).setInputId(mainInputId).setElement(residualBytes.toByteString());
            HashMap<String, Timestamp> outputWatermarkMap = new HashMap<String, Timestamp>();
            if (!watermarkAndState.getKey().equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
                Timestamp outputWatermark = Timestamp.newBuilder().setSeconds(watermarkAndState.getKey().getMillis() / 1000L).setNanos((int)(watermarkAndState.getKey().getMillis() % 1000L) * 1000000).build();
                for (String outputId : outputIds) {
                    outputWatermarkMap.put(outputId, outputWatermark);
                }
            }
            residualApplication.putAllOutputWatermarks(outputWatermarkMap);
            residualRoots.add(BeamFnApi.DelayedBundleApplication.newBuilder().setApplication(residualApplication).setRequestedTimeDelay(Durations.fromMillis(resumeDelay.getMillis())).build());
        } else if (downstreamElementSplit != null) {
            primaryRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getPrimaryRoots()));
            residualRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getResidualRoots()));
        }
        return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private HandlesSplits.SplitResult trySplitForElementAndRestriction(double fractionOfRemainder, Duration resumeDelay) {
        KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
        WindowedSplitResult windowedSplitResult = null;
        Object object = this.splitLock;
        synchronized (object) {
            if (this.currentTracker == null) {
                return null;
            }
            watermarkAndState = this.currentWatermarkEstimator.getWatermarkAndState();
            SplitResultsWithStopIndex splitResult = FnApiDoFnRunner.computeSplitForProcessOrTruncate(this.currentElement, this.currentRestriction, this.currentWindow, this.currentWindows, this.currentWatermarkEstimatorState, fractionOfRemainder, this.currentTracker, null, watermarkAndState, this.windowCurrentIndex, this.windowStopIndex);
            if (splitResult == null) {
                return null;
            }
            this.windowStopIndex = splitResult.getNewWindowStopIndex();
            windowedSplitResult = this.calculateRestrictionSize(splitResult.getWindowSplit(), "beam:transform:sdf_process_sized_element_and_restrictions:v1/GetSize");
        }
        WindowedValue.FullWindowedValueCoder<?> fullInputCoder = WindowedValue.getFullCoder(this.inputCoder, this.windowCoder);
        return FnApiDoFnRunner.constructSplitResult(windowedSplitResult, null, fullInputCoder, this.initialWatermark, watermarkAndState, this.pTransformId, this.mainInputId, this.pTransform.getOutputsMap().keySet(), resumeDelay);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <K> void processTimer(String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
        try {
            this.currentKey = timer.getUserKey();
            for (BoundedWindow this.currentWindow : timer.getWindows()) {
                FnApiTimerBundleTracker.Modifications bundleModifications = this.timerBundleTracker.getBundleModifications();
                Table modifiedTimerIds = bundleModifications.getModifiedTimerIds();
                NavigableSet<FnApiTimerBundleTracker.TimerInfo<K>> earlierTimers = bundleModifications.getModifiedTimersOrdered(timeDomain).headSet(FnApiTimerBundleTracker.TimerInfo.of(timer, "", timeDomain), true);
                while (!earlierTimers.isEmpty()) {
                    FnApiTimerBundleTracker.TimerInfo<K> insertedTimer = earlierTimers.pollFirst();
                    if (this.timerModified(modifiedTimerIds, insertedTimer.getTimerFamilyOrId(), insertedTimer.getTimer())) continue;
                    String timerId = insertedTimer.getTimer().getDynamicTimerTag().isEmpty() ? insertedTimer.getTimerFamilyOrId() : insertedTimer.getTimer().getDynamicTimerTag();
                    String timerFamily = insertedTimer.getTimer().getDynamicTimerTag().isEmpty() ? "" : insertedTimer.getTimerFamilyOrId();
                    modifiedTimerIds.put(insertedTimer.getTimerFamilyOrId(), insertedTimer.getTimer().getDynamicTimerTag(), Timer.cleared(insertedTimer.getTimer().getUserKey(), insertedTimer.getTimer().getDynamicTimerTag(), insertedTimer.getTimer().getWindows()));
                    this.processTimerDirect(timerFamily, timerId, insertedTimer.getTimeDomain(), insertedTimer.getTimer());
                }
                if (this.timerModified(modifiedTimerIds, timerIdOrTimerFamilyId, timer)) continue;
                String timerId = timerIdOrTimerFamilyId.startsWith("tfs-") ? "" : timerIdOrTimerFamilyId;
                String timerFamilyId = timerIdOrTimerFamilyId.startsWith("tfs-") ? timerIdOrTimerFamilyId : "";
                this.processTimerDirect(timerFamilyId, timerId, timeDomain, timer);
            }
        }
        finally {
            this.currentKey = null;
            this.currentTimer = null;
            this.currentTimeDomain = null;
            this.currentWindow = null;
        }
    }

    private <K> boolean timerModified(Table<String, String, Timer<K>> modifiedTimerIds, String timerFamilyOrId, Timer<K> timer) {
        @Nullable Timer<K> modifiedTimer = modifiedTimerIds.get(timerFamilyOrId, timer.getDynamicTimerTag());
        return modifiedTimer != null && !modifiedTimer.equals(timer);
    }

    private <K> void processTimerDirect(String timerFamilyId, String timerId, TimeDomain timeDomain, Timer<K> timer) {
        this.currentTimer = timer;
        this.currentTimeDomain = timeDomain;
        this.doFnInvoker.invokeOnTimer(timerId, timerFamilyId, this.onTimerContext);
    }

    private <K> void processOnWindowExpiration(Timer<K> timer) {
        try {
            this.currentKey = timer.getUserKey();
            this.currentTimer = timer;
            for (BoundedWindow this.currentWindow : timer.getWindows()) {
                this.doFnInvoker.invokeOnWindowExpiration(this.onWindowExpirationContext);
            }
        }
        finally {
            this.currentKey = null;
            this.currentTimer = null;
            this.currentWindow = null;
        }
    }

    private void finishBundle() throws Exception {
        this.timerBundleTracker.outputTimers(timerFamilyOrId -> this.outboundTimerReceivers.get(timerFamilyOrId));
        for (CloseableFnDataReceiver<Timer<Object>> outboundTimerReceiver : this.outboundTimerReceivers.values()) {
            outboundTimerReceiver.close();
        }
        this.doFnInvoker.invokeFinishBundle(this.finishBundleArgumentProvider);
        this.stateAccessor.finalizeState();
    }

    private void tearDown() {
        this.doFnInvoker.invokeTeardown();
    }

    private <T> void outputTo(Collection<FnDataReceiver<WindowedValue<T>>> consumers, WindowedValue<T> output) {
        if (this.currentWatermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
            ((TimestampObservingWatermarkEstimator)((Object)this.currentWatermarkEstimator)).observeTimestamp(output.getTimestamp());
        }
        try {
            for (FnDataReceiver<WindowedValue<WindowedValue<T>>> fnDataReceiver : consumers) {
                fnDataReceiver.accept(output);
            }
        }
        catch (Throwable t) {
            throw UserCodeException.wrap(t);
        }
    }

    private void checkTimestamp(Instant timestamp) {
        Instant lowerBound;
        try {
            lowerBound = this.currentElement.getTimestamp().minus(this.doFn.getAllowedTimestampSkew());
        }
        catch (ArithmeticException e) {
            lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }
        if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
            throw new IllegalArgumentException(String.format("Cannot output with timestamp %s. Output timestamps must be no earlier than the timestamp of the current input (%s) minus the allowed skew (%s) and no later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", timestamp, this.currentElement.getTimestamp(), PeriodFormat.getDefault().print(this.doFn.getAllowedTimestampSkew().toPeriod()), BoundedWindow.TIMESTAMP_MAX_VALUE));
        }
    }

    private TimeDomain translateTimeDomain(RunnerApi.TimeDomain.Enum domain) {
        switch (domain) {
            case EVENT_TIME: {
                return TimeDomain.EVENT_TIME;
            }
            case PROCESSING_TIME: {
                return TimeDomain.PROCESSING_TIME;
            }
        }
        throw new IllegalArgumentException("Unknown time domain");
    }

    private class OnTimerContext<K>
    extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
        private final Context context = new Context();

        private OnTimerContext() {
        }

        @Override
        public BoundedWindow window() {
            return FnApiDoFnRunner.this.currentWindow;
        }

        @Override
        public Instant timestamp(DoFn<InputT, OutputT> doFn) {
            return FnApiDoFnRunner.this.currentTimer.getHoldTimestamp();
        }

        @Override
        public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
            return FnApiDoFnRunner.this.currentTimeDomain;
        }

        @Override
        public K key() {
            return FnApiDoFnRunner.this.currentTimer.getUserKey();
        }

        @Override
        public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedReceiver(this.context, null);
        }

        @Override
        public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.rowReceiver(this.context, null, FnApiDoFnRunner.this.mainOutputSchemaCoder);
        }

        @Override
        public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedMultiReceiver(this.context);
        }

        @Override
        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            return this.context;
        }

        @Override
        public State state(String stateId, boolean alwaysFetched) {
            StateSpec spec;
            DoFnSignature.StateDeclaration stateDeclaration = FnApiDoFnRunner.this.doFnSignature.stateDeclarations().get(stateId);
            Preconditions.checkNotNull(stateDeclaration, "No state declaration found for %s", (Object)stateId);
            try {
                spec = (StateSpec)stateDeclaration.field().get(FnApiDoFnRunner.this.doFn);
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
            Object state = spec.bind(stateId, FnApiDoFnRunner.this.stateAccessor);
            if (alwaysFetched) {
                return (State)((Object)((ReadableState)state).readLater());
            }
            return state;
        }

        @Override
        public org.apache.beam.sdk.state.Timer timer(String timerId) {
            TimeDomain timeDomain = FnApiDoFnRunner.this.translateTimeDomain(FnApiDoFnRunner.this.parDoPayload.getTimerFamilySpecsMap().get(timerId).getTimeDomain());
            return new FnApiTimer(timerId, FnApiDoFnRunner.this.currentTimer.getUserKey(), "", FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), FnApiDoFnRunner.this.currentTimer.getFireTimestamp(), FnApiDoFnRunner.this.currentTimer.getPane(), timeDomain);
        }

        @Override
        public TimerMap timerFamily(String timerFamilyId) {
            return new FnApiTimerMap(timerFamilyId, FnApiDoFnRunner.this.currentTimer.getUserKey(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), FnApiDoFnRunner.this.currentTimer.getFireTimestamp(), FnApiDoFnRunner.this.currentTimer.getPane());
        }

        @Override
        public String timerId(DoFn<InputT, OutputT> doFn) {
            return FnApiDoFnRunner.this.currentTimer.getDynamicTimerTag();
        }

        @Override
        public PipelineOptions pipelineOptions() {
            return FnApiDoFnRunner.this.pipelineOptions;
        }

        @Override
        public String getErrorContext() {
            return "FnApiDoFnRunner/OnTimer";
        }

        private class Context
        extends DoFn.OnTimerContext {
            private Context() {
                super(FnApiDoFnRunner.this.doFn);
            }

            @Override
            public PipelineOptions getPipelineOptions() {
                return FnApiDoFnRunner.this.pipelineOptions;
            }

            @Override
            public BoundedWindow window() {
                return FnApiDoFnRunner.this.currentWindow;
            }

            @Override
            public void output(OutputT output) {
                FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            @Override
            public void outputWithTimestamp(OutputT output, Instant timestamp) {
                this.checkTimerTimestamp(timestamp);
                FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            @Override
            public <T> void output(TupleTag<T> tag, T output) {
                Collection consumers = FnApiDoFnRunner.this.localNameToConsumer.get(tag.getId());
                if (consumers == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
                }
                FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            @Override
            public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
                this.checkTimerTimestamp(timestamp);
                Collection consumers = FnApiDoFnRunner.this.localNameToConsumer.get(tag.getId());
                if (consumers == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
                }
                FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            @Override
            public TimeDomain timeDomain() {
                return FnApiDoFnRunner.this.currentTimeDomain;
            }

            @Override
            public Instant fireTimestamp() {
                return FnApiDoFnRunner.this.currentTimer.getFireTimestamp();
            }

            @Override
            public Instant timestamp() {
                return FnApiDoFnRunner.this.currentTimer.getHoldTimestamp();
            }

            private void checkTimerTimestamp(Instant timestamp) {
                Instant lowerBound;
                try {
                    lowerBound = FnApiDoFnRunner.this.currentTimer.getHoldTimestamp().minus(FnApiDoFnRunner.this.doFn.getAllowedTimestampSkew());
                }
                catch (ArithmeticException e) {
                    lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
                }
                if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    throw new IllegalArgumentException(String.format("Cannot output with timestamp %s. Output timestamps must be no earlier than the timestamp of the timer (%s) minus the allowed skew (%s) and no later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", timestamp, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), PeriodFormat.getDefault().print(FnApiDoFnRunner.this.doFn.getAllowedTimestampSkew().toPeriod()), BoundedWindow.TIMESTAMP_MAX_VALUE));
                }
            }
        }
    }

    private class OnWindowExpirationContext<K>
    extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
        private final Context context = new Context();

        private OnWindowExpirationContext() {
        }

        @Override
        public BoundedWindow window() {
            return FnApiDoFnRunner.this.currentWindow;
        }

        @Override
        public Instant timestamp(DoFn<InputT, OutputT> doFn) {
            return FnApiDoFnRunner.this.currentTimer.getHoldTimestamp();
        }

        @Override
        public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
            return FnApiDoFnRunner.this.currentTimeDomain;
        }

        @Override
        public K key() {
            return FnApiDoFnRunner.this.currentTimer.getUserKey();
        }

        @Override
        public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedReceiver(this.context, null);
        }

        @Override
        public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.rowReceiver(this.context, null, FnApiDoFnRunner.this.mainOutputSchemaCoder);
        }

        @Override
        public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedMultiReceiver(this.context);
        }

        @Override
        public State state(String stateId, boolean alwaysFetched) {
            StateSpec spec;
            DoFnSignature.StateDeclaration stateDeclaration = FnApiDoFnRunner.this.doFnSignature.stateDeclarations().get(stateId);
            Preconditions.checkNotNull(stateDeclaration, "No state declaration found for %s", (Object)stateId);
            try {
                spec = (StateSpec)stateDeclaration.field().get(FnApiDoFnRunner.this.doFn);
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
            Object state = spec.bind(stateId, FnApiDoFnRunner.this.stateAccessor);
            if (alwaysFetched) {
                return (State)((Object)((ReadableState)state).readLater());
            }
            return state;
        }

        @Override
        public PipelineOptions pipelineOptions() {
            return FnApiDoFnRunner.this.pipelineOptions;
        }

        @Override
        public String getErrorContext() {
            return "FnApiDoFnRunner/OnWindowExpiration";
        }

        private class Context
        extends DoFn.OnWindowExpirationContext {
            private Context() {
                super(FnApiDoFnRunner.this.doFn);
            }

            @Override
            public PipelineOptions getPipelineOptions() {
                return FnApiDoFnRunner.this.pipelineOptions;
            }

            @Override
            public BoundedWindow window() {
                return FnApiDoFnRunner.this.currentWindow;
            }

            @Override
            public void output(OutputT output) {
                FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            @Override
            public void outputWithTimestamp(OutputT output, Instant timestamp) {
                this.checkOnWindowExpirationTimestamp(timestamp);
                FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            @Override
            public <T> void output(TupleTag<T> tag, T output) {
                Collection consumers = FnApiDoFnRunner.this.localNameToConsumer.get(tag.getId());
                if (consumers == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
                }
                FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            @Override
            public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
                this.checkOnWindowExpirationTimestamp(timestamp);
                Collection consumers = FnApiDoFnRunner.this.localNameToConsumer.get(tag.getId());
                if (consumers == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
                }
                FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentTimer.getPane()));
            }

            private void checkOnWindowExpirationTimestamp(Instant timestamp) {
                Instant lowerBound;
                try {
                    lowerBound = FnApiDoFnRunner.this.currentTimer.getHoldTimestamp().minus(FnApiDoFnRunner.this.doFn.getAllowedTimestampSkew());
                }
                catch (ArithmeticException e) {
                    lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
                }
                if (timestamp.isBefore(lowerBound) || timestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    throw new IllegalArgumentException(String.format("Cannot output with timestamp %s. Output timestamps must be no earlier than the timestamp of the timer (%s) minus the allowed skew (%s) and no later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", timestamp, FnApiDoFnRunner.this.currentTimer.getHoldTimestamp(), PeriodFormat.getDefault().print(FnApiDoFnRunner.this.doFn.getAllowedTimestampSkew().toPeriod()), BoundedWindow.TIMESTAMP_MAX_VALUE));
                }
            }
        }
    }

    private abstract class ProcessBundleContextBase
    extends DoFn.ProcessContext
    implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
        private ProcessBundleContextBase() {
            super(FnApiDoFnRunner.this.doFn);
        }

        @Override
        public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
            return this.pane();
        }

        @Override
        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access StartBundleContext outside of @StartBundle method.");
        }

        @Override
        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access FinishBundleContext outside of @FinishBundle method.");
        }

        @Override
        public DoFn.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
            return this;
        }

        @Override
        public InputT element(DoFn<InputT, OutputT> doFn) {
            return this.element();
        }

        @Override
        public Object key() {
            throw new UnsupportedOperationException("Cannot access key as parameter outside of @OnTimer method.");
        }

        @Override
        public Object schemaElement(int index) {
            SerializableFunction<?, ?> converter = FnApiDoFnRunner.this.doFnSchemaInformation.getElementConverters().get(index);
            return converter.apply(this.element());
        }

        @Override
        public Instant timestamp(DoFn<InputT, OutputT> doFn) {
            return this.timestamp();
        }

        @Override
        public String timerId(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access timerId as parameter outside of @OnTimer method.");
        }

        @Override
        public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access time domain outside of @ProcessTimer method.");
        }

        @Override
        public DoFn.OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedReceiver(this, null);
        }

        @Override
        public DoFn.OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.rowReceiver(this, null, FnApiDoFnRunner.this.mainOutputSchemaCoder);
        }

        @Override
        public DoFn.MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
            return DoFnOutputReceivers.windowedMultiReceiver(this, FnApiDoFnRunner.this.outputCoders);
        }

        @Override
        public DoFn.BundleFinalizer bundleFinalizer() {
            return FnApiDoFnRunner.this.bundleFinalizer;
        }

        @Override
        public Object restriction() {
            return FnApiDoFnRunner.this.currentRestriction;
        }

        @Override
        public DoFn.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
            throw new UnsupportedOperationException("Cannot access OnTimerContext outside of @OnTimer methods.");
        }

        @Override
        public RestrictionTracker<?, ?> restrictionTracker() {
            return FnApiDoFnRunner.this.currentTracker;
        }

        @Override
        public PipelineOptions getPipelineOptions() {
            return FnApiDoFnRunner.this.pipelineOptions;
        }

        @Override
        public PipelineOptions pipelineOptions() {
            return FnApiDoFnRunner.this.pipelineOptions;
        }

        @Override
        public void output(OutputT output) {
            this.outputWithTimestamp(output, FnApiDoFnRunner.this.currentElement.getTimestamp());
        }

        @Override
        public <T> void output(TupleTag<T> tag, T output) {
            this.outputWithTimestamp(tag, output, FnApiDoFnRunner.this.currentElement.getTimestamp());
        }

        @Override
        public InputT element() {
            return FnApiDoFnRunner.this.currentElement.getValue();
        }

        @Override
        public Instant timestamp() {
            return FnApiDoFnRunner.this.currentElement.getTimestamp();
        }

        @Override
        public PaneInfo pane() {
            return FnApiDoFnRunner.this.currentElement.getPane();
        }

        @Override
        public Object watermarkEstimatorState() {
            return FnApiDoFnRunner.this.currentWatermarkEstimatorState;
        }

        @Override
        public WatermarkEstimator<?> watermarkEstimator() {
            return FnApiDoFnRunner.this.currentWatermarkEstimator;
        }
    }

    private class NonWindowObservingProcessBundleContext
    extends ProcessBundleContextBase {
        private NonWindowObservingProcessBundleContext() {
        }

        @Override
        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            FnApiDoFnRunner.this.checkTimestamp(timestamp);
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentElement.getWindows(), FnApiDoFnRunner.this.currentElement.getPane()));
        }

        @Override
        public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            FnApiDoFnRunner.this.checkTimestamp(timestamp);
            Collection consumers = FnApiDoFnRunner.this.localNameToConsumer.get(tag.getId());
            if (consumers == null) {
                throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
            }
            FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentElement.getWindows(), FnApiDoFnRunner.this.currentElement.getPane()));
        }

        @Override
        public BoundedWindow window() {
            throw new UnsupportedOperationException("Cannot access window in non-window observing context.");
        }

        @Override
        public Object sideInput(String tagId) {
            throw new UnsupportedOperationException("Cannot access sideInput in non-window observing context.");
        }

        @Override
        public <T> T sideInput(PCollectionView<T> view) {
            throw new UnsupportedOperationException("Cannot access sideInput in non-window observing context.");
        }

        @Override
        public State state(String stateId, boolean alwaysFetched) {
            throw new UnsupportedOperationException("Cannot access state in non-window observing context.");
        }

        @Override
        public org.apache.beam.sdk.state.Timer timer(String timerId) {
            throw new UnsupportedOperationException("Cannot access timer in non-window observing context.");
        }

        @Override
        public TimerMap timerFamily(String timerFamilyId) {
            throw new UnsupportedOperationException("Cannot access timerFamily in non-window observing context.");
        }
    }

    private class SizedRestrictionNonWindowObservingProcessBundleContext
    extends NonWindowObservingProcessBundleContext {
        private final String errorContextPrefix;

        SizedRestrictionNonWindowObservingProcessBundleContext(String errorContextPrefix) {
            this.errorContextPrefix = errorContextPrefix;
        }

        @Override
        public void outputWithTimestamp(final OutputT output, final Instant timestamp) {
            FnApiDoFnRunner.this.checkTimestamp(timestamp);
            double size = FnApiDoFnRunner.this.doFnInvoker.invokeGetSize(new DoFnInvoker.DelegatingArgumentProvider<InputT, OutputT>(this, this.errorContextPrefix + "/GetSize"){

                @Override
                public Object restriction() {
                    return output;
                }

                @Override
                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                    return timestamp;
                }

                @Override
                public RestrictionTracker<?, ?> restrictionTracker() {
                    return FnApiDoFnRunner.this.doFnInvoker.invokeNewTracker(this);
                }
            });
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(KV.of(KV.of(FnApiDoFnRunner.this.currentElement.getValue(), KV.of(output, FnApiDoFnRunner.this.currentWatermarkEstimatorState)), size), timestamp, FnApiDoFnRunner.this.currentElement.getWindows(), FnApiDoFnRunner.this.currentElement.getPane()));
        }
    }

    private class SizedRestrictionWindowObservingProcessBundleContext
    extends WindowObservingProcessBundleContext {
        private final String errorContextPrefix;

        SizedRestrictionWindowObservingProcessBundleContext(String errorContextPrefix) {
            this.errorContextPrefix = errorContextPrefix;
        }

        @Override
        public void outputWithTimestamp(final OutputT output, final Instant timestamp) {
            FnApiDoFnRunner.this.checkTimestamp(timestamp);
            double size = FnApiDoFnRunner.this.doFnInvoker.invokeGetSize(new DoFnInvoker.DelegatingArgumentProvider<InputT, OutputT>(this, this.errorContextPrefix + "/GetSize"){

                @Override
                public Object restriction() {
                    return output;
                }

                @Override
                public Instant timestamp(DoFn<InputT, OutputT> doFn) {
                    return timestamp;
                }

                @Override
                public RestrictionTracker<?, ?> restrictionTracker() {
                    return FnApiDoFnRunner.this.doFnInvoker.invokeNewTracker(this);
                }
            });
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(KV.of(KV.of(FnApiDoFnRunner.this.currentElement.getValue(), KV.of(output, FnApiDoFnRunner.this.currentWatermarkEstimatorState)), size), timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getPane()));
        }
    }

    private class WindowObservingProcessBundleContext
    extends ProcessBundleContextBase {
        private WindowObservingProcessBundleContext() {
        }

        @Override
        public BoundedWindow window() {
            return FnApiDoFnRunner.this.currentWindow;
        }

        @Override
        public Object sideInput(String tagId) {
            return this.sideInput((PCollectionView)FnApiDoFnRunner.this.sideInputMapping.get(tagId));
        }

        @Override
        public <T> T sideInput(PCollectionView<T> view) {
            return FnApiDoFnRunner.this.stateAccessor.get(view, FnApiDoFnRunner.this.currentWindow);
        }

        @Override
        public State state(String stateId, boolean alwaysFetched) {
            StateSpec spec;
            DoFnSignature.StateDeclaration stateDeclaration = FnApiDoFnRunner.this.doFnSignature.stateDeclarations().get(stateId);
            Preconditions.checkNotNull(stateDeclaration, "No state declaration found for %s", (Object)stateId);
            try {
                spec = (StateSpec)stateDeclaration.field().get(FnApiDoFnRunner.this.doFn);
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
            Object state = spec.bind(stateId, FnApiDoFnRunner.this.stateAccessor);
            if (alwaysFetched) {
                return (State)((Object)((ReadableState)state).readLater());
            }
            return state;
        }

        @Override
        public org.apache.beam.sdk.state.Timer timer(String timerId) {
            Preconditions.checkState(FnApiDoFnRunner.this.currentElement.getValue() instanceof KV, "Accessing timer in unkeyed context. Current element is not a KV: %s.", FnApiDoFnRunner.this.currentElement.getValue());
            TimeDomain timeDomain = FnApiDoFnRunner.this.translateTimeDomain(FnApiDoFnRunner.this.parDoPayload.getTimerFamilySpecsMap().get(timerId).getTimeDomain());
            return new FnApiTimer(timerId, ((KV)FnApiDoFnRunner.this.currentElement.getValue()).getKey(), "", FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getTimestamp(), FnApiDoFnRunner.this.currentElement.getTimestamp(), FnApiDoFnRunner.this.currentElement.getPane(), timeDomain);
        }

        @Override
        public TimerMap timerFamily(String timerFamilyId) {
            return new FnApiTimerMap(timerFamilyId, ((KV)FnApiDoFnRunner.this.currentElement.getValue()).getKey(), FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getTimestamp(), FnApiDoFnRunner.this.currentElement.getTimestamp(), FnApiDoFnRunner.this.currentElement.getPane());
        }

        @Override
        public void outputWithTimestamp(OutputT output, Instant timestamp) {
            FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getPane()));
        }

        @Override
        public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
            Collection consumers = FnApiDoFnRunner.this.localNameToConsumer.get(tag.getId());
            if (consumers == null) {
                throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
            }
            FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, FnApiDoFnRunner.this.currentWindow, FnApiDoFnRunner.this.currentElement.getPane()));
        }
    }

    private class FinishBundleArgumentProvider
    extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
        private final org.apache.beam.fn.harness.FnApiDoFnRunner$FinishBundleArgumentProvider.Context context = new Context();

        private FinishBundleArgumentProvider() {
        }

        @Override
        public DoFn.FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
            return this.context;
        }

        @Override
        public PipelineOptions pipelineOptions() {
            return FnApiDoFnRunner.this.pipelineOptions;
        }

        @Override
        public DoFn.BundleFinalizer bundleFinalizer() {
            return FnApiDoFnRunner.this.bundleFinalizer;
        }

        @Override
        public String getErrorContext() {
            return "FnApiDoFnRunner/FinishBundle";
        }

        private class Context
        extends DoFn.FinishBundleContext {
            Context() {
                super(FnApiDoFnRunner.this.doFn);
            }

            @Override
            public PipelineOptions getPipelineOptions() {
                return FnApiDoFnRunner.this.pipelineOptions;
            }

            @Override
            public void output(OutputT output, Instant timestamp, BoundedWindow window) {
                FnApiDoFnRunner.this.outputTo(FnApiDoFnRunner.this.mainOutputConsumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
            }

            @Override
            public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                Collection consumers = FnApiDoFnRunner.this.localNameToConsumer.get(tag.getId());
                if (consumers == null) {
                    throw new IllegalArgumentException(String.format("Unknown output tag %s", tag));
                }
                FnApiDoFnRunner.this.outputTo(consumers, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
            }
        }
    }

    private class StartBundleArgumentProvider
    extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
        private final org.apache.beam.fn.harness.FnApiDoFnRunner$StartBundleArgumentProvider.Context context = new Context();

        private StartBundleArgumentProvider() {
        }

        @Override
        public DoFn.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
            return this.context;
        }

        @Override
        public PipelineOptions pipelineOptions() {
            return FnApiDoFnRunner.this.pipelineOptions;
        }

        @Override
        public DoFn.BundleFinalizer bundleFinalizer() {
            return FnApiDoFnRunner.this.bundleFinalizer;
        }

        @Override
        public String getErrorContext() {
            return "FnApiDoFnRunner/StartBundle";
        }

        private class Context
        extends DoFn.StartBundleContext {
            Context() {
                super(FnApiDoFnRunner.this.doFn);
            }

            @Override
            public PipelineOptions getPipelineOptions() {
                return FnApiDoFnRunner.this.pipelineOptions;
            }
        }
    }

    private class FnApiTimerMap<K>
    implements TimerMap {
        private final String timerFamilyId;
        private final K userKey;
        private final TimeDomain timeDomain;
        private final Instant elementTimestampOrTimerHoldTimestamp;
        private final Instant elementTimestampOrTimerFireTimestamp;
        private final BoundedWindow boundedWindow;
        private final PaneInfo paneInfo;

        FnApiTimerMap(String timerFamilyId, K userKey, BoundedWindow boundedWindow, Instant elementTimestampOrTimerHoldTimestamp, Instant elementTimestampOrTimerFireTimestamp, PaneInfo paneInfo) {
            this.timerFamilyId = timerFamilyId;
            this.userKey = userKey;
            this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
            this.elementTimestampOrTimerFireTimestamp = elementTimestampOrTimerFireTimestamp;
            this.boundedWindow = boundedWindow;
            this.paneInfo = paneInfo;
            this.timeDomain = FnApiDoFnRunner.this.translateTimeDomain(FnApiDoFnRunner.this.parDoPayload.getTimerFamilySpecsMap().get(timerFamilyId).getTimeDomain());
        }

        @Override
        public void set(String dynamicTimerTag, Instant absoluteTime) {
            this.get(dynamicTimerTag).set(absoluteTime);
        }

        @Override
        public org.apache.beam.sdk.state.Timer get(String dynamicTimerTag) {
            return new FnApiTimer<K>(this.timerFamilyId, this.userKey, dynamicTimerTag, this.boundedWindow, this.elementTimestampOrTimerHoldTimestamp, this.elementTimestampOrTimerFireTimestamp, this.paneInfo, this.timeDomain);
        }
    }

    private class FnApiTimer<K>
    implements org.apache.beam.sdk.state.Timer {
        private final String timerIdOrFamily;
        private final K userKey;
        private final String dynamicTimerTag;
        private final TimeDomain timeDomain;
        private final Duration allowedLateness;
        private final Instant fireTimestamp;
        private final Instant elementTimestampOrTimerHoldTimestamp;
        private final BoundedWindow boundedWindow;
        private final PaneInfo paneInfo;
        private Instant outputTimestamp;
        private Duration period = Duration.ZERO;
        private Duration offset = Duration.ZERO;

        FnApiTimer(String timerIdOrFamily, K userKey, String dynamicTimerTag, BoundedWindow boundedWindow, Instant elementTimestampOrTimerHoldTimestamp, Instant elementTimestampOrTimerFireTimestamp, PaneInfo paneInfo, TimeDomain timeDomain) {
            this.timerIdOrFamily = timerIdOrFamily;
            this.userKey = userKey;
            this.dynamicTimerTag = dynamicTimerTag;
            this.elementTimestampOrTimerHoldTimestamp = elementTimestampOrTimerHoldTimestamp;
            this.boundedWindow = boundedWindow;
            this.paneInfo = paneInfo;
            this.timeDomain = timeDomain;
            switch (timeDomain) {
                case EVENT_TIME: {
                    this.fireTimestamp = elementTimestampOrTimerFireTimestamp;
                    break;
                }
                case PROCESSING_TIME: {
                    this.fireTimestamp = new Instant(DateTimeUtils.currentTimeMillis());
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unknown or unsupported time domain %s", new Object[]{timeDomain}));
                }
            }
            try {
                this.allowedLateness = FnApiDoFnRunner.this.rehydratedComponents.getPCollection(FnApiDoFnRunner.this.pTransform.getInputsOrThrow(ParDoTranslation.getMainInputName(FnApiDoFnRunner.this.pTransform))).getWindowingStrategy().getAllowedLateness();
            }
            catch (IOException e) {
                throw new IllegalArgumentException(String.format("Unable to get allowed lateness for timer %s", timerIdOrFamily));
            }
        }

        @Override
        public void set(Instant absoluteTime) {
            if (TimeDomain.EVENT_TIME.equals((Object)this.timeDomain)) {
                Instant windowExpiry = LateDataUtils.garbageCollectionTime(FnApiDoFnRunner.this.currentWindow, this.allowedLateness);
                Preconditions.checkArgument(!absoluteTime.isAfter(windowExpiry), "Attempted to set event time timer for %s but that is after the expiration of window %s", (Object)absoluteTime, (Object)windowExpiry);
            }
            FnApiDoFnRunner.this.timerBundleTracker.timerModified(this.timerIdOrFamily, this.timeDomain, this.getTimerForTime(absoluteTime));
        }

        @Override
        public void setRelative() {
            long millisSinceStart;
            Instant target = this.period.equals(Duration.ZERO) ? this.fireTimestamp.plus(this.offset) : ((millisSinceStart = this.fireTimestamp.plus(this.offset).getMillis() % this.period.getMillis()) == 0L ? this.fireTimestamp : this.fireTimestamp.plus(this.period).minus(Duration.millis(millisSinceStart)));
            target = this.minTargetAndGcTime(target);
            FnApiDoFnRunner.this.timerBundleTracker.timerModified(this.timerIdOrFamily, this.timeDomain, this.getTimerForTime(target));
        }

        @Override
        public void clear() {
            FnApiDoFnRunner.this.timerBundleTracker.timerModified(this.timerIdOrFamily, this.timeDomain, this.getClearedTimer());
        }

        @Override
        public org.apache.beam.sdk.state.Timer offset(Duration offset) {
            this.offset = offset;
            return this;
        }

        @Override
        public org.apache.beam.sdk.state.Timer align(Duration period) {
            this.period = period;
            return this;
        }

        @Override
        public org.apache.beam.sdk.state.Timer withOutputTimestamp(Instant outputTime) {
            this.outputTimestamp = outputTime;
            return this;
        }

        @Override
        public Instant getCurrentRelativeTime() {
            return this.fireTimestamp;
        }

        private Instant minTargetAndGcTime(Instant target) {
            Instant windowExpiry;
            if (TimeDomain.EVENT_TIME.equals((Object)this.timeDomain) && target.isAfter(windowExpiry = LateDataUtils.garbageCollectionTime(FnApiDoFnRunner.this.currentWindow, this.allowedLateness))) {
                return windowExpiry;
            }
            return target;
        }

        private Timer<K> getClearedTimer() {
            return Timer.cleared(this.userKey, this.dynamicTimerTag, Collections.singletonList(this.boundedWindow));
        }

        private Timer<K> getTimerForTime(Instant scheduledTime) {
            if (this.outputTimestamp != null) {
                Instant lowerBound;
                try {
                    lowerBound = this.elementTimestampOrTimerHoldTimestamp.minus(FnApiDoFnRunner.this.doFn.getAllowedTimestampSkew());
                }
                catch (ArithmeticException e) {
                    lowerBound = BoundedWindow.TIMESTAMP_MIN_VALUE;
                }
                if (this.outputTimestamp.isBefore(lowerBound) || this.outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                    throw new IllegalArgumentException(String.format("Cannot output timer with output timestamp %s. Output timestamps must be no earlier than the timestamp of the current input (%s) minus the allowed skew (%s) and no later than %s. See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", this.outputTimestamp, this.elementTimestampOrTimerHoldTimestamp, PeriodFormat.getDefault().print(FnApiDoFnRunner.this.doFn.getAllowedTimestampSkew().toPeriod()), BoundedWindow.TIMESTAMP_MAX_VALUE));
                }
            }
            if (this.outputTimestamp == null && TimeDomain.EVENT_TIME.equals((Object)this.timeDomain)) {
                this.outputTimestamp = scheduledTime;
            }
            if (this.outputTimestamp == null) {
                this.outputTimestamp = this.elementTimestampOrTimerHoldTimestamp;
            }
            Instant windowExpiry = LateDataUtils.garbageCollectionTime(FnApiDoFnRunner.this.currentWindow, this.allowedLateness);
            if (TimeDomain.EVENT_TIME.equals((Object)this.timeDomain)) {
                Preconditions.checkArgument(!this.outputTimestamp.isAfter(scheduledTime), "Attempted to set an event-time timer with an output timestamp of %s that is after the timer firing timestamp %s", (Object)this.outputTimestamp, (Object)scheduledTime);
                Preconditions.checkArgument(!scheduledTime.isAfter(windowExpiry), "Attempted to set an event-time timer with a firing timestamp of %s that is after the expiration of window %s", (Object)scheduledTime, (Object)windowExpiry);
            } else {
                Preconditions.checkArgument(!this.outputTimestamp.isAfter(windowExpiry), "Attempted to set a processing-time timer with an output timestamp of %s that is after the expiration of window %s", (Object)this.outputTimestamp, (Object)windowExpiry);
            }
            return Timer.of(this.userKey, this.dynamicTimerTag, Collections.singletonList(this.boundedWindow), scheduledTime, this.outputTimestamp, this.paneInfo);
        }
    }

    private abstract class SplittableFnDataReceiver
    implements HandlesSplits,
    FnDataReceiver<WindowedValue> {
        private SplittableFnDataReceiver() {
        }

        @Override
        public HandlesSplits.SplitResult trySplit(double fractionOfRemainder) {
            return FnApiDoFnRunner.this.trySplitForElementAndRestriction(fractionOfRemainder, Duration.ZERO);
        }

        @Override
        public double getProgress() {
            double totalWork;
            RestrictionTracker.Progress progress = FnApiDoFnRunner.this.getProgress();
            if (progress != null && (totalWork = progress.getWorkCompleted() + progress.getWorkRemaining()) > 0.0) {
                return progress.getWorkCompleted() / totalWork;
            }
            return 0.0;
        }
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    static abstract class SplitResultsWithStopIndex {
        SplitResultsWithStopIndex() {
        }

        public static SplitResultsWithStopIndex of(WindowedSplitResult windowSplit, HandlesSplits.SplitResult downstreamSplit, int newWindowStopIndex) {
            return new AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex(windowSplit, downstreamSplit, newWindowStopIndex);
        }

        public abstract @Nullable WindowedSplitResult getWindowSplit();

        public abstract @Nullable HandlesSplits.SplitResult getDownstreamSplit();

        public abstract int getNewWindowStopIndex();
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    static abstract class WindowedSplitResult {
        WindowedSplitResult() {
        }

        public static WindowedSplitResult forRoots(WindowedValue primaryInFullyProcessedWindowsRoot, WindowedValue primarySplitRoot, WindowedValue residualSplitRoot, WindowedValue residualInUnprocessedWindowsRoot) {
            return new AutoValue_FnApiDoFnRunner_WindowedSplitResult(primaryInFullyProcessedWindowsRoot, primarySplitRoot, residualSplitRoot, residualInUnprocessedWindowsRoot);
        }

        public abstract @Nullable WindowedValue getPrimaryInFullyProcessedWindowsRoot();

        public abstract @Nullable WindowedValue getPrimarySplitRoot();

        public abstract @Nullable WindowedValue getResidualSplitRoot();

        public abstract @Nullable WindowedValue getResidualInUnprocessedWindowsRoot();
    }

    private static interface TriFunction<FirstT, SecondT, ThirdT> {
        public void accept(FirstT var1, SecondT var2, ThirdT var3);
    }

    static class Factory<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT>
    implements PTransformRunnerFactory<FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT>> {
        Factory() {
        }

        @Override
        public final FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, OutputT> createRunnerForPTransform(PTransformRunnerFactory.Context context) {
            FnApiDoFnRunner runner = new FnApiDoFnRunner(context.getPipelineOptions(), context.getBeamFnStateClient(), context.getBeamFnTimerClient(), context.getPTransformId(), context.getPTransform(), context.getProcessBundleInstructionIdSupplier(), context.getCacheTokensSupplier(), context.getBundleCacheSupplier(), context.getProcessWideCache(), context.getPCollections(), context.getCoders(), context.getWindowingStrategies(), context::addStartBundleFunction, context::addFinishBundleFunction, context::addTearDownFunction, context::getPCollectionConsumer, (pCollectionId, consumer, valueCoder) -> context.addPCollectionConsumer((String)pCollectionId, consumer, valueCoder), context::addProgressRequestCallback, context.getSplitListener(), context.getBundleFinalizer());
            for (Map.Entry entry : runner.timerFamilyInfos.entrySet()) {
                String localName = (String)entry.getKey();
                TimeDomain timeDomain = (TimeDomain)((Object)((KV)entry.getValue()).getKey());
                Coder coder = (Coder)((KV)entry.getValue()).getValue();
                if (!localName.equals("") && localName.equals(runner.parDoPayload.getOnWindowExpirationTimerFamilySpec())) {
                    context.addIncomingTimerEndpoint(localName, coder, timer -> runner.processOnWindowExpiration(timer));
                    continue;
                }
                context.addIncomingTimerEndpoint(localName, coder, timer -> runner.processTimer(localName, timeDomain, timer));
            }
            return runner;
        }
    }

    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            Factory factory = new Factory();
            return ImmutableMap.builder().put("beam:transform:pardo:v1", factory).put("beam:transform:sdf_pair_with_restriction:v1", factory).put("beam:transform:sdf_split_and_size_restrictions:v1", factory).put("beam:transform:sdf_truncate_sized_restrictions:v1", factory).put("beam:transform:sdf_process_sized_element_and_restrictions:v1", factory).build();
        }
    }
}

