/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.PriorityDispatchPolicy;
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.network.DemandForwardingBridgeSupport;
import org.apache.activemq.network.NetworkConnector;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.filter.AbstractFilter;
import org.apache.logging.log4j.core.layout.MessageLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestReplyTempDestRemovalAdvisoryRaceTest
extends JmsMultipleBrokersTestSupport {
    private static final Logger LOG = LoggerFactory.getLogger(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
    private static final String BROKER_A = "BrokerA";
    private static final String BROKER_B = "BrokerB";
    private static final String BROKER_C = "BrokerC";
    private static final int NUM_RESPONDENTS = 1;
    private static final int NUM_SENDS = 1;
    private static final int RANDOM_SLEEP_FOR_RESPONDENT_MS = 0;
    private static final int RANDOM_SLEEP_FOR_SENDER_MS = 1;
    private static final String QUEUE_NAME = "foo.queue";
    private static String[] TEST_ITERATIONS = new String[]{"foo.queue0", "foo.queue1", "foo.queue2", "foo.queue3"};
    final AtomicLong messageCount = new AtomicLong(0L);
    final AtomicLong respondentSendError = new AtomicLong(0L);
    final AtomicLong responseReceived = new AtomicLong(0L);
    final AtomicLong sendsWithNoConsumers = new AtomicLong(0L);
    final AtomicLong forwardFailures = new AtomicLong(0L);
    protected final AtomicBoolean shutdown = new AtomicBoolean(false);
    HashSet<NetworkConnector> networkConnectors = new HashSet();
    HashSet<Connection> advisoryConsumerConnections = new HashSet();
    AbstractAppender slowDownAppender;
    CountDownLatch consumerDemandExists;
    protected boolean useDuplex = false;

    public static Test suite() {
        return RequestReplyTempDestRemovalAdvisoryRaceTest.suite(RequestReplyTempDestRemovalAdvisoryRaceTest.class);
    }

    public void initCombos() {
        this.addCombinationValues("QUEUE_NAME", TEST_ITERATIONS);
    }

    public void testTempDestRaceDuplex() throws Exception {
        this.useDuplex = true;
        this.bridgeBrokers(BROKER_A, BROKER_B, false, 3);
        this.bridgeBrokers(BROKER_B, BROKER_C, false, 3);
        this.startAllBrokers();
        this.waitForBridgeFormation(1);
        HashSet bridgesStart = new HashSet();
        for (NetworkConnector networkConnector : this.networkConnectors) {
            bridgesStart.addAll(networkConnector.activeBridges());
        }
        LOG.info("Bridges start:" + String.valueOf(bridgesStart));
        this.slowDownAdvisoryDispatch();
        this.noConsumerAdvisory();
        this.forwardFailureAdvisory();
        ExecutorService respondentThreadPool = Executors.newFixedThreadPool(50);
        JmsMultipleBrokersTestSupport.BrokerItem brokerA = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_A);
        ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        brokerAFactory.setAlwaysSyncSend(true);
        for (int i = 0; i < 1; ++i) {
            respondentThreadPool.execute(new EchoRespondent(brokerAFactory));
        }
        ExecutorService senderThreadPool = Executors.newCachedThreadPool();
        JmsMultipleBrokersTestSupport.BrokerItem brokerC = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_C);
        ActiveMQConnectionFactory brokerCFactory = new ActiveMQConnectionFactory(brokerC.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        for (int i = 0; i < 1; ++i) {
            senderThreadPool.execute(new MessageSender(brokerCFactory));
        }
        senderThreadPool.shutdown();
        senderThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(15L);
        LOG.info("shutting down");
        this.shutdown.compareAndSet(false, true);
        HashSet bridgesEnd = new HashSet();
        for (NetworkConnector networkConnector : this.networkConnectors) {
            bridgesEnd.addAll(networkConnector.activeBridges());
        }
        LOG.info("Bridges end:" + String.valueOf(bridgesEnd));
        RequestReplyTempDestRemovalAdvisoryRaceTest.assertEquals((String)"no new bridges created", bridgesStart, bridgesEnd);
        LOG.info("received: " + this.responseReceived.get() + ", respondent error: " + this.respondentSendError.get() + ", noConsumerCount: " + this.sendsWithNoConsumers.get() + ", forwardFailures: " + this.forwardFailures.get());
        RequestReplyTempDestRemovalAdvisoryRaceTest.assertEquals((String)"success or error", (long)1L, (long)(this.respondentSendError.get() + this.forwardFailures.get() + this.responseReceived.get() + this.sendsWithNoConsumers.get()));
    }

    private void forwardFailureAdvisory() throws JMSException {
        for (JmsMultipleBrokersTestSupport.BrokerItem item : this.brokers.values()) {
            ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
            Connection connection = brokerAFactory.createConnection();
            connection.start();
            connection.createSession(false, 1).createConsumer((Destination)AdvisorySupport.getNetworkBridgeForwardFailureAdvisoryTopic()).setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    RequestReplyTempDestRemovalAdvisoryRaceTest.this.forwardFailures.incrementAndGet();
                }
            });
        }
    }

    private void noConsumerAdvisory() throws JMSException {
        for (JmsMultipleBrokersTestSupport.BrokerItem item : this.brokers.values()) {
            ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(item.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
            Connection connection = brokerAFactory.createConnection();
            connection.start();
            connection.createSession(false, 1).createConsumer((Destination)AdvisorySupport.getNoTopicConsumersAdvisoryTopic((ActiveMQDestination)new ActiveMQTempTopic(">"))).setMessageListener(new MessageListener(){

                public void onMessage(Message message) {
                    RequestReplyTempDestRemovalAdvisoryRaceTest.this.sendsWithNoConsumers.incrementAndGet();
                }
            });
        }
    }

    public void testTempDestRace() throws Exception {
        this.bridgeBrokers(BROKER_A, BROKER_B, false, 3);
        this.bridgeBrokers(BROKER_B, BROKER_A, false, 3);
        this.bridgeBrokers(BROKER_B, BROKER_C, false, 3);
        this.bridgeBrokers(BROKER_C, BROKER_B, false, 3);
        this.startAllBrokers();
        this.waitForBridgeFormation(1);
        HashSet bridgesStart = new HashSet();
        for (NetworkConnector networkConnector : this.networkConnectors) {
            bridgesStart.addAll(networkConnector.activeBridges());
        }
        this.slowDownAdvisoryDispatch();
        this.noConsumerAdvisory();
        this.forwardFailureAdvisory();
        ExecutorService respondentThreadPool = Executors.newFixedThreadPool(50);
        JmsMultipleBrokersTestSupport.BrokerItem brokerA = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_A);
        ActiveMQConnectionFactory brokerAFactory = new ActiveMQConnectionFactory(brokerA.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        brokerAFactory.setAlwaysSyncSend(true);
        for (int i = 0; i < 1; ++i) {
            respondentThreadPool.execute(new EchoRespondent(brokerAFactory));
        }
        ExecutorService senderThreadPool = Executors.newCachedThreadPool();
        JmsMultipleBrokersTestSupport.BrokerItem brokerC = (JmsMultipleBrokersTestSupport.BrokerItem)this.brokers.get(BROKER_C);
        ActiveMQConnectionFactory brokerCFactory = new ActiveMQConnectionFactory(brokerC.broker.getTransportConnectorByScheme("tcp").getName() + "?jms.watchTopicAdvisories=false");
        for (int i = 0; i < 1; ++i) {
            senderThreadPool.execute(new MessageSender(brokerCFactory));
        }
        senderThreadPool.shutdown();
        senderThreadPool.awaitTermination(30L, TimeUnit.SECONDS);
        TimeUnit.SECONDS.sleep(10L);
        LOG.info("shutting down");
        this.shutdown.compareAndSet(false, true);
        HashSet bridgesEnd = new HashSet();
        for (NetworkConnector networkConnector : this.networkConnectors) {
            bridgesEnd.addAll(networkConnector.activeBridges());
        }
        RequestReplyTempDestRemovalAdvisoryRaceTest.assertEquals((String)"no new bridges created", bridgesStart, bridgesEnd);
        LOG.info("received: " + this.responseReceived.get() + ", respondent error: " + this.respondentSendError.get() + ", noConsumerCount: " + this.sendsWithNoConsumers.get() + ", forwardFailures: " + this.forwardFailures.get());
        RequestReplyTempDestRemovalAdvisoryRaceTest.assertEquals((String)"success or error", (long)1L, (long)(this.respondentSendError.get() + this.forwardFailures.get() + this.responseReceived.get() + this.sendsWithNoConsumers.get()));
    }

    private void slowDownAdvisoryDispatch() throws Exception {
        ((org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getLogger(DemandForwardingBridgeSupport.class))).setLevel(Level.DEBUG);
        org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getRootLogger());
        this.slowDownAppender = new AbstractAppender("testAppender", (Filter)new AbstractFilter(){}, (Layout)new MessageLayout(), false, new Property[0]){

            public void append(LogEvent event) {
                String message;
                if (Level.DEBUG.equals((Object)event.getLevel()) && (message = event.getMessage().getFormattedMessage()).startsWith(RequestReplyTempDestRemovalAdvisoryRaceTest.BROKER_B) && message.contains("remove local subscription")) {
                    try {
                        RequestReplyTempDestRemovalAdvisoryRaceTest.this.consumerDemandExists.countDown();
                        System.err.println("Sleeping on receipt of remove info debug message: " + message);
                        TimeUnit.SECONDS.sleep(2L);
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                }
            }
        };
        this.slowDownAppender.start();
        logger.get().addAppender((Appender)this.slowDownAppender, Level.DEBUG, (Filter)new AbstractFilter(){});
        logger.addAppender((Appender)this.slowDownAppender);
    }

    @Override
    protected void setUp() throws Exception {
        super.setUp();
        this.responseReceived.set(0L);
        this.respondentSendError.set(0L);
        this.forwardFailures.set(0L);
        this.sendsWithNoConsumers.set(0L);
        this.networkConnectors.clear();
        this.advisoryConsumerConnections.clear();
        this.consumerDemandExists = new CountDownLatch(1);
        this.createBroker(new URI("broker:(tcp://localhost:0)/BrokerA?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
        this.createBroker(new URI("broker:(tcp://localhost:0)/BrokerB?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
        this.createBroker(new URI("broker:(tcp://localhost:0)/BrokerC?persistent=false&useJmx=false")).setDedicatedTaskRunner(false);
        PolicyMap map = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setSendAdvisoryIfNoConsumers(true);
        SharedDeadLetterStrategy deadletterStrategy = new SharedDeadLetterStrategy();
        deadletterStrategy.setProcessNonPersistent(true);
        defaultEntry.setDeadLetterStrategy((DeadLetterStrategy)deadletterStrategy);
        defaultEntry.setDispatchPolicy((DispatchPolicy)new PriorityDispatchPolicy());
        map.put((ActiveMQDestination)new ActiveMQTempTopic(">"), (Object)defaultEntry);
        for (JmsMultipleBrokersTestSupport.BrokerItem item : this.brokers.values()) {
            item.broker.setDestinationPolicy(map);
        }
    }

    @Override
    protected void tearDown() throws Exception {
        if (this.slowDownAppender != null) {
            ((org.apache.logging.log4j.core.Logger)org.apache.logging.log4j.core.Logger.class.cast(LogManager.getRootLogger())).removeAppender((Appender)this.slowDownAppender);
        }
        for (Connection connection : this.advisoryConsumerConnections) {
            connection.close();
        }
        super.tearDown();
    }

    protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName, boolean dynamicOnly, int networkTTL) throws Exception {
        NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName, dynamicOnly, networkTTL, true);
        connector.setBridgeTempDestinations(true);
        connector.setAdvisoryForFailedForward(true);
        connector.setDuplex(this.useDuplex);
        connector.setAlwaysSyncSend(true);
        this.networkConnectors.add(connector);
        return connector;
    }

    class EchoRespondent
    extends MessageClient
    implements Runnable {
        public EchoRespondent(ActiveMQConnectionFactory factory) throws Exception {
            super(factory, 0);
        }

        @Override
        public void run() {
            try {
                LOG.info("RESPONDENT LISTENING");
                while (!RequestReplyTempDestRemovalAdvisoryRaceTest.this.shutdown.get()) {
                    Message incomingMessage = this.consumer.receive(1000L);
                    if (!(incomingMessage instanceof TextMessage)) continue;
                    ActiveMQTextMessage textMessage = (ActiveMQTextMessage)incomingMessage;
                    try {
                        LOG.info("RESPONDENT: Received a message: [" + textMessage.getText() + "]" + String.valueOf(Arrays.asList(textMessage.getBrokerPath())));
                        TextMessage message = this.session.createTextMessage("reply: " + textMessage.getText());
                        Destination replyTo = incomingMessage.getJMSReplyTo();
                        TimeUnit.MILLISECONDS.sleep(this.timeToSleep);
                        RequestReplyTempDestRemovalAdvisoryRaceTest.this.consumerDemandExists.await(5L, TimeUnit.SECONDS);
                        try {
                            this.producer.send(replyTo, (Message)message);
                            LOG.info("RESPONDENT: sent reply:" + message.getJMSMessageID() + " back to: " + String.valueOf(replyTo));
                        }
                        catch (JMSException e) {
                            LOG.error("RESPONDENT: could not send reply message: " + e.getLocalizedMessage(), (Throwable)e);
                            RequestReplyTempDestRemovalAdvisoryRaceTest.this.respondentSendError.incrementAndGet();
                        }
                    }
                    catch (JMSException e) {
                        LOG.error("RESPONDENT: could not create the reply message: " + e.getLocalizedMessage(), (Throwable)e);
                    }
                    catch (InterruptedException e) {
                        LOG.info("RESPONDENT could not generate a random number");
                    }
                }
            }
            catch (JMSException e) {
                LOG.info("RESPONDENT: Could not set the message listener on the respondent");
            }
        }

        @Override
        protected void initProducer() throws JMSException {
            this.producer = this.session.createProducer(null);
            this.producer.setDeliveryMode(1);
        }

        @Override
        protected void initConsumer() throws JMSException {
            this.consumer = this.session.createConsumer((Destination)new ActiveMQQueue(RequestReplyTempDestRemovalAdvisoryRaceTest.QUEUE_NAME));
        }
    }

    class MessageSender
    extends MessageClient
    implements Runnable {
        protected Destination tempDest;

        public MessageSender(ActiveMQConnectionFactory factory) throws Exception {
            super(factory, 1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                TextMessage message = this.session.createTextMessage("request: message #" + RequestReplyTempDestRemovalAdvisoryRaceTest.this.messageCount.getAndIncrement());
                message.setJMSReplyTo(this.tempDest);
                this.producer.send((Message)message);
                LOG.info("SENDER: Message [" + message.getText() + "] has been sent.");
                Message incomingMessage = this.consumer.receive((long)this.timeToSleep);
                if (incomingMessage instanceof TextMessage) {
                    try {
                        LOG.info("SENDER: Got a response from echo service!" + ((TextMessage)incomingMessage).getText());
                        RequestReplyTempDestRemovalAdvisoryRaceTest.this.responseReceived.incrementAndGet();
                    }
                    catch (JMSException e) {
                        LOG.error("SENDER: might want to see why i'm getting non-text messages..." + String.valueOf(incomingMessage), (Throwable)e);
                    }
                } else {
                    LOG.info("SENDER: Did not get a response this time");
                }
            }
            catch (JMSException e) {
                LOG.error("SENDER: Could not complete message sending properly: " + e.getMessage());
            }
            finally {
                try {
                    this.producer.close();
                    this.consumer.close();
                    this.session.close();
                    this.connection.close();
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

        @Override
        protected void preInit() throws JMSException {
            this.tempDest = this.session.createTemporaryTopic();
        }

        @Override
        protected void initProducer() throws JMSException {
            this.producer = this.session.createProducer((Destination)new ActiveMQQueue(RequestReplyTempDestRemovalAdvisoryRaceTest.QUEUE_NAME));
        }

        @Override
        protected void initConsumer() throws JMSException {
            this.consumer = this.session.createConsumer(this.tempDest);
            LOG.info("consumer for: " + String.valueOf(this.tempDest) + ", " + String.valueOf(this.consumer));
        }
    }

    abstract class MessageClient {
        protected Connection connection;
        protected Session session;
        protected MessageConsumer consumer;
        protected MessageProducer producer;
        protected Random random;
        protected int timeToSleep;

        public MessageClient(ActiveMQConnectionFactory factory, int timeToSleep) throws Exception {
            this.connection = factory.createConnection();
            this.session = this.connection.createSession(false, 1);
            this.timeToSleep = timeToSleep;
            this.random = new Random(System.currentTimeMillis());
            this.preInit();
            this.initProducer();
            this.initConsumer();
            this.connection.start();
        }

        protected void preInit() throws JMSException {
        }

        protected abstract void initProducer() throws JMSException;

        protected abstract void initConsumer() throws JMSException;
    }
}

