/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.test.TestSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableConsumerCloseAndReconnectTest
extends TestSupport {
    protected static final long RECEIVE_TIMEOUT = 5000L;
    private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTest.class);
    BrokerService brokerService;
    protected Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private MessageProducer producer;
    private Destination destination;
    private int messageCount;
    private String vmConnectorURI;

    protected void setUp() throws Exception {
        this.createBroker();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        this.stopBroker();
        super.tearDown();
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(this.vmConnectorURI);
    }

    protected void createBroker() throws Exception {
        this.brokerService = new BrokerService();
        this.brokerService.setUseJmx(false);
        this.brokerService.setPersistent(false);
        KahaDBPersistenceAdapter store = new KahaDBPersistenceAdapter();
        this.brokerService.setPersistenceAdapter((PersistenceAdapter)store);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        this.vmConnectorURI = this.brokerService.getVmConnectorURI().toString();
    }

    protected void stopBroker() throws Exception {
        this.brokerService.stop();
        this.brokerService.waitUntilStopped();
    }

    public void testDurableSubscriberReconnectMultipleTimes() throws Exception {
        Connection dummyConnection = this.createConnection();
        dummyConnection.start();
        this.makeConsumer(1);
        this.closeConsumer();
        this.publish(30);
        int counter = 1;
        for (int i = 0; i < 15; ++i) {
            this.makeConsumer(1);
            Message message = this.consumer.receive(5000L);
            DurableConsumerCloseAndReconnectTest.assertTrue((String)"Should have received a message!", (message != null ? 1 : 0) != 0);
            LOG.info("Received message " + counter++);
            message = this.consumer.receive(5000L);
            DurableConsumerCloseAndReconnectTest.assertTrue((String)"Should have received a message!", (message != null ? 1 : 0) != 0);
            LOG.info("Received message " + counter++);
            this.closeConsumer();
        }
        dummyConnection.close();
    }

    public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
        Connection dummyConnection = this.createConnection();
        dummyConnection.start();
        this.consumeMessagesDeliveredWhileConsumerClosed();
        dummyConnection.close();
        this.consumeMessagesDeliveredWhileConsumerClosed();
    }

    protected void consumeMessagesDeliveredWhileConsumerClosed() throws Exception {
        this.makeConsumer();
        this.closeConsumer();
        this.publish(1);
        Thread.sleep(1000L);
        this.makeConsumer();
        Message message = this.consumer.receive(5000L);
        DurableConsumerCloseAndReconnectTest.assertTrue((String)"Should have received a message!", (message != null ? 1 : 0) != 0);
        this.closeConsumer();
        LOG.info("Now lets create the consumer again and because we didn't ack, we should get it again");
        this.makeConsumer();
        message = this.consumer.receive(5000L);
        DurableConsumerCloseAndReconnectTest.assertTrue((String)"Should have received a message!", (message != null ? 1 : 0) != 0);
        message.acknowledge();
        this.closeConsumer();
        LOG.info("Now lets create the consumer again and because we did ack, we should not get it again");
        this.makeConsumer();
        message = this.consumer.receive(2000L);
        DurableConsumerCloseAndReconnectTest.assertTrue((String)"Should have no more messages left!", (message == null ? 1 : 0) != 0);
        this.closeConsumer();
        LOG.info("Lets publish one more message now");
        this.publish(1);
        this.makeConsumer();
        message = this.consumer.receive(5000L);
        DurableConsumerCloseAndReconnectTest.assertTrue((String)"Should have received a message!", (message != null ? 1 : 0) != 0);
        message.acknowledge();
        this.closeConsumer();
    }

    protected void publish(int numMessages) throws Exception {
        this.connection = this.createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 2);
        this.destination = this.createDestination();
        this.producer = this.session.createProducer(this.destination);
        this.producer.setDeliveryMode(2);
        for (int i = 0; i < numMessages; ++i) {
            TextMessage msg = this.session.createTextMessage("This is a test: " + this.messageCount++);
            this.producer.send((Message)msg);
        }
        this.producer.close();
        this.producer = null;
        this.closeSession();
    }

    protected Destination createDestination() throws JMSException {
        if (this.isTopic()) {
            return this.session.createTopic(this.getSubject());
        }
        return this.session.createQueue(this.getSubject());
    }

    protected boolean isTopic() {
        return true;
    }

    protected void closeConsumer() throws JMSException {
        LOG.info("Closing the consumer");
        this.consumer.close();
        this.consumer = null;
        this.closeSession();
    }

    protected void closeSession() throws JMSException {
        this.session.close();
        this.session = null;
        this.connection.close();
        this.connection = null;
    }

    protected void makeConsumer() throws Exception {
        this.makeConsumer(2);
    }

    protected void makeConsumer(int ackMode) throws Exception {
        String durableName = this.getName();
        String clientID = this.getSubject();
        LOG.info("Creating a durable subscriber for clientID: " + clientID + " and durable name: " + durableName);
        this.createSession(clientID, ackMode);
        this.consumer = this.createConsumer(durableName);
    }

    private MessageConsumer createConsumer(String durableName) throws JMSException {
        if (this.destination instanceof Topic) {
            return this.session.createDurableSubscriber((Topic)this.destination, durableName);
        }
        return this.session.createConsumer(this.destination);
    }

    protected void createSession(String clientID, int ackMode) throws Exception {
        this.connection = this.createConnection();
        this.connection.setClientID(clientID);
        this.connection.start();
        this.session = this.connection.createSession(false, ackMode);
        this.destination = this.createDestination();
    }
}

