/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.control;

import java.util.EnumMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.beam.fn.harness.logging.BeamFnLoggingMDC;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
import org.apache.beam.sdk.function.ThrowingFunction;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Channel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.Status;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnControlClient {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BeamFnControlClient.class);
    private final @UnknownKeyFor @NonNull @Initialized StreamObserver< @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse> outboundObserver;
    private final @UnknownKeyFor @NonNull @Initialized EnumMap< @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest.RequestCase, @UnknownKeyFor @NonNull @Initialized ThrowingFunction< @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest,  @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder>> handlers;
    private final @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Object> onFinish = new CompletableFuture();
    private static final @UnknownKeyFor @NonNull @Initialized Object COMPLETED = new Object();

    public BeamFnControlClient(// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized Endpoints.ApiServiceDescriptor apiServiceDescriptor, @UnknownKeyFor @NonNull @Initialized ManagedChannelFactory channelFactory, @UnknownKeyFor @NonNull @Initialized OutboundObserverFactory outboundObserverFactory, @UnknownKeyFor @NonNull @Initialized Executor executor, @UnknownKeyFor @NonNull @Initialized EnumMap< @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest.RequestCase, @UnknownKeyFor @NonNull @Initialized ThrowingFunction< @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest,  @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder>> handlers) {
        this(BeamFnControlGrpc.newStub((Channel)channelFactory.forDescriptor(apiServiceDescriptor)), outboundObserverFactory, executor, handlers);
    }

    public BeamFnControlClient(@UnknownKeyFor @NonNull @Initialized BeamFnControlGrpc.BeamFnControlStub controlStub, @UnknownKeyFor @NonNull @Initialized OutboundObserverFactory outboundObserverFactory, @UnknownKeyFor @NonNull @Initialized Executor executor, @UnknownKeyFor @NonNull @Initialized EnumMap< @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest.RequestCase, @UnknownKeyFor @NonNull @Initialized ThrowingFunction< @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest,  @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder>> handlers) {
        this.handlers = handlers;
        this.outboundObserver = outboundObserverFactory.outboundObserverFor(controlStub::control, new InboundObserver(executor));
    }

    public @UnknownKeyFor @NonNull @Initialized CompletableFuture<@UnknownKeyFor @NonNull @Initialized Object> terminationFuture() {
        return this.onFinish;
    }

    public  @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse delegateOnInstructionRequestType( @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest value) {
        try {
            return ((BeamFnApi.InstructionResponse.Builder)this.handlers.getOrDefault((Object)value.getRequestCase(), (ThrowingFunction<BeamFnApi.InstructionRequest, BeamFnApi.InstructionResponse.Builder>)((ThrowingFunction)this::missingHandler)).apply((Object)value)).setInstructionId(value.getInstructionId()).build();
        }
        catch (Exception e) {
            LOG.error("Exception while trying to handle {} {}", new Object[]{BeamFnApi.InstructionRequest.class.getSimpleName(), value.getInstructionId(), e});
            return BeamFnApi.InstructionResponse.newBuilder().setInstructionId(value.getInstructionId()).setError(Throwables.getStackTraceAsString((Throwable)e)).build();
        }
        catch (Error e) {
            LOG.error("Error thrown when handling {} {}", new Object[]{BeamFnApi.InstructionRequest.class.getSimpleName(), value.getInstructionId(), e});
            throw e;
        }
    }

    public void sendInstructionResponse( @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse value) {
        LOG.debug("Sending InstructionResponse {}", (Object)value);
        this.outboundObserver.onNext((Object)value);
    }

    private void sendErrorResponse(@UnknownKeyFor @NonNull @Initialized Error e) {
        this.onFinish.completeExceptionally(e);
        this.outboundObserver.onError((Throwable)Status.INTERNAL.withDescription(String.format("%s: %s", e.getClass().getName(), e.getMessage())).asException());
    }

    private  @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionResponse.Builder missingHandler( @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest request) {
        return BeamFnApi.InstructionResponse.newBuilder().setError(String.format("Unknown InstructionRequest type %s", new Object[]{request.getRequestCase()}));
    }

    private class InboundObserver
    implements StreamObserver<BeamFnApi.InstructionRequest> {
        private final @UnknownKeyFor @NonNull @Initialized Executor executor;

        InboundObserver(Executor executorService) {
            this.executor = executorService;
        }

        public void onNext( @UnknownKeyFor @NonNull @Initialized BeamFnApi.InstructionRequest request) {
            try {
                BeamFnLoggingMDC.setInstructionId(request.getInstructionId());
                LOG.debug("Received InstructionRequest {}", (Object)request);
                this.executor.execute(() -> {
                    try {
                        BeamFnLoggingMDC.setInstructionId(request.getInstructionId());
                        BeamFnApi.InstructionResponse response = BeamFnControlClient.this.delegateOnInstructionRequestType(request);
                        BeamFnControlClient.this.sendInstructionResponse(response);
                    }
                    catch (Error e) {
                        BeamFnControlClient.this.sendErrorResponse(e);
                        throw e;
                    }
                    finally {
                        BeamFnLoggingMDC.reset();
                    }
                });
            }
            finally {
                BeamFnLoggingMDC.reset();
            }
        }

        public void onError(@UnknownKeyFor @NonNull @Initialized Throwable t2) {
            BeamFnControlClient.this.onFinish.completeExceptionally(t2);
        }

        public void onCompleted() {
            BeamFnControlClient.this.onFinish.complete(COMPLETED);
        }
    }
}

