/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.store.kahadb;

import java.util.Objects;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerId;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KahaDBStoreTest {
    KahaDBStore.KahaDBMessageStore underTest;
    KahaDBStore store;
    ActiveMQMessage message;
    ProducerId producerId = new ProducerId("1.1.1");
    private static final int MESSAGE_COUNT = 2000;
    private Vector<Throwable> exceptions = new Vector();

    @Before
    public void initStore() throws Exception {
        ActiveMQQueue destination = new ActiveMQQueue("Test");
        this.store = new KahaDBStore();
        this.store.setMaxAsyncJobs(100);
        this.store.setDeleteAllMessages(true);
        this.store.start();
        KahaDBStore kahaDBStore = this.store;
        Objects.requireNonNull(kahaDBStore);
        this.underTest = new KahaDBStore.KahaDBMessageStore(kahaDBStore, (ActiveMQDestination)destination);
        this.underTest.start();
        this.message = new ActiveMQMessage();
        this.message.setDestination((ActiveMQDestination)destination);
    }

    @After
    public void destroyStore() throws Exception {
        if (this.store != null) {
            this.store.stop();
        }
    }

    @Test
    public void testConcurrentStoreAndDispatchQueue() throws Exception {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 2000; ++i) {
            final int id = ++i;
            executor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        Message msg = KahaDBStoreTest.this.message.copy();
                        msg.setMessageId(new MessageId(KahaDBStoreTest.this.producerId, (long)id));
                        KahaDBStoreTest.this.underTest.asyncAddQueueMessage(null, msg);
                    }
                    catch (Exception e) {
                        KahaDBStoreTest.this.exceptions.add(e);
                    }
                }
            });
        }
        ExecutorService executor2 = Executors.newCachedThreadPool();
        for (int i = 0; i < 2000; ++i) {
            final int id = ++i;
            executor2.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        MessageAck ack = new MessageAck();
                        ack.setLastMessageId(new MessageId(KahaDBStoreTest.this.producerId, (long)id));
                        KahaDBStoreTest.this.underTest.removeAsyncMessage(null, ack);
                    }
                    catch (Exception e) {
                        KahaDBStoreTest.this.exceptions.add(e);
                    }
                }
            });
        }
        executor.shutdown();
        executor.awaitTermination(60L, TimeUnit.SECONDS);
        executor2.shutdown();
        executor2.awaitTermination(60L, TimeUnit.SECONDS);
        Assert.assertTrue((String)("no exceptions " + String.valueOf(this.exceptions)), (boolean)this.exceptions.isEmpty());
    }
}

