/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import java.net.URI;
import java.util.List;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.SocketProxy;
import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class DurableSubscriberWithNetworkDisconnectTest
extends JmsMultipleBrokersTestSupport {
    private static final Log LOG = LogFactory.getLog(DurableSubscriberWithNetworkDisconnectTest.class);
    private static final int NETWORK_DOWN_TIME = 10000;
    private static final String HUB = "HubBroker";
    private static final String SPOKE = "SpokeBroker";
    private SocketProxy socketProxy;
    private long networkDownTimeStart;
    private long inactiveDuration = 1000L;
    private long receivedMsgs = 0L;
    private boolean useSocketProxy = true;
    protected static final int MESSAGE_COUNT = 200;
    public boolean useDuplexNetworkBridge = true;
    public boolean simulateStalledNetwork;
    public boolean dynamicOnly = true;
    public long networkTTL = 3L;
    public boolean exponentialBackOff;
    public boolean failover = false;
    public boolean inactivity = true;

    public void initCombosForTestSendOnAReceiveOnBWithTransportDisconnect() {
        this.addCombinationValues("failover", new Object[]{Boolean.FALSE, Boolean.TRUE});
    }

    public void testSendOnAReceiveOnBWithTransportDisconnect() throws Exception {
        this.bridgeBrokers(SPOKE, HUB);
        this.startAllBrokers();
        URI hubURI = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)HUB)).broker.getVmConnectorURI();
        URI spokeURI = ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)SPOKE)).broker.getVmConnectorURI();
        ActiveMQConnectionFactory facHub = new ActiveMQConnectionFactory(hubURI);
        ActiveMQConnectionFactory facSpoke = new ActiveMQConnectionFactory(spokeURI);
        Connection conHub = facHub.createConnection();
        Connection conSpoke = facSpoke.createConnection();
        conHub.setClientID("clientHUB");
        conSpoke.setClientID("clientSPOKE");
        conHub.start();
        conSpoke.start();
        Session sesHub = conHub.createSession(false, 1);
        Session sesSpoke = conSpoke.createSession(false, 1);
        ActiveMQTopic topic = new ActiveMQTopic("TEST.FOO");
        String consumerName = "consumerName";
        TopicSubscriber remoteConsumer = sesSpoke.createDurableSubscriber((Topic)topic, consumerName);
        remoteConsumer.setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                try {
                    TextMessage textMsg = (TextMessage)msg;
                    ++DurableSubscriberWithNetworkDisconnectTest.this.receivedMsgs;
                    LOG.info((Object)("Received messages (" + DurableSubscriberWithNetworkDisconnectTest.this.receivedMsgs + "): " + textMsg.getText()));
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        this.sleep(1000);
        MessageProducer localProducer = sesHub.createProducer((Destination)topic);
        localProducer.setDeliveryMode(2);
        for (int i = 0; i < 200; ++i) {
            this.sleep(50);
            if (i == 50 || i == 150) {
                if (this.simulateStalledNetwork) {
                    this.socketProxy.pause();
                } else {
                    this.socketProxy.close();
                }
                this.networkDownTimeStart = System.currentTimeMillis();
            } else if (this.networkDownTimeStart > 0L) {
                this.sleep(10000);
                this.networkDownTimeStart = 0L;
                if (this.simulateStalledNetwork) {
                    this.socketProxy.goOn();
                } else {
                    this.socketProxy.reopen();
                }
            } else {
                this.sleep(500);
            }
            TextMessage test = sesHub.createTextMessage("test-" + i);
            localProducer.send((Message)test);
        }
        LOG.info((Object)"waiting for messages to flow");
        Wait.waitFor((Wait.Condition)new Wait.Condition(){

            public boolean isSatisified() throws Exception {
                return DurableSubscriberWithNetworkDisconnectTest.this.receivedMsgs >= 200L;
            }
        });
        DurableSubscriberWithNetworkDisconnectTest.assertTrue((String)("At least message 200 must be received, count=" + this.receivedMsgs), (200L <= this.receivedMsgs ? 1 : 0) != 0);
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)HUB)).broker.deleteAllMessages();
        ((JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get((Object)SPOKE)).broker.deleteAllMessages();
        conHub.close();
        conSpoke.close();
    }

    @Override
    protected void startAllBrokers() throws Exception {
        JmsMultipleBrokersTestSupport.BrokerItem brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(HUB);
        brokerItem.broker.start();
        brokerItem = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(SPOKE);
        brokerItem.broker.start();
        this.sleep(600);
    }

    @Override
    public void setUp() throws Exception {
        this.networkDownTimeStart = 0L;
        this.inactiveDuration = 1000L;
        this.useSocketProxy = true;
        this.receivedMsgs = 0L;
        super.setAutoFail(true);
        super.setUp();
        String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
        this.createBroker(new URI("broker:(tcp://localhost:61617)/HubBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
        this.createBroker(new URI("broker:(tcp://localhost:61616)/SpokeBroker?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"));
    }

    @Override
    public void tearDown() throws Exception {
        super.tearDown();
        if (this.socketProxy != null) {
            this.socketProxy.close();
        }
    }

    public static Test suite() {
        return DurableSubscriberWithNetworkDisconnectTest.suite(DurableSubscriberWithNetworkDisconnectTest.class);
    }

    private void sleep(int milliSecondTime) {
        try {
            Thread.sleep(milliSecondTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    @Override
    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean l_dynamicOnly, int networkTTL, boolean l_conduit, boolean l_failover) throws Exception {
        List transportConnectors = remoteBroker.getTransportConnectors();
        if (!transportConnectors.isEmpty()) {
            URI remoteURI = ((TransportConnector)transportConnectors.get(0)).getConnectUri();
            if (this.useSocketProxy) {
                this.socketProxy = new SocketProxy(remoteURI);
                remoteURI = this.socketProxy.getUrl();
            }
            Object options = "";
            options = this.failover ? "static:(failover:(" + remoteURI : "static:(" + remoteURI;
            options = this.inactivity ? (String)options + "?wireFormat.maxInactivityDuration=" + this.inactiveDuration + "&wireFormat.maxInactivityDurationInitalDelay=" + this.inactiveDuration + ")" : (String)options + ")";
            if (this.failover) {
                options = (String)options + "?maxReconnectAttempts=0)";
            }
            options = (String)options + "?useExponentialBackOff=" + this.exponentialBackOff;
            DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI((String)options));
            connector.setDynamicOnly(this.dynamicOnly);
            connector.setNetworkTTL(networkTTL);
            localBroker.addNetworkConnector((NetworkConnector)connector);
            maxSetupTime = 2000;
            if (this.useDuplexNetworkBridge) {
                connector.setDuplex(true);
            }
            return connector;
        }
        throw new Exception("Remote broker has no registered connectors.");
    }
}

