/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.util.Properties;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.test.JmsTopicSendReceiveTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JmsDurableTopicSlowReceiveTest
extends JmsTopicSendReceiveTest {
    static final int NMSG = 200;
    static final int MSIZE = 256000;
    private static final transient Logger LOG = LoggerFactory.getLogger(JmsDurableTopicSlowReceiveTest.class);
    private static final String COUNT_PROPERY_NAME = "count";
    protected Connection connection2;
    protected Session session2;
    protected Session consumeSession2;
    protected MessageConsumer consumer2;
    protected MessageProducer producer2;
    protected Destination consumerDestination2;
    BrokerService broker;
    private Connection connection3;
    private Session consumeSession3;
    private TopicSubscriber consumer3;

    @Override
    protected void setUp() throws Exception {
        this.durable = true;
        this.broker = this.createBroker();
        super.setUp();
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
        this.broker.stop();
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        ActiveMQConnectionFactory result = new ActiveMQConnectionFactory("vm://localhost?async=false");
        Properties props = new Properties();
        props.put("prefetchPolicy.durableTopicPrefetch", "5");
        props.put("prefetchPolicy.optimizeDurableTopicPrefetch", "5");
        result.setProperties(props);
        return result;
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService answer = new BrokerService();
        this.configureBroker(answer);
        answer.start();
        return answer;
    }

    protected void configureBroker(BrokerService answer) throws Exception {
        answer.setDeleteAllMessagesOnStartup(true);
    }

    public void testSlowReceiver() throws Exception {
        this.connection2 = this.createConnection();
        this.connection2.setClientID("test");
        this.connection2.start();
        this.consumeSession2 = this.connection2.createSession(false, 1);
        this.session2 = this.connection2.createSession(false, 1);
        this.consumerDestination2 = this.session2.createTopic(this.getConsumerSubject() + "2");
        this.consumer2 = this.consumeSession2.createDurableSubscriber((Topic)this.consumerDestination2, this.getName());
        this.consumer2.close();
        this.connection2.close();
        new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    int count = 0;
                    for (int loop = 0; loop < 4; ++loop) {
                        JmsDurableTopicSlowReceiveTest.this.connection2 = JmsDurableTopicSlowReceiveTest.this.createConnection();
                        JmsDurableTopicSlowReceiveTest.this.connection2.start();
                        JmsDurableTopicSlowReceiveTest.this.session2 = JmsDurableTopicSlowReceiveTest.this.connection2.createSession(false, 1);
                        JmsDurableTopicSlowReceiveTest.this.producer2 = JmsDurableTopicSlowReceiveTest.this.session2.createProducer(null);
                        JmsDurableTopicSlowReceiveTest.this.producer2.setDeliveryMode(JmsDurableTopicSlowReceiveTest.this.deliveryMode);
                        Thread.sleep(1000L);
                        for (int i = 0; i < 50; ++i) {
                            BytesMessage message = JmsDurableTopicSlowReceiveTest.this.session2.createBytesMessage();
                            message.writeBytes(new byte[256000]);
                            message.setStringProperty("test", "test");
                            message.setIntProperty(JmsDurableTopicSlowReceiveTest.COUNT_PROPERY_NAME, count);
                            message.setJMSType("test");
                            JmsDurableTopicSlowReceiveTest.this.producer2.send(JmsDurableTopicSlowReceiveTest.this.consumerDestination2, (Message)message);
                            Thread.sleep(50L);
                            if (JmsDurableTopicSlowReceiveTest.this.verbose) {
                                LOG.debug("Sent(" + loop + "): " + i);
                            }
                            ++count;
                        }
                        JmsDurableTopicSlowReceiveTest.this.producer2.close();
                        JmsDurableTopicSlowReceiveTest.this.connection2.stop();
                        JmsDurableTopicSlowReceiveTest.this.connection2.close();
                    }
                }
                catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        }, "SENDER Thread").start();
        this.connection3 = this.createConnection();
        this.connection3.setClientID("test");
        this.connection3.start();
        this.consumeSession3 = this.connection3.createSession(false, 2);
        this.consumer3 = this.consumeSession3.createDurableSubscriber((Topic)this.consumerDestination2, this.getName());
        this.connection3.close();
        int count = 0;
        for (int loop = 0; loop < 4; ++loop) {
            int i;
            this.connection3 = this.createConnection();
            this.connection3.setClientID("test");
            this.connection3.start();
            this.consumeSession3 = this.connection3.createSession(false, 2);
            this.consumer3 = this.consumeSession3.createDurableSubscriber((Topic)this.consumerDestination2, this.getName());
            Message msg = null;
            for (i = 0; i < 50 && (msg = this.consumer3.receive(10000L)) != null; ++i) {
                if (this.verbose) {
                    LOG.debug("Received(" + loop + "): " + i + " count = " + msg.getIntProperty(COUNT_PROPERY_NAME));
                }
                JmsDurableTopicSlowReceiveTest.assertNotNull((Object)msg);
                JmsDurableTopicSlowReceiveTest.assertEquals((String)msg.getJMSType(), (String)"test");
                JmsDurableTopicSlowReceiveTest.assertEquals((String)msg.getStringProperty("test"), (String)"test");
                JmsDurableTopicSlowReceiveTest.assertEquals((String)"Messages received out of order", (int)count, (int)msg.getIntProperty(COUNT_PROPERY_NAME));
                Thread.sleep(500L);
                msg.acknowledge();
                ++count;
            }
            this.consumer3.close();
            JmsDurableTopicSlowReceiveTest.assertEquals((String)("Receiver " + loop), (int)50, (int)i);
            this.connection3.close();
        }
    }
}

