/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.reactive;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.util.VertxUtils;
import io.vertx.core.Context;
import io.vertx.core.Future;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BasePublisher<T>
implements Publisher<T> {
    private static final Logger log = LoggerFactory.getLogger(BasePublisher.class);
    protected final Context ctx;
    private volatile Subscriber<? super T> subscriber;
    private long demand;
    private boolean cancelled;
    private boolean sentComplete;
    private volatile Throwable failure;

    @SuppressFBWarnings(value={"EI_EXPOSE_REP2"}, justification="ctx should be mutable")
    public BasePublisher(Context ctx) {
        this.ctx = Objects.requireNonNull(ctx);
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        if (this.isFailed()) {
            throw new IllegalStateException("Cannot subscribe to failed publisher. Failure cause: " + this.failure);
        }
        Objects.requireNonNull(subscriber);
        if (VertxUtils.isEventLoopAndSameContext(this.ctx)) {
            this.doSubscribe(subscriber);
        } else {
            this.ctx.runOnContext(v -> this.doSubscribe(subscriber));
        }
    }

    public Future<Void> close() {
        this.ctx.runOnContext(v -> this.doClose());
        return Future.succeededFuture();
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP"}, justification="ctx should be mutable")
    public Context getContext() {
        return this.ctx;
    }

    protected void checkContext() {
        VertxUtils.checkContext(this.ctx);
    }

    protected final void sendError(Throwable e) {
        this.checkContext();
        try {
            if (this.subscriber != null) {
                this.subscriber.onError(e);
            } else {
                log.error("Failure in publisher", e);
            }
            this.failure = e;
        }
        catch (Exception ex) {
            this.logError("Exception encountered in onError", ex);
        }
    }

    protected void sendComplete() {
        try {
            this.sentComplete = true;
            if (this.subscriber != null) {
                this.subscriber.onComplete();
            }
        }
        catch (Exception ex) {
            this.logError("Exception encountered in onComplete", ex);
        }
    }

    protected void doOnNext(T val) {
        if (!this.beforeOnNext()) {
            return;
        }
        try {
            this.subscriber.onNext(val);
        }
        catch (Exception ex) {
            this.logError("Exception encountered in onNext", ex);
        }
        if (this.demand != Long.MAX_VALUE) {
            --this.demand;
        }
    }

    protected long getDemand() {
        return this.demand;
    }

    public Subscriber<? super T> getSubscriber() {
        return this.subscriber;
    }

    protected boolean hasSentComplete() {
        return this.sentComplete;
    }

    protected boolean isCancelled() {
        return this.cancelled;
    }

    protected boolean isFailed() {
        return this.failure != null;
    }

    protected abstract void maybeSend();

    protected boolean beforeOnNext() {
        return true;
    }

    protected void afterSubscribe() {
    }

    private void doSubscribe(Subscriber<? super T> subscriber) {
        this.subscriber = subscriber;
        try {
            subscriber.onSubscribe((Subscription)new Sub());
        }
        catch (Throwable t) {
            this.sendError(new IllegalStateException("Exception encountered in onSubscribe", t));
        }
        if (this.isFailed()) {
            this.sendError(new IllegalStateException("Cannot subscribe to failed publisher. Failure cause: " + this.failure));
        } else if (this.hasSentComplete()) {
            this.sendComplete();
        }
        this.afterSubscribe();
    }

    private void doClose() {
        if (this.subscriber != null) {
            this.sendComplete();
        }
    }

    private void doRequest(long n) {
        if (n <= 0L) {
            this.sendError(new IllegalArgumentException("Amount requested must be > 0"));
        } else if (this.demand + n < 1L) {
            this.demand = Long.MAX_VALUE;
            this.maybeSend();
        } else {
            this.demand += n;
            this.maybeSend();
        }
    }

    private void doCancel() {
        this.cancelled = true;
        this.subscriber = null;
    }

    private void logError(String message, Exception e) {
        log.error(message, (Throwable)e);
        this.failure = e;
    }

    private class Sub
    implements Subscription {
        private Sub() {
        }

        public void request(long n) {
            BasePublisher.this.ctx.runOnContext(v -> BasePublisher.this.doRequest(n));
        }

        public void cancel() {
            BasePublisher.this.ctx.runOnContext(v -> BasePublisher.this.doCancel());
        }
    }
}

