/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.data;

import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer2;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.ManagedChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataGrpcClient
implements BeamFnDataClient {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcClient.class);
    private final ConcurrentMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer2> cache;
    private final Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory;
    private final OutboundObserverFactory outboundObserverFactory;
    private final PipelineOptions options;

    public BeamFnDataGrpcClient(PipelineOptions options, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> channelFactory, OutboundObserverFactory outboundObserverFactory) {
        this.options = options;
        this.channelFactory = channelFactory;
        this.outboundObserverFactory = outboundObserverFactory;
        this.cache = new ConcurrentHashMap<Endpoints.ApiServiceDescriptor, BeamFnDataGrpcMultiplexer2>();
    }

    @Override
    public void registerReceiver(String instructionId, List<Endpoints.ApiServiceDescriptor> apiServiceDescriptors, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) {
        LOG.debug("Registering consumer for {}", (Object)instructionId);
        int size = apiServiceDescriptors.size();
        for (int i = 0; i < size; ++i) {
            BeamFnDataGrpcMultiplexer2 client = this.getClientFor(apiServiceDescriptors.get(i));
            client.registerConsumer(instructionId, receiver);
        }
    }

    @Override
    public void unregisterReceiver(String instructionId, List<Endpoints.ApiServiceDescriptor> apiServiceDescriptors) {
        LOG.debug("Unregistering consumer for {}", (Object)instructionId);
        int size = apiServiceDescriptors.size();
        for (int i = 0; i < size; ++i) {
            BeamFnDataGrpcMultiplexer2 client = this.getClientFor(apiServiceDescriptors.get(i));
            client.unregisterConsumer(instructionId);
        }
    }

    @Override
    public <T> CloseableFnDataReceiver<T> send(Endpoints.ApiServiceDescriptor apiServiceDescriptor, LogicalEndpoint outputLocation, Coder<T> coder) {
        BeamFnDataGrpcMultiplexer2 client = this.getClientFor(apiServiceDescriptor);
        LOG.debug("Creating output consumer for {}", (Object)outputLocation);
        return BeamFnDataBufferingOutboundObserver.forLocation(this.options, outputLocation, coder, client.getOutboundObserver());
    }

    private BeamFnDataGrpcMultiplexer2 getClientFor(Endpoints.ApiServiceDescriptor apiServiceDescriptor) {
        return this.cache.computeIfAbsent(apiServiceDescriptor, descriptor -> new BeamFnDataGrpcMultiplexer2((Endpoints.ApiServiceDescriptor)descriptor, this.outboundObserverFactory, BeamFnDataGrpc.newStub(this.channelFactory.apply(apiServiceDescriptor))::data));
    }
}

