/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=BlockJUnit4ClassRunner.class)
public class MessageGroupLateArrivalsTest {
    public static final Logger log = LoggerFactory.getLogger(MessageGroupLateArrivalsTest.class);
    protected Connection connection;
    protected Session session;
    protected MessageProducer producer;
    protected Destination destination;
    BrokerService broker;
    protected TransportConnector connector;
    protected HashMap<String, Integer> messageCount = new HashMap();
    protected HashMap<String, Set<String>> messageGroups = new HashMap();

    @Before
    public void setUp() throws Exception {
        this.broker = this.createBroker();
        this.broker.start();
        ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(String.valueOf(this.connector.getConnectUri()) + "?jms.prefetchPolicy.all=1000");
        this.connection = connFactory.createConnection();
        this.session = this.connection.createSession(false, 2);
        this.destination = new ActiveMQQueue("test-queue2");
        this.producer = this.session.createProducer(this.destination);
        this.connection.start();
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService service = new BrokerService();
        service.setPersistent(false);
        service.setUseJmx(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        policy.setUseConsumerPriority(true);
        policyMap.setDefaultEntry(policy);
        service.setDestinationPolicy(policyMap);
        this.connector = service.addConnector("tcp://localhost:0");
        return service;
    }

    @After
    public void tearDown() throws Exception {
        this.producer.close();
        this.session.close();
        this.connection.close();
        this.broker.stop();
    }

    @Test(timeout=30000L)
    public void testConsumersLateToThePartyGetSomeNewGroups() throws Exception {
        int i;
        int perBatch = 3;
        int[] counters = new int[]{3, 3, 3};
        CountDownLatch startSignal = new CountDownLatch(0);
        CountDownLatch doneSignal = new CountDownLatch(3);
        CountDownLatch worker1Started = new CountDownLatch(1);
        CountDownLatch worker2Started = new CountDownLatch(1);
        CountDownLatch worker3Started = new CountDownLatch(1);
        this.messageCount.put("worker1", 0);
        this.messageGroups.put("worker1", new HashSet());
        Worker worker1 = new Worker(this.connection, this.destination, "worker1", startSignal, doneSignal, counters, this.messageCount, this.messageGroups, worker1Started);
        this.messageCount.put("worker2", 0);
        this.messageGroups.put("worker2", new HashSet());
        Worker worker2 = new Worker(this.connection, this.destination, "worker2", startSignal, doneSignal, counters, this.messageCount, this.messageGroups, worker2Started);
        this.messageCount.put("worker3", 0);
        this.messageGroups.put("worker3", new HashSet());
        Worker worker3 = new Worker(this.connection, this.destination, "worker3", startSignal, doneSignal, counters, this.messageCount, this.messageGroups, worker3Started);
        new Thread(worker1).start();
        new Thread(worker2).start();
        worker1Started.await();
        worker2Started.await();
        for (i = 0; i < 3; ++i) {
            TextMessage msga = this.session.createTextMessage("hello a");
            msga.setStringProperty("JMSXGroupID", "A");
            this.producer.send((Message)msga);
            TextMessage msgb = this.session.createTextMessage("hello b");
            msgb.setStringProperty("JMSXGroupID", "B");
            this.producer.send((Message)msgb);
        }
        new Thread(worker3).start();
        worker3Started.await();
        for (i = 0; i < 3; ++i) {
            TextMessage msgc = this.session.createTextMessage("hello c");
            msgc.setStringProperty("JMSXGroupID", "C");
            this.producer.send((Message)msgc);
        }
        doneSignal.await();
        ArrayList<String> workers = new ArrayList<String>(this.messageCount.keySet());
        Collections.sort(workers);
        for (String worker : workers) {
            log.info("worker " + worker + " received " + String.valueOf(this.messageCount.get(worker)) + " messages from groups " + String.valueOf(this.messageGroups.get(worker)));
        }
        for (String worker : workers) {
            Assert.assertEquals((String)("worker " + worker + " received " + String.valueOf(this.messageCount.get(worker)) + " messages from groups " + String.valueOf(this.messageGroups.get(worker))), (long)3L, (long)this.messageCount.get(worker).intValue());
            Assert.assertEquals((String)("worker " + worker + " received " + String.valueOf(this.messageCount.get(worker)) + " messages from groups " + String.valueOf(this.messageGroups.get(worker))), (long)1L, (long)this.messageGroups.get(worker).size());
        }
    }

    @Test(timeout=30000L)
    public void testConsumerLateToBigPartyGetsNewGroup() throws Exception {
        int i;
        int perBatch = 2;
        int[] counters = new int[]{2, 2, 2};
        CountDownLatch startSignal = new CountDownLatch(0);
        CountDownLatch doneSignal = new CountDownLatch(2);
        CountDownLatch worker1Started = new CountDownLatch(1);
        CountDownLatch worker2Started = new CountDownLatch(1);
        this.messageCount.put("worker1", 0);
        this.messageGroups.put("worker1", new HashSet());
        Worker worker1 = new Worker(this.connection, this.destination, "worker1", startSignal, doneSignal, counters, this.messageCount, this.messageGroups, worker1Started);
        this.messageCount.put("worker2", 0);
        this.messageGroups.put("worker2", new HashSet());
        Worker worker2 = new Worker(this.connection, this.destination, "worker2", startSignal, doneSignal, counters, this.messageCount, this.messageGroups, worker2Started);
        new Thread(worker1).start();
        for (i = 0; i < 2; ++i) {
            TextMessage msga = this.session.createTextMessage("hello c");
            msga.setStringProperty("JMSXGroupID", "A");
            this.producer.send((Message)msga);
            TextMessage msgb = this.session.createTextMessage("hello b");
            msgb.setStringProperty("JMSXGroupID", "B");
            this.producer.send((Message)msgb);
        }
        new Thread(worker2).start();
        worker2Started.await();
        for (i = 0; i < 2; ++i) {
            TextMessage msgc = this.session.createTextMessage("hello a");
            msgc.setStringProperty("JMSXGroupID", "C");
            this.producer.send((Message)msgc);
        }
        doneSignal.await();
        log.info("worker1  received " + String.valueOf(this.messageCount.get("worker1")) + " messages from groups " + String.valueOf(this.messageGroups.get("worker1")));
        Assert.assertEquals((String)("worker1 received " + String.valueOf(this.messageCount.get("worker1")) + " messages from groups " + String.valueOf(this.messageGroups.get("worker1"))), (long)4L, (long)this.messageCount.get("worker1").intValue());
        Assert.assertEquals((String)("worker1 received " + String.valueOf(this.messageCount.get("worker1")) + " messages from groups " + String.valueOf(this.messageGroups.get("worker1"))), (long)2L, (long)this.messageGroups.get("worker1").size());
        log.info("worker2  received " + String.valueOf(this.messageCount.get("worker2")) + " messages from groups " + String.valueOf(this.messageGroups.get("worker2")));
        Assert.assertEquals((String)("worker2 received " + String.valueOf(this.messageCount.get("worker2")) + " messages from groups " + String.valueOf(this.messageGroups.get("worker2"))), (long)4L, (long)this.messageCount.get("worker1").intValue());
        Assert.assertEquals((String)("worker2 received " + String.valueOf(this.messageCount.get("worker2")) + " messages from groups " + String.valueOf(this.messageGroups.get("worker2"))), (long)1L, (long)this.messageGroups.get("worker2").size());
    }

    private static final class Worker
    implements Runnable {
        private Connection connection = null;
        private Destination queueName = null;
        private String workerName = null;
        private CountDownLatch startSignal = null;
        private CountDownLatch doneSignal = null;
        private CountDownLatch workerStarted = null;
        private int[] counters = null;
        private final HashMap<String, Integer> messageCount;
        private final HashMap<String, Set<String>> messageGroups;

        private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount, HashMap<String, Set<String>> messageGroups, CountDownLatch workerStarted) {
            this.connection = connection;
            this.queueName = queueName;
            this.workerName = workerName;
            this.startSignal = startSignal;
            this.doneSignal = doneSignal;
            this.counters = counters;
            this.messageCount = messageCount;
            this.messageGroups = messageGroups;
            this.workerStarted = workerStarted;
        }

        private void update(String group) {
            int msgCount = this.messageCount.get(this.workerName);
            this.messageCount.put(this.workerName, msgCount + 1);
            Set<String> groups = this.messageGroups.get(this.workerName);
            groups.add(group);
            this.messageGroups.put(this.workerName, groups);
        }

        @Override
        public void run() {
            try {
                this.startSignal.await();
                log.info(this.workerName);
                Session sess = this.connection.createSession(false, 2);
                MessageConsumer consumer = sess.createConsumer(this.queueName);
                this.workerStarted.countDown();
                while (true) {
                    if (this.counters[0] == 0 && this.counters[1] == 0 && this.counters[2] == 0) break;
                    Message msg = consumer.receive(500L);
                    if (msg == null) continue;
                    msg.acknowledge();
                    String group = msg.getStringProperty("JMSXGroupID");
                    msg.getBooleanProperty("JMSXGroupFirstForConsumer");
                    if ("A".equals(group)) {
                        this.counters[0] = this.counters[0] - 1;
                        this.update(group);
                    } else if ("B".equals(group)) {
                        this.counters[1] = this.counters[1] - 1;
                        this.update(group);
                    } else if ("C".equals(group)) {
                        this.counters[2] = this.counters[2] - 1;
                        this.update(group);
                    } else {
                        log.warn(this.workerName + ", unknown group");
                    }
                    if (this.counters[0] == 0 && this.counters[1] == 0 && this.counters[2] == 0) continue;
                    msg.acknowledge();
                }
                this.doneSignal.countDown();
                log.info(this.workerName + " done...");
                consumer.close();
                sess.close();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

