/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataGrpcMultiplexer2
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataGrpcMultiplexer2.class);
    private final  @Nullable Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final StreamObserver<BeamFnApi.Elements> inboundObserver;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;
    private final ConcurrentMap<String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>> receivers;
    private final ConcurrentMap<String, Boolean> erroredInstructionIds;

    public BeamFnDataGrpcMultiplexer2( @Nullable Endpoints.ApiServiceDescriptor apiServiceDescriptor, OutboundObserverFactory outboundObserverFactory, OutboundObserverFactory.BasicFactory<BeamFnApi.Elements, BeamFnApi.Elements> baseOutboundObserverFactory) {
        this.apiServiceDescriptor = apiServiceDescriptor;
        this.receivers = new ConcurrentHashMap<String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>>();
        this.erroredInstructionIds = new ConcurrentHashMap<String, Boolean>();
        this.inboundObserver = new InboundObserver();
        this.outboundObserver = outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, this.inboundObserver);
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).omitNullValues().add("apiServiceDescriptor", this.apiServiceDescriptor).add("consumers", this.receivers).toString();
    }

    public StreamObserver<BeamFnApi.Elements> getInboundObserver() {
        return this.inboundObserver;
    }

    public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
        return this.outboundObserver;
    }

    private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture(String instructionId) {
        return this.receivers.computeIfAbsent(instructionId, unused -> new CompletableFuture());
    }

    public void registerConsumer(String instructionId, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) {
        this.receiverFuture(instructionId).complete(receiver);
    }

    public void unregisterConsumer(String instructionId) {
        this.receivers.remove(instructionId);
    }

    @VisibleForTesting
    boolean hasConsumer(String instructionId) {
        return this.receivers.containsKey(instructionId);
    }

    @Override
    public void close() throws Exception {
        Exception exception = null;
        for (CompletableFuture receiver : ImmutableList.copyOf(this.receivers.values())) {
            receiver.cancel(true);
            if (receiver.isCompletedExceptionally()) continue;
            try {
                ((CloseableFnDataReceiver)receiver.get()).close();
            }
            catch (Exception e) {
                if (exception == null) {
                    exception = e;
                    continue;
                }
                exception.addSuppressed(e);
            }
        }
        this.outboundObserver.onError(Status.CANCELLED.withDescription("Multiplexer hanging up").asException());
        this.inboundObserver.onCompleted();
        if (exception != null) {
            throw exception;
        }
    }

    private final class InboundObserver
    implements StreamObserver<BeamFnApi.Elements> {
        private InboundObserver() {
        }

        @Override
        public void onNext(BeamFnApi.Elements value) {
            block10: {
                String instructionId = null;
                for (BeamFnApi.Elements.Data data : value.getDataList()) {
                    if (instructionId == null) {
                        instructionId = data.getInstructionId();
                        continue;
                    }
                    if (instructionId.equals(data.getInstructionId())) continue;
                    break block10;
                }
                for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
                    if (instructionId == null) {
                        instructionId = timers.getInstructionId();
                        continue;
                    }
                    if (instructionId.equals(timers.getInstructionId())) continue;
                    break block10;
                }
                if (instructionId == null) {
                    return;
                }
                this.forwardToConsumerForInstructionId(instructionId, value);
                return;
            }
            HashSet<String> instructionIds = new HashSet<String>();
            for (BeamFnApi.Elements.Data data : value.getDataList()) {
                instructionIds.add(data.getInstructionId());
            }
            for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
                instructionIds.add(timers.getInstructionId());
            }
            for (String instructionId : instructionIds) {
                BeamFnApi.Elements.Builder builder = BeamFnApi.Elements.newBuilder();
                for (BeamFnApi.Elements.Data data : value.getDataList()) {
                    if (!instructionId.equals(data.getInstructionId())) continue;
                    builder.addData(data);
                }
                for (BeamFnApi.Elements.Timers timers : value.getTimersList()) {
                    if (!instructionId.equals(timers.getInstructionId())) continue;
                    builder.addTimers(timers);
                }
                this.forwardToConsumerForInstructionId(instructionId, builder.build());
            }
        }

        private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) {
            CloseableFnDataReceiver consumer;
            if (BeamFnDataGrpcMultiplexer2.this.erroredInstructionIds.containsKey(instructionId)) {
                LOG.debug("Ignoring inbound data for failed instruction {}", (Object)instructionId);
                return;
            }
            CompletableFuture consumerFuture = BeamFnDataGrpcMultiplexer2.this.receiverFuture(instructionId);
            if (!consumerFuture.isDone()) {
                LOG.debug("Received data for instruction {} without consumer ready. Waiting for consumer to be registered.", (Object)instructionId);
            }
            try {
                consumer = (CloseableFnDataReceiver)consumerFuture.get();
            }
            catch (InterruptedException | ExecutionException e) {
                LOG.error("Client interrupted during handling of data for instruction {}", (Object)instructionId, (Object)e);
                BeamFnDataGrpcMultiplexer2.this.outboundObserver.onError(e);
                return;
            }
            catch (RuntimeException e) {
                LOG.error("Client failed to handle data for instruction {}", (Object)instructionId, (Object)e);
                BeamFnDataGrpcMultiplexer2.this.outboundObserver.onError(e);
                return;
            }
            try {
                consumer.accept(value);
            }
            catch (Exception e) {
                BeamFnDataGrpcMultiplexer2.this.erroredInstructionIds.put(instructionId, true);
            }
        }

        @Override
        public void onError(Throwable t) {
            LOG.error("Failed to handle for {}", BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor == null ? "unknown endpoint" : BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor, (Object)t);
            BeamFnDataGrpcMultiplexer2.this.outboundObserver.onCompleted();
        }

        @Override
        public void onCompleted() {
            LOG.warn("Hanged up for {}.", BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor == null ? "unknown endpoint" : BeamFnDataGrpcMultiplexer2.this.apiServiceDescriptor);
            BeamFnDataGrpcMultiplexer2.this.outboundObserver.onCompleted();
        }
    }
}

