/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.resume;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.CharsetUtil;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketErrorException;
import io.rsocket.exceptions.ConnectionErrorException;
import io.rsocket.frame.ErrorFrameCodec;
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.internal.UnboundedProcessor;
import io.rsocket.resume.ResumableFramesStore;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;
import reactor.util.annotation.Nullable;

public class ResumableDuplexConnection
extends Flux<ByteBuf>
implements DuplexConnection,
Subscription {
    static final Logger logger = LoggerFactory.getLogger(ResumableDuplexConnection.class);
    final String side;
    final String session;
    final ResumableFramesStore resumableFramesStore;
    final UnboundedProcessor savableFramesSender;
    final Sinks.Empty<Void> onQueueClose;
    final Sinks.Empty<Void> onLastConnectionClose;
    final SocketAddress remoteAddress;
    final Sinks.Many<Integer> onConnectionClosedSink;
    CoreSubscriber<? super ByteBuf> receiveSubscriber;
    FrameReceivingSubscriber activeReceivingSubscriber;
    volatile int state;
    static final AtomicIntegerFieldUpdater<ResumableDuplexConnection> STATE = AtomicIntegerFieldUpdater.newUpdater(ResumableDuplexConnection.class, "state");
    volatile DuplexConnection activeConnection;
    static final AtomicReferenceFieldUpdater<ResumableDuplexConnection, DuplexConnection> ACTIVE_CONNECTION = AtomicReferenceFieldUpdater.newUpdater(ResumableDuplexConnection.class, DuplexConnection.class, "activeConnection");
    int connectionIndex = 0;

    public ResumableDuplexConnection(String side, ByteBuf session, DuplexConnection initialConnection, ResumableFramesStore resumableFramesStore) {
        this.side = side;
        this.session = session.toString(CharsetUtil.UTF_8);
        this.onConnectionClosedSink = Sinks.unsafe().many().unicast().onBackpressureBuffer();
        this.resumableFramesStore = resumableFramesStore;
        this.onQueueClose = Sinks.unsafe().empty();
        this.onLastConnectionClose = Sinks.unsafe().empty();
        this.savableFramesSender = new UnboundedProcessor(() -> this.onQueueClose.tryEmitEmpty());
        this.remoteAddress = initialConnection.remoteAddress();
        resumableFramesStore.saveFrames(this.savableFramesSender).subscribe();
        ACTIVE_CONNECTION.lazySet(this, initialConnection);
    }

    public boolean connect(DuplexConnection nextConnection) {
        DuplexConnection activeConnection = this.activeConnection;
        if (activeConnection != DisposedConnection.INSTANCE && ACTIVE_CONNECTION.compareAndSet(this, activeConnection, nextConnection)) {
            if (!activeConnection.isDisposed()) {
                activeConnection.sendErrorAndClose(new ConnectionErrorException("Connection unexpectedly replaced"));
            }
            this.initConnection(nextConnection);
            return true;
        }
        return false;
    }

    void initConnection(DuplexConnection nextConnection) {
        int nextConnectionIndex = this.connectionIndex + 1;
        FrameReceivingSubscriber frameReceivingSubscriber = new FrameReceivingSubscriber(this.side, this.resumableFramesStore, this.receiveSubscriber);
        this.connectionIndex = nextConnectionIndex;
        this.activeReceivingSubscriber = frameReceivingSubscriber;
        if (logger.isDebugEnabled()) {
            logger.debug("Side[{}]|Session[{}]|DuplexConnection[{}]. Connecting", new Object[]{this.side, this.session, this.connectionIndex});
        }
        Disposable resumeStreamSubscription = this.resumableFramesStore.resumeStream().subscribe(f -> nextConnection.sendFrame(FrameHeaderCodec.streamId(f), (ByteBuf)f), t -> {
            this.dispose(nextConnection, (Throwable)t);
            nextConnection.sendErrorAndClose(new ConnectionErrorException(t.getMessage(), (Throwable)t));
        }, () -> {
            ConnectionErrorException e = new ConnectionErrorException("Connection Closed Unexpectedly");
            this.dispose(nextConnection, e);
            nextConnection.sendErrorAndClose(e);
        });
        nextConnection.receive().subscribe((CoreSubscriber)frameReceivingSubscriber);
        nextConnection.onClose().doFinally(__ -> {
            Sinks.EmitResult result;
            frameReceivingSubscriber.dispose();
            resumeStreamSubscription.dispose();
            if (logger.isDebugEnabled()) {
                logger.debug("Side[{}]|Session[{}]|DuplexConnection[{}]. Disconnected", new Object[]{this.side, this.session, this.connectionIndex});
            }
            if (!(result = this.onConnectionClosedSink.tryEmitNext((Object)nextConnectionIndex)).equals((Object)Sinks.EmitResult.OK)) {
                logger.error("Side[{}]|Session[{}]|DuplexConnection[{}]. Failed to notify session of closed connection: {}", new Object[]{this.side, this.session, this.connectionIndex, result});
            }
        }).subscribe();
    }

    public void disconnect() {
        DuplexConnection activeConnection = this.activeConnection;
        if (activeConnection != DisposedConnection.INSTANCE && !activeConnection.isDisposed()) {
            activeConnection.dispose();
        }
    }

    @Override
    public void sendFrame(int streamId, ByteBuf frame) {
        if (streamId == 0) {
            this.savableFramesSender.tryEmitPrioritized(frame);
        } else {
            this.savableFramesSender.tryEmitNormal(frame);
        }
    }

    Flux<Integer> onActiveConnectionClosed() {
        return this.onConnectionClosedSink.asFlux();
    }

    @Override
    public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
        DuplexConnection activeConnection = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
        if (activeConnection == DisposedConnection.INSTANCE) {
            return;
        }
        this.savableFramesSender.tryEmitFinal(ErrorFrameCodec.encode(activeConnection.alloc(), 0, rSocketErrorException));
        activeConnection.onClose().subscribe(null, t -> {
            this.onConnectionClosedSink.tryEmitComplete();
            this.onLastConnectionClose.tryEmitEmpty();
        }, () -> {
            this.onConnectionClosedSink.tryEmitComplete();
            Throwable cause = rSocketErrorException.getCause();
            if (cause == null) {
                this.onLastConnectionClose.tryEmitEmpty();
            } else {
                this.onLastConnectionClose.tryEmitError(cause);
            }
        });
    }

    @Override
    public Flux<ByteBuf> receive() {
        return this;
    }

    @Override
    public ByteBufAllocator alloc() {
        return this.activeConnection.alloc();
    }

    @Override
    public Mono<Void> onClose() {
        return Mono.whenDelayError((Publisher[])new Publisher[]{this.onQueueClose.asMono(), this.resumableFramesStore.onClose(), this.onLastConnectionClose.asMono()});
    }

    public void dispose() {
        DuplexConnection activeConnection = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
        if (activeConnection == DisposedConnection.INSTANCE) {
            return;
        }
        this.savableFramesSender.onComplete();
        activeConnection.onClose().subscribe(null, t -> {
            this.onConnectionClosedSink.tryEmitComplete();
            this.onLastConnectionClose.tryEmitEmpty();
        }, () -> {
            this.onConnectionClosedSink.tryEmitComplete();
            this.onLastConnectionClose.tryEmitEmpty();
        });
    }

    void dispose(DuplexConnection nextConnection, @Nullable Throwable e) {
        DuplexConnection activeConnection = ACTIVE_CONNECTION.getAndSet(this, DisposedConnection.INSTANCE);
        if (activeConnection == DisposedConnection.INSTANCE) {
            return;
        }
        this.savableFramesSender.onComplete();
        nextConnection.onClose().subscribe(null, t -> {
            if (e != null) {
                this.onLastConnectionClose.tryEmitError(e);
            } else {
                this.onLastConnectionClose.tryEmitEmpty();
            }
            this.onConnectionClosedSink.tryEmitComplete();
        }, () -> {
            if (e != null) {
                this.onLastConnectionClose.tryEmitError(e);
            } else {
                this.onLastConnectionClose.tryEmitEmpty();
            }
            this.onConnectionClosedSink.tryEmitComplete();
        });
    }

    public boolean isDisposed() {
        return (Boolean)this.onQueueClose.scan(Scannable.Attr.TERMINATED) != false || (Boolean)this.onQueueClose.scan(Scannable.Attr.CANCELLED) != false;
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.remoteAddress;
    }

    public void request(long n) {
        if (this.state == 1 && STATE.compareAndSet(this, 1, 2)) {
            this.initConnection(this.activeConnection);
        }
    }

    public void cancel() {
        this.dispose();
    }

    public void subscribe(CoreSubscriber<? super ByteBuf> receiverSubscriber) {
        if (this.state == 0 && STATE.compareAndSet(this, 0, 1)) {
            this.receiveSubscriber = receiverSubscriber;
            receiverSubscriber.onSubscribe((Subscription)this);
        }
    }

    static boolean isResumableFrame(ByteBuf frame) {
        return FrameHeaderCodec.streamId(frame) != 0;
    }

    public String toString() {
        return "ResumableDuplexConnection{side='" + this.side + '\'' + ", session='" + this.session + '\'' + ", remoteAddress=" + this.remoteAddress + ", state=" + this.state + ", activeConnection=" + this.activeConnection + ", connectionIndex=" + this.connectionIndex + '}';
    }

    private static final class FrameReceivingSubscriber
    implements CoreSubscriber<ByteBuf>,
    Disposable {
        final ResumableFramesStore resumableFramesStore;
        final CoreSubscriber<? super ByteBuf> actual;
        final String tag;
        volatile Subscription s;
        static final AtomicReferenceFieldUpdater<FrameReceivingSubscriber, Subscription> S = AtomicReferenceFieldUpdater.newUpdater(FrameReceivingSubscriber.class, Subscription.class, "s");
        boolean cancelled;

        private FrameReceivingSubscriber(String tag, ResumableFramesStore store, CoreSubscriber<? super ByteBuf> actual) {
            this.tag = tag;
            this.resumableFramesStore = store;
            this.actual = actual;
        }

        public void onSubscribe(Subscription s) {
            if (Operators.setOnce(S, (Object)this, (Subscription)s)) {
                s.request(Long.MAX_VALUE);
            }
        }

        public void onNext(ByteBuf frame) {
            if (this.cancelled || this.s == Operators.cancelledSubscription()) {
                return;
            }
            if (ResumableDuplexConnection.isResumableFrame(frame)) {
                if (this.resumableFramesStore.resumableFrameReceived(frame)) {
                    this.actual.onNext((Object)frame);
                }
                return;
            }
            this.actual.onNext((Object)frame);
        }

        public void onError(Throwable t) {
            Operators.set(S, (Object)this, (Subscription)Operators.cancelledSubscription());
        }

        public void onComplete() {
            Operators.set(S, (Object)this, (Subscription)Operators.cancelledSubscription());
        }

        public void dispose() {
            this.cancelled = true;
            Operators.terminate(S, (Object)this);
        }

        public boolean isDisposed() {
            return this.cancelled || this.s == Operators.cancelledSubscription();
        }
    }

    private static final class DisposedConnection
    implements DuplexConnection {
        static final DisposedConnection INSTANCE = new DisposedConnection();

        private DisposedConnection() {
        }

        public void dispose() {
        }

        @Override
        public Mono<Void> onClose() {
            return Mono.never();
        }

        @Override
        public void sendFrame(int streamId, ByteBuf frame) {
        }

        @Override
        public Flux<ByteBuf> receive() {
            return Flux.never();
        }

        @Override
        public void sendErrorAndClose(RSocketErrorException e) {
        }

        @Override
        public ByteBufAllocator alloc() {
            return ByteBufAllocator.DEFAULT;
        }

        @Override
        public SocketAddress remoteAddress() {
            return null;
        }
    }
}

