/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.bugs;

import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.Message;
import jakarta.jms.MessageProducer;
import jakarta.jms.ObjectMessage;
import jakarta.jms.Session;
import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQ4361Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ4361Test.class);
    private BrokerService service;
    private String brokerUrlString;

    @Before
    public void setUp() throws Exception {
        this.service = new BrokerService();
        this.service.setDeleteAllMessagesOnStartup(true);
        this.service.setUseJmx(false);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policy = new PolicyEntry();
        policy.setMemoryLimit(1L);
        policy.setPendingSubscriberPolicy((PendingSubscriberMessageStoragePolicy)new VMPendingSubscriberMessageStoragePolicy());
        policy.setPendingQueuePolicy((PendingQueueMessageStoragePolicy)new VMPendingQueueMessageStoragePolicy());
        policy.setProducerFlowControl(true);
        policyMap.setDefaultEntry(policy);
        this.service.setDestinationPolicy(policyMap);
        this.service.setAdvisorySupport(false);
        this.brokerUrlString = this.service.addConnector("tcp://localhost:0").getPublishableConnectString();
        this.service.start();
        this.service.waitUntilStarted();
    }

    @After
    public void tearDown() throws Exception {
        if (this.service != null) {
            this.service.stop();
            this.service.waitUntilStopped();
        }
    }

    @Test
    public void testCloseWhenHunk() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.brokerUrlString);
        connectionFactory.setProducerWindowSize(1024);
        final ActiveMQDestination destination = ActiveMQDestination.createDestination((String)"queue://TINY_QUEUE", (byte)-1);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(false, 1);
        final MessageProducer producer = session.createProducer((Destination)destination);
        producer.setTimeToLive(0L);
        producer.setDeliveryMode(1);
        final AtomicReference<Object> publishException = new AtomicReference<Object>(null);
        final AtomicReference<Object> closeException = new AtomicReference<Object>(null);
        final AtomicLong lastLoop = new AtomicLong(System.currentTimeMillis() + 100L);
        Thread pubThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    byte[] data = new byte[1000];
                    new Random(-559038737L).nextBytes(data);
                    for (int i = 0; i < 10000; ++i) {
                        lastLoop.set(System.currentTimeMillis());
                        ObjectMessage objMsg = session.createObjectMessage();
                        objMsg.setObject((Serializable)data);
                        producer.send((Destination)destination, (Message)objMsg);
                    }
                }
                catch (Exception e) {
                    publishException.set(e);
                }
            }
        }, "PublishingThread");
        pubThread.start();
        while (System.currentTimeMillis() - lastLoop.get() < 2000L) {
            Thread.sleep(100L);
        }
        LOG.info("Publisher deadlock detected.");
        Thread closeThread = new Thread(new Runnable(){

            @Override
            public void run() {
                try {
                    LOG.info("Attempting close..");
                    producer.close();
                }
                catch (Exception e) {
                    closeException.set(e);
                }
            }
        }, "ClosingThread");
        closeThread.start();
        try {
            closeThread.join(30000L);
        }
        catch (InterruptedException ie) {
            Assert.assertFalse((String)"Closing thread didn't complete in 10 seconds", (boolean)true);
        }
        try {
            pubThread.join(30000L);
        }
        catch (InterruptedException ie) {
            Assert.assertFalse((String)"Publishing thread didn't complete in 10 seconds", (boolean)true);
        }
        Assert.assertNull(closeException.get());
        Assert.assertNotNull(publishException.get());
    }
}

