/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.control;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.runners.core.construction.SyntheticComponents;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.core.construction.graph.UserStateReference;
import org.apache.beam.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_BagUserStateSpec;
import org.apache.beam.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor;
import org.apache.beam.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_OutputEncoding;
import org.apache.beam.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_SideInputSpec;
import org.apache.beam.runners.fnexecution.control.AutoValue_ProcessBundleDescriptors_TimerSpec;
import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
import org.apache.beam.runners.fnexecution.wire.ByteStringCoder;
import org.apache.beam.runners.fnexecution.wire.LengthPrefixUnknownCoders;
import org.apache.beam.runners.fnexecution.wire.WireCoders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableTable;
import org.checkerframework.checker.nullness.qual.Nullable;

public class ProcessBundleDescriptors {
    public static ExecutableProcessBundleDescriptor fromExecutableStage(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint, Endpoints.ApiServiceDescriptor stateEndpoint) throws IOException {
        Preconditions.checkState((id != null ? 1 : 0) != 0, (Object)"id must be specified.");
        Preconditions.checkState((stage != null ? 1 : 0) != 0, (Object)"stage must be specified.");
        Preconditions.checkState((dataEndpoint != null ? 1 : 0) != 0, (Object)"dataEndpoint must be specified.");
        Preconditions.checkState((stateEndpoint != null ? 1 : 0) != 0, (Object)"stateEndpoint must be specified.");
        return ProcessBundleDescriptors.fromExecutableStageInternal(id, stage, dataEndpoint, stateEndpoint);
    }

    public static ExecutableProcessBundleDescriptor fromExecutableStage(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint) throws IOException {
        Preconditions.checkState((id != null ? 1 : 0) != 0, (Object)"id must be specified.");
        Preconditions.checkState((stage != null ? 1 : 0) != 0, (Object)"stage must be specified.");
        Preconditions.checkState((dataEndpoint != null ? 1 : 0) != 0, (Object)"dateEndpoint must be specified.");
        return ProcessBundleDescriptors.fromExecutableStageInternal(id, stage, dataEndpoint, null);
    }

    private static ExecutableProcessBundleDescriptor fromExecutableStageInternal(String id, ExecutableStage stage, Endpoints.ApiServiceDescriptor dataEndpoint, // Could not load outer class - annotation placement on inner may be incorrect
     @Nullable Endpoints.ApiServiceDescriptor stateEndpoint) throws IOException {
        Map<String, RunnerApi.PTransform> stageTransforms = stage.getTransforms().stream().collect(Collectors.toMap(PipelineNode.PTransformNode::getId, PipelineNode.PTransformNode::getTransform));
        RunnerApi.Components.Builder components = stage.getComponents().toBuilder().clearTransforms().putAllTransforms(stageTransforms);
        ImmutableList.Builder inputDestinationsBuilder = ImmutableList.builder();
        ImmutableMap.Builder remoteOutputCodersBuilder = ImmutableMap.builder();
        RunnerApi.ExecutableStagePayload.WireCoderSetting wireCoderSetting = stage.getWireCoderSettings().stream().filter(ws -> ws.getInputOrOutputId().equals(stage.getInputPCollection().getId())).findAny().orElse(RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
        inputDestinationsBuilder.add(ProcessBundleDescriptors.addStageInput(dataEndpoint, stage.getInputPCollection(), components, wireCoderSetting));
        remoteOutputCodersBuilder.putAll(ProcessBundleDescriptors.addStageOutputs(dataEndpoint, stage.getOutputPCollections(), components, stage.getWireCoderSettings()));
        Map<String, Map<String, SideInputSpec>> sideInputSpecs = ProcessBundleDescriptors.addSideInputs(stage, components);
        Map<String, Map<String, BagUserStateSpec>> bagUserStateSpecs = ProcessBundleDescriptors.forBagUserStates(stage, components.build());
        Map<String, Map<String, TimerSpec>> timerSpecs = ProcessBundleDescriptors.forTimerSpecs(stage, components);
        ProcessBundleDescriptors.lengthPrefixAnyInputCoder(stage.getInputPCollection().getId(), components);
        BeamFnApi.ProcessBundleDescriptor.Builder bundleDescriptorBuilder = BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(id);
        if (stateEndpoint != null) {
            bundleDescriptorBuilder.setStateApiServiceDescriptor(stateEndpoint);
        }
        if (timerSpecs.size() > 0) {
            bundleDescriptorBuilder.setTimerApiServiceDescriptor(dataEndpoint);
        }
        bundleDescriptorBuilder.putAllCoders(components.getCodersMap()).putAllEnvironments(components.getEnvironmentsMap()).putAllPcollections(components.getPcollectionsMap()).putAllWindowingStrategies(components.getWindowingStrategiesMap()).putAllTransforms(components.getTransformsMap());
        return ExecutableProcessBundleDescriptor.of(bundleDescriptorBuilder.build(), (List<RemoteInputDestination>)inputDestinationsBuilder.build(), (Map<String, Coder>)remoteOutputCodersBuilder.build(), sideInputSpecs, bagUserStateSpecs, timerSpecs);
    }

    private static void lengthPrefixAnyInputCoder(String inputPCollectionId, RunnerApi.Components.Builder componentsBuilder) {
        RunnerApi.PCollection pcollection = componentsBuilder.getPcollectionsOrThrow(inputPCollectionId);
        String newInputCoderId = LengthPrefixUnknownCoders.addLengthPrefixedCoder(pcollection.getCoderId(), componentsBuilder, false);
        componentsBuilder.putPcollections(inputPCollectionId, pcollection.toBuilder().setCoderId(newInputCoderId).build());
    }

    private static Map<String, Coder<WindowedValue<?>>> addStageOutputs(Endpoints.ApiServiceDescriptor dataEndpoint, Collection<PipelineNode.PCollectionNode> outputPCollections, RunnerApi.Components.Builder components, Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> wireCoderSettings) throws IOException {
        LinkedHashMap remoteOutputCoders = new LinkedHashMap();
        for (PipelineNode.PCollectionNode outputPCollection : outputPCollections) {
            RunnerApi.ExecutableStagePayload.WireCoderSetting wireCoderSetting = wireCoderSettings.stream().filter(ws -> ws.getInputOrOutputId().equals(outputPCollection.getId())).findAny().orElse(RunnerApi.ExecutableStagePayload.WireCoderSetting.getDefaultInstance());
            OutputEncoding outputEncoding = ProcessBundleDescriptors.addStageOutput(dataEndpoint, components, outputPCollection, wireCoderSetting);
            remoteOutputCoders.put(outputEncoding.getPTransformId(), outputEncoding.getCoder());
        }
        return remoteOutputCoders;
    }

    private static RemoteInputDestination<WindowedValue<?>> addStageInput(Endpoints.ApiServiceDescriptor dataEndpoint, PipelineNode.PCollectionNode inputPCollection, RunnerApi.Components.Builder components, RunnerApi.ExecutableStagePayload.WireCoderSetting wireCoderSetting) throws IOException {
        String inputWireCoderId = WireCoders.addSdkWireCoder(inputPCollection, components, wireCoderSetting);
        Coder wireCoder = WireCoders.instantiateRunnerWireCoder(inputPCollection, components.build(), wireCoderSetting);
        BeamFnApi.RemoteGrpcPort inputPort = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(dataEndpoint).setCoderId(inputWireCoderId).build();
        String inputId = SyntheticComponents.uniqueId((String)String.format("fn/read/%s", inputPCollection.getId()), arg_0 -> ((RunnerApi.Components.Builder)components).containsTransforms(arg_0));
        RunnerApi.PTransform inputTransform = RemoteGrpcPortRead.readFromPort((BeamFnApi.RemoteGrpcPort)inputPort, (String)inputPCollection.getId()).toPTransform();
        components.putTransforms(inputId, inputTransform);
        return RemoteInputDestination.of(wireCoder, inputId);
    }

    private static OutputEncoding addStageOutput(Endpoints.ApiServiceDescriptor dataEndpoint, RunnerApi.Components.Builder components, PipelineNode.PCollectionNode outputPCollection, RunnerApi.ExecutableStagePayload.WireCoderSetting wireCoderSetting) throws IOException {
        String outputWireCoderId = WireCoders.addSdkWireCoder(outputPCollection, components, wireCoderSetting);
        Coder wireCoder = WireCoders.instantiateRunnerWireCoder(outputPCollection, components.build(), wireCoderSetting);
        BeamFnApi.RemoteGrpcPort outputPort = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(dataEndpoint).setCoderId(outputWireCoderId).build();
        RemoteGrpcPortWrite outputWrite = RemoteGrpcPortWrite.writeToPort((String)outputPCollection.getId(), (BeamFnApi.RemoteGrpcPort)outputPort);
        String outputId = SyntheticComponents.uniqueId((String)String.format("fn/write/%s", outputPCollection.getId()), arg_0 -> ((RunnerApi.Components.Builder)components).containsTransforms(arg_0));
        RunnerApi.PTransform outputTransform = outputWrite.toPTransform();
        components.putTransforms(outputId, outputTransform);
        return new AutoValue_ProcessBundleDescriptors_OutputEncoding(outputId, wireCoder);
    }

    public static Map<String, Map<String, SideInputSpec>> getSideInputs(ExecutableStage stage) throws IOException {
        return ProcessBundleDescriptors.addSideInputs(stage, stage.getComponents().toBuilder());
    }

    private static Map<String, Map<String, SideInputSpec>> addSideInputs(ExecutableStage stage, RunnerApi.Components.Builder components) throws IOException {
        ImmutableTable.Builder idsToSpec = ImmutableTable.builder();
        for (SideInputReference sideInputReference : stage.getSideInputs()) {
            PipelineNode.PCollectionNode pcNode = sideInputReference.collection();
            RunnerApi.PCollection pc = pcNode.getPCollection();
            String lengthPrefixedCoderId = LengthPrefixUnknownCoders.addLengthPrefixedCoder(pc.getCoderId(), components, false);
            components.putPcollections(pcNode.getId(), pc.toBuilder().setCoderId(lengthPrefixedCoderId).build());
            WindowedValue.FullWindowedValueCoder coder = (WindowedValue.FullWindowedValueCoder)WireCoders.instantiateRunnerWireCoder(pcNode, components.build());
            idsToSpec.put((Object)sideInputReference.transform().getId(), (Object)sideInputReference.localName(), (Object)SideInputSpec.of(sideInputReference.transform().getId(), sideInputReference.localName(), ProcessBundleDescriptors.getAccessPattern(sideInputReference), coder.getValueCoder(), coder.getWindowCoder()));
        }
        return idsToSpec.build().rowMap();
    }

    private static RunnerApi.FunctionSpec getAccessPattern(SideInputReference sideInputReference) {
        try {
            return ((RunnerApi.SideInput)RunnerApi.ParDoPayload.parseFrom((ByteString)sideInputReference.transform().getTransform().getSpec().getPayload()).getSideInputsMap().get(sideInputReference.localName())).getAccessPattern();
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
    }

    private static Map<String, Map<String, BagUserStateSpec>> forBagUserStates(ExecutableStage stage, RunnerApi.Components components) throws IOException {
        ImmutableTable.Builder idsToSpec = ImmutableTable.builder();
        for (UserStateReference userStateReference : stage.getUserStates()) {
            WindowedValue.FullWindowedValueCoder coder = (WindowedValue.FullWindowedValueCoder)WireCoders.instantiateRunnerWireCoder(userStateReference.collection(), components);
            idsToSpec.put((Object)userStateReference.transform().getId(), (Object)userStateReference.localName(), BagUserStateSpec.of(userStateReference.transform().getId(), userStateReference.localName(), ByteStringCoder.of(), ByteStringCoder.of(), coder.getWindowCoder()));
        }
        return idsToSpec.build().rowMap();
    }

    private static Map<String, Map<String, TimerSpec>> forTimerSpecs(ExecutableStage stage, RunnerApi.Components.Builder components) throws IOException {
        ImmutableTable.Builder idsToSpec = ImmutableTable.builder();
        for (TimerReference timerReference : stage.getTimers()) {
            org.apache.beam.sdk.state.TimerSpec spec;
            RunnerApi.ParDoPayload payload = RunnerApi.ParDoPayload.parseFrom((ByteString)timerReference.transform().getTransform().getSpec().getPayload());
            RunnerApi.TimerFamilySpec timerFamilySpec = payload.getTimerFamilySpecsOrThrow(timerReference.localName());
            switch (timerFamilySpec.getTimeDomain()) {
                case EVENT_TIME: {
                    spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
                    break;
                }
                case PROCESSING_TIME: {
                    spec = TimerSpecs.timer((TimeDomain)TimeDomain.PROCESSING_TIME);
                    break;
                }
                default: {
                    throw new IllegalArgumentException(String.format("Unknown or unsupported time domain %s", timerFamilySpec.getTimeDomain()));
                }
            }
            for (RunnerApi.ExecutableStagePayload.WireCoderSetting wireCoderSetting : stage.getWireCoderSettings()) {
                if (!wireCoderSetting.hasTimer() || !wireCoderSetting.getTimer().getTransformId().equals(timerReference.transform().getId()) || !wireCoderSetting.getTimer().getLocalName().equals(timerReference.localName())) continue;
                throw new UnsupportedOperationException("WireCoderSetting for timer is yet to be supported.");
            }
            String originalTimerCoderId = timerFamilySpec.getTimerFamilyCoderId();
            String sdkCoderId = LengthPrefixUnknownCoders.addLengthPrefixedCoder(originalTimerCoderId, components, false);
            String runnerCoderId = LengthPrefixUnknownCoders.addLengthPrefixedCoder(originalTimerCoderId, components, true);
            Coder timerCoder = RehydratedComponents.forComponents((RunnerApi.Components)components.build()).getCoder(runnerCoderId);
            Preconditions.checkArgument((boolean)(timerCoder instanceof Timer.Coder), (String)"Expected a timer coder but received %s.", (Object)timerCoder);
            RunnerApi.FunctionSpec.Builder updatedSpec = components.getTransformsOrThrow(timerReference.transform().getId()).toBuilder().getSpecBuilder();
            RunnerApi.ParDoPayload.Builder updatedPayload = RunnerApi.ParDoPayload.parseFrom((ByteString)updatedSpec.getPayload()).toBuilder();
            updatedPayload.putTimerFamilySpecs(timerReference.localName(), updatedPayload.getTimerFamilySpecsOrThrow(timerReference.localName()).toBuilder().setTimerFamilyCoderId(sdkCoderId).build());
            updatedSpec.setPayload(updatedPayload.build().toByteString());
            components.putTransforms(timerReference.transform().getId(), components.getTransformsOrThrow(timerReference.transform().getId()).toBuilder().setSpec(updatedSpec).build());
            idsToSpec.put((Object)timerReference.transform().getId(), (Object)timerReference.localName(), TimerSpec.of(timerReference.transform().getId(), timerReference.localName(), spec, timerCoder));
        }
        return idsToSpec.build().rowMap();
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class ExecutableProcessBundleDescriptor {
        public static ExecutableProcessBundleDescriptor of(BeamFnApi.ProcessBundleDescriptor descriptor, List<RemoteInputDestination> inputDestinations, Map<String, Coder> outputTransformCoders, Map<String, Map<String, SideInputSpec>> sideInputSpecs, Map<String, Map<String, BagUserStateSpec>> bagUserStateSpecs, Map<String, Map<String, TimerSpec>> timerSpecs) {
            ImmutableTable.Builder copyOfSideInputSpecs = ImmutableTable.builder();
            for (Map.Entry<String, Map<String, SideInputSpec>> entry : sideInputSpecs.entrySet()) {
                for (Map.Entry<String, SideInputSpec> entry2 : entry.getValue().entrySet()) {
                    copyOfSideInputSpecs.put((Object)entry.getKey(), (Object)entry2.getKey(), (Object)entry2.getValue());
                }
            }
            ImmutableTable.Builder copyOfBagUserStateSpecs = ImmutableTable.builder();
            for (Map.Entry<String, Map<String, BagUserStateSpec>> entry : bagUserStateSpecs.entrySet()) {
                for (Map.Entry<String, BagUserStateSpec> entry3 : entry.getValue().entrySet()) {
                    copyOfBagUserStateSpecs.put((Object)entry.getKey(), (Object)entry3.getKey(), (Object)entry3.getValue());
                }
            }
            ImmutableTable.Builder builder = ImmutableTable.builder();
            for (Map.Entry<String, Map<String, TimerSpec>> entry : timerSpecs.entrySet()) {
                for (Map.Entry<String, TimerSpec> inner : entry.getValue().entrySet()) {
                    builder.put((Object)entry.getKey(), (Object)inner.getKey(), (Object)inner.getValue());
                }
            }
            return new AutoValue_ProcessBundleDescriptors_ExecutableProcessBundleDescriptor(descriptor, inputDestinations, Collections.unmodifiableMap(outputTransformCoders), (Map<String, Map<String, SideInputSpec>>)copyOfSideInputSpecs.build().rowMap(), (Map<String, Map<String, BagUserStateSpec>>)copyOfBagUserStateSpecs.build().rowMap(), (Map<String, Map<String, TimerSpec>>)builder.build().rowMap());
        }

        public abstract BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor();

        public abstract List<RemoteInputDestination> getRemoteInputDestinations();

        public abstract Map<String, Coder> getRemoteOutputCoders();

        public abstract Map<String, Map<String, SideInputSpec>> getSideInputSpecs();

        public abstract Map<String, Map<String, BagUserStateSpec>> getBagUserStateSpecs();

        public abstract Map<String, Map<String, TimerSpec>> getTimerSpecs();
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class TimerSpec<K, V, W extends BoundedWindow> {
        static <K, V, W extends BoundedWindow> TimerSpec<K, V, W> of(String transformId, String timerId, org.apache.beam.sdk.state.TimerSpec timerSpec, Coder<Timer<K>> coder) {
            return new AutoValue_ProcessBundleDescriptors_TimerSpec(transformId, timerId, timerSpec, coder);
        }

        public abstract String transformId();

        public abstract String timerId();

        public abstract org.apache.beam.sdk.state.TimerSpec getTimerSpec();

        public abstract Coder<K> coder();
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class BagUserStateSpec<K, V, W extends BoundedWindow> {
        static <K, V, W extends BoundedWindow> BagUserStateSpec<K, V, W> of(String transformId, String userStateId, Coder<K> keyCoder, Coder<V> valueCoder, Coder<W> windowCoder) {
            return new AutoValue_ProcessBundleDescriptors_BagUserStateSpec<K, V, W>(transformId, userStateId, keyCoder, valueCoder, windowCoder);
        }

        public abstract String transformId();

        public abstract String userStateId();

        public abstract Coder<K> keyCoder();

        public abstract Coder<V> valueCoder();

        public abstract Coder<W> windowCoder();
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    public static abstract class SideInputSpec<T, W extends BoundedWindow> {
        public static <T, W extends BoundedWindow> SideInputSpec of(String transformId, String sideInputId, RunnerApi.FunctionSpec accessPattern, Coder<T> elementCoder, Coder<W> windowCoder) {
            return new AutoValue_ProcessBundleDescriptors_SideInputSpec<T, W>(transformId, sideInputId, accessPattern, elementCoder, windowCoder);
        }

        public abstract String transformId();

        public abstract String sideInputId();

        public abstract RunnerApi.FunctionSpec accessPattern();

        public abstract Coder<T> elementCoder();

        public abstract Coder<W> windowCoder();
    }

    @AutoValue
    @AutoValue.CopyAnnotations
    static abstract class OutputEncoding {
        OutputEncoding() {
        }

        abstract String getPTransformId();

        abstract Coder<WindowedValue<?>> getCoder();
    }
}

