/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorTestContext;
import org.apache.flink.streaming.api.operators.source.CollectingDataOutput;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class SourceOperatorAlignmentTest {
    @Nullable
    private SourceOperatorTestContext context;
    @Nullable
    private SourceOperator<Integer, MockSourceSplit> operator;

    @BeforeEach
    public void setup() throws Exception {
        this.context = new SourceOperatorTestContext(false, (WatermarkStrategy<Integer>)WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedGenerator()).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, t) -> r.intValue()).withWatermarkAlignment("group1", Duration.ofMillis(100L), Duration.ofMillis(1L)));
        this.operator = this.context.getOperator();
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.context.close();
        this.context = null;
        this.operator = null;
    }

    @Test
    public void testWatermarkAlignment() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit newSplit = new MockSourceSplit(2);
        int record1 = 1000;
        int record2 = 2000;
        int record3 = 3000;
        newSplit.addRecord(record1);
        newSplit.addRecord(record2);
        newSplit.addRecord(record3);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<Integer>();
        ArrayList<Integer> expectedOutput = new ArrayList<Integer>();
        MatcherAssert.assertThat((Object)this.operator.emitNext(actualOutput), (Matcher)CoreMatchers.is((Object)DataInputStatus.MORE_AVAILABLE));
        expectedOutput.add(record1);
        this.context.getTimeService().advance(1L);
        this.assertLatestReportedWatermarkEvent(record1);
        this.assertOutput(actualOutput, expectedOutput);
        Assertions.assertTrue((boolean)this.operator.isAvailable());
        this.operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)(record1 - 1)));
        Assertions.assertFalse((boolean)this.operator.isAvailable());
        MatcherAssert.assertThat((Object)this.operator.emitNext(actualOutput), (Matcher)CoreMatchers.is((Object)DataInputStatus.NOTHING_AVAILABLE));
        this.assertLatestReportedWatermarkEvent(record1);
        this.assertOutput(actualOutput, expectedOutput);
        Assertions.assertFalse((boolean)this.operator.isAvailable());
        this.operator.handleOperatorEvent((OperatorEvent)new WatermarkAlignmentEvent((long)(record1 + 1)));
        Assertions.assertTrue((boolean)this.operator.isAvailable());
        this.operator.emitNext(actualOutput);
        MatcherAssert.assertThat((Object)this.operator.emitNext(actualOutput), (Matcher)CoreMatchers.is((Object)DataInputStatus.NOTHING_AVAILABLE));
        expectedOutput.add(record2);
        this.context.getTimeService().advance(1L);
        this.assertLatestReportedWatermarkEvent(record2);
        this.assertOutput(actualOutput, expectedOutput);
        Assertions.assertFalse((boolean)this.operator.isAvailable());
    }

    @Test
    public void testWatermarkAlignmentWithIdleness() throws Exception {
        try (SourceOperatorTestContext context = new SourceOperatorTestContext(true, (WatermarkStrategy<Integer>)WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedGenerator(PunctuatedGenerator.GenerationMode.ODD)).withWatermarkAlignment("group1", Duration.ofMillis(100L), Duration.ofMillis(1L)).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, t) -> r.intValue()));){
            SourceOperator<Integer, MockSourceSplit> operator = context.getOperator();
            operator.initializeState(context.createStateContext());
            operator.open();
            MockSourceSplit newSplit = new MockSourceSplit(2);
            int record1 = 1;
            newSplit.addRecord(record1);
            operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
            CollectingDataOutput<Integer> actualOutput = new CollectingDataOutput<Integer>();
            ArrayList<Integer> expectedOutput = new ArrayList<Integer>();
            MatcherAssert.assertThat((Object)operator.emitNext(actualOutput), (Matcher)CoreMatchers.is((Object)DataInputStatus.MORE_AVAILABLE));
            expectedOutput.add(record1);
            context.getTimeService().advance(1L);
            this.assertLatestReportedWatermarkEvent(context, record1);
            this.assertOutput(actualOutput, expectedOutput);
            Assertions.assertTrue((boolean)operator.isAvailable());
            MatcherAssert.assertThat((Object)operator.emitNext(actualOutput), (Matcher)CoreMatchers.is((Object)DataInputStatus.NOTHING_AVAILABLE));
            context.getTimeService().advance(1L);
            this.assertLatestReportedWatermarkEvent(context, Long.MAX_VALUE);
            newSplit = new MockSourceSplit(3);
            int record2 = 2;
            newSplit.addRecord(record2);
            operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(newSplit), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
            MatcherAssert.assertThat((Object)operator.emitNext(actualOutput), (Matcher)CoreMatchers.is((Object)DataInputStatus.MORE_AVAILABLE));
            expectedOutput.add(record2);
            context.getTimeService().advance(1L);
            this.assertLatestReportedWatermarkEvent(context, record1);
            this.assertOutput(actualOutput, expectedOutput);
            Assertions.assertTrue((boolean)operator.isAvailable());
        }
    }

    @Test
    public void testStopWhileWaitingForWatermarkAlignment() throws Exception {
        this.testWatermarkAlignment();
        CompletableFuture availableFuture = this.operator.getAvailableFuture();
        Assertions.assertFalse((boolean)availableFuture.isDone());
        this.operator.stop(StopMode.NO_DRAIN);
        Assertions.assertTrue((boolean)availableFuture.isDone());
        Assertions.assertTrue((boolean)this.operator.isAvailable());
    }

    @Test
    public void testReportedWatermarkDoNotDecrease() throws Exception {
        this.operator.initializeState(this.context.createStateContext());
        this.operator.open();
        MockSourceSplit split1 = new MockSourceSplit(2);
        MockSourceSplit split2 = new MockSourceSplit(3);
        int record1 = 2000;
        int record2 = 1000;
        split1.addRecord(record1);
        split2.addRecord(record2);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(split1), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        CollectingDataOutput actualOutput = new CollectingDataOutput();
        this.operator.emitNext(actualOutput);
        this.context.getTimeService().advance(1L);
        this.assertLatestReportedWatermarkEvent(record1);
        this.operator.handleOperatorEvent((OperatorEvent)new AddSplitEvent(Collections.singletonList(split2), (SimpleVersionedSerializer)new MockSourceSplitSerializer()));
        this.operator.emitNext(actualOutput);
        this.context.getTimeService().advance(1L);
        this.assertLatestReportedWatermarkEvent(record1);
    }

    private void assertOutput(CollectingDataOutput<Integer> actualOutput, List<Integer> expectedOutput) {
        MatcherAssert.assertThat(actualOutput.getEvents().stream().filter(o -> o instanceof StreamRecord).mapToInt(object -> (Integer)((StreamRecord)object).getValue()).boxed().collect(Collectors.toList()), (Matcher)Matchers.contains((Object[])expectedOutput.toArray()));
    }

    private void assertLatestReportedWatermarkEvent(long expectedWatermark) {
        this.assertLatestReportedWatermarkEvent(this.context, expectedWatermark);
    }

    private void assertLatestReportedWatermarkEvent(SourceOperatorTestContext context, long expectedWatermark) {
        List events = context.getGateway().getEventsSent().stream().filter(event -> event instanceof ReportedWatermarkEvent).collect(Collectors.toList());
        Assertions.assertFalse((boolean)events.isEmpty());
        Assertions.assertEquals((Object)new ReportedWatermarkEvent(expectedWatermark), events.get(events.size() - 1));
    }

    private static class PunctuatedGenerator
    implements WatermarkGenerator<Integer> {
        private GenerationMode mode;

        public PunctuatedGenerator() {
            this(GenerationMode.ALL);
        }

        public PunctuatedGenerator(GenerationMode mode) {
            this.mode = mode;
        }

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            boolean shouldGenerate;
            switch (this.mode) {
                case ALL: {
                    shouldGenerate = true;
                    break;
                }
                case ODD: {
                    shouldGenerate = eventTimestamp % 2L == 1L;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Unknown mode: " + (Object)((Object)this.mode));
                }
            }
            if (shouldGenerate) {
                output.emitWatermark(new Watermark(eventTimestamp));
            }
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }

        private static enum GenerationMode {
            ALL,
            ODD;

        }
    }
}

