/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.transport.failover;

import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import jakarta.jms.Topic;
import jakarta.jms.TopicSubscriber;
import jakarta.jms.TransactionRolledBackException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class FailoverDurableSubTransactionTest {
    private static final Logger LOG = LoggerFactory.getLogger(FailoverDurableSubTransactionTest.class);
    private static final String TOPIC_NAME = "Failover.WithTx";
    private static final String TRANSPORT_URI = "tcp://localhost:0";
    private String url;
    BrokerService broker;
    @Parameterized.Parameter(value=0)
    public FailType failType;

    @Parameterized.Parameters(name="failType=#{0}")
    public static Iterable<Object[]> parameters() {
        return Arrays.asList({FailType.ON_DISPATCH}, {FailType.ON_DISPACH_WITH_REPLAY_DELAY}, {FailType.ON_ACK}, {FailType.ON_COMMIT});
    }

    @After
    public void tearDown() throws Exception {
        this.stopBroker();
    }

    public void stopBroker() throws Exception {
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        this.broker = this.createBroker(deleteAllMessagesOnStartup);
        this.broker.start();
    }

    public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
        this.broker = this.createBroker(deleteAllMessagesOnStartup, bindAddress);
        this.broker.start();
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception {
        return this.createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI);
    }

    public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception {
        this.broker = new BrokerService();
        this.broker.setUseJmx(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector(bindAddress);
        this.broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry defaultEntry = new PolicyEntry();
        policyMap.setDefaultEntry(defaultEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.setKeepDurableSubsActive(true);
        this.url = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getConnectUri().toString();
        return this.broker;
    }

    public void configureConnectionFactory(ActiveMQConnectionFactory factory) {
        factory.setWatchTopicAdvisories(false);
        factory.getRedeliveryPolicy().setMaximumRedeliveries(-1);
        if (!FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals((Object)this.failType)) {
            factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0L);
            factory.getRedeliveryPolicy().setRedeliveryDelay(0L);
        }
    }

    @Test
    public void testFailoverCommit() throws Exception {
        final AtomicInteger dispatchedCount = new AtomicInteger(0);
        final int errorAt = FailType.ON_COMMIT.equals((Object)this.failType) ? 1 : 9;
        int messageCount = 10;
        this.broker = this.createBroker(true);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
                if (FailType.ON_COMMIT.equals((Object)FailoverDurableSubTransactionTest.this.failType) && dispatchedCount.incrementAndGet() == errorAt) {
                    for (TransportConnection transportConnection : ((TransportConnector)FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections()) {
                        LOG.error("Whacking connection on commit: " + transportConnection);
                        transportConnection.serviceException((Throwable)new IOException("ERROR NOW"));
                    }
                } else {
                    super.commitTransaction(context, xid, onePhase);
                }
            }

            public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
                if (FailType.ON_ACK.equals((Object)FailoverDurableSubTransactionTest.this.failType) && ack.getAckType() == 0 && dispatchedCount.incrementAndGet() == errorAt) {
                    for (TransportConnection transportConnection : ((TransportConnector)FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections()) {
                        LOG.error("Whacking connection on ack: " + transportConnection);
                        transportConnection.serviceException((Throwable)new IOException("ERROR NOW"));
                    }
                }
                super.acknowledge(consumerExchange, ack);
            }

            public void postProcessDispatch(MessageDispatch messageDispatch) {
                super.postProcessDispatch(messageDispatch);
                if ((FailType.ON_DISPATCH.equals((Object)FailoverDurableSubTransactionTest.this.failType) || FailType.ON_DISPACH_WITH_REPLAY_DELAY.equals((Object)FailoverDurableSubTransactionTest.this.failType)) && dispatchedCount.incrementAndGet() == errorAt) {
                    for (TransportConnection transportConnection : ((TransportConnector)FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections()) {
                        LOG.error("Whacking connection on dispatch: " + transportConnection);
                        transportConnection.serviceException((Throwable)new IOException("ERROR NOW"));
                    }
                }
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        cf.setAlwaysSyncSend(true);
        cf.setAlwaysSessionAsync(false);
        cf.getPrefetchPolicy().setDurableTopicPrefetch(FailType.ON_ACK.equals((Object)this.failType) ? 2 : 100);
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.setClientID("CID");
        connection.start();
        Session session = connection.createSession(true, 0);
        Topic destination = session.createTopic(TOPIC_NAME);
        TopicSubscriber consumer = session.createDurableSubscriber(destination, "DS");
        consumer.close();
        this.produceMessage(destination, 10);
        LOG.info("Production done! " + this.broker.getDestination(ActiveMQDestination.transform((jakarta.jms.Destination)destination)));
        consumer = session.createDurableSubscriber(destination, "DS");
        AtomicBoolean success = new AtomicBoolean(false);
        HashSet<Integer> dupCheck = new HashSet<Integer>();
        while (!success.get()) {
            dupCheck.clear();
            int i = 0;
            for (i = 0; i < 10; ++i) {
                Message msg = consumer.receive(5000L);
                if (msg == null) {
                    LOG.info("Failed to receive on: " + i);
                    break;
                }
                LOG.info("Received: @" + i + ":" + msg.getJMSMessageID() + ", ID:" + msg.getIntProperty("ID"));
                Assert.assertTrue((String)("single instance of: " + i), (boolean)dupCheck.add(msg.getIntProperty("ID")));
            }
            try {
                if (i == 10) {
                    session.commit();
                    success.set(true);
                    continue;
                }
                session.rollback();
            }
            catch (TransactionRolledBackException expected) {
                LOG.info("Got expected", (Throwable)expected);
                session.rollback();
            }
        }
        consumer.close();
        connection.close();
        Destination dlq = this.broker.getDestination(ActiveMQDestination.transform((jakarta.jms.Destination)new ActiveMQQueue("ActiveMQ.DLQ")));
        LOG.info("DLQ: " + dlq);
        Assert.assertEquals((String)"DLQ empty ", (long)0L, (long)dlq.getDestinationStatistics().getMessages().getCount());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFailoverCommitListener() throws Exception {
        final AtomicInteger dispatchedCount = new AtomicInteger(0);
        final int errorAt = FailType.ON_ACK.equals((Object)this.failType) ? 1 : 1;
        int messageCount = 10;
        this.broker = this.createBroker(true);
        this.broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport(){

            public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
                LOG.info("commit request: " + xid);
                if (FailType.ON_COMMIT.equals((Object)FailoverDurableSubTransactionTest.this.failType) && dispatchedCount.incrementAndGet() == errorAt) {
                    for (TransportConnection transportConnection : ((TransportConnector)FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections()) {
                        LOG.error("Whacking connection on commit: " + transportConnection);
                        transportConnection.serviceException((Throwable)new IOException("ERROR NOW"));
                    }
                } else {
                    super.commitTransaction(context, xid, onePhase);
                }
            }

            public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
                LOG.info("ack request: " + ack);
                if (FailType.ON_ACK.equals((Object)FailoverDurableSubTransactionTest.this.failType) && dispatchedCount.incrementAndGet() == errorAt) {
                    for (TransportConnection transportConnection : ((TransportConnector)FailoverDurableSubTransactionTest.this.broker.getTransportConnectors().get(0)).getConnections()) {
                        LOG.error("Whacking connection on ack: " + transportConnection);
                        transportConnection.serviceException((Throwable)new IOException("ERROR NOW"));
                    }
                } else {
                    super.acknowledge(consumerExchange, ack);
                }
            }
        }});
        this.broker.start();
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + this.url + ")");
        cf.setAlwaysSyncSend(true);
        cf.setAlwaysSessionAsync(true);
        cf.setWatchTopicAdvisories(false);
        Connection connection = cf.createConnection();
        connection.setClientID("CID");
        connection.start();
        Session session = connection.createSession(true, 0);
        Topic destination = session.createTopic(TOPIC_NAME);
        TopicSubscriber consumer = session.createDurableSubscriber(destination, "DS");
        consumer.close();
        connection.close();
        this.produceMessage(destination, 20);
        LOG.info("Production done! " + this.broker.getDestination(ActiveMQDestination.transform((jakarta.jms.Destination)destination)));
        connection = cf.createConnection();
        connection.setClientID("CID");
        connection.start();
        final Session receiveSession = connection.createSession(true, 0);
        consumer = receiveSession.createDurableSubscriber(destination, "DS");
        final AtomicBoolean success = new AtomicBoolean(false);
        final HashSet dupCheck = new HashSet();
        final AtomicInteger receivedCount = new AtomicInteger();
        consumer.setMessageListener(new MessageListener(){

            public void onMessage(Message msg) {
                try {
                    int i = receivedCount.getAndIncrement();
                    LOG.info("Received: @" + i + ":" + msg.getJMSMessageID() + ", ID:" + msg.getIntProperty("ID"));
                    Assert.assertTrue((String)("single instance of: " + i), (boolean)dupCheck.add(msg.getIntProperty("ID")));
                    if (receivedCount.get() == 10) {
                        receiveSession.commit();
                        success.set(true);
                    }
                }
                catch (TransactionRolledBackException expected) {
                    LOG.info("Got expected", (Throwable)expected);
                    try {
                        receiveSession.rollback();
                    }
                    catch (JMSException e) {
                        e.printStackTrace();
                    }
                    dupCheck.clear();
                    receivedCount.set(0);
                }
                catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        connection.start();
        try {
            Assert.assertTrue((String)"committed ok", (boolean)Wait.waitFor((Wait.Condition)new Wait.Condition(){

                public boolean isSatisified() throws Exception {
                    return success.get();
                }
            }));
        }
        finally {
            consumer.close();
            connection.close();
        }
        Destination dlq = this.broker.getDestination(ActiveMQDestination.transform((jakarta.jms.Destination)new ActiveMQQueue("ActiveMQ.DLQ")));
        LOG.info("DLQ: " + dlq);
        Assert.assertEquals((String)"DLQ empty ", (long)0L, (long)dlq.getDestinationStatistics().getMessages().getCount());
    }

    private void produceMessage(Topic destination, int count) throws JMSException {
        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(this.url);
        this.configureConnectionFactory(cf);
        Connection connection = cf.createConnection();
        connection.start();
        Session producerSession = connection.createSession(false, 1);
        MessageProducer producer = producerSession.createProducer((jakarta.jms.Destination)destination);
        TextMessage message = producerSession.createTextMessage("Test message");
        for (int i = 0; i < count; ++i) {
            message.setIntProperty("ID", i);
            producer.send((Message)message);
        }
        connection.close();
    }

    public static enum FailType {
        ON_DISPATCH,
        ON_ACK,
        ON_COMMIT,
        ON_DISPACH_WITH_REPLAY_DELAY;

    }
}

