/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.broker.policy;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.management.InstanceNotFoundException;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.AbortSlowConsumerStrategyViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.policy.AbortSlowConsumerBase;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AbortSlowConsumer0Test
extends AbortSlowConsumerBase {
    private static final Logger LOG = LoggerFactory.getLogger(AbortSlowConsumer0Test.class);

    public AbortSlowConsumer0Test() {
        this.topic = true;
    }

    @Test
    public void testRegularConsumerIsNotAborted() throws Exception {
        this.startConsumers(this.destination);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(10);
        this.allMessagesList.assertAtLeastMessagesReceived(10);
    }

    @Test
    public void testSlowConsumerIsAbortedViaJmx() throws Exception {
        this.underTest.setMaxSlowDuration(60000L);
        this.startConsumers(this.withPrefetch(2, this.destination));
        Map.Entry consumertoAbort = this.consumers.entrySet().iterator().next();
        ((MessageIdList)((Object)consumertoAbort.getValue())).setProcessingDelay(8000L);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertMessagesReceived(1);
        ActiveMQDestination amqDest = (ActiveMQDestination)this.destination;
        ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + (amqDest.isTopic() ? "Topic" : "Queue") + ",destinationName=" + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
        DestinationViewMBean queue = (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
        ObjectName slowConsumerPolicyMBeanName = queue.getSlowConsumerStrategy();
        Assert.assertNotNull((Object)slowConsumerPolicyMBeanName);
        AbortSlowConsumerStrategyViewMBean abortPolicy = (AbortSlowConsumerStrategyViewMBean)this.broker.getManagementContext().newProxyInstance(slowConsumerPolicyMBeanName, AbortSlowConsumerStrategyViewMBean.class, true);
        TimeUnit.SECONDS.sleep(3L);
        TabularData slowOnes = abortPolicy.getSlowConsumers();
        Assert.assertEquals((String)"one slow consumers", (long)1L, (long)slowOnes.size());
        LOG.info("slow ones:" + slowOnes);
        CompositeData slowOne = (CompositeData)slowOnes.values().iterator().next();
        LOG.info("Slow one: " + slowOne);
        Assert.assertTrue((String)"we have an object name", (boolean)(slowOne.get("subscription") instanceof ObjectName));
        abortPolicy.abortConsumer((ObjectName)slowOne.get("subscription"));
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertAtMostMessagesReceived(1);
        slowOnes = abortPolicy.getSlowConsumers();
        Assert.assertEquals((String)"no slow consumers left", (long)0L, (long)slowOnes.size());
        if (this.topic) {
            this.broker.getAdminView().removeTopic(amqDest.getPhysicalName());
        } else {
            this.broker.getAdminView().removeQueue(amqDest.getPhysicalName());
        }
        try {
            abortPolicy.getSlowConsumers();
            Assert.fail((String)"expect not found post destination removal");
        }
        catch (UndeclaredThrowableException expected) {
            Assert.assertTrue((String)("correct exception: " + expected.getCause()), (boolean)(expected.getCause() instanceof InstanceNotFoundException));
        }
    }

    private Destination withPrefetch(int i, Destination destination) {
        String destWithPrefetch = ((ActiveMQDestination)destination).getPhysicalName() + "?consumer.prefetchSize=" + i;
        return this.topic ? new ActiveMQTopic(destWithPrefetch) : new ActiveMQQueue(destWithPrefetch);
    }

    @Test
    public void testOnlyOneSlowConsumerIsAborted() throws Exception {
        this.consumerCount = 10;
        this.startConsumers(this.destination);
        Map.Entry consumertoAbort = this.consumers.entrySet().iterator().next();
        ((MessageIdList)((Object)consumertoAbort.getValue())).setProcessingDelay(8000L);
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(99);
        this.allMessagesList.assertAtLeastMessagesReceived(99);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertMessagesReceived(1);
        TimeUnit.SECONDS.sleep(5L);
        ((MessageIdList)((Object)consumertoAbort.getValue())).assertAtMostMessagesReceived(1);
    }

    @Test
    public void testAbortAlreadyClosingConsumers() throws Exception {
        this.consumerCount = 1;
        this.startConsumers(this.withPrefetch(2, this.destination));
        for (MessageIdList list : this.consumers.values()) {
            list.setProcessingDelay(6000L);
        }
        for (Connection c : this.connections) {
            c.setExceptionListener((ExceptionListener)this);
        }
        this.startProducers(this.destination, 100);
        this.allMessagesList.waitForMessagesToArrive(this.consumerCount);
        for (MessageConsumer consumer : this.consumers.keySet()) {
            LOG.info("closing consumer: " + consumer);
            consumer.close();
        }
    }

    @Test
    public void testAbortConsumerOnDeadConnection() throws Exception {
        TransportConnector transportConnector = this.broker.addConnector("tcp://0.0.0.0:0");
        transportConnector.setBrokerService(this.broker);
        transportConnector.setTaskRunnerFactory(this.broker.getTaskRunnerFactory());
        transportConnector.start();
        SocketProxy socketProxy = new SocketProxy(transportConnector.getPublishableConnectURI());
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(socketProxy.getUrl());
        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
        prefetchPolicy.setAll(4);
        connectionFactory.setPrefetchPolicy(prefetchPolicy);
        Connection c = connectionFactory.createConnection();
        this.connections.add(c);
        c.start();
        Session session = c.createSession(false, 2);
        final ActiveMQMessageConsumer messageconsumer = (ActiveMQMessageConsumer)session.createConsumer(this.destination);
        this.startProducers(this.destination, 10);
        messageconsumer.receive(4000L).acknowledge();
        Assert.assertNotNull((Object)messageconsumer.receive(4000L));
        Assert.assertNotNull((Object)messageconsumer.receive(4000L));
        Assert.assertNotNull((Object)messageconsumer.receive(4000L));
        socketProxy.pause();
        ActiveMQDestination amqDest = (ActiveMQDestination)this.destination;
        ObjectName destinationViewMBean = new ObjectName("org.apache.activemq:destinationType=" + (amqDest.isTopic() ? "Topic" : "Queue") + ",destinationName=" + amqDest.getPhysicalName() + ",type=Broker,brokerName=localhost");
        final DestinationViewMBean destView = (DestinationViewMBean)this.broker.getManagementContext().newProxyInstance(destinationViewMBean, DestinationViewMBean.class, true);
        Assert.assertTrue((String)"Consumer gone from broker view", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("DestView {} comsumerCount {}", (Object)destView, (Object)destView.getConsumerCount());
                return 0L == destView.getConsumerCount();
            }
        }));
        socketProxy.goOn();
        Assert.assertTrue((String)"consumer was closed", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                boolean closed = false;
                try {
                    messageconsumer.receive(400L);
                }
                catch (IllegalStateException expected) {
                    closed = expected.toString().contains("closed");
                }
                return closed;
            }
        }));
    }

    @Override
    public void onException(JMSException exception) {
        this.exceptions.add(exception);
        exception.printStackTrace();
    }
}

