/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.broker.rsocket;

import io.netty.util.ReferenceCountUtil;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.broker.rsocket.FluxDeferredResolution;
import io.rsocket.broker.rsocket.MonoDeferredResolution;
import io.rsocket.broker.rsocket.ResolvingOperator;
import io.rsocket.frame.FrameType;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

public final class ResolvingRSocket
extends ResolvingOperator<RSocket>
implements CoreSubscriber<RSocket>,
RSocket {
    final Publisher<RSocket> rSocketMono;
    volatile Subscription s;
    static final AtomicReferenceFieldUpdater<ResolvingRSocket, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(ResolvingRSocket.class, Subscription.class, "s");

    public ResolvingRSocket(Publisher<RSocket> rSocketMono) {
        this.rSocketMono = rSocketMono;
    }

    public void onSubscribe(Subscription s) {
        if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
            s.request(Long.MAX_VALUE);
        }
    }

    public void onComplete() {
        Subscription s = this.s;
        if (s == Operators.cancelledSubscription() || !S.compareAndSet(this, s, null)) {
            this.doFinally();
            return;
        }
        if (this.value == null) {
            this.terminate(new IllegalStateException("Source completed empty"));
        } else {
            this.complete(this.value);
        }
    }

    public void onError(Throwable t) {
        Subscription s = this.s;
        if (s == Operators.cancelledSubscription() || S.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) {
            this.doFinally();
            Operators.onErrorDropped((Throwable)t, (Context)Context.empty());
            return;
        }
        this.doFinally();
        this.terminate(t);
    }

    public void onNext(RSocket value) {
        if (this.s == Operators.cancelledSubscription()) {
            this.doOnValueExpired(value);
            return;
        }
        this.value = value;
        this.doFinally();
    }

    @Override
    protected void doSubscribe() {
        this.rSocketMono.subscribe((Subscriber)this);
    }

    @Override
    protected void doOnValueResolved(RSocket value) {
        value.onClose().subscribe(null, t -> this.invalidate(), this::invalidate);
    }

    @Override
    protected void doOnValueExpired(RSocket value) {
        value.dispose();
        this.dispose();
    }

    @Override
    protected void doOnDispose() {
        Operators.terminate(S, (Object)this);
    }

    public Mono<Void> fireAndForget(Payload payload) {
        return new MonoInner<Void>(this, payload, FrameType.REQUEST_FNF);
    }

    public Mono<Payload> requestResponse(Payload payload) {
        return new MonoInner<Payload>(this, payload, FrameType.REQUEST_RESPONSE);
    }

    public Flux<Payload> requestStream(Payload payload) {
        return new FluxInner<Payload>(this, payload, FrameType.REQUEST_STREAM);
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return new FluxInner<Publisher<Payload>>(this, payloads, FrameType.REQUEST_CHANNEL);
    }

    public Mono<Void> metadataPush(Payload payload) {
        return new MonoInner<Void>(this, payload, FrameType.METADATA_PUSH);
    }

    public double availability() {
        RSocket rSocket = (RSocket)this.valueIfResolved();
        return rSocket != null ? rSocket.availability() : 0.0;
    }

    static final class FluxInner<INPUT>
    extends FluxDeferredResolution<INPUT, RSocket> {
        FluxInner(ResolvingRSocket parent, INPUT fluxOrPayload, FrameType requestType) {
            super(parent, fluxOrPayload, requestType);
        }

        @Override
        public void accept(RSocket rSocket, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                if (this.requestType == FrameType.REQUEST_STREAM) {
                    ReferenceCountUtil.safeRelease((Object)this.fluxOrPayload);
                }
                this.onError(t);
                return;
            }
            if (rSocket != null) {
                Flux source;
                switch (this.requestType) {
                    case REQUEST_STREAM: {
                        source = rSocket.requestStream((Payload)this.fluxOrPayload);
                        break;
                    }
                    case REQUEST_CHANNEL: {
                        source = rSocket.requestChannel((Publisher)((Flux)this.fluxOrPayload));
                        break;
                    }
                    default: {
                        Operators.error((Subscriber)this.actual, (Throwable)new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                source.subscribe((CoreSubscriber)this);
            } else {
                this.parent.add(this);
            }
        }
    }

    static final class MonoInner<RESULT>
    extends MonoDeferredResolution<RESULT, RSocket> {
        MonoInner(ResolvingRSocket parent, Payload payload, FrameType requestType) {
            super(parent, payload, requestType);
        }

        @Override
        public void accept(RSocket rSocket, Throwable t) {
            if (this.isTerminated()) {
                return;
            }
            if (t != null) {
                ReferenceCountUtil.safeRelease((Object)this.payload);
                this.onError(t);
                return;
            }
            if (rSocket != null) {
                Mono source;
                switch (this.requestType) {
                    case REQUEST_FNF: {
                        source = rSocket.fireAndForget(this.payload);
                        break;
                    }
                    case REQUEST_RESPONSE: {
                        source = rSocket.requestResponse(this.payload);
                        break;
                    }
                    case METADATA_PUSH: {
                        source = rSocket.metadataPush(this.payload);
                        break;
                    }
                    default: {
                        Operators.error((Subscriber)this.actual, (Throwable)new IllegalStateException("Should never happen"));
                        return;
                    }
                }
                source.subscribe((CoreSubscriber)this);
            } else {
                this.parent.add(this);
            }
        }
    }
}

