/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4092Test
extends TestCase {
    private static final Logger log = LoggerFactory.getLogger(AMQ4092Test.class);
    static final String QUEUE_NAME = "TEST";
    static final int NUM_TO_SEND_PER_PRODUCER = 1000;
    static final int NUM_PRODUCERS = 5;
    static final ActiveMQQueue[] DESTINATIONS = new ActiveMQQueue[]{new ActiveMQQueue("A"), new ActiveMQQueue("B")};
    static final boolean debug = false;
    private BrokerService brokerService;
    private ActiveMQQueue destination;
    private HashMap<Thread, Throwable> exceptions = new HashMap();
    private ExceptionListener exceptionListener = new ExceptionListener(){

        public void onException(JMSException exception) {
            exception.printStackTrace();
            AMQ4092Test.this.exceptions.put(Thread.currentThread(), exception);
        }
    };

    protected void setUp() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        ((KahaDBPersistenceAdapter)this.brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(false);
        this.brokerService.addConnector("tcp://localhost:0");
        this.brokerService.start();
        this.destination = new ActiveMQQueue();
        this.destination.setCompositeDestinations((ActiveMQDestination[])DESTINATIONS);
        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler(){

            @Override
            public void uncaughtException(Thread t, Throwable e) {
                AMQ4092Test.this.exceptions.put(t, e);
            }
        });
    }

    protected void tearDown() throws Exception {
        this.brokerService.stop();
    }

    public void testConcurrentGroups() throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new TestConsumer());
        for (int i = 0; i < 5; ++i) {
            executorService.submit(new TestProducer());
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.MINUTES);
        AMQ4092Test.assertTrue((String)("no exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
    }

    class TestConsumer
    implements Runnable {
        private CountDownLatch finishLatch = new CountDownLatch(1);

        TestConsumer() {
        }

        public void consume() throws Exception {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)AMQ4092Test.this.brokerService.getTransportConnectors().get(0)).getConnectUri().toString());
            connectionFactory.setExceptionListener(AMQ4092Test.this.exceptionListener);
            final int totalMessageCount = 1000 * DESTINATIONS.length * 5;
            final AtomicInteger counter = new AtomicInteger();
            MessageListener listener = new MessageListener(){

                public void onMessage(Message message) {
                    boolean first = false;
                    try {
                        first = message.getBooleanProperty("JMSXGroupFirstForConsumer");
                    }
                    catch (JMSException e) {
                        e.printStackTrace();
                        AMQ4092Test.this.exceptions.put(Thread.currentThread(), e);
                    }
                    TestCase.assertTrue((String)"Always is first message", (boolean)first);
                    if (counter.incrementAndGet() == totalMessageCount) {
                        log.info("Got all:" + counter.get());
                        TestConsumer.this.finishLatch.countDown();
                    }
                }
            };
            int consumerCount = DESTINATIONS.length * 100;
            Connection[] connections = new Connection[consumerCount];
            Session[] sessions = new Session[consumerCount];
            MessageConsumer[] consumers = new MessageConsumer[consumerCount];
            for (int i = 0; i < consumerCount; ++i) {
                connections[i] = connectionFactory.createConnection();
                connections[i].start();
                sessions[i] = connections[i].createSession(false, 1);
                consumers[i] = sessions[i].createConsumer((Destination)DESTINATIONS[i % DESTINATIONS.length], null);
                consumers[i].setMessageListener(listener);
            }
            log.info("received " + counter.get() + " messages");
            TestCase.assertTrue((String)"got all messages in time", (boolean)this.finishLatch.await(4L, TimeUnit.MINUTES));
            log.info("received " + counter.get() + " messages");
            for (MessageConsumer messageConsumer : consumers) {
                messageConsumer.close();
            }
            for (MessageConsumer messageConsumer : sessions) {
                messageConsumer.close();
            }
            for (MessageConsumer messageConsumer : connections) {
                messageConsumer.close();
            }
        }

        @Override
        public void run() {
            try {
                this.consume();
            }
            catch (Exception e) {
                e.printStackTrace();
                AMQ4092Test.this.exceptions.put(Thread.currentThread(), e);
            }
        }
    }

    class TestProducer
    implements Runnable {
        TestProducer() {
        }

        public void produceMessages() throws Exception {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)AMQ4092Test.this.brokerService.getTransportConnectors().get(0)).getConnectUri().toString());
            connectionFactory.setExceptionListener(AMQ4092Test.this.exceptionListener);
            connectionFactory.setUseAsyncSend(true);
            Connection connection = connectionFactory.createConnection();
            Session session = connection.createSession(false, 1);
            MessageProducer producer = session.createProducer((Destination)AMQ4092Test.this.destination);
            producer.setDeliveryMode(2);
            String name = new String(new byte[2048]);
            for (int i = 1; i <= 1000; ++i) {
                TextMessage message = session.createTextMessage(name + "_" + i);
                for (int j = 0; j < 100; ++j) {
                    message.setStringProperty("Prop" + j, "" + j);
                }
                message.setStringProperty("JMSXGroupID", Thread.currentThread().getName() + i);
                message.setIntProperty("JMSXGroupSeq", 1);
                producer.send((Message)message);
            }
            producer.close();
            session.close();
            connection.close();
        }

        @Override
        public void run() {
            try {
                this.produceMessages();
            }
            catch (Exception e) {
                e.printStackTrace();
                AMQ4092Test.this.exceptions.put(Thread.currentThread(), e);
            }
        }
    }
}

