/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.net.URI;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicBridgeSelectorConduitOnOff {
    private static final Logger LOG = LoggerFactory.getLogger(TopicBridgeSelectorConduitOnOff.class);
    BrokerService brokerA;
    BrokerService brokerB;
    final int numProducers = 20;
    final int numConsumers = 20;
    final int numberOfMessagesToSendPerProducer = 5000;
    final ActiveMQTopic destination = new ActiveMQTopic("TOPIC");

    @After
    public void stopBrokers() throws Exception {
        this.brokerA.stop();
        this.brokerB.stop();
    }

    @Test
    public void testForwardsWithConduitSubsTrue() throws Exception {
        this.doTestWithConduit(true);
    }

    @Test
    public void testForwardsWithConduitSubsFalse() throws Exception {
        this.doTestWithConduit(false);
    }

    private void doTestWithConduit(boolean conduitPlease) throws Exception {
        this.brokerA = this.newBroker("A");
        this.brokerB = this.newBroker("B");
        this.brokerB.start();
        NetworkConnector networkConnector = this.bridgeBrokers(this.brokerA, this.brokerB, conduitPlease);
        this.brokerA.start();
        while (networkConnector.activeBridges().size() == 0) {
            LOG.info("num bridges: " + String.valueOf(networkConnector.activeBridges()));
            TimeUnit.SECONDS.sleep(1L);
        }
        final CountDownLatch allReceived = new CountDownLatch(1000000);
        final ActiveMQConnectionFactory localConnectionFactoryForProducers = new ActiveMQConnectionFactory(this.brokerA.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        final ActiveMQConnectionFactory remoteConnectionFactoryForConsumers = new ActiveMQConnectionFactory(this.brokerB.getTransportConnectorByScheme("tcp").getPublishableConnectString() + "?jms.watchTopicAdvisories=false");
        ExecutorService consumersExecutor = Executors.newFixedThreadPool(20);
        final AtomicInteger receivedCount = new AtomicInteger(0);
        final List<Connection> connections = Collections.synchronizedList(new LinkedList());
        final CountDownLatch consumersRegistered = new CountDownLatch(20);
        int i = 0;
        while (i < 20) {
            final int id = i++;
            consumersExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        Connection connection = remoteConnectionFactoryForConsumers.createConnection();
                        connection.start();
                        Session session = connection.createSession(false, 1);
                        MessageConsumer consumerWithSelector = session.createConsumer((Destination)TopicBridgeSelectorConduitOnOff.this.destination, id % 2 == 0 ? "COLOUR = 'RED'" : "COLOUR = 'BLUE'");
                        consumerWithSelector.setMessageListener(new MessageListener(){

                            public void onMessage(Message message) {
                                int messageCount = receivedCount.incrementAndGet();
                                allReceived.countDown();
                                if (messageCount % 20000 == 0) {
                                    try {
                                        LOG.info("Consumer id: " + id + ", message COLOUR:" + message.getStringProperty("COLOUR"));
                                    }
                                    catch (JMSException e) {
                                        e.printStackTrace();
                                    }
                                }
                            }
                        });
                        connections.add(connection);
                        consumersRegistered.countDown();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        consumersRegistered.await(20L, TimeUnit.SECONDS);
        Topic topic = (Topic)this.brokerA.getDestination((ActiveMQDestination)this.destination);
        LOG.info("Num consumers: " + topic.getConsumers().size());
        while (topic.getConsumers().size() != (conduitPlease ? 1 : 20)) {
            LOG.info("Num consumers: " + topic.getConsumers().size());
            TimeUnit.SECONDS.sleep(1L);
        }
        ExecutorService producersExecutor = Executors.newFixedThreadPool(20);
        for (int i2 = 0; i2 < 20; ++i2) {
            producersExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        Connection connection = localConnectionFactoryForProducers.createConnection();
                        connection.start();
                        Session session = connection.createSession(false, 1);
                        MessageProducer producer = session.createProducer((Destination)TopicBridgeSelectorConduitOnOff.this.destination);
                        producer.setDeliveryMode(1);
                        for (int id = 0; id < 5000; ++id) {
                            BytesMessage message = session.createBytesMessage();
                            message.setStringProperty("COLOUR", id % 2 == 0 ? "RED" : "BLUE");
                            producer.send((Message)message);
                        }
                        connection.close();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        producersExecutor.shutdown();
        producersExecutor.awaitTermination(5L, TimeUnit.MINUTES);
        long start = System.currentTimeMillis();
        allReceived.await(5L, TimeUnit.MINUTES);
        LOG.info("Duration to Receive after producers complete: " + (System.currentTimeMillis() - start) + "ms");
        LOG.info("Topic enqueues: " + topic.getDestinationStatistics().getEnqueues().getCount() + ", Total received: " + receivedCount.get() + ", forwards: " + topic.getDestinationStatistics().getForwards().getCount());
        TimeUnit.SECONDS.sleep(10L);
        LOG.info("Topic enqueues: " + topic.getDestinationStatistics().getEnqueues().getCount() + ", Total received: " + receivedCount.get() + ", forwards: " + topic.getDestinationStatistics().getForwards().getCount());
        for (Connection connection : connections) {
            try {
                connection.close();
            }
            catch (Exception exception) {}
        }
        consumersExecutor.shutdown();
        consumersExecutor.awaitTermination(5L, TimeUnit.MINUTES);
    }

    private BrokerService newBroker(String name) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.setUseJmx(false);
        brokerService.setBrokerName(name);
        brokerService.addConnector("tcp://0.0.0.0:0");
        PolicyMap map = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        defaultEntry.setExpireMessagesPeriod(0L);
        defaultEntry.setEnableAudit(true);
        map.setDefaultEntry(defaultEntry);
        brokerService.setDestinationPolicy(map);
        return brokerService;
    }

    protected NetworkConnector bridgeBrokers(BrokerService localBroker, BrokerService remoteBroker, boolean conduitPlease) throws Exception {
        String uri = "static:(" + remoteBroker.getTransportConnectorByScheme("tcp").getPublishableConnectString() + ")";
        DiscoveryNetworkConnector connector = new DiscoveryNetworkConnector(new URI(uri));
        connector.setName(localBroker.getBrokerName() + "-to-" + remoteBroker.getBrokerName());
        connector.setConduitSubscriptions(conduitPlease);
        localBroker.addNetworkConnector((NetworkConnector)connector);
        LOG.info("Bridging with conduit subs:" + conduitPlease);
        return connector;
    }
}

