/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness;

import java.io.IOException;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateBackedIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;

public class BeamFnDataWriteRunner<InputT> {
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final String pTransformId;
    private final Coder<WindowedValue<InputT>> coder;
    private final BeamFnDataClient beamFnDataClientFactory;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private CloseableFnDataReceiver<WindowedValue<InputT>> consumer;

    BeamFnDataWriteRunner(final Supplier<Cache<?, ?>> cache, String pTransformId, RunnerApi.PTransform remoteWriteNode, final Supplier<String> processBundleInstructionIdSupplier, Map<String, RunnerApi.Coder> coders, BeamFnDataClient beamFnDataClientFactory, final BeamFnStateClient beamFnStateClient) throws IOException {
        this.pTransformId = pTransformId;
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortWrite.fromPTransform(remoteWriteNode).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.beamFnDataClientFactory = beamFnDataClientFactory;
        this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
        RehydratedComponents components = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(coders).build());
        this.coder = CoderTranslation.fromProto(coders.get(port.getCoderId()), components, new StateBackedIterable.StateBackedIterableTranslationContext(){

            @Override
            public Supplier<Cache<?, ?>> getCache() {
                return cache;
            }

            @Override
            public BeamFnStateClient getStateClient() {
                return beamFnStateClient;
            }

            @Override
            public Supplier<String> getCurrentInstructionId() {
                return processBundleInstructionIdSupplier;
            }
        });
    }

    public void registerForOutput() {
        this.consumer = this.beamFnDataClientFactory.send(this.apiServiceDescriptor, LogicalEndpoint.data(this.processBundleInstructionIdSupplier.get(), this.pTransformId), this.coder);
    }

    public void close() throws Exception {
        this.consumer.close();
    }

    public void consume(WindowedValue<InputT> value) throws Exception {
        this.consumer.accept(value);
    }

    static class Factory<InputT>
    implements PTransformRunnerFactory<BeamFnDataWriteRunner<InputT>> {
        Factory() {
        }

        @Override
        public BeamFnDataWriteRunner<InputT> createRunnerForPTransform(PTransformRunnerFactory.Context context) throws IOException {
            BeamFnDataWriteRunner runner = new BeamFnDataWriteRunner(context.getBundleCacheSupplier(), context.getPTransformId(), context.getPTransform(), context.getProcessBundleInstructionIdSupplier(), context.getCoders(), context.getBeamFnDataClient(), context.getBeamFnStateClient());
            context.addStartBundleFunction(runner::registerForOutput);
            context.addPCollectionConsumer(Iterables.getOnlyElement(context.getPTransform().getInputsMap().values()), runner::consume, ((WindowedValue.WindowedValueCoder)runner.coder).getValueCoder());
            context.addFinishBundleFunction(runner::close);
            return runner;
        }
    }

    public static class Registrar
    implements PTransformRunnerFactory.Registrar {
        @Override
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of("beam:runner:sink:v1", new Factory());
        }
    }
}

