/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.io.File;
import java.io.IOException;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableSubProcessMultiRestartTest {
    private static final Logger LOG = LoggerFactory.getLogger(DurableSubProcessMultiRestartTest.class);
    public static final long RUNTIME = 60000L;
    private BrokerService broker;
    private ActiveMQTopic topic;
    private final ReentrantReadWriteLock processLock = new ReentrantReadWriteLock(true);
    private int restartCount = 0;
    private final int SUBSCRIPTION_ID = 1;
    static final Vector<Throwable> exceptions = new Vector();

    @Test
    public void testProcess() throws Exception {
        DurableSubscriber durableSubscriber = new DurableSubscriber(1);
        MsgProducer msgProducer = new MsgProducer();
        try {
            durableSubscriber.start();
            msgProducer.start();
            long endTime = System.currentTimeMillis() + 60000L;
            while (endTime > System.currentTimeMillis()) {
                Thread.sleep(10000L);
                this.restartBroker();
            }
        }
        catch (Throwable e) {
            DurableSubProcessMultiRestartTest.exit("ProcessTest.testProcess failed.", e);
        }
        try {
            msgProducer.join();
            durableSubscriber.join();
        }
        catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        this.restartBroker();
        Assert.assertTrue((String)("no exceptions: " + String.valueOf(exceptions)), (boolean)exceptions.isEmpty());
        final KahaDBPersistenceAdapter pa = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        Assert.assertTrue((String)("only less than two journal files should be left: " + pa.getStore().getJournal().getFileMap().size()), (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return pa.getStore().getJournal().getFileMap().size() <= 2;
            }
        }, (long)TimeUnit.MINUTES.toMillis(3L)));
        LOG.info("DONE.");
    }

    private void restartBroker() throws Exception {
        LOG.info("Broker restart: waiting for components.");
        this.processLock.writeLock().lock();
        try {
            this.destroyBroker();
            this.startBroker(false);
            ++this.restartCount;
            LOG.info("Broker restarted. count: " + this.restartCount);
        }
        finally {
            this.processLock.writeLock().unlock();
        }
    }

    public static void exit(String message) {
        DurableSubProcessMultiRestartTest.exit(message, null);
    }

    public static void exit(String message, Throwable e) {
        RuntimeException cause = new RuntimeException(message, e);
        LOG.error(message, (Throwable)cause);
        exceptions.add(cause);
        Assert.fail((String)cause.toString());
    }

    @Before
    public void setUp() throws Exception {
        this.topic = new ActiveMQTopic("TopicT");
        this.startBroker();
    }

    @After
    public void tearDown() throws Exception {
        this.destroyBroker();
    }

    private void startBroker() throws Exception {
        this.startBroker(true);
    }

    private void startBroker(boolean deleteAllMessages) throws Exception {
        if (this.broker != null) {
            return;
        }
        this.broker = BrokerFactory.createBroker((String)("broker:(vm://" + DurableSubProcessMultiRestartTest.getName() + ")"));
        this.broker.setBrokerName(DurableSubProcessMultiRestartTest.getName());
        this.broker.setAdvisorySupport(false);
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.broker.setKeepDurableSubsActive(true);
        File kahadbData = new File("activemq-data/" + DurableSubProcessMultiRestartTest.getName() + "-kahadb");
        if (deleteAllMessages) {
            DurableSubProcessMultiRestartTest.delete(kahadbData);
        }
        this.broker.setPersistent(true);
        KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
        kahadb.setDirectory(kahadbData);
        kahadb.setJournalMaxFileLength(20480);
        this.broker.setPersistenceAdapter((PersistenceAdapter)kahadb);
        this.broker.addConnector("tcp://localhost:61656");
        this.broker.getSystemUsage().getMemoryUsage().setLimit(0x10000000L);
        this.broker.getSystemUsage().getTempUsage().setLimit(0x10000000L);
        this.broker.getSystemUsage().getStoreUsage().setLimit(0x10000000L);
        this.broker.start();
    }

    protected static String getName() {
        return "DurableSubProcessMultiRestartTest";
    }

    private static boolean delete(File path) {
        if (path == null) {
            return true;
        }
        if (path.isDirectory()) {
            for (File file : path.listFiles()) {
                DurableSubProcessMultiRestartTest.delete(file);
            }
        }
        return path.delete();
    }

    private void destroyBroker() throws Exception {
        if (this.broker == null) {
            return;
        }
        this.broker.stop();
        this.broker = null;
    }

    private final class DurableSubscriber
    extends Thread {
        String url;
        final ConnectionFactory cf;
        public static final String SUBSCRIPTION_NAME = "subscription";
        private final int id;
        private final String conClientId;
        private long msgCount;

        public DurableSubscriber(int id) throws JMSException {
            super("DurableSubscriber" + id);
            this.url = "tcp://localhost:61656";
            this.cf = new ActiveMQConnectionFactory(this.url);
            this.setDaemon(true);
            this.id = id;
            this.conClientId = "cli" + id;
            this.subscribe();
        }

        @Override
        public void run() {
            long end = System.currentTimeMillis() + 60000L;
            try {
                while (end > System.currentTimeMillis()) {
                    DurableSubProcessMultiRestartTest.this.processLock.readLock().lock();
                    try {
                        this.process(5000L);
                    }
                    finally {
                        DurableSubProcessMultiRestartTest.this.processLock.readLock().unlock();
                    }
                }
                this.unsubscribe();
            }
            catch (JMSException maybe) {
                if (!(maybe.getCause() instanceof IOException)) {
                    DurableSubProcessMultiRestartTest.exit(this.toString() + " failed with JMSException", maybe);
                }
            }
            catch (Throwable e) {
                DurableSubProcessMultiRestartTest.exit(this.toString() + " failed.", e);
            }
            LOG.info(this.toString() + " DONE. MsgCout=" + this.msgCount);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void process(long duration) throws JMSException {
            LOG.info(this.toString() + " ONLINE.");
            Connection con = this.openConnection();
            Session sess = con.createSession(false, 1);
            TopicSubscriber consumer = sess.createDurableSubscriber((Topic)DurableSubProcessMultiRestartTest.this.topic, SUBSCRIPTION_NAME);
            long end = System.currentTimeMillis() + duration;
            try {
                while (end > System.currentTimeMillis()) {
                    Message message = consumer.receive(100L);
                    if (message == null) continue;
                    LOG.info(this.toString() + "received message...");
                    ++this.msgCount;
                }
            }
            finally {
                sess.close();
                con.close();
                LOG.info(this.toString() + " OFFLINE.");
            }
        }

        private Connection openConnection() throws JMSException {
            Connection con = this.cf.createConnection();
            con.setClientID(this.conClientId);
            con.start();
            return con;
        }

        private void subscribe() throws JMSException {
            Connection con = this.openConnection();
            Session session = con.createSession(false, 1);
            session.createDurableSubscriber((Topic)DurableSubProcessMultiRestartTest.this.topic, SUBSCRIPTION_NAME);
            LOG.info(this.toString() + " SUBSCRIBED");
            session.close();
            con.close();
        }

        private void unsubscribe() throws JMSException {
            Connection con = this.openConnection();
            Session session = con.createSession(false, 1);
            session.unsubscribe(SUBSCRIPTION_NAME);
            LOG.info(this.toString() + " UNSUBSCRIBED");
            session.close();
            con.close();
        }

        @Override
        public String toString() {
            return "DurableSubscriber[id=" + this.id + "]";
        }
    }

    final class MsgProducer
    extends Thread {
        String url;
        final ConnectionFactory cf;
        private long msgCount;
        int messageRover;

        public MsgProducer() {
            super("MsgProducer");
            this.url = "vm://" + DurableSubProcessMultiRestartTest.getName();
            this.cf = new ActiveMQConnectionFactory(this.url);
            this.messageRover = 0;
            this.setDaemon(true);
        }

        @Override
        public void run() {
            long endTime = 60000L + System.currentTimeMillis();
            try {
                while (endTime > System.currentTimeMillis()) {
                    Thread.sleep(500L);
                    DurableSubProcessMultiRestartTest.this.processLock.readLock().lock();
                    try {
                        this.send();
                    }
                    finally {
                        DurableSubProcessMultiRestartTest.this.processLock.readLock().unlock();
                    }
                    LOG.info("MsgProducer msgCount=" + this.msgCount);
                }
            }
            catch (Throwable e) {
                DurableSubProcessMultiRestartTest.exit("Server.run failed", e);
            }
        }

        public void send() throws JMSException {
            LOG.info("Sending ... ");
            Connection con = this.cf.createConnection();
            Session sess = con.createSession(false, 1);
            MessageProducer prod = sess.createProducer(null);
            Message message = sess.createMessage();
            message.setIntProperty("ID", ++this.messageRover);
            message.setBooleanProperty("COMMIT", true);
            prod.send((Destination)DurableSubProcessMultiRestartTest.this.topic, message);
            ++this.msgCount;
            LOG.info("Message Sent.");
            sess.close();
            con.close();
        }
    }
}

