/*
 * Decompiled with CFR 0.152.
 */
package io.nats.client.impl;

import io.nats.client.Consumer;
import io.nats.client.NatsSystemClock;
import io.nats.client.impl.ConsumerMessageQueue;
import io.nats.client.impl.NatsConnection;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

abstract class NatsConsumer
implements Consumer {
    NatsConnection connection;
    private final AtomicLong maxMessages;
    private final AtomicLong maxBytes;
    private final AtomicLong droppedMessages;
    private final AtomicLong messagesDelivered;
    private final AtomicBoolean slow;
    private final AtomicReference<CompletableFuture<Boolean>> drainingFuture;

    NatsConsumer(NatsConnection conn) {
        this.connection = conn;
        this.maxMessages = new AtomicLong(524288L);
        this.maxBytes = new AtomicLong(0x4000000L);
        this.droppedMessages = new AtomicLong();
        this.messagesDelivered = new AtomicLong(0L);
        this.slow = new AtomicBoolean(false);
        this.drainingFuture = new AtomicReference();
    }

    @Override
    public void setPendingLimits(long maxMessages, long maxBytes) {
        this.maxMessages.set(maxMessages <= 0L ? 0L : maxMessages);
        this.maxBytes.set(maxBytes <= 0L ? 0L : maxBytes);
    }

    @Override
    public long getPendingMessageLimit() {
        return this.maxMessages.get();
    }

    @Override
    public long getPendingByteLimit() {
        return this.maxBytes.get();
    }

    @Override
    public long getPendingMessageCount() {
        return this.getMessageQueue() != null ? this.getMessageQueue().length() : 0L;
    }

    @Override
    public long getPendingByteCount() {
        return this.getMessageQueue() != null ? this.getMessageQueue().sizeInBytes() : 0L;
    }

    @Override
    public long getDeliveredCount() {
        return this.messagesDelivered.get();
    }

    void incrementDeliveredCount() {
        this.messagesDelivered.incrementAndGet();
    }

    void incrementDroppedCount() {
        this.droppedMessages.incrementAndGet();
    }

    @Override
    public long getDroppedCount() {
        return this.droppedMessages.get();
    }

    @Override
    public void clearDroppedCount() {
        this.droppedMessages.set(0L);
    }

    void markSlow() {
        this.slow.set(true);
    }

    void markNotSlow() {
        this.slow.set(false);
    }

    boolean isMarkedSlow() {
        return this.slow.get();
    }

    boolean hasReachedPendingLimits() {
        long ml = this.maxMessages.get();
        if (ml > 0L && this.getPendingMessageCount() >= ml) {
            return true;
        }
        long bl = this.maxBytes.get();
        return bl > 0L && this.getPendingByteCount() >= bl;
    }

    void markDraining(CompletableFuture<Boolean> future) {
        this.drainingFuture.set(future);
    }

    void markUnsubedForDrain() {
        if (this.getMessageQueue() != null) {
            this.getMessageQueue().drain();
        }
    }

    CompletableFuture<Boolean> getDrainingFuture() {
        return this.drainingFuture.get();
    }

    boolean isDraining() {
        return this.drainingFuture.get() != null;
    }

    boolean isDrained() {
        return this.isDraining() && this.getPendingMessageCount() == 0L;
    }

    @Override
    public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedException {
        if (!this.isActive() || this.connection == null) {
            throw new IllegalStateException("Consumer is closed");
        }
        if (this.isDraining()) {
            return this.getDrainingFuture();
        }
        CompletableFuture<Boolean> tracker = new CompletableFuture<Boolean>();
        this.markDraining(tracker);
        this.sendUnsubForDrain();
        try {
            this.connection.flush(timeout);
        }
        catch (TimeoutException e) {
            this.connection.processException(e);
        }
        this.markUnsubedForDrain();
        this.connection.getExecutor().submit(() -> {
            try {
                long timeoutNanos = timeout == null || timeout.toNanos() <= 0L ? Long.MAX_VALUE : timeout.toNanos();
                long startTime = System.nanoTime();
                while (NatsSystemClock.nanoTime() - startTime < timeoutNanos && !Thread.interrupted() && !this.isDrained()) {
                    Thread.sleep(1L);
                }
                this.cleanUpAfterDrain();
            }
            catch (InterruptedException e) {
                this.connection.processException(e);
                Thread.currentThread().interrupt();
            }
            finally {
                tracker.complete(this.isDrained());
            }
        });
        return this.getDrainingFuture();
    }

    @Override
    public abstract boolean isActive();

    abstract ConsumerMessageQueue getMessageQueue();

    abstract void sendUnsubForDrain();

    abstract void cleanUpAfterDrain();
}

