/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.time.Duration;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=20L, unit=TimeUnit.SECONDS)
class StreamTaskTimerITCase {
    private StreamTaskTestHarness<?> testHarness;
    private ProcessingTimeService timeService;

    StreamTaskTimerITCase() {
    }

    @BeforeEach
    void setup() throws Exception {
        this.testHarness = this.startTestHarness();
        StreamTask<?, ?> task = this.testHarness.getTask();
        this.timeService = task.getProcessingTimeServiceFactory().createProcessingTimeService(task.getMailboxExecutorFactory().createExecutor(this.testHarness.getStreamConfig().getChainIndex()));
    }

    @AfterEach
    void teardown() throws Exception {
        this.stopTestHarness(this.testHarness, 4000L);
    }

    @Test
    void testOpenCloseAndTimestamps() throws InterruptedException {
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() != 1) {
            Thread.sleep(1L);
        }
    }

    @Test
    void testErrorReporting() throws Exception {
        AtomicReference errorRef = new AtomicReference();
        OneShotLatch latch = new OneShotLatch();
        this.testHarness.getEnvironment().setExternalExceptionHandler(ex -> {
            errorRef.set(ex);
            latch.trigger();
        });
        ProcessingTimeService.ProcessingTimeCallback callback = timestamp -> {
            throw new Exception("Exception in Timer");
        };
        this.timeService.registerTimer(System.currentTimeMillis(), callback);
        latch.await();
        Assertions.assertThat((Throwable)((Throwable)errorRef.get())).isInstanceOf(Exception.class);
    }

    @Test
    void checkScheduledTimestamps() throws Exception {
        ValidatingProcessingTimeCallback.numInSequence = 0;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList<ValidatingProcessingTimeCallback> timeCallbacks = new ArrayList<ValidatingProcessingTimeCallback>();
        timeCallbacks.add(new ValidatingProcessingTimeCallback(currentTimeMillis - 1L, 0));
        timeCallbacks.add(new ValidatingProcessingTimeCallback(currentTimeMillis - 200L, 1));
        timeCallbacks.add(new ValidatingProcessingTimeCallback(currentTimeMillis + 100L, 2));
        timeCallbacks.add(new ValidatingProcessingTimeCallback(currentTimeMillis + 200L, 3));
        for (ValidatingProcessingTimeCallback timeCallback : timeCallbacks) {
            this.timeService.registerTimer(timeCallback.expectedTimestamp, (ProcessingTimeService.ProcessingTimeCallback)timeCallback);
        }
        for (ValidatingProcessingTimeCallback timeCallback : timeCallbacks) {
            timeCallback.assertExpectedValues();
        }
        Assertions.assertThat((int)ValidatingProcessingTimeCallback.numInSequence).isEqualTo(4);
    }

    private StreamTaskTestHarness<?> startTestHarness() throws Exception {
        OneInputStreamTaskTestHarness testHarness = new OneInputStreamTaskTestHarness(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setupOutputForSingletonOperatorChain();
        Configuration taskConfig = testHarness.getTaskManagerRuntimeInfo().getConfiguration();
        taskConfig.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, (Object)true);
        taskConfig.set(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD, (Object)Duration.ofMinutes(10L));
        StreamConfig streamConfig = testHarness.getStreamConfig();
        streamConfig.setChainIndex(0);
        streamConfig.setStreamOperator((StreamOperator)new StreamMap(new DummyMapFunction()));
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        return testHarness;
    }

    private void stopTestHarness(StreamTaskTestHarness<?> testHarness, long timeout) throws Exception {
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        long deadline = System.currentTimeMillis() + timeout;
        while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && System.currentTimeMillis() < deadline) {
            Thread.sleep(10L);
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)StreamTask.TRIGGER_THREAD_GROUP.activeCount()).as("Trigger timer thread did not properly shut down", new Object[0])).isZero();
    }

    public static class DummyMapFunction<T>
    implements MapFunction<T, T> {
        public T map(T value) {
            return value;
        }
    }

    private static class ValidatingProcessingTimeCallback
    implements ProcessingTimeService.ProcessingTimeCallback {
        static int numInSequence;
        private final CompletableFuture<Void> finished = new CompletableFuture();
        private final long expectedTimestamp;
        private final int expectedInSequence;

        private ValidatingProcessingTimeCallback(long expectedTimestamp, int expectedInSequence) {
            this.expectedTimestamp = expectedTimestamp;
            this.expectedInSequence = expectedInSequence;
        }

        public void onProcessingTime(long timestamp) {
            try {
                Assertions.assertThat((long)timestamp).isEqualTo(this.expectedTimestamp);
                Assertions.assertThat((int)numInSequence).isEqualTo(this.expectedInSequence);
                ++numInSequence;
                this.finished.complete(null);
            }
            catch (Throwable t) {
                this.finished.completeExceptionally(t);
            }
        }

        private void assertExpectedValues() throws ExecutionException, InterruptedException, TimeoutException {
            this.finished.get(20L, TimeUnit.SECONDS);
        }
    }
}

