/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.ConnectionConsumer;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.ServerSession;
import jakarta.jms.ServerSessionPool;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicConnection;
import jakarta.jms.TopicPublisher;
import jakarta.jms.TopicSession;
import jakarta.jms.TopicSubscriber;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AMQ3961Test {
    private static BrokerService brokerService;
    private static String BROKER_ADDRESS;
    private ActiveMQConnectionFactory connectionFactory;
    private String connectionUri;
    public static final int MESSAGE_COUNT = 16;
    private final List<TestServerSession> processedSessions = new LinkedList<TestServerSession>();
    private final List<TestServerSession> committedSessions = new LinkedList<TestServerSession>();

    @Before
    public void setUp() throws Exception {
        brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(true);
        brokerService.setDeleteAllMessagesOnStartup(true);
        this.connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
        brokerService.start();
        brokerService.waitUntilStarted();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
    }

    @After
    public void tearDown() throws Exception {
        brokerService.stop();
        brokerService.waitUntilStopped();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPrefetchInDurableSubscription() throws Exception {
        ActiveMQTopic topic = new ActiveMQTopic("TestTopic");
        TopicConnection initialSubConnection = this.connectionFactory.createTopicConnection();
        initialSubConnection.setClientID("TestClient");
        initialSubConnection.start();
        TopicSession initialSubSession = initialSubConnection.createTopicSession(false, 2);
        TopicSubscriber initialSubscriber = initialSubSession.createDurableSubscriber((Topic)topic, "TestSubscriber");
        initialSubscriber.close();
        initialSubSession.close();
        initialSubConnection.close();
        TopicConnection publisherConnection = this.connectionFactory.createTopicConnection();
        publisherConnection.start();
        TopicSession publisherSession = publisherConnection.createTopicSession(false, 1);
        TopicPublisher publisher = publisherSession.createPublisher((Topic)topic);
        for (int i = 1; i <= 16; ++i) {
            TextMessage msg = publisherSession.createTextMessage("Message #" + i);
            publisher.publish((Message)msg);
        }
        publisher.close();
        publisherSession.close();
        publisherConnection.close();
        TopicConnection connection = this.connectionFactory.createTopicConnection();
        connection.setClientID("TestClient");
        connection.start();
        TestServerSessionPool pool = new TestServerSessionPool(connection);
        ConnectionConsumer connectionConsumer = connection.createDurableConnectionConsumer((Topic)topic, "TestSubscriber", null, (ServerSessionPool)pool, 1);
        block7: while (true) {
            LinkedList<TestServerSession> collected;
            int lastMsgCount = 0;
            int msgCount = 0;
            do {
                lastMsgCount = msgCount;
                Thread.sleep(200L);
                List<TestServerSession> list = this.processedSessions;
                synchronized (list) {
                    msgCount = this.processedSessions.size();
                }
            } while (lastMsgCount < msgCount);
            if (lastMsgCount == 0) break;
            List<TestServerSession> list = this.processedSessions;
            synchronized (list) {
                collected = new LinkedList<TestServerSession>(this.processedSessions);
                this.processedSessions.clear();
            }
            Iterator sessions = collected.iterator();
            while (true) {
                if (!sessions.hasNext()) continue block7;
                TestServerSession session = (TestServerSession)sessions.next();
                this.committedSessions.add(session);
                session.getSession().commit();
                session.getSession().close();
            }
            break;
        }
        connectionConsumer.close();
        TopicSession finalSession = connection.createTopicSession(false, 1);
        finalSession.unsubscribe("TestSubscriber");
        finalSession.close();
        connection.close();
        Assert.assertEquals((long)16L, (long)this.committedSessions.size());
    }

    static {
        BROKER_ADDRESS = "tcp://localhost:0";
    }

    public class TestServerSessionPool
    implements ServerSessionPool {
        private final TopicConnection connection;

        public TestServerSessionPool(TopicConnection connection) {
            this.connection = connection;
        }

        public ServerSession getServerSession() throws JMSException {
            TopicSession topicSession = this.connection.createTopicSession(true, 1);
            return new TestServerSession(topicSession);
        }
    }

    public class TestServerSession
    implements ServerSession,
    MessageListener {
        private final TopicSession session;

        public TestServerSession(TopicSession session) throws JMSException {
            this.session = session;
            session.setMessageListener((MessageListener)this);
        }

        public Session getSession() throws JMSException {
            return this.session;
        }

        public void start() throws JMSException {
            this.session.run();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(Message message) {
            List<TestServerSession> list = AMQ3961Test.this.processedSessions;
            synchronized (list) {
                AMQ3961Test.this.processedSessions.add(this);
            }
        }
    }
}

