/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.usecases;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assert;
import org.junit.Test;

public class RequestReplyToTopicViaThreeNetworkHopsTest {
    protected static final int CONCURRENT_CLIENT_COUNT = 5;
    protected static final int CONCURRENT_SERVER_COUNT = 5;
    protected static final int TOTAL_CLIENT_ITER = 10;
    protected static int Next_broker_num = 0;
    protected EmbeddedTcpBroker edge1 = new EmbeddedTcpBroker("edge", 1);
    protected EmbeddedTcpBroker edge2 = new EmbeddedTcpBroker("edge", 2);
    protected EmbeddedTcpBroker core1 = new EmbeddedTcpBroker("core", 1);
    protected EmbeddedTcpBroker core2 = new EmbeddedTcpBroker("core", 2);
    protected boolean testError = false;
    protected boolean fatalTestError = false;
    protected int echoResponseFill = 0;
    protected static Log LOG = LogFactory.getLog(RequestReplyToTopicViaThreeNetworkHopsTest.class);
    public boolean duplex = true;

    public RequestReplyToTopicViaThreeNetworkHopsTest() throws Exception {
        this.edge1.coreConnectTo(this.core1, this.duplex);
        this.edge2.coreConnectTo(this.core2, this.duplex);
        this.core1.coreConnectTo(this.core2, this.duplex);
    }

    public void logMessage(String msg) {
        System.out.println(msg);
        System.out.flush();
    }

    public void testMessages(Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg) throws Exception {
        MessageConsumer resp_cons = sess.createConsumer(resp_dest);
        MessageClient cons_client = new MessageClient(resp_cons, num_msg);
        cons_client.start();
        for (int cur = 0; cur < num_msg && !this.fatalTestError; ++cur) {
            TextMessage msg = sess.createTextMessage("MSG AAAA " + cur);
            msg.setIntProperty("SEQ", 100 + cur);
            msg.setStringProperty("TEST", "TOPO");
            msg.setJMSReplyTo(resp_dest);
            if (cur == num_msg - 1) {
                msg.setBooleanProperty("end-of-response", true);
            }
            this.sendWithRetryOnDeletedDest(req_prod, (Message)msg);
            LOG.debug((Object)("Sent:" + msg));
        }
        cons_client.waitShutdown(5000L);
        if (cons_client.shutdown()) {
            LOG.debug((Object)"Consumer client shutdown complete");
        } else {
            LOG.debug((Object)"Consumer client shutdown incomplete!!!");
        }
        int tot_expected = num_msg * (this.echoResponseFill + 1);
        if (cons_client.getNumMsgReceived() == tot_expected) {
            LOG.debug((Object)("Have " + tot_expected + " messages, as-expected"));
        } else {
            this.testError = true;
            if (cons_client.getNumMsgReceived() == 0) {
                this.fatalTestError = true;
            }
            LOG.error((Object)("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected + " on destination " + resp_dest));
        }
        resp_cons.close();
    }

    protected void sendWithRetryOnDeletedDest(MessageProducer prod, Message msg) throws JMSException {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("SENDING REQUEST message " + msg));
            }
            prod.send(msg);
        }
        catch (JMSException jms_exc) {
            System.out.println("AAA: " + jms_exc.getMessage());
            throw jms_exc;
        }
    }

    public void testOneDest(Connection conn, Session sess, Destination cons_dest, int num_msg) throws Exception {
        LOG.trace((Object)"Creating echo queue and producer");
        Queue prod_dest = sess.createQueue("echo");
        MessageProducer msg_prod = sess.createProducer((Destination)prod_dest);
        this.testMessages(sess, msg_prod, cons_dest, num_msg);
        msg_prod.close();
    }

    public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception {
        int num_msg = 5;
        LOG.debug((Object)("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"));
        Connection conn = this.createConnection(cons_broker_url);
        conn.start();
        Session sess = conn.createSession(false, 1);
        LOG.trace((Object)"Creating destination");
        TemporaryTopic cons_dest = sess.createTemporaryTopic();
        this.testOneDest(conn, sess, (Destination)cons_dest, num_msg);
        sess.close();
        conn.close();
    }

    public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception {
        int num_msg = 5;
        LOG.info((Object)("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"));
        Connection conn = this.createConnection(cons_broker_url);
        conn.start();
        Session sess = conn.createSession(false, 1);
        String topic_name = "topotest2.perm.topic";
        LOG.trace((Object)"Removing existing Topic");
        RequestReplyToTopicViaThreeNetworkHopsTest.removeTopic(conn, topic_name);
        LOG.trace((Object)("Creating Topic, " + topic_name));
        Topic cons_dest = sess.createTopic(topic_name);
        this.testOneDest(conn, sess, (Destination)cons_dest, num_msg);
        RequestReplyToTopicViaThreeNetworkHopsTest.removeTopic(conn, topic_name);
        sess.close();
        conn.close();
    }

    public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception {
        int num_msg = 5;
        LOG.info((Object)("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"));
        Connection conn = this.createConnection(cons_broker_url);
        conn.start();
        Session sess = conn.createSession(false, 1);
        LOG.trace((Object)"Creating destination");
        TemporaryQueue cons_dest = sess.createTemporaryQueue();
        this.testOneDest(conn, sess, (Destination)cons_dest, num_msg);
        sess.close();
        conn.close();
    }

    public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception {
        int num_msg = 5;
        LOG.info((Object)("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"));
        Connection conn = this.createConnection(cons_broker_url);
        conn.start();
        Session sess = conn.createSession(false, 1);
        String queue_name = "topotest2.perm.queue";
        LOG.trace((Object)"Removing existing Queue");
        RequestReplyToTopicViaThreeNetworkHopsTest.removeQueue(conn, queue_name);
        LOG.trace((Object)("Creating Queue, " + queue_name));
        Queue cons_dest = sess.createQueue(queue_name);
        this.testOneDest(conn, sess, (Destination)cons_dest, num_msg);
        RequestReplyToTopicViaThreeNetworkHopsTest.removeQueue(conn, queue_name);
        sess.close();
        conn.close();
    }

    @Test
    public void runWithTempTopicReplyTo() throws Exception {
        int iter;
        this.fatalTestError = false;
        this.testError = false;
        ThreadPoolExecutor clientExecPool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000));
        final CountDownLatch clientCompletionLatch = new CountDownLatch(10);
        Thread start1 = new Thread(){

            @Override
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.edge1.start();
                }
                catch (Exception ex) {
                    LOG.error(null, (Throwable)ex);
                }
            }
        };
        Thread start2 = new Thread(){

            @Override
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.edge2.start();
                }
                catch (Exception ex) {
                    LOG.error(null, (Throwable)ex);
                }
            }
        };
        Thread start3 = new Thread(){

            @Override
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.core1.start();
                }
                catch (Exception ex) {
                    LOG.error(null, (Throwable)ex);
                }
            }
        };
        Thread start4 = new Thread(){

            @Override
            public void run() {
                try {
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.core2.start();
                }
                catch (Exception ex) {
                    LOG.error(null, (Throwable)ex);
                }
            }
        };
        start1.start();
        start2.start();
        start3.start();
        start4.start();
        start1.join();
        start2.join();
        start3.join();
        start4.join();
        TopicTrafficGenerator traffic_gen = new TopicTrafficGenerator(this.edge1.getConnectionUrl(), this.edge2.getConnectionUrl());
        traffic_gen.start();
        EchoService echo_svc = new EchoService("echo", this.edge1.getConnectionUrl());
        echo_svc.start();
        LOG.info((Object)"** STARTING TEMP TOPIC TESTS");
        for (iter = 0; iter < 10 && !this.fatalTestError; ++iter) {
            clientExecPool.execute(new Runnable(){

                @Override
                public void run() {
                    try {
                        RequestReplyToTopicViaThreeNetworkHopsTest.this.testTempTopic(RequestReplyToTopicViaThreeNetworkHopsTest.this.edge1.getConnectionUrl(), RequestReplyToTopicViaThreeNetworkHopsTest.this.edge2.getConnectionUrl());
                    }
                    catch (Exception exc) {
                        LOG.error((Object)"test exception", (Throwable)exc);
                        RequestReplyToTopicViaThreeNetworkHopsTest.this.fatalTestError = true;
                        RequestReplyToTopicViaThreeNetworkHopsTest.this.testError = true;
                    }
                    clientCompletionLatch.countDown();
                }
            });
        }
        boolean allDoneOnTime = clientCompletionLatch.await(20L, TimeUnit.MINUTES);
        LOG.info((Object)("** FINISHED TEMP TOPIC TESTS AFTER " + iter + " ITERATIONS, testError:" + this.testError + ", fatal: " + this.fatalTestError + ", onTime:" + allDoneOnTime));
        Thread.sleep(100L);
        echo_svc.shutdown();
        traffic_gen.shutdown();
        this.shutdown();
        Assert.assertTrue((String)"test completed in time", (boolean)allDoneOnTime);
        Assert.assertTrue((String)"no errors", (!this.testError ? 1 : 0) != 0);
    }

    public void shutdown() throws Exception {
        this.edge1.stop();
        this.edge2.stop();
        this.core1.stop();
        this.core2.stop();
    }

    protected Connection createConnection(String url) throws Exception {
        return ActiveMQConnection.makeConnection((String)url);
    }

    protected static void removeQueue(Connection conn, String dest_name) throws Exception {
        if (conn instanceof ActiveMQConnection) {
            ActiveMQDestination dest = ActiveMQDestination.createDestination((String)dest_name, (byte)1);
            ((ActiveMQConnection)conn).destroyDestination(dest);
        }
    }

    protected static void removeTopic(Connection conn, String dest_name) throws Exception {
        if (conn instanceof ActiveMQConnection) {
            ActiveMQDestination dest = ActiveMQDestination.createDestination((String)dest_name, (byte)2);
            ((ActiveMQConnection)conn).destroyDestination(dest);
        }
    }

    public static String fmtMsgInfo(Message msg) throws Exception {
        StringBuilder msg_desc = new StringBuilder();
        msg_desc = new StringBuilder();
        if (msg instanceof TextMessage) {
            msg_desc.append(((TextMessage)msg).getText());
        } else {
            msg_desc.append("[");
            msg_desc.append(msg.getClass().getName());
            msg_desc.append("]");
        }
        Enumeration prop_enum = msg.getPropertyNames();
        while (prop_enum.hasMoreElements()) {
            String prop = (String)prop_enum.nextElement();
            msg_desc.append("; ");
            msg_desc.append(prop);
            msg_desc.append("=");
            msg_desc.append(msg.getStringProperty(prop));
        }
        return msg_desc.toString();
    }

    protected class TopicTrafficGenerator
    extends Thread {
        protected Connection conn1;
        protected Connection conn2;
        protected Session sess1;
        protected Session sess2;
        protected Destination dest;
        protected MessageProducer prod;
        protected MessageConsumer cons;
        protected boolean Shutdown_ind;
        protected int send_count;

        public TopicTrafficGenerator(String url1, String url2) throws Exception {
            this.conn1 = RequestReplyToTopicViaThreeNetworkHopsTest.this.createConnection(url1);
            this.conn2 = RequestReplyToTopicViaThreeNetworkHopsTest.this.createConnection(url2);
            this.sess1 = this.conn1.createSession(false, 1);
            this.sess2 = this.conn2.createSession(false, 1);
            this.conn1.start();
            this.conn2.start();
            this.dest = this.sess1.createTopic("traffic");
            this.prod = this.sess1.createProducer(this.dest);
            this.dest = this.sess2.createTopic("traffic");
            this.cons = this.sess2.createConsumer(this.dest);
        }

        public void shutdown() {
            this.Shutdown_ind = true;
        }

        @Override
        public void run() {
            try {
                LOG.info((Object)"Starting Topic Traffic Generator");
                while (!this.Shutdown_ind) {
                    TextMessage msg = this.sess1.createTextMessage("TRAFFIC");
                    this.prod.send((Message)msg);
                    ++this.send_count;
                    msg = this.cons.receive(250L);
                }
            }
            catch (JMSException jms_exc) {
                LOG.warn((Object)"traffic generator failed on jms exception", (Throwable)jms_exc);
            }
            finally {
                LOG.info((Object)("Shutdown of Topic Traffic Generator; send count = " + this.send_count));
                if (this.conn1 != null) {
                    try {
                        this.conn1.stop();
                    }
                    catch (JMSException jms_exc) {
                        LOG.warn((Object)"failed to shutdown connection", (Throwable)jms_exc);
                    }
                }
                if (this.conn2 != null) {
                    try {
                        this.conn2.stop();
                    }
                    catch (JMSException jms_exc) {
                        LOG.warn((Object)"failed to shutdown connection", (Throwable)jms_exc);
                    }
                }
            }
        }
    }

    protected class EchoRequestProcessor
    implements Runnable {
        protected Session session;
        protected Destination resp_dest;
        protected MessageProducer msg_prod;
        protected Message request;

        public EchoRequestProcessor(Session sess, Message req) throws Exception {
            this.session = sess;
            this.request = req;
            this.resp_dest = req.getJMSReplyTo();
            if (this.resp_dest == null) {
                throw new Exception("invalid request: no reply-to destination given");
            }
            this.msg_prod = this.session.createProducer(this.resp_dest);
        }

        @Override
        public void run() {
            try {
                this.processRequest(this.request);
            }
            catch (Exception ex) {
                LOG.error((Object)"Failed to process request", (Throwable)ex);
            }
        }

        protected void processRequest(Message req) throws Exception {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("ECHO request message " + req.toString()));
            }
            this.resp_dest = req.getJMSReplyTo();
            if (this.resp_dest != null) {
                this.msg_prod = this.session.createProducer(this.resp_dest);
                LOG.debug((Object)("SENDING ECHO RESPONSE to:" + this.resp_dest));
                this.msg_prod.send(req);
                LOG.debug((Object)(((ActiveMQSession)this.session).getConnection().getBrokerName() + " SENT ECHO RESPONSE to " + this.resp_dest));
                this.msg_prod.close();
                this.msg_prod = null;
            } else {
                LOG.warn((Object)"invalid request: no reply-to destination given");
            }
        }
    }

    protected class EchoService
    extends Thread {
        protected String destName;
        protected Connection jmsConn;
        protected Session sess;
        protected MessageConsumer msg_cons;
        protected boolean Shutdown_ind;
        protected Destination req_dest;
        protected CountDownLatch waitShutdown;
        protected ThreadPoolExecutor processorPool;

        public EchoService(String dest, Connection broker_conn) throws Exception {
            this.destName = dest;
            this.jmsConn = broker_conn;
            this.Shutdown_ind = false;
            this.sess = this.jmsConn.createSession(false, 1);
            this.req_dest = this.sess.createQueue(this.destName);
            this.msg_cons = this.sess.createConsumer(this.req_dest);
            this.jmsConn.start();
            this.waitShutdown = new CountDownLatch(1);
            this.processorPool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000));
        }

        public EchoService(String dest, String broker_url) throws Exception {
            this(dest, (Connection)ActiveMQConnection.makeConnection((String)broker_url));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                LOG.info((Object)"STARTING ECHO SERVICE");
                while (!this.Shutdown_ind) {
                    Message req = this.msg_cons.receive(100L);
                    if (req == null) continue;
                    this.processorPool.execute(new EchoRequestProcessor(this.sess, req));
                }
            }
            catch (Exception ex) {
                LOG.error((Object)"error processing echo service requests", (Throwable)ex);
            }
            finally {
                LOG.info((Object)"shutting down test echo service");
                try {
                    this.jmsConn.stop();
                }
                catch (JMSException jms_exc) {
                    LOG.warn((Object)"error on shutting down JMS connection", (Throwable)jms_exc);
                }
                EchoService echoService = this;
                synchronized (echoService) {
                    this.waitShutdown.countDown();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void shutdown() {
            CountDownLatch wait_l;
            EchoService echoService = this;
            synchronized (echoService) {
                wait_l = this.waitShutdown;
            }
            this.Shutdown_ind = true;
            try {
                if (wait_l != null) {
                    if (wait_l.await(3000L, TimeUnit.MILLISECONDS)) {
                        LOG.info((Object)"echo service shutdown complete");
                    } else {
                        LOG.warn((Object)"timeout waiting for echo service shutdown");
                    }
                } else {
                    LOG.info((Object)"echo service shutdown: service does not appear to be active");
                }
            }
            catch (InterruptedException int_exc) {
                LOG.warn((Object)"interrupted while waiting for echo service shutdown");
            }
        }
    }

    protected class MessageClient
    extends Thread {
        protected MessageConsumer msgCons;
        protected boolean shutdownInd;
        protected int expectedCount;
        protected int lastSeq = 0;
        protected int msgCount = 0;
        protected boolean haveFirstSeq;
        protected CountDownLatch shutdownLatch;

        public MessageClient(MessageConsumer cons, int num_to_expect) {
            this.msgCons = cons;
            this.expectedCount = num_to_expect * (RequestReplyToTopicViaThreeNetworkHopsTest.this.echoResponseFill + 1);
            this.shutdownLatch = new CountDownLatch(1);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                CountDownLatch latch;
                MessageClient messageClient = this;
                synchronized (messageClient) {
                    latch = this.shutdownLatch;
                }
                this.shutdownInd = false;
                this.processMessages();
                latch.countDown();
            }
            catch (Exception exc) {
                LOG.error((Object)"message client error", (Throwable)exc);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitShutdown(long timeout) {
            try {
                CountDownLatch latch;
                MessageClient messageClient = this;
                synchronized (messageClient) {
                    latch = this.shutdownLatch;
                }
                if (latch != null) {
                    latch.await(timeout, TimeUnit.MILLISECONDS);
                } else {
                    LOG.info((Object)"echo client shutdown: client does not appear to be active");
                }
            }
            catch (InterruptedException int_exc) {
                LOG.warn((Object)"wait for message client shutdown interrupted", (Throwable)int_exc);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean shutdown() {
            boolean down_ind;
            if (!this.shutdownInd) {
                this.shutdownInd = true;
            }
            this.waitShutdown(200L);
            MessageClient messageClient = this;
            synchronized (messageClient) {
                down_ind = this.shutdownLatch == null || this.shutdownLatch.getCount() == 0L;
            }
            return down_ind;
        }

        public int getNumMsgReceived() {
            return this.msgCount;
        }

        protected void processMessages() throws Exception {
            this.haveFirstSeq = false;
            while (!this.shutdownInd && !RequestReplyToTopicViaThreeNetworkHopsTest.this.fatalTestError) {
                Message in_msg = this.msgCons.receive(100L);
                if (in_msg == null) continue;
                ++this.msgCount;
                this.checkMessage(in_msg);
            }
            this.msgCons.close();
        }

        protected void checkMessage(Message in_msg) throws Exception {
            LOG.debug((Object)("received message " + RequestReplyToTopicViaThreeNetworkHopsTest.fmtMsgInfo(in_msg) + " from " + in_msg.getJMSDestination()));
            if (in_msg.propertyExists("SEQ")) {
                int seq = in_msg.getIntProperty("SEQ");
                if (this.haveFirstSeq && seq != this.lastSeq + 1) {
                    LOG.error((Object)("***ERROR*** incorrect sequence number; expected " + Integer.toString(this.lastSeq + 1) + " but have " + Integer.toString(seq)));
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.testError = true;
                }
                this.lastSeq = seq;
                if (this.msgCount > this.expectedCount) {
                    LOG.error((Object)("*** have more messages than expected; have " + this.msgCount + "; expect " + this.expectedCount));
                    RequestReplyToTopicViaThreeNetworkHopsTest.this.testError = true;
                }
            }
            if (in_msg.propertyExists("end-of-response")) {
                LOG.trace((Object)"received end-of-response message");
            }
        }
    }

    protected class EmbeddedTcpBroker {
        protected BrokerService brokerSvc = new BrokerService();
        protected int brokerNum;
        protected String brokerName;
        protected String brokerId;
        protected int port;
        protected String tcpUrl;
        protected String fullUrl;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public EmbeddedTcpBroker(String name, int number) throws Exception {
            Class<?> clazz = this.getClass();
            synchronized (clazz) {
                this.brokerNum = Next_broker_num++;
            }
            this.brokerName = name + number;
            this.brokerId = this.brokerName;
            this.brokerSvc.setBrokerName(this.brokerName);
            this.brokerSvc.setBrokerId(this.brokerId);
            this.brokerSvc.setPersistent(false);
            this.brokerSvc.setUseJmx(false);
            this.port = 60000 + this.brokerNum * 10;
            this.tcpUrl = "tcp://127.0.0.1:" + Integer.toString(this.port);
            this.fullUrl = this.tcpUrl + "?jms.watchTopicAdvisories=false";
            this.brokerSvc.addConnector(this.tcpUrl);
        }

        public Connection createConnection() throws URISyntaxException, JMSException {
            ActiveMQConnection result = ActiveMQConnection.makeConnection((String)this.fullUrl);
            return result;
        }

        public String getConnectionUrl() {
            return this.fullUrl;
        }

        public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception {
            this.makeConnectionTo(other, duplex_f, true);
            this.makeConnectionTo(other, duplex_f, false);
            if (!duplex_f) {
                other.makeConnectionTo(this, duplex_f, true);
                other.makeConnectionTo(this, duplex_f, false);
            }
        }

        public void start() throws Exception {
            this.brokerSvc.start();
            this.brokerSvc.waitUntilStarted();
        }

        public void stop() throws Exception {
            this.brokerSvc.stop();
        }

        protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f) throws Exception {
            ActiveMQDestination excl_dest;
            String prefix;
            DiscoveryNetworkConnector nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
            nw_conn.setDuplex(duplex_f);
            if (queue_f) {
                nw_conn.setConduitSubscriptions(false);
            } else {
                nw_conn.setConduitSubscriptions(true);
            }
            nw_conn.setNetworkTTL(3);
            nw_conn.setSuppressDuplicateQueueSubscriptions(true);
            nw_conn.setDecreaseNetworkConsumerPriority(true);
            nw_conn.setBridgeTempDestinations(queue_f);
            if (queue_f) {
                prefix = "queue";
                excl_dest = ActiveMQDestination.createDestination((String)">", (byte)2);
            } else {
                prefix = "topic";
                excl_dest = ActiveMQDestination.createDestination((String)">", (byte)1);
            }
            ArrayList<ActiveMQDestination> excludes = new ArrayList<ActiveMQDestination>();
            excludes.add(excl_dest);
            nw_conn.setExcludedDestinations(excludes);
            if (duplex_f) {
                nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
            } else {
                nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
            }
            this.brokerSvc.addNetworkConnector((NetworkConnector)nw_conn);
        }
    }
}

