/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.MessageHandler;
import io.nats.client.Subscription;
import io.nats.client.impl.ConsumerMessageQueue;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsConsumer;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.NatsSubscription;
import io.nats.client.impl.NatsSubscriptionFactory;
import io.nats.client.support.Validator;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

class NatsDispatcher
extends NatsConsumer
implements Dispatcher,
Runnable {
    protected final ConsumerMessageQueue incoming;
    protected final MessageHandler defaultHandler;
    protected Future<Boolean> thread;
    protected final AtomicBoolean running;
    protected final AtomicBoolean started;
    protected String id;
    protected final Map<String, NatsSubscription> subWithDefaultHandlerBySubject;
    protected final Map<String, NatsSubscription> subWithNonDefaultHandlerBySid;
    protected final Map<String, Map<String, NatsSubscription>> subsBySidNonDefaultHandlersBySubject;
    protected final Map<String, MessageHandler> nonDefaultHandlerBySid;
    protected final Duration waitForMessage;

    NatsDispatcher(NatsConnection conn, MessageHandler handler) {
        super(conn);
        this.defaultHandler = handler;
        this.incoming = new ConsumerMessageQueue();
        this.subWithDefaultHandlerBySubject = new ConcurrentHashMap<String, NatsSubscription>();
        this.subWithNonDefaultHandlerBySid = new ConcurrentHashMap<String, NatsSubscription>();
        this.subsBySidNonDefaultHandlersBySubject = new ConcurrentHashMap<String, Map<String, NatsSubscription>>();
        this.nonDefaultHandlerBySid = new ConcurrentHashMap<String, MessageHandler>();
        this.running = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.waitForMessage = Duration.ofMinutes(5L);
    }

    @Override
    public void start(String id) {
        this.internalStart(id, true);
    }

    protected void internalStart(String id, boolean threaded) {
        if (!this.started.get()) {
            this.id = id;
            this.running.set(true);
            this.started.set(true);
            if (threaded) {
                this.thread = this.connection.getExecutor().submit(this, Boolean.TRUE);
            }
        }
    }

    boolean breakRunLoop() {
        return this.incoming.isDrained();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            while (this.running.get() && !Thread.interrupted()) {
                NatsSubscription sub;
                NatsMessage msg = this.incoming.pop(this.waitForMessage);
                if (msg != null && (sub = msg.getNatsSubscription()) != null && sub.isActive()) {
                    MessageHandler handler = this.nonDefaultHandlerBySid.get(sub.getSID());
                    if (handler == null) {
                        handler = this.defaultHandler;
                    }
                    if (handler != null) {
                        sub.incrementDeliveredCount();
                        this.incrementDeliveredCount();
                        try {
                            handler.onMessage(msg);
                        }
                        catch (Exception exp) {
                            this.connection.processException(exp);
                        }
                        catch (Error err) {
                            this.connection.processException(new Exception(err));
                        }
                        if (sub.reachedUnsubLimit()) {
                            this.connection.invalidate(sub);
                        }
                    }
                }
                if (!this.breakRunLoop()) continue;
                return;
            }
        }
        catch (InterruptedException exp) {
            if (this.running.get()) {
                this.connection.processException(exp);
            }
            Thread.currentThread().interrupt();
        }
        finally {
            this.running.set(false);
            this.thread = null;
        }
    }

    void stop(boolean unsubscribeAll) {
        this.running.set(false);
        this.incoming.pause();
        if (this.thread != null) {
            try {
                if (!this.thread.isCancelled()) {
                    this.thread.cancel(true);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (unsubscribeAll) {
            this.subWithDefaultHandlerBySubject.forEach((subject, sub) -> this.connection.unsubscribe((NatsSubscription)sub, -1));
            this.subWithNonDefaultHandlerBySid.forEach((sid, sub) -> this.connection.unsubscribe((NatsSubscription)sub, -1));
        }
        this.subWithDefaultHandlerBySubject.clear();
        this.subWithNonDefaultHandlerBySid.clear();
        this.subsBySidNonDefaultHandlersBySubject.clear();
        this.nonDefaultHandlerBySid.clear();
    }

    @Override
    public boolean isActive() {
        return this.running.get();
    }

    String getId() {
        return this.id;
    }

    @Override
    ConsumerMessageQueue getMessageQueue() {
        return this.incoming;
    }

    MessageHandler getNonDefaultHandlerBySid(String sid) {
        return this.nonDefaultHandlerBySid.get(sid);
    }

    boolean hasNoSubs() {
        return this.subWithDefaultHandlerBySubject.isEmpty() && this.subWithNonDefaultHandlerBySid.isEmpty();
    }

    void resendSubscriptions() {
        this.subWithDefaultHandlerBySubject.forEach((subject, sub) -> this.connection.sendSubscriptionMessage(sub.getSID(), (String)subject, sub.getQueueName(), true));
        this.subWithNonDefaultHandlerBySid.forEach((sid, sub) -> this.connection.sendSubscriptionMessage((String)sid, sub.getSubject(), sub.getQueueName(), true));
    }

    void remove(NatsSubscription sub) {
        String sid = sub.getSID();
        NatsSubscription defaultSub = this.subWithDefaultHandlerBySubject.get(sub.getSubject());
        if (defaultSub != null && defaultSub.getSID().equals(sid)) {
            this.subWithDefaultHandlerBySubject.remove(sub.getSubject());
        }
        this.subWithNonDefaultHandlerBySid.remove(sid);
        this.nonDefaultHandlerBySid.remove(sid);
        Map<String, NatsSubscription> subsBySid = this.subsBySidNonDefaultHandlersBySubject.get(sub.getSubject());
        if (subsBySid != null) {
            subsBySid.remove(sid);
            if (subsBySid.isEmpty()) {
                this.subsBySidNonDefaultHandlersBySubject.remove(sub.getSubject());
            }
        }
    }

    @Override
    public Dispatcher subscribe(String subject) {
        if (this.defaultHandler == null) {
            throw new IllegalStateException("Dispatcher was made without a default handler.");
        }
        this.connection.subjectValidate(subject, true);
        this.subscribeImplCore(subject, null, null);
        return this;
    }

    NatsSubscription subscribeReturningSubscription(String subject) {
        this.connection.subjectValidate(subject, true);
        return this.subscribeImplCore(subject, null, null);
    }

    @Override
    public Subscription subscribe(String subject, MessageHandler handler) {
        this.connection.subjectValidate(subject, true);
        Validator.required(handler, "Handler");
        return this.subscribeImplCore(subject, null, handler);
    }

    @Override
    public Dispatcher subscribe(String subject, String queueName) {
        this.connection.subjectValidate(subject, true);
        Validator.validateQueueName(queueName, true);
        this.subscribeImplCore(subject, queueName, null);
        return this;
    }

    @Override
    public Subscription subscribe(String subject, String queueName, MessageHandler handler) {
        this.connection.subjectValidate(subject, true);
        Validator.validateQueueName(queueName, true);
        if (handler == null) {
            throw new IllegalArgumentException("MessageHandler is required in subscribe");
        }
        return this.subscribeImplCore(subject, queueName, handler);
    }

    NatsSubscription subscribeImplCore(String subject, String queueName, MessageHandler handler) {
        this.checkBeforeSubImpl();
        if (handler == null) {
            NatsSubscription wonTheRace;
            NatsSubscription sub = this.subWithDefaultHandlerBySubject.get(subject);
            if (sub == null && (wonTheRace = this.subWithDefaultHandlerBySubject.putIfAbsent(subject, sub = this.connection.createSubscription(subject, queueName, this, null))) != null) {
                this.connection.unsubscribe(sub, -1);
            }
            return sub;
        }
        return this._subscribeImplHandlerProvided(subject, queueName, handler, null);
    }

    NatsSubscription subscribeImplJetStream(String subject, String queueName, MessageHandler handler, NatsSubscriptionFactory nsf) {
        this.checkBeforeSubImpl();
        return this._subscribeImplHandlerProvided(subject, queueName, handler, nsf);
    }

    private NatsSubscription _subscribeImplHandlerProvided(String subject, String queueName, MessageHandler handler, NatsSubscriptionFactory nsf) {
        NatsSubscription sub = this.connection.createSubscription(subject, queueName, this, nsf);
        this.trackSubWithUserHandler(sub.getSID(), sub, handler);
        return sub;
    }

    String reSubscribe(NatsSubscription sub, String subject, String queueName, MessageHandler handler) {
        String sid = this.connection.reSubscribe(sub, subject, queueName);
        this.trackSubWithUserHandler(sid, sub, handler);
        return sid;
    }

    private void trackSubWithUserHandler(String sid, NatsSubscription sub, MessageHandler handler) {
        this.subWithNonDefaultHandlerBySid.put(sid, sub);
        Map subsBySid = this.subsBySidNonDefaultHandlersBySubject.computeIfAbsent(sub.getSubject(), k -> new ConcurrentHashMap());
        subsBySid.put(sid, sub);
        this.nonDefaultHandlerBySid.put(sid, handler);
    }

    private void checkBeforeSubImpl() {
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        if (this.isDraining()) {
            throw new IllegalStateException("Dispatcher is draining");
        }
    }

    @Override
    public Dispatcher unsubscribe(String subject) {
        return this.unsubscribe(subject, -1);
    }

    @Override
    public Dispatcher unsubscribe(Subscription subscription) {
        return this.unsubscribe(subscription, -1);
    }

    @Override
    public Dispatcher unsubscribe(String subject, int after) {
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        if (this.isDraining()) {
            return this;
        }
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in unsubscribe");
        }
        NatsSubscription defaultHandlerSub = this.subWithDefaultHandlerBySubject.get(subject);
        if (defaultHandlerSub != null) {
            this.connection.unsubscribe(defaultHandlerSub, after);
        }
        this.subWithNonDefaultHandlerBySid.forEach((sid, sub) -> {
            if (subject.equals(sub.getSubject())) {
                this.connection.unsubscribe((NatsSubscription)sub, after);
            }
        });
        return this;
    }

    @Override
    public Dispatcher unsubscribe(Subscription subscription, int after) {
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        if (this.isDraining()) {
            return this;
        }
        if (subscription.getDispatcher() != this) {
            throw new IllegalStateException("Subscription is not managed by this Dispatcher");
        }
        if (!(subscription instanceof NatsSubscription)) {
            throw new IllegalArgumentException("This Subscription implementation is not known by Dispatcher");
        }
        NatsSubscription ns = (NatsSubscription)subscription;
        NatsSubscription sub = this.subWithNonDefaultHandlerBySid.get(ns.getSID());
        if (sub != null) {
            this.connection.unsubscribe(sub, after);
        }
        return this;
    }

    @Override
    void sendUnsubForDrain() {
        this.subWithDefaultHandlerBySubject.forEach((id, sub) -> this.connection.sendUnsub((NatsSubscription)sub, -1));
        this.subWithNonDefaultHandlerBySid.forEach((sid, sub) -> this.connection.sendUnsub((NatsSubscription)sub, -1));
    }

    @Override
    void cleanUpAfterDrain() {
        this.connection.cleanupDispatcher(this);
    }

    @Override
    public boolean isDrained() {
        return !this.isActive() && super.isDrained();
    }
}

