/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.net.SocketFactory;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.usecases.DurableConsumerCloseAndReconnectTest;
import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DurableConsumerCloseAndReconnectTcpTest
extends DurableConsumerCloseAndReconnectTest
implements ExceptionListener,
TransportListener {
    private static final Logger LOG = LoggerFactory.getLogger(DurableConsumerCloseAndReconnectTcpTest.class);
    private BrokerService broker;
    private TransportConnector connector;
    private CountDownLatch gotException = new CountDownLatch(1);
    private Exception reconnectException;
    private boolean reconnectInExceptionListener;
    private boolean reconnectInTransportListener;

    @Override
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.connector = this.broker.addConnector("tcp://localhost:0?transport.useInactivityMonitor=false");
        this.broker.setPersistent(false);
        this.broker.start();
        this.broker.waitUntilStarted();
        class SlowCloseSocketTcpTransportFactory
        extends TcpTransportFactory {
            SlowCloseSocketTcpTransportFactory() {
            }

            protected SocketFactory createSocketFactory() throws IOException {
                return new 1SlowCloseSocketTcpTransportFactory.SlowCloseSocketFactory();
            }

            class 1SlowCloseSocketTcpTransportFactory.SlowCloseSocketFactory
            extends SocketFactory {
                1SlowCloseSocketTcpTransportFactory.SlowCloseSocketFactory() {
                }

                @Override
                public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
                    return new SlowCloseSocket(host, port);
                }

                @Override
                public Socket createSocket(InetAddress host, int port) throws IOException {
                    return new SlowCloseSocket(host, port);
                }

                @Override
                public Socket createSocket(String host, int port, InetAddress localHost, int localPort) throws IOException, UnknownHostException {
                    return new SlowCloseSocket(host, port, localHost, localPort);
                }

                @Override
                public Socket createSocket(InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException {
                    return new SlowCloseSocket(address, port, localAddress, localPort);
                }

                class SlowCloseSocket
                extends Socket {
                    public SlowCloseSocket(String host, int port) throws IOException {
                        super(host, port);
                    }

                    public SlowCloseSocket(InetAddress host, int port) throws IOException {
                        super(host, port);
                    }

                    public SlowCloseSocket(String host, int port, InetAddress localHost, int localPort) throws IOException {
                        super(host, port, localHost, localPort);
                    }

                    public SlowCloseSocket(InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException {
                        super(address, port, localAddress, localPort);
                    }

                    @Override
                    public synchronized void close() throws IOException {
                        LOG.info("delaying close");
                        try {
                            TimeUnit.MILLISECONDS.sleep(500L);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        super.close();
                    }
                }
            }
        }
        TransportFactory.registerTransportFactory((String)"tcp", (TransportFactory)new SlowCloseSocketTcpTransportFactory());
    }

    @Override
    public void tearDown() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(URISupport.removeQuery((URI)this.connector.getConnectUri()) + "?useKeepAlive=false&wireFormat.maxInactivityDuration=2000");
    }

    @Override
    public void testCreateDurableConsumerCloseThenReconnect() throws Exception {
        this.reconnectInExceptionListener = true;
        this.makeConsumer();
        this.connection.setExceptionListener((ExceptionListener)this);
        ((ActiveMQConnection)this.connection).addTransportListener((TransportListener)this);
        DurableConsumerCloseAndReconnectTcpTest.assertTrue((String)"inactive connection timedout", (boolean)this.gotException.await(30L, TimeUnit.SECONDS));
        DurableConsumerCloseAndReconnectTcpTest.assertNotNull((String)("Got expected exception on close reconnect overlap: " + this.reconnectException), (Object)this.reconnectException);
    }

    public void testCreateDurableConsumerSlowCloseThenReconnectTransportListener() throws Exception {
        this.reconnectInTransportListener = true;
        this.makeConsumer();
        this.connection.setExceptionListener((ExceptionListener)this);
        ((ActiveMQConnection)this.connection).addTransportListener((TransportListener)this);
        DurableConsumerCloseAndReconnectTcpTest.assertTrue((String)"inactive connection timedout", (boolean)this.gotException.await(30L, TimeUnit.SECONDS));
        DurableConsumerCloseAndReconnectTcpTest.assertNull((String)("No exception: " + this.reconnectException), (Object)this.reconnectException);
    }

    public void onException(JMSException exception) {
        LOG.info("Exception listener exception:" + (Object)((Object)exception));
        if (this.reconnectInExceptionListener) {
            try {
                this.makeConsumer();
            }
            catch (Exception e) {
                this.reconnectException = e;
            }
            this.gotException.countDown();
        }
    }

    public void onCommand(Object command) {
    }

    public void onException(IOException error) {
        LOG.info("Transport listener exception:" + error);
        if (this.reconnectInTransportListener) {
            try {
                TimeUnit.MILLISECONDS.sleep(500L);
                this.makeConsumer();
            }
            catch (Exception e) {
                this.reconnectException = e;
            }
            this.gotException.countDown();
        }
    }

    public void transportInterupted() {
    }

    public void transportResumed() {
    }
}

