/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import jakarta.jms.BytesMessage;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KahaDBFastEnqueueTest {
    private static final Logger LOG = LoggerFactory.getLogger(KahaDBFastEnqueueTest.class);
    private BrokerService broker;
    private ActiveMQConnectionFactory connectionFactory;
    KahaDBPersistenceAdapter kahaDBPersistenceAdapter;
    private final Destination destination = new ActiveMQQueue("Test");
    private final String payloadString = new String(new byte[6144]);
    private final boolean useBytesMessage = true;
    private final int parallelProducer = 20;
    private final Vector<Exception> exceptions = new Vector();
    long toSend = 10000L;
    final double sampleRate = 100000.0;

    @Test
    public void testPublishNoConsumer() throws Exception {
        this.startBroker(true, 10);
        final AtomicLong sharedCount = new AtomicLong(this.toSend);
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; ++i) {
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        KahaDBFastEnqueueTest.this.publishMessages(sharedCount, 0);
                    }
                    catch (Exception e) {
                        KahaDBFastEnqueueTest.this.exceptions.add(e);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(30L, TimeUnit.MINUTES);
        Assert.assertTrue((String)"Producers done in time", (boolean)executorService.isTerminated());
        Assert.assertTrue((String)("No exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
        long totalSent = this.toSend * (long)this.payloadString.length();
        double duration = System.currentTimeMillis() - start;
        this.stopBroker();
        LOG.info("Duration:                " + duration + "ms");
        LOG.info("Rate:                       " + (double)(this.toSend * 1000L) / duration + "m/s");
        LOG.info("Total send:             " + totalSent);
        LOG.info("Total journal write: " + this.kahaDBPersistenceAdapter.getStore().getJournal().length());
        LOG.info("Total index size " + this.kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize());
        LOG.info("Total store size: " + this.kahaDBPersistenceAdapter.size());
        LOG.info("Journal writes %:    " + (double)this.kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100.0 + "%");
        this.restartBroker(0, 1200000);
        this.consumeMessages(this.toSend);
    }

    @Test
    public void testPublishNoConsumerNoCheckpoint() throws Exception {
        this.toSend = 100L;
        this.startBroker(true, 0);
        final AtomicLong sharedCount = new AtomicLong(this.toSend);
        long start = System.currentTimeMillis();
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; ++i) {
            executorService.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        KahaDBFastEnqueueTest.this.publishMessages(sharedCount, 0);
                    }
                    catch (Exception e) {
                        KahaDBFastEnqueueTest.this.exceptions.add(e);
                    }
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(30L, TimeUnit.MINUTES);
        Assert.assertTrue((String)"Producers done in time", (boolean)executorService.isTerminated());
        Assert.assertTrue((String)("No exceptions: " + this.exceptions), (boolean)this.exceptions.isEmpty());
        long totalSent = this.toSend * (long)this.payloadString.length();
        this.broker.getAdminView().gc();
        double duration = System.currentTimeMillis() - start;
        this.stopBroker();
        LOG.info("Duration:                " + duration + "ms");
        LOG.info("Rate:                       " + (double)(this.toSend * 1000L) / duration + "m/s");
        LOG.info("Total send:             " + totalSent);
        LOG.info("Total journal write: " + this.kahaDBPersistenceAdapter.getStore().getJournal().length());
        LOG.info("Total index size " + this.kahaDBPersistenceAdapter.getStore().getPageFile().getDiskSize());
        LOG.info("Total store size: " + this.kahaDBPersistenceAdapter.size());
        LOG.info("Journal writes %:    " + (double)this.kahaDBPersistenceAdapter.getStore().getJournal().length() / (double)totalSent * 100.0 + "%");
        this.restartBroker(0, 0);
        this.consumeMessages(this.toSend);
    }

    private void consumeMessages(long count) throws Exception {
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        connection.setWatchTopicAdvisories(false);
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer(this.destination);
        int i = 0;
        while ((long)i < count) {
            Assert.assertNotNull((String)("got message " + i), (Object)consumer.receive(10000L));
            ++i;
        }
        Assert.assertNull((String)"none left over", (Object)consumer.receive(2000L));
    }

    private void restartBroker(int restartDelay, int checkpoint) throws Exception {
        this.stopBroker();
        TimeUnit.MILLISECONDS.sleep(restartDelay);
        this.startBroker(false, checkpoint);
    }

    @Before
    public void setProps() {
        System.setProperty("org.apache.kahadb.journal.CALLER_BUFFER_APPENDER", Boolean.toString(true));
        System.setProperty("org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW", "10000");
    }

    @After
    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
            this.broker.waitUntilStopped();
        }
        System.clearProperty("org.apache.kahadb.journal.CALLER_BUFFER_APPENDER");
        System.clearProperty("org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW");
    }

    private void publishMessages(AtomicLong count, int expiry) throws Exception {
        ActiveMQConnection connection = (ActiveMQConnection)this.connectionFactory.createConnection();
        connection.setWatchTopicAdvisories(false);
        connection.start();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer(this.destination);
        Long start = System.currentTimeMillis();
        long i = 0L;
        while ((i = count.getAndDecrement()) > 0L) {
            BytesMessage message = null;
            message = session.createBytesMessage();
            message.writeBytes(this.payloadString.getBytes());
            producer.send((Message)message, 2, 5, (long)expiry);
            if (i == this.toSend || (double)i % 100000.0 != 0.0) continue;
            long now = System.currentTimeMillis();
            LOG.info("Remainder: " + i + ", rate: " + 1.0E8 / (double)(now - start) + "m/s");
            start = now;
        }
        connection.syncSendPacket((Command)new ConnectionControl());
        connection.close();
    }

    public void startBroker(boolean deleteAllMessages, int checkPointPeriod) throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessages);
        this.kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter)this.broker.getPersistenceAdapter();
        this.kahaDBPersistenceAdapter.setEnableJournalDiskSyncs(false);
        this.kahaDBPersistenceAdapter.setCleanupInterval((long)checkPointPeriod);
        this.kahaDBPersistenceAdapter.setCheckpointInterval((long)checkPointPeriod);
        this.kahaDBPersistenceAdapter.setJournalMaxWriteBatchSize(0x1800000);
        this.kahaDBPersistenceAdapter.setJournalMaxFileLength(0x8000000);
        this.kahaDBPersistenceAdapter.setIndexCacheSize(500000);
        this.kahaDBPersistenceAdapter.setIndexWriteBatchSize(500000);
        this.kahaDBPersistenceAdapter.setEnableIndexRecoveryFile(false);
        this.kahaDBPersistenceAdapter.setEnableIndexDiskSyncs(false);
        this.broker.addConnector("tcp://0.0.0.0:0");
        this.broker.start();
        String options = "?jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192";
        this.connectionFactory = new ActiveMQConnectionFactory(((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri() + options);
    }

    @Test
    public void testRollover() throws Exception {
        byte flip = 1;
        for (long i = 0L; i < 32767L; ++i) {
            flip = (byte)(flip ^ 1);
            Assert.assertEquals((String)("0 @:" + i), (long)0L, (long)flip);
            flip = (byte)(flip ^ 1);
            Assert.assertEquals((String)("1 @:" + i), (long)1L, (long)flip);
        }
    }
}

