/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSourceContexts;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.CollectorOutput;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StreamSourceContextIdleDetectionTests {
    private TestMethod testMethod;

    public StreamSourceContextIdleDetectionTests(TestMethod testMethod) {
        this.testMethod = testMethod;
    }

    @Test
    public void testManualWatermarkContext() throws Exception {
        long idleTimeout = 100L;
        long initialTime = 0L;
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(initialTime);
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        ArrayList<StreamElement> expectedOutput = new ArrayList<StreamElement>();
        SourceFunction.SourceContext context = StreamSourceContexts.getSourceContext((TimeCharacteristic)TimeCharacteristic.EventTime, (ProcessingTimeService)processingTimeService, (Object)new Object(), new CollectorOutput(output), (long)0L, (long)idleTimeout, (boolean)true);
        processingTimeService.setCurrentTime(initialTime + idleTimeout);
        expectedOutput.add((StreamElement)WatermarkStatus.IDLE);
        Assert.assertThat(output, (Matcher)Matchers.equalTo(expectedOutput));
        processingTimeService.setCurrentTime(initialTime + 2L * idleTimeout);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout);
        Assert.assertThat(output, (Matcher)Matchers.equalTo(expectedOutput));
        expectedOutput.add((StreamElement)WatermarkStatus.ACTIVE);
        this.emitStreamElement(initialTime + 3L * idleTimeout + idleTimeout / 10L, expectedOutput, processingTimeService, (SourceFunction.SourceContext<String>)context);
        Assert.assertThat(output, (Matcher)Matchers.equalTo(expectedOutput));
        this.emitStreamElement(initialTime + 3L * idleTimeout + 2L * idleTimeout / 10L, expectedOutput, processingTimeService, (SourceFunction.SourceContext<String>)context);
        Assert.assertThat(output, (Matcher)Matchers.equalTo(expectedOutput));
        processingTimeService.setCurrentTime(initialTime + 4L * idleTimeout + idleTimeout / 10L);
        Assert.assertThat(output, (Matcher)Matchers.equalTo(expectedOutput));
        processingTimeService.setCurrentTime(initialTime + 5L * idleTimeout + idleTimeout / 10L);
        expectedOutput.add((StreamElement)WatermarkStatus.IDLE);
        Assert.assertThat(output, (Matcher)Matchers.equalTo(expectedOutput));
    }

    private void emitStreamElement(long currentTime, List<StreamElement> expectedOutput, TestProcessingTimeService processingTimeService, SourceFunction.SourceContext<String> context) throws Exception {
        processingTimeService.setCurrentTime(currentTime);
        switch (this.testMethod) {
            case COLLECT: {
                expectedOutput.add((StreamElement)new StreamRecord((Object)"msg"));
                context.collect((Object)"msg");
                break;
            }
            case COLLECT_WITH_TIMESTAMP: {
                long recordTime = processingTimeService.getCurrentProcessingTime();
                expectedOutput.add((StreamElement)new StreamRecord((Object)"msg", recordTime));
                context.collectWithTimestamp((Object)"msg", recordTime);
                break;
            }
            case EMIT_WATERMARK: {
                long watermarkTime = processingTimeService.getCurrentProcessingTime();
                expectedOutput.add((StreamElement)new Watermark(watermarkTime));
                context.emitWatermark(new Watermark(watermarkTime));
            }
        }
    }

    @Test
    public void testAutomaticWatermarkContext() throws Exception {
        long watermarkInterval = 40L;
        long idleTimeout = 100L;
        long initialTime = 20L;
        TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
        processingTimeService.setCurrentTime(initialTime);
        ArrayList<StreamElement> output = new ArrayList<StreamElement>();
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        SourceFunction.SourceContext context = StreamSourceContexts.getSourceContext((TimeCharacteristic)TimeCharacteristic.IngestionTime, (ProcessingTimeService)processingTimeService, (Object)new Object(), new CollectorOutput(output), (long)watermarkInterval, (long)idleTimeout, (boolean)true);
        processingTimeService.setCurrentTime(initialTime + watermarkInterval);
        expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
        processingTimeService.setCurrentTime(initialTime + 2L * watermarkInterval);
        expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
        processingTimeService.setCurrentTime(initialTime + idleTimeout);
        expectedOutput.add(WatermarkStatus.IDLE);
        Assert.assertEquals(expectedOutput, output);
        processingTimeService.setCurrentTime(initialTime + 3L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 4L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 2L * idleTimeout);
        processingTimeService.setCurrentTime(initialTime + 6L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 7L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout);
        Assert.assertEquals(expectedOutput, output);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout + idleTimeout / 10L);
        switch (this.testMethod) {
            case COLLECT: {
                expectedOutput.add(WatermarkStatus.ACTIVE);
                context.collect((Object)"msg");
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case COLLECT_WITH_TIMESTAMP: {
                expectedOutput.add(WatermarkStatus.ACTIVE);
                context.collectWithTimestamp((Object)"msg", processingTimeService.getCurrentProcessingTime());
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 8L * watermarkInterval);
        processingTimeService.setCurrentTime(initialTime + 3L * idleTimeout + 3L * idleTimeout / 10L);
        switch (this.testMethod) {
            case COLLECT: {
                context.collect((Object)"msg");
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case COLLECT_WITH_TIMESTAMP: {
                context.collectWithTimestamp((Object)"msg", processingTimeService.getCurrentProcessingTime());
                expectedOutput.add(new StreamRecord((Object)"msg", processingTimeService.getCurrentProcessingTime()));
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                context.emitWatermark(new Watermark(processingTimeService.getCurrentProcessingTime()));
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 9L * watermarkInterval);
        switch (this.testMethod) {
            case COLLECT: 
            case COLLECT_WITH_TIMESTAMP: {
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 10L * watermarkInterval);
        switch (this.testMethod) {
            case COLLECT: 
            case COLLECT_WITH_TIMESTAMP: {
                expectedOutput.add(new Watermark(processingTimeService.getCurrentProcessingTime() - processingTimeService.getCurrentProcessingTime() % watermarkInterval));
                Assert.assertEquals(expectedOutput, output);
                break;
            }
            case EMIT_WATERMARK: {
                Assert.assertEquals(expectedOutput, output);
            }
        }
        processingTimeService.setCurrentTime(initialTime + 4L * idleTimeout + idleTimeout / 10L);
        Assert.assertEquals(expectedOutput, output);
        processingTimeService.setCurrentTime(initialTime + 11L * watermarkInterval);
        if (this.testMethod != TestMethod.EMIT_WATERMARK) {
            expectedOutput.add(WatermarkStatus.IDLE);
        }
        Assert.assertEquals(expectedOutput, output);
    }

    @Parameterized.Parameters(name="TestMethod = {0}")
    public static Collection<TestMethod[]> timeCharacteristic() {
        return Arrays.asList(new TestMethod[][]{{TestMethod.COLLECT}, {TestMethod.COLLECT_WITH_TIMESTAMP}, {TestMethod.EMIT_WATERMARK}});
    }

    private static enum TestMethod {
        COLLECT,
        COLLECT_WITH_TIMESTAMP,
        EMIT_WATERMARK;

    }
}

