/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AMQ7185Test {
    private final String xaDestinationName = "DestinationXA";
    private BrokerService broker;
    private String connectionUri;
    private long txGenerator = System.currentTimeMillis();
    private XAConnectionFactory xaConnectionFactory;
    private ConnectionFactory connectionFactory;
    final Topic dest = new ActiveMQTopic("DestinationXA");

    @Before
    public void startBroker() throws Exception {
        this.broker = new BrokerService();
        this.broker.setDeleteAllMessagesOnStartup(true);
        this.broker.setPersistent(false);
        this.broker.setAdvisorySupport(false);
        this.broker.addConnector("tcp://0.0.0.0:0?trace=true");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.connectionUri = ((TransportConnector)this.broker.getTransportConnectors().get(0)).getPublishableConnectString();
        this.connectionFactory = new ActiveMQConnectionFactory(this.connectionUri);
        ((ActiveMQConnectionFactory)this.connectionFactory).setWatchTopicAdvisories(false);
        this.xaConnectionFactory = new ActiveMQXAConnectionFactory("failover://" + this.connectionUri);
        ((ActiveMQXAConnectionFactory)this.xaConnectionFactory).setWatchTopicAdvisories(false);
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Test
    public void testRollbackRedeliveryNoDup() throws Exception {
        XAConnection xaConnection = this.xaConnectionFactory.createXAConnection();
        xaConnection.setClientID("cid0");
        xaConnection.start();
        XASession session = xaConnection.createXASession();
        TopicSubscriber consumer = session.createDurableSubscriber(this.dest, "sub");
        consumer.close();
        session.close();
        xaConnection.close();
        this.publish(this.dest);
        xaConnection = this.xaConnectionFactory.createXAConnection();
        xaConnection.setClientID("cid0");
        xaConnection.start();
        session = xaConnection.createXASession();
        consumer = session.createDurableSubscriber(this.dest, "sub");
        Xid tid = this.createXid();
        XAResource resource = session.getXAResource();
        resource.start(tid, 0);
        TextMessage receivedMessage = (TextMessage)consumer.receive(4000L);
        Assert.assertNotNull((Object)receivedMessage);
        resource.end(tid, 0x4000000);
        resource.rollback(tid);
        consumer.close();
        session.close();
        xaConnection.close();
        xaConnection = this.xaConnectionFactory.createXAConnection();
        xaConnection.setClientID("cid0");
        xaConnection.start();
        session = xaConnection.createXASession();
        consumer = session.createDurableSubscriber(this.dest, "sub");
        tid = this.createXid();
        resource = session.getXAResource();
        resource.start(tid, 0);
        receivedMessage = (TextMessage)consumer.receive(1000L);
        Assert.assertNotNull((Object)receivedMessage);
        receivedMessage = (TextMessage)consumer.receiveNoWait();
        Assert.assertNull((Object)receivedMessage);
        resource.end(tid, 0x4000000);
        resource.commit(tid, true);
        consumer.close();
        session.close();
        xaConnection.close();
        Assert.assertEquals((String)"Only one enqueue", (long)1L, (long)this.broker.getAdminView().getTotalEnqueueCount());
    }

    private void publish(Topic dest) throws JMSException {
        Connection connection = this.connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, 1);
        session.createProducer((Destination)dest).send((Message)new ActiveMQTextMessage());
        connection.close();
    }

    public Xid createXid() throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream os = new DataOutputStream(baos);
        os.writeLong(++this.txGenerator);
        os.close();
        final byte[] bs = baos.toByteArray();
        return new Xid(){

            @Override
            public int getFormatId() {
                return 86;
            }

            @Override
            public byte[] getGlobalTransactionId() {
                return bs;
            }

            @Override
            public byte[] getBranchQualifier() {
                return bs;
            }
        };
    }
}

