/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueWorkerPrefetchTest
extends TestCase
implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(QueueWorkerPrefetchTest.class);
    private static final int BATCH_SIZE = 10;
    private static final long WAIT_TIMEOUT = 10000L;
    private static final String BROKER_BIND_ADDRESS = "tcp://localhost:0";
    private static final int QUEUE_PREFETCH_SIZE = 1;
    private static final int NUM_WORKERS = 2;
    private BrokerService broker;
    private MessageProducer workItemProducer;
    private MessageConsumer masterItemConsumer;
    private final AtomicLong acksReceived = new AtomicLong(0L);
    private final AtomicReference<CountDownLatch> latch = new AtomicReference();
    private String connectionUri;

    public void onMessage(Message message) {
        long acks = this.acksReceived.incrementAndGet();
        this.latch.get().countDown();
        if (acks % 1L == 0L) {
            LOG.info("Master now has ack count of: " + String.valueOf(this.acksReceived));
        }
    }

    protected void setUp() throws Exception {
        super.setUp();
        this.broker = new BrokerService();
        this.broker.setPersistent(false);
        this.broker.setUseJmx(true);
        this.broker.addConnector(BROKER_BIND_ADDRESS);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
    }

    protected void tearDown() throws Exception {
        this.broker.deleteAllMessages();
        this.broker.stop();
        super.tearDown();
    }

    public void testActiveMQ() throws Exception {
        int i;
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setQueuePrefetch(1);
        connectionFactory.setPrefetchPolicy(prefetchPolicy);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session masterSession = connection.createSession(false, 1);
        this.workItemProducer = masterSession.createProducer((Destination)masterSession.createQueue("work-item"));
        this.masterItemConsumer = masterSession.createConsumer((Destination)masterSession.createQueue("master-item"));
        this.masterItemConsumer.setMessageListener((MessageListener)this);
        Worker[] workers = new Worker[2];
        for (i = 0; i < 2; ++i) {
            workers[i] = new Worker(connection.createSession(false, 1));
        }
        this.acksReceived.set(0L);
        this.latch.set(new CountDownLatch(10));
        this.workItemProducer.send((Message)masterSession.createObjectMessage((Serializable)new WorkMessage(1)));
        if (!this.latch.get().await(10000L, TimeUnit.MILLISECONDS)) {
            QueueWorkerPrefetchTest.fail((String)("First batch only received " + String.valueOf(this.acksReceived) + " messages"));
        }
        LOG.info("First batch received");
        this.acksReceived.set(0L);
        this.latch.set(new CountDownLatch(10));
        this.workItemProducer.send((Message)masterSession.createObjectMessage((Serializable)new WorkMessage(1)));
        if (!this.latch.get().await(10000L, TimeUnit.MILLISECONDS)) {
            QueueWorkerPrefetchTest.fail((String)("Second batch only received " + String.valueOf(this.acksReceived) + " messages"));
        }
        LOG.info("Second batch received");
        for (i = 0; i < 2; ++i) {
            workers[i].close();
        }
        masterSession.close();
        connection.close();
    }

    private static class Worker
    implements MessageListener {
        private static AtomicInteger counter = new AtomicInteger(0);
        private Session session;
        private MessageProducer masterItemProducer;
        private MessageProducer workItemProducer;

        public Worker(Session session) throws JMSException {
            this.session = session;
            this.masterItemProducer = session.createProducer((Destination)session.createQueue("master-item"));
            Queue workItemQueue = session.createQueue("work-item");
            this.workItemProducer = session.createProducer((Destination)workItemQueue);
            MessageConsumer workItemConsumer = session.createConsumer((Destination)workItemQueue);
            workItemConsumer.setMessageListener((MessageListener)this);
        }

        public void onMessage(Message message) {
            try {
                WorkMessage work = (WorkMessage)((ObjectMessage)message).getObject();
                long c = counter.incrementAndGet();
                if (c % 10L != 0L) {
                    this.workItemProducer.send((Message)this.session.createObjectMessage((Serializable)new WorkMessage(work.id + 1)));
                }
                this.masterItemProducer.send((Message)this.session.createObjectMessage((Serializable)work));
            }
            catch (JMSException e) {
                throw new IllegalStateException("Something has gone wrong", e);
            }
        }

        public void close() throws JMSException {
            this.masterItemProducer.close();
            this.workItemProducer.close();
            this.session.close();
        }
    }

    private static class WorkMessage
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final int id;

        public WorkMessage(int id) {
            this.id = id;
        }

        public String toString() {
            return "Work: " + this.id;
        }
    }
}

