/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4157Test {
    static final Logger LOG = LoggerFactory.getLogger(AMQ4157Test.class);
    private BrokerService broker;
    private ActiveMQConnectionFactory connectionFactory;
    private final Destination destination = new ActiveMQQueue("Test");
    private final String payloadString = new String(new byte[8192]);
    private final boolean useBytesMessage = true;
    private final int parallelProducer = 20;
    private final int parallelConsumer = 100;
    private final Vector<Exception> exceptions = new Vector();
    long toSend = 1000L;

    @Test
    public void testPublishCountsWithRollbackConsumer() throws Exception {
        int i;
        this.startBroker(true);
        final AtomicLong sharedCount = new AtomicLong(this.toSend);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (i = 0; i < 100; ++i) {
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        AMQ4157Test.this.consumeOneAndRollback();
                    }
                    catch (Exception e) {
                        AMQ4157Test.this.exceptions.add(e);
                    }
                }
            });
        }
        for (i = 0; i < 20; ++i) {
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        AMQ4157Test.this.publishMessages(sharedCount, 0);
                    }
                    catch (Exception e) {
                        AMQ4157Test.this.exceptions.add(e);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(30L, TimeUnit.MINUTES);
        Assert.assertTrue((String)"Producers done in time", (boolean)executorService.isTerminated());
        Assert.assertTrue((String)("No exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
        this.restartBroker(500);
        LOG.info("Attempting consume of {} messages", (Object)this.toSend);
        this.consumeMessages(this.toSend);
    }

    private void consumeOneAndRollback() throws Exception {
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(true, 0);
        MessageConsumer consumer = session.createConsumer(this.destination);
        Message message = null;
        while (message == null) {
            message = consumer.receive(1000L);
        }
        session.rollback();
        connection.close();
    }

    private void consumeMessages(long count) throws Exception {
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer(this.destination);
        int i = 0;
        while ((long)i < count) {
            Assert.assertNotNull((String)("got message " + i), (Object)consumer.receive(20000L));
            ++i;
        }
        Assert.assertNull((String)"none left over", (Object)consumer.receive(2000L));
    }

    private void restartBroker(int restartDelay) throws Exception {
        this.stopBroker();
        TimeUnit.MILLISECONDS.sleep(restartDelay);
        this.startBroker(false);
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
    }

    private void publishMessages(AtomicLong count, int expiry) throws Exception {
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        connection.setWatchTopicAdvisories(false);
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(this.destination);
        while (count.getAndDecrement() > 0L) {
            BytesMessage message = null;
            message = session.createBytesMessage();
            message.writeBytes(this.payloadString.getBytes());
            producer.send((Message)message, 2, 5, (long)expiry);
        }
        connection.syncSendPacket((Command)new ConnectionControl());
        connection.close();
    }

    public void startBroker(boolean deleteAllMessages) throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        String options = "?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.prefetchPolicy.all=1000&jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
        this.connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri() + options);
    }
}

