/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkBridgeFilterFactory;

public class VirtualTopicNetworkClusterReactivationTest
extends JmsMultipleBrokersTestSupport {
    private static final String BROKER_A = "brokerA";
    private static final String BROKER_B = "brokerB";
    private static final String BROKER_A_TRANSPORT_URL = "tcp://localhost:61616";
    private static final String BROKER_B_TRANSPORT_URL = "tcp://localhost:61617";
    private static final long DEFAULT_SLEEP_MS = 1000L;
    private ActiveMQTopic topic = new ActiveMQTopic("VirtualTopic.FOO.TEST");
    private ActiveMQQueue queue = new ActiveMQQueue("Consumer.FOO.VirtualTopic.FOO.TEST");

    public void testDurableSubReconnectFromAtoB() throws JMSException {
        ActiveMQConnectionFactory bConnFactory = new ActiveMQConnectionFactory("tcp://localhost:61617?jms.prefetchPolicy.queuePrefetch=0");
        Connection bConn = bConnFactory.createConnection();
        bConn.start();
        Session bSession = bConn.createSession(false, 1);
        MessageConsumer bSessionConsumer = bSession.createConsumer((Destination)this.queue);
        ActiveMQConnectionFactory aConnFactory = new ActiveMQConnectionFactory(BROKER_A_TRANSPORT_URL);
        Connection aProducerConn = aConnFactory.createConnection();
        aProducerConn.start();
        Session aProducerSession = aProducerConn.createSession(false, 1);
        MessageProducer producer = aProducerSession.createProducer((Destination)this.topic);
        this.produce(producer, aProducerSession, 5);
        this.sleep();
        bSessionConsumer.close();
        bSession.close();
        bConn.close();
        this.sleep();
        Connection aConsumerConn = aConnFactory.createConnection();
        aConsumerConn.start();
        Session aConsumerSession = aConsumerConn.createSession(false, 1);
        MessageConsumer aSessionConsumer = aConsumerSession.createConsumer((Destination)this.queue);
        this.sleep();
        this.consume(aSessionConsumer, 5);
    }

    private void consume(MessageConsumer durable, int numMessagesExpected) throws JMSException {
        for (int i = 0; i < numMessagesExpected; ++i) {
            Message message = durable.receive(1000L);
            VirtualTopicNetworkClusterReactivationTest.assertNotNull((Object)message);
            TextMessage textMessage = (TextMessage)message;
            System.out.println("received: " + textMessage.getText());
            VirtualTopicNetworkClusterReactivationTest.assertEquals((String)("message: " + i), (String)textMessage.getText());
        }
    }

    private void produce(MessageProducer producer, Session sess, int numMessages) throws JMSException {
        for (int i = 0; i < numMessages; ++i) {
            producer.send((Message)sess.createTextMessage("message: " + i));
        }
    }

    @Override
    protected void setUp() throws Exception {
        maxSetupTime = 1000;
        super.setAutoFail(true);
        super.setUp();
        String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true";
        BrokerService brokerServiceA = this.createBroker(new URI(String.format("broker:(%s)/%s%s", BROKER_A_TRANSPORT_URL, BROKER_A, "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true")));
        brokerServiceA.setDestinationPolicy(this.buildPolicyMap());
        brokerServiceA.setDestinations(new ActiveMQDestination[]{this.queue});
        BrokerService brokerServiceB = this.createBroker(new URI(String.format("broker:(%s)/%s%s", BROKER_B_TRANSPORT_URL, BROKER_B, "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true")));
        brokerServiceB.setDestinationPolicy(this.buildPolicyMap());
        brokerServiceB.setDestinations(new ActiveMQDestination[]{this.queue});
        this.bridgeBrokers(BROKER_A, BROKER_B);
        this.bridgeBrokers(BROKER_B, BROKER_A);
        this.startAllBrokers();
    }

    private PolicyMap buildPolicyMap() {
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setOptimizedDispatch(true);
        ConditionalNetworkBridgeFilterFactory networkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
        networkBridgeFilterFactory.setReplayWhenNoConsumers(true);
        policyEntry.setNetworkBridgeFilterFactory((NetworkBridgeFilterFactory)networkBridgeFilterFactory);
        policyEntry.setEnableAudit(false);
        policyMap.put((ActiveMQDestination)new ActiveMQQueue("Consumer.*.VirtualTopic.>"), (Object)policyEntry);
        return policyMap;
    }

    private void sleep() {
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void sleep(int milliSecondTime) {
        try {
            Thread.sleep(milliSecondTime);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

