/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport;

import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.management.JMException;
import javax.management.ObjectName;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestrictedThreadPoolInactivityTimeoutTest
extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(RestrictedThreadPoolInactivityTimeoutTest.class);
    public String brokerTransportScheme = "tcp";
    public Boolean rejectWork = Boolean.FALSE;
    final int poolSize = 2;
    final int numConnections = 10;
    final CountDownLatch doneOneConnectionAddress = new CountDownLatch(1);
    final CountDownLatch doneConsumers = new CountDownLatch(10);

    @Override
    protected BrokerService createBroker() throws Exception {
        if (this.rejectWork.booleanValue()) {
            System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity", Integer.toString(2));
            System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork", "true");
        }
        System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize", Integer.toString(2));
        BrokerService broker = super.createBroker();
        broker.setPersistent(false);
        broker.setUseJmx(true);
        broker.setManagementContext(new ManagementContext(){

            public void unregisterMBean(ObjectName name) throws JMException {
                if (name.getKeyPropertyListString().contains("remoteAddress")) {
                    LOG.info("SLEEP : " + Thread.currentThread() + ": on remoteAddress unregister: " + name);
                    try {
                        TimeUnit.SECONDS.sleep(2L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    RestrictedThreadPoolInactivityTimeoutTest.this.doneOneConnectionAddress.countDown();
                } else if (name.getKeyPropertyListString().contains("Consumer")) {
                    LOG.info(Thread.currentThread() + ": on consumer unregister: " + name);
                    RestrictedThreadPoolInactivityTimeoutTest.this.doneConsumers.countDown();
                }
                super.unregisterMBean(name);
            }
        });
        broker.addConnector(this.brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000");
        return broker;
    }

    public void initCombosForTestThreadsInvolvedInXInactivityTimeouts() {
        this.addCombinationValues("brokerTransportScheme", new Object[]{"tcp", "nio"});
        this.addCombinationValues("rejectWork", new Object[]{Boolean.TRUE, Boolean.FALSE});
    }

    public void testThreadsInvolvedInXInactivityTimeouts() throws Exception {
        URI tcpBrokerUri = URISupport.removeQuery((URI)((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri());
        SocketProxy proxy = new SocketProxy();
        proxy.setTarget(tcpBrokerUri);
        proxy.open();
        URI clientUri = URISupport.createURIWithQuery((URI)proxy.getUrl(), (String)"useInactivityMonitor=false");
        LOG.info("using server uri: " + tcpBrokerUri + ", client uri: " + clientUri);
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUri);
        for (int i = 0; i < 10; ++i) {
            Connection c = factory.createConnection();
            c.start();
        }
        proxy.pause();
        int before = Thread.currentThread().getThreadGroup().activeCount();
        LOG.info("threads before: " + before);
        Thread.yield();
        this.doneOneConnectionAddress.await(10L, TimeUnit.SECONDS);
        int after = Thread.currentThread().getThreadGroup().activeCount();
        int diff = Math.abs(before - after);
        LOG.info("threads after: " + after + ", diff: " + diff);
        RestrictedThreadPoolInactivityTimeoutTest.assertTrue((String)("Should be at most inactivity monitor pool size * 2. Diff = " + diff), (diff <= 4 ? 1 : 0) != 0);
        RestrictedThreadPoolInactivityTimeoutTest.assertTrue((String)"all work complete", (boolean)this.doneConsumers.await(10L, TimeUnit.SECONDS));
    }

    @Override
    protected void tearDown() throws Exception {
        super.tearDown();
        System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.workQueueCapacity");
        System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.maximumPoolSize");
        System.clearProperty("org.apache.activemq.transport.AbstractInactivityMonitor.rejectWork");
    }

    public static Test suite() {
        return RestrictedThreadPoolInactivityTimeoutTest.suite(RestrictedThreadPoolInactivityTimeoutTest.class);
    }
}

