/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.ExceptionListener;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.Socket;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FailoverTimeoutTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverTimeoutTest.class);
    private static final String QUEUE_NAME = "test.failovertimeout";
    BrokerService bs;
    URI tcpUri;

    @Before
    public void setUp() throws Exception {
        this.bs = new BrokerService();
        this.bs.setUseJmx(false);
        this.bs.addConnector(this.getTransportUri());
        this.bs.start();
        this.tcpUri = ((TransportConnector)this.bs.getTransportConnectors().get(0)).getConnectUri();
    }

    @After
    public void tearDown() throws Exception {
        if (this.bs != null) {
            this.bs.stop();
        }
    }

    protected String getTransportUri() {
        return "tcp://localhost:0";
    }

    @Test
    public void testTimoutDoesNotFailConnectionAttempts() throws Exception {
        this.bs.stop();
        long timeout = 1000L;
        long startTime = System.currentTimeMillis();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?timeout=" + timeout + "&useExponentialBackOff=false&maxReconnectAttempts=5&initialReconnectDelay=1000");
        Connection connection = cf.createConnection();
        try {
            connection.start();
            Assert.fail((String)"Should have failed to connect");
        }
        catch (JMSException ex) {
            LOG.info("Caught exception on call to start: {}", (Object)ex.getMessage());
        }
        long endTime = System.currentTimeMillis();
        long duration = endTime - startTime;
        LOG.info("Time spent waiting to connect: {} ms", (Object)duration);
        Assert.assertTrue((duration > 3000L ? 1 : 0) != 0);
        this.safeClose(connection);
    }

    private void safeClose(Connection connection) {
        try {
            connection.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testTimeout() throws Exception {
        long timeout = 1000L;
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?timeout=" + timeout + "&useExponentialBackOff=false");
        Connection connection = cf.createConnection();
        Session session = connection.createSession(false, 1);
        MessageProducer producer = session.createProducer((Destination)session.createQueue(QUEUE_NAME));
        TextMessage message = session.createTextMessage("Test message");
        producer.send((Message)message);
        this.bs.stop();
        try {
            producer.send((Message)message);
        }
        catch (JMSException jmse) {
            Assert.assertEquals((Object)("Failover timeout of " + timeout + " ms reached."), (Object)jmse.getMessage());
        }
        this.bs = new BrokerService();
        this.bs.setUseJmx(false);
        this.bs.addConnector(this.tcpUri);
        this.bs.start();
        this.bs.waitUntilStarted();
        producer.send((Message)message);
        this.bs.stop();
        connection.close();
    }

    @Test
    public void testInterleaveAckAndException() throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?maxReconnectAttempts=0");
        ActiveMQConnection connection = (ActiveMQConnection)cf.createConnection();
        this.doTestInterleaveAndException(connection, (Command)new MessageAck());
        this.safeClose((Connection)connection);
    }

    @Test
    public void testInterleaveTxAndException() throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?maxReconnectAttempts=0");
        ActiveMQConnection connection = (ActiveMQConnection)cf.createConnection();
        TransactionInfo tx = new TransactionInfo();
        tx.setConnectionId(connection.getConnectionInfo().getConnectionId());
        tx.setTransactionId((TransactionId)new LocalTransactionId(tx.getConnectionId(), 1L));
        this.doTestInterleaveAndException(connection, (Command)tx);
        this.safeClose((Connection)connection);
    }

    public void doTestInterleaveAndException(final ActiveMQConnection connection, final Command command) throws Exception {
        connection.start();
        connection.setExceptionListener(new ExceptionListener(){

            public void onException(JMSException exception) {
                try {
                    LOG.info("Deal with exception - invoke op that may block pending outstanding oneway");
                    connection.asyncSendPacket(command);
                }
                catch (Exception exception2) {
                    // empty catch block
                }
            }
        });
        ExecutorService executorService = Executors.newCachedThreadPool();
        int NUM_TASKS = 200;
        final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(200);
        final AtomicLong sleepMillis = new AtomicLong(1000L);
        for (int i = 0; i < 200; ++i) {
            executorService.submit(new Runnable(){

                @Override
                public void run() {
                    try {
                        TimeUnit.MILLISECONDS.sleep(Math.max(0L, sleepMillis.addAndGet(-50L)));
                        connection.asyncSendPacket(command);
                    }
                    catch (Exception exception) {
                    }
                    finally {
                        enqueueOnExecutorDone.countDown();
                    }
                }
            });
        }
        while (enqueueOnExecutorDone.getCount() > 190L) {
            enqueueOnExecutorDone.await(20L, TimeUnit.MILLISECONDS);
        }
        Socket socket = (Socket)connection.getTransport().narrow(Socket.class);
        socket.close();
        executorService.shutdown();
        Assert.assertTrue((String)"all ops finish", (boolean)enqueueOnExecutorDone.await(15L, TimeUnit.SECONDS));
    }

    @Test
    public void testUpdateUris() throws Exception {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.tcpUri + ")?useExponentialBackOff=false");
        ActiveMQConnection connection = (ActiveMQConnection)cf.createConnection();
        connection.start();
        FailoverTransport failoverTransport = (FailoverTransport)connection.getTransport().narrow(FailoverTransport.class);
        URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + this.tcpUri.getPort()), new URI("tcp://unknownHost2:" + this.tcpUri.getPort()), new URI("tcp://localhost:2222")};
        failoverTransport.add(false, bunchOfUnknownAndOneKnown);
    }
}

