/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SoWriteTimeoutClientTest
extends JmsTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutClientTest.class);

    @Override
    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        broker.setDeleteAllMessagesOnStartup(true);
        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
        adapter.setConcurrentStoreAndDispatchQueues(false);
        broker.setPersistenceAdapter((PersistenceAdapter)adapter);
        broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
        return broker;
    }

    public void testSendWithClientWriteTimeout() throws Exception {
        final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
        this.messageTextPrefix = this.initMessagePrefix(81920);
        URI tcpBrokerUri = URISupport.removeQuery((URI)((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri());
        LOG.info("consuming using uri: " + String.valueOf(tcpBrokerUri));
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
        Connection c = factory.createConnection();
        c.start();
        Session session = c.createSession(false, 1);
        MessageConsumer consumer = session.createConsumer((Destination)dest);
        SocketProxy proxy = new SocketProxy();
        proxy.setTarget(tcpBrokerUri);
        proxy.open();
        ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" + String.valueOf(proxy.getUrl()) + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
        final Connection pc = pFactory.createConnection();
        pc.start();
        proxy.pause();
        int messageCount = 20;
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    SoWriteTimeoutClientTest.this.sendMessages(pc, (Destination)dest, 20);
                }
                catch (Exception ignored) {
                    ignored.printStackTrace();
                }
            }
        });
        TimeUnit.SECONDS.sleep(8L);
        proxy.goOn();
        for (int i = 0; i < 20; ++i) {
            SoWriteTimeoutClientTest.assertNotNull((String)("Got message " + i + " after reconnect"), (Object)consumer.receive(5000L));
        }
        SoWriteTimeoutClientTest.assertTrue((String)"no pending messages when done", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                LOG.info("current total message count: " + SoWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount());
                return SoWriteTimeoutClientTest.this.broker.getAdminView().getTotalMessageCount() == 0L;
            }
        }));
    }

    private String initMessagePrefix(int i) {
        byte[] content = new byte[i];
        return new String(content);
    }

    public static Test suite() {
        return SoWriteTimeoutClientTest.suite(SoWriteTimeoutClientTest.class);
    }
}

