/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Dispatcher;
import io.nats.client.MessageHandler;
import io.nats.client.Subscription;
import io.nats.client.impl.MessageQueue;
import io.nats.client.impl.NatsConnection;
import io.nats.client.impl.NatsConsumer;
import io.nats.client.impl.NatsMessage;
import io.nats.client.impl.NatsSubscription;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

class NatsDispatcher
extends NatsConsumer
implements Dispatcher,
Runnable {
    private MessageQueue incoming;
    private MessageHandler defaultHandler;
    private Future<Boolean> thread;
    private final AtomicBoolean running;
    private String id;
    private Map<String, NatsSubscription> subscriptionsUsingDefaultHandler;
    private Map<String, NatsSubscription> subscriptionsWithHandlers;
    private Map<String, MessageHandler> subscriptionHandlers;
    private Duration waitForMessage;

    NatsDispatcher(NatsConnection conn, MessageHandler handler) {
        super(conn);
        this.defaultHandler = handler;
        this.incoming = new MessageQueue(true);
        this.subscriptionsUsingDefaultHandler = new ConcurrentHashMap<String, NatsSubscription>();
        this.subscriptionsWithHandlers = new ConcurrentHashMap<String, NatsSubscription>();
        this.subscriptionHandlers = new ConcurrentHashMap<String, MessageHandler>();
        this.running = new AtomicBoolean(false);
        this.waitForMessage = Duration.ofMinutes(5L);
    }

    void start(String id) {
        this.id = id;
        this.running.set(true);
        this.thread = this.connection.getExecutor().submit(this, Boolean.TRUE);
    }

    boolean breakRunLoop() {
        return this.incoming.isDrained();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            while (this.running.get()) {
                NatsMessage msg = this.incoming.pop(this.waitForMessage);
                if (msg == null) {
                    if (!this.breakRunLoop()) continue;
                    return;
                }
                NatsSubscription sub = msg.getNatsSubscription();
                if (sub != null && sub.isActive()) {
                    sub.incrementDeliveredCount();
                    this.incrementDeliveredCount();
                    MessageHandler currentHandler = this.defaultHandler;
                    MessageHandler customHandler = this.subscriptionHandlers.get(sub.getSID());
                    if (customHandler != null) {
                        currentHandler = customHandler;
                    }
                    try {
                        currentHandler.onMessage(msg);
                    }
                    catch (Exception exp) {
                        this.connection.processException(exp);
                    }
                    if (sub.reachedUnsubLimit()) {
                        this.connection.invalidate(sub);
                    }
                }
                if (!this.breakRunLoop()) continue;
                return;
            }
        }
        catch (InterruptedException exp) {
            if (this.running.get()) {
                this.connection.processException(exp);
            }
        }
        finally {
            this.running.set(false);
            this.thread = null;
        }
    }

    void stop(boolean unsubscribeAll) {
        this.running.set(false);
        this.incoming.pause();
        if (this.thread != null) {
            try {
                if (!this.thread.isCancelled()) {
                    this.thread.cancel(true);
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (unsubscribeAll) {
            this.subscriptionsUsingDefaultHandler.forEach((subj, sub) -> this.connection.unsubscribe((NatsSubscription)sub, -1));
            this.subscriptionsWithHandlers.forEach((sid, sub) -> this.connection.unsubscribe((NatsSubscription)sub, -1));
        } else {
            this.subscriptionsUsingDefaultHandler.clear();
            this.subscriptionsWithHandlers.clear();
            this.subscriptionHandlers.clear();
        }
    }

    @Override
    public boolean isActive() {
        return this.running.get();
    }

    String getId() {
        return this.id;
    }

    @Override
    MessageQueue getMessageQueue() {
        return this.incoming;
    }

    void resendSubscriptions() {
        this.subscriptionsUsingDefaultHandler.forEach((id, sub) -> this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true));
        this.subscriptionsWithHandlers.forEach((sid, sub) -> this.connection.sendSubscriptionMessage(sub.getSID(), sub.getSubject(), sub.getQueueName(), true));
    }

    void remove(NatsSubscription sub) {
        if (this.subscriptionsWithHandlers.remove(sub.getSID()) != null) {
            this.subscriptionHandlers.remove(sub.getSID());
        } else {
            NatsSubscription s = this.subscriptionsUsingDefaultHandler.get(sub.getSubject());
            if (s.getSID() == sub.getSID()) {
                this.subscriptionsUsingDefaultHandler.remove(sub.getSubject());
            }
        }
    }

    @Override
    public Dispatcher subscribe(String subject) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        this.subscribeImpl(subject, null, null, false);
        return this;
    }

    NatsSubscription subscribeReturningSubscription(String subject) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        return this.subscribeImpl(subject, null, null, false);
    }

    @Override
    public Subscription subscribe(String subject, MessageHandler handler) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        if (handler == null) {
            throw new IllegalArgumentException("MessageHandler is required in subscribe");
        }
        return this.subscribeImpl(subject, null, handler, false);
    }

    @Override
    public Dispatcher subscribe(String subject, String queueName) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        if (queueName == null || queueName.length() == 0) {
            throw new IllegalArgumentException("QueueName is required in subscribe");
        }
        this.subscribeImpl(subject, queueName, null, false);
        return this;
    }

    @Override
    public Subscription subscribe(String subject, String queueName, MessageHandler handler) {
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in subscribe");
        }
        if (queueName == null || queueName.length() == 0) {
            throw new IllegalArgumentException("QueueName is required in subscribe");
        }
        if (handler == null) {
            throw new IllegalArgumentException("MessageHandler is required in subscribe");
        }
        return this.subscribeImpl(subject, queueName, handler, false);
    }

    NatsSubscription subscribeImpl(String subject, String queueName, MessageHandler handler, boolean isJetStream) {
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        if (this.isDraining()) {
            throw new IllegalStateException("Dispatcher is draining");
        }
        if (handler == null) {
            NatsSubscription actual;
            NatsSubscription sub = this.subscriptionsUsingDefaultHandler.get(subject);
            if (sub == null && (actual = this.subscriptionsUsingDefaultHandler.putIfAbsent(subject, sub = this.connection.createSubscription(subject, queueName, this, isJetStream))) != null) {
                this.connection.unsubscribe(sub, -1);
            }
            return sub;
        }
        NatsSubscription sub = this.connection.createSubscription(subject, queueName, this, isJetStream);
        this.subscriptionsWithHandlers.put(sub.getSID(), sub);
        this.subscriptionHandlers.put(sub.getSID(), handler);
        return sub;
    }

    @Override
    public Dispatcher unsubscribe(String subject) {
        return this.unsubscribe(subject, -1);
    }

    @Override
    public Dispatcher unsubscribe(Subscription subscription) {
        return this.unsubscribe(subscription, -1);
    }

    @Override
    public Dispatcher unsubscribe(String subject, int after) {
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        if (this.isDraining()) {
            return this;
        }
        if (subject == null || subject.length() == 0) {
            throw new IllegalArgumentException("Subject is required in unsubscribe");
        }
        NatsSubscription sub = this.subscriptionsUsingDefaultHandler.get(subject);
        if (sub != null) {
            this.connection.unsubscribe(sub, after);
        }
        return this;
    }

    @Override
    public Dispatcher unsubscribe(Subscription subscription, int after) {
        if (!this.running.get()) {
            throw new IllegalStateException("Dispatcher is closed");
        }
        if (this.isDraining()) {
            return this;
        }
        if (subscription.getDispatcher() != this) {
            throw new IllegalStateException("Subscription is not managed by this Dispatcher");
        }
        if (!(subscription instanceof NatsSubscription)) {
            throw new IllegalArgumentException("This Subscription implementation is not known by Dispatcher");
        }
        NatsSubscription ns = (NatsSubscription)subscription;
        NatsSubscription sub = this.subscriptionsWithHandlers.get(ns.getSID());
        if (sub != null) {
            this.connection.unsubscribe(sub, after);
        }
        return this;
    }

    @Override
    void sendUnsubForDrain() {
        this.subscriptionsUsingDefaultHandler.forEach((id, sub) -> this.connection.sendUnsub((NatsSubscription)sub, -1));
        this.subscriptionsWithHandlers.forEach((sid, sub) -> this.connection.sendUnsub((NatsSubscription)sub, -1));
    }

    @Override
    void cleanUpAfterDrain() {
        this.connection.cleanupDispatcher(this);
    }

    @Override
    public boolean isDrained() {
        return !this.isActive() && super.isDrained();
    }
}

