/*
 * Decompiled with CFR 0.152.
 */
package io.dapr.client;

import io.dapr.client.SubscriptionListener;
import io.dapr.client.domain.CloudEvent;
import io.dapr.exceptions.DaprException;
import io.dapr.v1.DaprAppCallbackProtos;
import io.dapr.v1.DaprGrpc;
import io.dapr.v1.DaprProtos;
import io.grpc.stub.StreamObserver;
import java.io.Closeable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.annotation.Nonnull;
import reactor.core.publisher.Mono;

public class Subscription<T>
implements Closeable {
    private final BlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1> ackQueue = new LinkedBlockingQueue<DaprProtos.SubscribeTopicEventsRequestAlpha1>(50);
    private final AtomicBoolean running = new AtomicBoolean(true);
    private final Semaphore receiverStateChange = new Semaphore(0);
    private Thread acker;
    private Thread receiver;

    Subscription(DaprGrpc.DaprStub asyncStub, DaprProtos.SubscribeTopicEventsRequestAlpha1 request, final SubscriptionListener<T> listener, final Function<DaprProtos.SubscribeTopicEventsResponseAlpha1, CloudEvent<T>> cloudEventConverter) {
        final AtomicReference streamRef = new AtomicReference();
        this.acker = new Thread(() -> {
            while (this.running.get()) {
                try {
                    DaprProtos.SubscribeTopicEventsRequestAlpha1 ackResponse = this.ackQueue.take();
                    if (ackResponse == null) continue;
                    StreamObserver stream = (StreamObserver)streamRef.get();
                    if (stream == null) {
                        Thread.sleep(1000L);
                        continue;
                    }
                    stream.onNext((Object)ackResponse);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
                catch (Exception e) {
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException ex) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
        });
        this.receiver = new Thread(() -> {
            while (this.running.get()) {
                StreamObserver stream = asyncStub.subscribeTopicEventsAlpha1((StreamObserver)new StreamObserver<DaprProtos.SubscribeTopicEventsResponseAlpha1>(){

                    public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 topicEventRequest) {
                        try {
                            StreamObserver stream = (StreamObserver)streamRef.get();
                            if (stream == null) {
                                throw new RuntimeException("Cannot receive event: streaming subscription is not initialized.");
                            }
                            CloudEvent cloudEvent = (CloudEvent)cloudEventConverter.apply(topicEventRequest);
                            if (cloudEvent == null) {
                                return;
                            }
                            String id = cloudEvent.getId();
                            if (id == null || id.isEmpty()) {
                                return;
                            }
                            Subscription.onEvent(listener, cloudEvent).subscribe(status -> {
                                DaprProtos.SubscribeTopicEventsRequestAlpha1 ack = Subscription.buildAckRequest(id, status);
                                try {
                                    Subscription.this.ackQueue.put(ack);
                                }
                                catch (InterruptedException e) {
                                    throw new RuntimeException(e);
                                }
                            });
                        }
                        catch (Exception e) {
                            this.onError(DaprException.propagate(e));
                        }
                    }

                    public void onError(Throwable throwable) {
                        listener.onError(DaprException.propagate(throwable));
                    }

                    public void onCompleted() {
                        Subscription.this.receiverStateChange.release();
                    }
                });
                streamRef.set(stream);
                stream.onNext((Object)request);
                try {
                    this.receiverStateChange.acquire();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.running.set(false);
                }
            }
        });
    }

    private static <T> Mono<SubscriptionListener.Status> onEvent(SubscriptionListener<T> listener, CloudEvent<T> cloudEvent) {
        return listener.onEvent(cloudEvent).onErrorMap(t -> {
            RuntimeException exception = DaprException.propagate(t);
            listener.onError(exception);
            return exception;
        }).onErrorReturn((Object)SubscriptionListener.Status.RETRY);
    }

    @Nonnull
    private static DaprProtos.SubscribeTopicEventsRequestAlpha1 buildAckRequest(String id, SubscriptionListener.Status status) {
        DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1 eventProcessed = DaprProtos.SubscribeTopicEventsRequestProcessedAlpha1.newBuilder().setId(id).setStatus(DaprAppCallbackProtos.TopicEventResponse.newBuilder().setStatus(DaprAppCallbackProtos.TopicEventResponse.TopicEventResponseStatus.valueOf((String)status.name())).build()).build();
        DaprProtos.SubscribeTopicEventsRequestAlpha1 ack = DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder().setEventProcessed(eventProcessed).build();
        return ack;
    }

    void start() {
        this.receiver.start();
        this.acker.start();
    }

    @Override
    public void close() {
        this.running.set(false);
        this.receiverStateChange.release();
        this.acker.interrupt();
    }

    public void awaitTermination() throws InterruptedException {
        this.receiver.join();
        this.acker.join();
    }
}

