/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import junit.framework.Test;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MessageGroupDelayedTest
extends JmsTestSupport {
    public static final Logger log = LoggerFactory.getLogger(MessageGroupDelayedTest.class);
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected Destination destination;
    public int consumersBeforeDispatchStarts;
    public int timeBeforeDispatchStarts;
    BrokerService broker;
    protected TransportConnector connector;
    protected HashMap<String, Integer> messageCount = new HashMap();
    protected HashMap<String, Set<String>> messageGroups = new HashMap();

    public static Test suite() {
        return MessageGroupDelayedTest.suite(MessageGroupDelayedTest.class);
    }

    public static void main(String[] args) {
        TestRunner.run((Test)MessageGroupDelayedTest.suite());
    }

    @Override
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(String.valueOf(this.connector.getConnectUri()) + "?jms.prefetchPolicy.all=1");
        this.connection = connFactory.createConnection();
        this.session = this.connection.createSession(false, 2);
        this.destination = new ActiveMQQueue("test-queue2");
        this.producer = this.session.createProducer(this.destination);
        this.connection.start();
    }

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService service = new BrokerService();
        service.setPersistent(false);
        service.setUseJmx(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        log.info("testing with consumersBeforeDispatchStarts=" + this.consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" + this.timeBeforeDispatchStarts);
        policy.setConsumersBeforeDispatchStarts(this.consumersBeforeDispatchStarts);
        policy.setTimeBeforeDispatchStarts(this.timeBeforeDispatchStarts);
        policyMap.setDefaultEntry(policy);
        service.setDestinationPolicy(policyMap);
        this.connector = service.addConnector("tcp://localhost:0");
        return service;
    }

    @Override
    public void tearDown() throws Exception {
        this.producer.close();
        this.session.close();
        this.connection.close();
        this.broker.stop();
    }

    public void initCombosForTestDelayedDirectConnectionListener() {
        this.addCombinationValues("consumersBeforeDispatchStarts", new Object[]{0, 3, 5});
        this.addCombinationValues("timeBeforeDispatchStarts", new Object[]{0, 100});
    }

    public void testDelayedDirectConnectionListener() throws Exception {
        for (int i = 0; i < 10; ++i) {
            TextMessage msga = this.session.createTextMessage("hello a");
            msga.setStringProperty("JMSXGroupID", "A");
            this.producer.send((Message)msga);
            TextMessage msgb = this.session.createTextMessage("hello b");
            msgb.setStringProperty("JMSXGroupID", "B");
            this.producer.send((Message)msgb);
            TextMessage msgc = this.session.createTextMessage("hello c");
            msgc.setStringProperty("JMSXGroupID", "C");
            this.producer.send((Message)msgc);
        }
        log.info("30 messages sent to group A/B/C");
        int[] counters = new int[]{10, 10, 10};
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(1);
        this.messageCount.put("worker1", 0);
        this.messageGroups.put("worker1", new HashSet());
        Worker worker1 = new Worker(this.connection, this.destination, "worker1", startSignal, doneSignal, counters, this.messageCount, this.messageGroups);
        this.messageCount.put("worker2", 0);
        this.messageGroups.put("worker2", new HashSet());
        Worker worker2 = new Worker(this.connection, this.destination, "worker2", startSignal, doneSignal, counters, this.messageCount, this.messageGroups);
        this.messageCount.put("worker3", 0);
        this.messageGroups.put("worker3", new HashSet());
        Worker worker3 = new Worker(this.connection, this.destination, "worker3", startSignal, doneSignal, counters, this.messageCount, this.messageGroups);
        new Thread(worker1).start();
        new Thread(worker2).start();
        new Thread(worker3).start();
        startSignal.countDown();
        doneSignal.await();
        if (this.consumersBeforeDispatchStarts == 0 && this.timeBeforeDispatchStarts == 0) {
            log.info("Ignoring results because both parameters are 0");
            return;
        }
        for (String worker : this.messageCount.keySet()) {
            log.info("worker " + worker + " received " + String.valueOf(this.messageCount.get(worker)) + " messages from groups " + String.valueOf(this.messageGroups.get(worker)));
            MessageGroupDelayedTest.assertEquals((String)("worker " + worker + " received " + String.valueOf(this.messageCount.get(worker)) + " messages from groups " + String.valueOf(this.messageGroups.get(worker))), (int)10, (int)this.messageCount.get(worker));
            MessageGroupDelayedTest.assertEquals((String)("worker " + worker + " received " + String.valueOf(this.messageCount.get(worker)) + " messages from groups " + String.valueOf(this.messageGroups.get(worker))), (int)1, (int)this.messageGroups.get(worker).size());
        }
    }

    private static final class Worker
    implements Runnable {
        private Connection connection = null;
        private Destination queueName = null;
        private String workerName = null;
        private CountDownLatch startSignal = null;
        private CountDownLatch doneSignal = null;
        private int[] counters = null;
        private final HashMap<String, Integer> messageCount;
        private final HashMap<String, Set<String>> messageGroups;

        private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount, HashMap<String, Set<String>> messageGroups) {
            this.connection = connection;
            this.queueName = queueName;
            this.workerName = workerName;
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
            this.counters = counters;
            this.messageCount = messageCount;
            this.messageGroups = messageGroups;
        }

        private void update(String group) {
            int msgCount = this.messageCount.get(this.workerName);
            this.messageCount.put(this.workerName, msgCount + 1);
            Set<String> groups = this.messageGroups.get(this.workerName);
            groups.add(group);
            this.messageGroups.put(this.workerName, groups);
        }

        @Override
        public void run() {
            try {
                log.info(this.workerName);
                this.startSignal.await();
                Session sess = this.connection.createSession(false, 2);
                MessageConsumer consumer = sess.createConsumer(this.queueName);
                while (true) {
                    if (this.counters[0] == 0 && this.counters[1] == 0 && this.counters[2] == 0) break;
                    Message msg = consumer.receive(500L);
                    if (msg == null) continue;
                    String group = msg.getStringProperty("JMSXGroupID");
                    msg.getBooleanProperty("JMSXGroupFirstForConsumer");
                    if ("A".equals(group)) {
                        this.counters[0] = this.counters[0] - 1;
                        this.update(group);
                        Thread.sleep(500L);
                    } else if ("B".equals(group)) {
                        this.counters[1] = this.counters[1] - 1;
                        this.update(group);
                        Thread.sleep(100L);
                    } else if ("C".equals(group)) {
                        this.counters[2] = this.counters[2] - 1;
                        this.update(group);
                        Thread.sleep(10L);
                    } else {
                        log.warn("unknown group");
                    }
                    if (this.counters[0] == 0 && this.counters[1] == 0 && this.counters[2] == 0) continue;
                    msg.acknowledge();
                }
                this.doneSignal.countDown();
                log.info(this.workerName + " done...");
                consumer.close();
                sess.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

