/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.scheduler;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.broker.scheduler.JobSchedulerStoreCheckpointTest;
import org.apache.activemq.broker.util.RedeliveryPlugin;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.kahadb.scheduler.JobSchedulerStoreImpl;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobSchedulerRedliveryPluginDLQStoreCleanupTest {
    static final Logger LOG = LoggerFactory.getLogger(JobSchedulerStoreCheckpointTest.class);
    private JobSchedulerStoreImpl store;
    private BrokerService brokerService;
    private ByteSequence payload;
    private String connectionURI;
    private ActiveMQConnectionFactory cf;

    @Before
    public void setUp() throws Exception {
        org.apache.log4j.Logger.getLogger(JobSchedulerStoreImpl.class).setLevel(Level.TRACE);
        File directory = new File("target/test/ScheduledJobsDB");
        IOHelper.mkdirs((File)directory);
        IOHelper.deleteChildren((File)directory);
        this.createSchedulerStore(directory);
        this.brokerService = new BrokerService();
        this.brokerService.setUseJmx(false);
        this.brokerService.setDeleteAllMessagesOnStartup(true);
        this.brokerService.setJobSchedulerStore((JobSchedulerStore)this.store);
        this.brokerService.setSchedulerSupport(true);
        this.brokerService.setAdvisorySupport(false);
        TransportConnector connector = this.brokerService.addConnector("tcp://0.0.0.0:0");
        RedeliveryPlugin plugin = this.createRedeliveryPlugin();
        this.brokerService.setPlugins(new BrokerPlugin[]{plugin});
        PolicyEntry policy = new PolicyEntry();
        IndividualDeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
        strategy.setProcessExpired(true);
        strategy.setProcessNonPersistent(false);
        strategy.setUseQueueForQueueMessages(true);
        strategy.setQueuePrefix("DLQ.");
        policy.setDeadLetterStrategy((DeadLetterStrategy)strategy);
        PolicyMap pMap = new PolicyMap();
        pMap.setDefaultEntry(policy);
        this.brokerService.setDestinationPolicy(pMap);
        this.brokerService.start();
        this.brokerService.waitUntilStarted();
        this.connectionURI = connector.getPublishableConnectString();
        byte[] data = new byte[8192];
        for (int i = 0; i < data.length; ++i) {
            data[i] = (byte)(i % 256);
        }
        this.payload = new ByteSequence(data);
        this.cf = new ActiveMQConnectionFactory(this.connectionURI);
        this.cf.getRedeliveryPolicy().setMaximumRedeliveries(0);
    }

    protected void createSchedulerStore(File directory) throws Exception {
        this.store = new JobSchedulerStoreImpl();
        this.store.setDirectory(directory);
        this.store.setCheckpointInterval(5000L);
        this.store.setCleanupInterval(10000L);
        this.store.setJournalMaxFileLength(10240);
    }

    protected RedeliveryPlugin createRedeliveryPlugin() {
        RedeliveryPlugin plugin = new RedeliveryPlugin();
        RedeliveryPolicy queueEntry = new RedeliveryPolicy();
        queueEntry.setMaximumRedeliveries(3);
        queueEntry.setDestination((ActiveMQDestination)new ActiveMQQueue("FOO.BAR"));
        RedeliveryPolicy defaultEntry = new RedeliveryPolicy();
        defaultEntry.setInitialRedeliveryDelay(5000L);
        defaultEntry.setMaximumRedeliveries(0);
        RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
        redeliveryPolicyMap.setDefaultEntry(defaultEntry);
        redeliveryPolicyMap.setRedeliveryPolicyEntries(Arrays.asList(queueEntry));
        plugin.setRedeliveryPolicyMap(redeliveryPolicyMap);
        return plugin;
    }

    @After
    public void tearDown() throws Exception {
        if (this.brokerService != null) {
            this.brokerService.stop();
            this.brokerService.waitUntilStopped();
        }
    }

    @Test
    public void testProducerAndRollback() throws Exception {
        Connection connection = this.cf.createConnection();
        final Session producerSession = connection.createSession(false, 1);
        final Session consumerSession = connection.createSession(true, 0);
        Queue queue = producerSession.createQueue("FOO.BAR");
        final MessageProducer producer = producerSession.createProducer((Destination)queue);
        MessageConsumer consumer = consumerSession.createConsumer((Destination)queue);
        final CountDownLatch sentAll = new CountDownLatch(8);
        connection.start();
        producer.setDeliveryMode(2);
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message message) {
                try {
                    LOG.info("Rolling back incoming message");
                    consumerSession.rollback();
                }
                catch (JMSException e) {
                    LOG.warn("Failed to Rollback on incoming message");
                }
            }
        });
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleWithFixedDelay(new Runnable(){

            @Override
            public void run() {
                try {
                    BytesMessage message = producerSession.createBytesMessage();
                    message.writeBytes(((JobSchedulerRedliveryPluginDLQStoreCleanupTest)JobSchedulerRedliveryPluginDLQStoreCleanupTest.this).payload.data, ((JobSchedulerRedliveryPluginDLQStoreCleanupTest)JobSchedulerRedliveryPluginDLQStoreCleanupTest.this).payload.offset, ((JobSchedulerRedliveryPluginDLQStoreCleanupTest)JobSchedulerRedliveryPluginDLQStoreCleanupTest.this).payload.length);
                    producer.send((Message)message);
                    LOG.info("Send next Message to Queue");
                    sentAll.countDown();
                }
                catch (JMSException e) {
                    LOG.warn("Send of message did not complete.");
                }
            }
        }, 0L, 5L, TimeUnit.SECONDS);
        Assert.assertTrue((String)"Should have sent all messages", (boolean)sentAll.await(2L, TimeUnit.MINUTES));
        executor.shutdownNow();
        Assert.assertTrue((boolean)executor.awaitTermination(30L, TimeUnit.SECONDS));
        Assert.assertTrue((String)"Should clean out the scheduler store", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return JobSchedulerRedliveryPluginDLQStoreCleanupTest.this.getNumJournalFiles() == 1;
            }
        }));
    }

    private int getNumJournalFiles() throws IOException {
        return this.store.getJournal().getFileMap().size();
    }
}

