/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks.mailbox;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.FutureTaskWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class TaskMailboxProcessorTest {
    public static final int DEFAULT_PRIORITY = 0;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @Test
    public void testRejectIfNotOpen() {
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {});
        mailboxProcessor.prepareClose();
        try {
            mailboxProcessor.getMailboxExecutor(0).execute(() -> {}, "dummy");
            Assert.fail((String)"Should not be able to accept runnables if not opened.");
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    @Test
    public void testSubmittingRunnableWithException() throws Exception {
        this.expectedException.expectMessage("Expected");
        try (MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {});){
            Thread submitThread = new Thread(() -> mailboxProcessor.getMainMailboxExecutor().execute(this::throwFlinkException, "testSubmittingRunnableWithException"));
            submitThread.start();
            mailboxProcessor.runMailboxLoop();
            submitThread.join();
        }
    }

    private void throwFlinkException() throws FlinkException {
        throw new FlinkException("Expected");
    }

    @Test
    public void testShutdown() {
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {});
        FutureTaskWithException testRunnableFuture = new FutureTaskWithException(() -> {});
        mailboxProcessor.getMailboxExecutor(0).execute((ThrowingRunnable)testRunnableFuture, "testRunnableFuture");
        mailboxProcessor.prepareClose();
        try {
            mailboxProcessor.getMailboxExecutor(0).execute(() -> {}, "dummy");
            Assert.fail((String)"Should not be able to accept runnables if not opened.");
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)testRunnableFuture.isDone());
        mailboxProcessor.close();
        Assert.assertTrue((boolean)testRunnableFuture.isCancelled());
    }

    @Test
    public void testRunDefaultActionAndMails() throws Exception {
        final AtomicBoolean stop = new AtomicBoolean(false);
        final AtomicInteger counter = new AtomicInteger();
        MailboxThread mailboxThread = new MailboxThread(){

            @Override
            public void runDefaultAction(MailboxDefaultAction.Controller controller) throws Exception {
                counter.incrementAndGet();
                if (stop.get()) {
                    controller.allActionsCompleted();
                } else {
                    Thread.sleep(10L);
                }
            }
        };
        MailboxProcessor mailboxProcessor = TaskMailboxProcessorTest.start(mailboxThread);
        mailboxProcessor.getMailboxExecutor(0).execute(() -> stop.set(true), "stop");
        mailboxThread.join();
        MatcherAssert.assertThat((Object)counter.get(), (Matcher)Matchers.greaterThan((Comparable)Integer.valueOf(0)));
        MatcherAssert.assertThat((Object)mailboxProcessor.getMailboxMetricsControl().getMailCounter().getCount(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
    }

    @Test
    public void testRunDefaultAction() throws Exception {
        int expectedInvocations = 3;
        final AtomicInteger counter = new AtomicInteger(0);
        MailboxThread mailboxThread = new MailboxThread(){

            @Override
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                if (counter.incrementAndGet() == 3) {
                    controller.allActionsCompleted();
                }
            }
        };
        TaskMailboxProcessorTest.start(mailboxThread);
        mailboxThread.join();
        Assert.assertEquals((long)3L, (long)counter.get());
    }

    @Test
    public void testSignalUnAvailable() throws Exception {
        final AtomicInteger counter = new AtomicInteger(0);
        final AtomicReference suspendedActionRef = new AtomicReference();
        final OneShotLatch actionSuspendedLatch = new OneShotLatch();
        int blockAfterInvocations = 3;
        int totalInvocations = 6;
        MailboxThread mailboxThread = new MailboxThread(){

            @Override
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                if (counter.incrementAndGet() == 3) {
                    suspendedActionRef.set(controller.suspendDefaultAction());
                    actionSuspendedLatch.trigger();
                } else if (counter.get() == 6) {
                    controller.allActionsCompleted();
                }
            }
        };
        MailboxProcessor mailboxProcessor = TaskMailboxProcessorTest.start(mailboxThread);
        actionSuspendedLatch.await();
        Assert.assertEquals((long)3L, (long)counter.get());
        MailboxDefaultAction.Suspension suspension = (MailboxDefaultAction.Suspension)suspendedActionRef.get();
        mailboxProcessor.getMailboxExecutor(0).execute(() -> ((MailboxDefaultAction.Suspension)suspension).resume(), "resume");
        mailboxThread.join();
        Assert.assertEquals((long)6L, (long)counter.get());
    }

    @Test
    public void testSignalUnAvailablePingPong() throws Exception {
        final AtomicReference suspendedActionRef = new AtomicReference();
        int totalSwitches = 10000;
        MailboxThread mailboxThread = new MailboxThread(){
            int count = 0;

            @Override
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                Assert.assertTrue((boolean)suspendedActionRef.compareAndSet(null, controller.suspendDefaultAction()));
                ++this.count;
                if (this.count == 10000) {
                    controller.allActionsCompleted();
                } else if (this.count % 1000 == 0) {
                    try {
                        Thread.sleep(1L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        };
        mailboxThread.start();
        MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
        Thread asyncUnblocker = new Thread(() -> {
            int count = 0;
            while (!Thread.currentThread().isInterrupted()) {
                MailboxDefaultAction.Suspension resume = suspendedActionRef.getAndSet(null);
                if (resume != null) {
                    mailboxProcessor.getMailboxExecutor(0).execute(() -> ((MailboxDefaultAction.Suspension)resume).resume(), "resume");
                } else {
                    try {
                        mailboxProcessor.getMailboxExecutor(0).execute(() -> {}, "dummy");
                    }
                    catch (RejectedExecutionException rejectedExecutionException) {
                        // empty catch block
                    }
                }
                if (++count % 5000 != 0) continue;
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });
        asyncUnblocker.start();
        mailboxThread.signalStart();
        mailboxThread.join();
        asyncUnblocker.interrupt();
        asyncUnblocker.join();
        mailboxThread.checkException();
    }

    @Test
    public void testCancelAfterClose() {
        MailboxProcessor mailboxProcessor = new MailboxProcessor(ctx -> {});
        mailboxProcessor.close();
        mailboxProcessor.allActionsCompleted();
    }

    private static MailboxProcessor start(MailboxThread mailboxThread) {
        mailboxThread.start();
        MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
        mailboxThread.signalStart();
        return mailboxProcessor;
    }

    @Test
    public void testAvoidStarvation() throws Exception {
        int expectedInvocations = 3;
        final AtomicInteger counter = new AtomicInteger(0);
        MailboxThread mailboxThread = new MailboxThread(){

            @Override
            public void runDefaultAction(MailboxDefaultAction.Controller controller) {
                if (counter.incrementAndGet() == 3) {
                    controller.allActionsCompleted();
                }
            }
        };
        mailboxThread.start();
        MailboxProcessor mailboxProcessor = mailboxThread.getMailboxProcessor();
        final MailboxExecutor mailboxExecutor = mailboxProcessor.getMailboxExecutor(0);
        final AtomicInteger index = new AtomicInteger();
        mailboxExecutor.execute((ThrowingRunnable)new RunnableWithException(){

            public void run() {
                mailboxExecutor.execute((ThrowingRunnable)this, "Blocking mail" + index.incrementAndGet());
            }
        }, "Blocking mail" + index.get());
        mailboxThread.signalStart();
        mailboxThread.join();
        Assert.assertEquals((long)3L, (long)counter.get());
        Assert.assertEquals((long)3L, (long)index.get());
    }

    @Test
    public void testSuspendRunningMailboxLoop() throws Exception {
        OneShotLatch doSomeWork = new OneShotLatch();
        AtomicBoolean stop = new AtomicBoolean(false);
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
            doSomeWork.trigger();
            if (stop.get()) {
                controller.allActionsCompleted();
            }
        });
        Thread suspendThread = new Thread(() -> {
            try {
                doSomeWork.await();
                mailboxProcessor.suspend();
                mailboxProcessor.getMailboxExecutor(0).execute(() -> stop.set(true), "stop");
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        suspendThread.start();
        mailboxProcessor.runMailboxLoop();
        suspendThread.join();
        Assert.assertFalse((boolean)stop.get());
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse((boolean)mailboxProcessor.isMailboxLoopRunning());
        Assert.assertTrue((boolean)stop.get());
    }

    @Test
    public void testResumeMailboxLoopAfterAllActionsCompleted() throws Exception {
        AtomicBoolean start = new AtomicBoolean(false);
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> start.set(true));
        mailboxProcessor.allActionsCompleted();
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse((boolean)mailboxProcessor.isMailboxLoopRunning());
        Assert.assertFalse((boolean)start.get());
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse((boolean)start.get());
    }

    @Test
    public void testResumeMailboxLoop() throws Exception {
        AtomicBoolean start = new AtomicBoolean(false);
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {
            start.set(true);
            controller.allActionsCompleted();
        });
        mailboxProcessor.suspend();
        mailboxProcessor.runMailboxLoop();
        Assert.assertFalse((boolean)start.get());
        mailboxProcessor.runMailboxLoop();
        Assert.assertTrue((boolean)start.get());
    }

    static class MailboxThread
    extends Thread
    implements MailboxDefaultAction {
        MailboxProcessor mailboxProcessor;
        OneShotLatch mailboxCreatedLatch = new OneShotLatch();
        OneShotLatch canRun = new OneShotLatch();
        private Throwable caughtException;

        MailboxThread() {
        }

        @Override
        public final void run() {
            this.mailboxProcessor = new MailboxProcessor((MailboxDefaultAction)this);
            this.mailboxCreatedLatch.trigger();
            try {
                this.canRun.await();
                this.mailboxProcessor.runMailboxLoop();
            }
            catch (Throwable t) {
                this.caughtException = t;
            }
        }

        public void runDefaultAction(MailboxDefaultAction.Controller controller) throws Exception {
            controller.allActionsCompleted();
        }

        final MailboxProcessor getMailboxProcessor() {
            try {
                this.mailboxCreatedLatch.await();
                return this.mailboxProcessor;
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        final void signalStart() {
            if (this.mailboxCreatedLatch.isTriggered()) {
                this.canRun.trigger();
            }
        }

        void checkException() throws Exception {
            if (this.caughtException != null) {
                throw new Exception(this.caughtException);
            }
        }
    }
}

