/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.DynamicProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueProcessWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.AtomicIntegerAssert;
import org.junit.jupiter.api.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

class WindowOperatorTest {
    private static final TypeInformation<Tuple2<String, Integer>> STRING_INT_TUPLE = TypeInformation.of((TypeHint)new TypeHint<Tuple2<String, Integer>>(){});
    private static AtomicInteger closeCalled = new AtomicInteger(0);
    private static final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-output"){};

    WindowOperatorTest() {
    }

    private void testSlidingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator) throws Exception {
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 1999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.clear();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 3999L));
        expectedOutput.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 4999L));
        expectedOutput.add(new Watermark(4999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSlidingEventTimeWindowsReduce() throws Exception {
        closeCalled.set(0);
        int windowSize = 3;
        boolean windowSlide = true;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        this.testSlidingEventTimeWindows((OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>)operator);
    }

    @Test
    void testSlidingEventTimeWindowsApply() throws Exception {
        closeCalled.set(0);
        int windowSize = 3;
        boolean windowSlide = true;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)EventTimeTrigger.create(), 0L, null);
        this.testSlidingEventTimeWindows((OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>)operator);
        ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)closeCalled).as("Close was not called.", new Object[0])).hasValue(2);
    }

    private void testTumblingEventTimeWindows(OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator) throws Exception {
        OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        expectedOutput.clear();
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new Watermark(2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 5999L));
        expectedOutput.add(new Watermark(5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testTumblingEventTimeWindowsReduce() throws Exception {
        closeCalled.set(0);
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        this.testTumblingEventTimeWindows((OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>)operator);
    }

    @Test
    void testTumblingEventTimeWindowsApply() throws Exception {
        closeCalled.set(0);
        int windowSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction(new RichSumReducer()), (Trigger)EventTimeTrigger.create(), 0L, null);
        this.testTumblingEventTimeWindows((OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>>)operator);
        ((AtomicIntegerAssert)Assertions.assertThat((AtomicInteger)closeCalled).as("Close was not called.", new Object[0])).hasValue(2);
    }

    @Test
    void testSessionWindows() throws Exception {
        closeCalled.set(0);
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), 6050L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-6", (Object)10L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-6", (Object)0L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-20", (Object)5501L, (Object)9050L), 9049L));
        expectedOutput.add(new Watermark(12000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 15000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)20), 15000L));
        testHarness.processWatermark(new Watermark(17999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-30", (Object)15000L, (Object)18000L), 17999L));
        expectedOutput.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSessionWindowsWithProcessFunction() throws Exception {
        closeCalled.set(0);
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableProcessWindowFunction((ProcessWindowFunction)new SessionProcessWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), 6050L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-6", (Object)10L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-6", (Object)0L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-20", (Object)5501L, (Object)9050L), 9049L));
        expectedOutput.add(new Watermark(12000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 15000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)20), 15000L));
        testHarness.processWatermark(new Watermark(17999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-30", (Object)15000L, (Object)18000L), 17999L));
        expectedOutput.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testReduceSessionWindows() throws Exception {
        closeCalled.set(0);
        int sessionSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), 6050L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-6", (Object)10L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-6", (Object)0L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-20", (Object)5501L, (Object)9050L), 9049L));
        expectedOutput.add(new Watermark(12000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 15000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)20), 15000L));
        testHarness.processWatermark(new Watermark(17999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-30", (Object)15000L, (Object)18000L), 17999L));
        expectedOutput.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testReduceSessionWindowsWithProcessFunction() throws Exception {
        closeCalled.set(0);
        int sessionSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueProcessWindowFunction((ProcessWindowFunction)new ReducedProcessSessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)6), 6050L));
        testHarness.processWatermark(new Watermark(12000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-6", (Object)10L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-6", (Object)0L, (Object)5500L), 5499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-20", (Object)5501L, (Object)9050L), 9049L));
        expectedOutput.add(new Watermark(12000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 15000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)20), 15000L));
        testHarness.processWatermark(new Watermark(17999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-30", (Object)15000L, (Object)18000L), 17999L));
        expectedOutput.add(new Watermark(17999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSessionWindowsWithCountTrigger() throws Exception {
        closeCalled.set(0);
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 3500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-10", (Object)0L, (Object)6500L), 6499L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        expectedOutput.clear();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 6500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 7000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)10), 4500L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-22", (Object)10L, (Object)10000L), 9999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSessionWindowsWithContinuousEventTimeTrigger() throws Exception {
        closeCalled.set(0);
        int sessionSize = 3;
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)ContinuousEventTimeTrigger.of((Time)Time.seconds((long)2L)), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 1500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1000L));
        testHarness.processWatermark(new Watermark(2500L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-1", (Object)1500L, (Object)4500L), 4499L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-6", (Object)0L, (Object)5500L), 5499L));
        expectedOutput.add(new Watermark(2500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 4000L));
        testHarness.processWatermark(new Watermark(3000L));
        expectedOutput.add(new Watermark(3000L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
        expectedOutput.clear();
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 4000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 3500L));
        testHarness.processWatermark(new Watermark(4000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-3", (Object)1500L, (Object)7000L), 6999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-15", (Object)0L, (Object)7000L), 6999L));
        expectedOutput.add(new Watermark(4000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testPointSessions() throws Exception {
        OperatorSubtaskState snapshot;
        closeCalled.set(0);
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)new PointSessionWindows(3000L), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        try (OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);){
            testHarness.open();
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 0L));
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)33), 1000L));
            snapshot = testHarness.snapshot(0L, 0L);
        }
        testHarness = WindowOperatorTest.createTestHarness(operator);
        var6_5 = null;
        try {
            testHarness.setup();
            testHarness.initializeState(snapshot);
            testHarness.open();
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)33), 2500L));
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10L));
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 1000L));
            testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)33), 2500L));
            testHarness.processWatermark(new Watermark(12000L));
            expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-36", (Object)10L, (Object)4000L), 3999L));
            expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-67", (Object)0L, (Object)3000L), 2999L));
            expectedOutput.add(new Watermark(12000L));
            TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        }
        catch (Throwable throwable) {
            var6_5 = throwable;
            throw throwable;
        }
        finally {
            if (testHarness != null) {
                if (var6_5 != null) {
                    try {
                        testHarness.close();
                    }
                    catch (Throwable throwable) {
                        var6_5.addSuppressed(throwable);
                    }
                } else {
                    testHarness.close();
                }
            }
        }
    }

    private static <OUT> OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, OUT> createTestHarness(OneInputStreamOperator<Tuple2<String, Integer>, OUT> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness<String, Tuple2<String, Integer>, OUT>(operator, (KeySelector<Tuple2<String, Integer>, String>)new TupleKeySelector(), (TypeInformation<String>)BasicTypeInfo.STRING_TYPE_INFO);
    }

    @Test
    void testContinuousWatermarkTrigger() throws Exception {
        closeCalled.set(0);
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ContinuousEventTimeTrigger.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1000L));
        expectedOutput.add(new Watermark(1000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(2000L));
        expectedOutput.add(new Watermark(2000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(3000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), Long.MAX_VALUE));
        expectedOutput.add(new Watermark(3000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(4000L));
        expectedOutput.add(new Watermark(4000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(5000L));
        expectedOutput.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(6000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), Long.MAX_VALUE));
        expectedOutput.add(new Watermark(6000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(new Watermark(7000L));
        testHarness.processWatermark(new Watermark(8000L));
        expectedOutput.add(new Watermark(7000L));
        expectedOutput.add(new Watermark(8000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testCountTrigger() throws Exception {
        closeCalled.set(0);
        int windowSize = 4;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ConcurrentLinkedQueue<Object> outputBeforeClose = testHarness.getOutput();
        stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        operator = new WindowOperator((WindowAssigner)GlobalWindows.create(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)CountTrigger.of((long)4L)), 0L, null);
        testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 10999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)4), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), Long.MAX_VALUE));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, Iterables.concat(outputBeforeClose, testHarness.getOutput()), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testEndOfStreamTrigger() throws Exception {
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer(new ExecutionConfig()));
        WindowOperator operator = new WindowOperator((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger(), (TypeSerializer)new GlobalWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), GlobalWindows.createWithEndOfStreamTrigger().getDefaultTrigger(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 20L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 0L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", Collections.EMPTY_LIST, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processWatermark(Watermark.MAX_WATERMARK);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), Long.MAX_VALUE));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), Long.MAX_VALUE));
        expectedOutput.add(Watermark.MAX_WATERMARK);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testProcessingTimeTumblingWindows() throws Throwable {
        int windowSize = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), Long.MAX_VALUE));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 7000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 7000L));
        testHarness.setProcessingTime(5000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 7000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 7000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 7000L));
        testHarness.setProcessingTime(7000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testProcessingTimeSlidingWindows() throws Throwable {
        int windowSize = 3;
        boolean windowSlide = true;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)SlidingProcessingTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), Long.MAX_VALUE));
        testHarness.setProcessingTime(1000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), Long.MAX_VALUE));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), Long.MAX_VALUE));
        testHarness.setProcessingTime(2000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 1999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), Long.MAX_VALUE));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), Long.MAX_VALUE));
        testHarness.setProcessingTime(3000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)3), 2999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 2999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), Long.MAX_VALUE));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), Long.MAX_VALUE));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), Long.MAX_VALUE));
        testHarness.setProcessingTime(7000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 3999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)5), 3999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)5), 4999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 5999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testProcessingTimeSessionWindows() throws Throwable {
        int windowGap = 3;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)ProcessingTimeSessionWindows.withGap((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(3L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1L));
        testHarness.setProcessingTime(1000L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1002L));
        testHarness.setProcessingTime(5000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 5000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 5000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 5000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 5000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 5000L));
        testHarness.setProcessingTime(10000L);
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 7999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 7999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple2ResultSortComparator());
        Assertions.assertThat(testHarness.getOutput()).hasSameSizeAs(expectedOutput);
        for (Object elem : testHarness.getOutput()) {
            if (!(elem instanceof StreamRecord)) continue;
            StreamRecord el = (StreamRecord)elem;
            Assertions.assertThat(expectedOutput).contains(new Object[]{el});
        }
        testHarness.close();
    }

    @Test
    void testDynamicEventTimeSessionWindows() throws Exception {
        closeCalled.set(0);
        SessionWindowTimeGapExtractor extractor = (SessionWindowTimeGapExtractor)Mockito.mock(SessionWindowTimeGapExtractor.class);
        Mockito.when((Object)extractor.extract(Matchers.any(Tuple2.class))).thenAnswer(invocation -> {
            Tuple2 element = (Tuple2)invocation.getArguments()[0];
            switch ((String)element.f0) {
                case "key1": {
                    return 3000L;
                }
                case "key2": {
                    switch ((Integer)element.f1) {
                        case 10: {
                            return 1000L;
                        }
                    }
                    return 2000L;
                }
            }
            return 0L;
        });
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)DynamicEventTimeSessionWindows.withDynamicGap((SessionWindowTimeGapExtractor)extractor), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 10L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.processWatermark(new Watermark(8999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-3", (Object)10L, (Object)3010L), 3009L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-9", (Object)5000L, (Object)8000L), 7999L));
        expectedOutput.add(new Watermark(8999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 9000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 10000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 10500L));
        testHarness.processWatermark(new Watermark(12999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-13", (Object)9000L, (Object)12000L), 11999L));
        expectedOutput.add(new Watermark(12999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 13000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 13500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14000L));
        testHarness.processWatermark(new Watermark(16999L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-21", (Object)13000L, (Object)16000L), 15999L));
        expectedOutput.add(new Watermark(16999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testDynamicProcessingTimeSessionWindows() throws Exception {
        closeCalled.set(0);
        SessionWindowTimeGapExtractor extractor = (SessionWindowTimeGapExtractor)Mockito.mock(SessionWindowTimeGapExtractor.class);
        Mockito.when((Object)extractor.extract(Matchers.any(Tuple2.class))).thenAnswer(invocation -> {
            Tuple2 element = (Tuple2)invocation.getArguments()[0];
            switch ((String)element.f0) {
                case "key1": {
                    return 3000L;
                }
                case "key2": {
                    switch ((Integer)element.f1) {
                        case 10: {
                            return 1000L;
                        }
                    }
                    return 2000L;
                }
            }
            return 0L;
        });
        ListStateDescriptor stateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)DynamicProcessingTimeSessionWindows.withDynamicGap((SessionWindowTimeGapExtractor)extractor), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new SessionWindowFunction()), (Trigger)ProcessingTimeTrigger.create(), 0L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.open();
        testHarness.setProcessingTime(10L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)3), 10L));
        testHarness.setProcessingTime(5000L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 5000L));
        testHarness.setProcessingTime(6000L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 6000L));
        testHarness.setProcessingTime(8999L);
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key1-3", (Object)10L, (Object)3010L), 3009L));
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-9", (Object)5000L, (Object)8000L), 7999L));
        testHarness.setProcessingTime(9000L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 9000L));
        testHarness.setProcessingTime(10000L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 10000L));
        testHarness.setProcessingTime(10500L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 10500L));
        testHarness.setProcessingTime(10500L);
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-13", (Object)9000L, (Object)12000L), 11999L));
        testHarness.setProcessingTime(13000L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 13000L));
        testHarness.setProcessingTime(13500L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)10), 13500L));
        testHarness.setProcessingTime(14000L);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14000L));
        testHarness.setProcessingTime(16999L);
        expectedOutput.add(new StreamRecord((Object)new Tuple3((Object)"key2-21", (Object)13000L, (Object)16000L), 15999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testLateness() throws Exception {
        int windowSize = 2;
        long lateness = 500L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)2L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)EventTimeTrigger.create()), 500L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        ConcurrentLinkedQueue<Object> lateExpected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 500L));
        testHarness.processWatermark(new Watermark(1500L));
        expected.add(new Watermark(1500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1300L));
        testHarness.processWatermark(new Watermark(2300L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1999L));
        expected.add(new Watermark(2300L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1997L));
        testHarness.processWatermark(new Watermark(6000L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        expected.add(new Watermark(6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processWatermark(new Watermark(7000L));
        lateExpected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        expected.add(new Watermark(7000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", lateExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testCleanupTimeOverflow() throws Exception {
        int windowSize = 1000;
        long lateness = 2000L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        TumblingEventTimeWindows windowAssigner = TumblingEventTimeWindows.of((Time)Time.milliseconds((long)1000L));
        final WindowOperator operator = new WindowOperator((WindowAssigner)windowAssigner, (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 2000L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        long timestamp = 9223372036854774057L;
        Collection windows = windowAssigner.assignWindows((Object)new Tuple2((Object)"key2", (Object)1), timestamp, new WindowAssigner.WindowAssignerContext(){

            public long getCurrentProcessingTime() {
                return operator.windowAssignerContext.getCurrentProcessingTime();
            }
        });
        TimeWindow window = (TimeWindow)Iterables.getOnlyElement((Iterable)windows);
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), timestamp));
        Assertions.assertThat((long)(window.maxTimestamp() + 2000L)).isLessThan(window.maxTimestamp());
        Assertions.assertThat((long)(window.maxTimestamp() + 2000L)).isLessThan(9223372036854774307L);
        testHarness.processWatermark(new Watermark(9223372036854774307L));
        Assertions.assertThat((long)window.maxTimestamp()).isStrictlyBetween(Long.valueOf(9223372036854774307L), Long.valueOf(Long.MAX_VALUE));
        testHarness.processWatermark(new Watermark(window.maxTimestamp()));
        expected.add(new Watermark(9223372036854774307L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), window.maxTimestamp()));
        expected.add(new Watermark(window.maxTimestamp()));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSideOutputDueToLatenessTumbling() throws Exception {
        int windowSize = 2;
        long lateness = 0L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)2L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1985L));
        expected.add(new Watermark(1985L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1980L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        sideExpected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2001L));
        testHarness.processWatermark(new Watermark(2999L));
        expected.add(new Watermark(2999L));
        testHarness.processWatermark(new Watermark(3999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        expected.add(new Watermark(3999L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSideOutputDueToLatenessSliding() throws Exception {
        int windowSize = 3;
        boolean windowSlide = true;
        long lateness = 0L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.of((long)3L, (TimeUnit)TimeUnit.SECONDS), (Time)Time.of((long)1L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2000L));
        testHarness.processWatermark(new Watermark(3000L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)2), 2999L));
        expected.add(new Watermark(3000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 3001L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 3001L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3900L));
        testHarness.processWatermark(new Watermark(6000L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)5), 3999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 3999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)4), 4999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 4999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 5999L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)2), 5999L));
        expected.add(new Watermark(6000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 3001L));
        sideExpected.add(new StreamRecord((Object)new Tuple2((Object)"key1", (Object)1), 3001L));
        testHarness.processWatermark(new Watermark(25000L));
        expected.add(new Watermark(25000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSideOutputDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
        int gapSize = 3;
        long lateness = 0L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)EventTimeTrigger.create()), 0L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new Watermark(4998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 4500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 8500L));
        testHarness.processWatermark(new Watermark(7400L));
        expected.add(new Watermark(7400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processWatermark(new Watermark(11501L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-5", (Object)1000L, (Object)11500L), 11499L));
        expected.add(new Watermark(11501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 11600L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)11600L, (Object)14600L), 14599L));
        expected.add(new Watermark(14600L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        sideExpected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10100L));
        sideExpected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10100L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14500L));
        testHarness.processWatermark(new Watermark(20000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)14500L, (Object)17500L), 17499L));
        expected.add(new Watermark(20000L));
        testHarness.processWatermark(new Watermark(100000L));
        expected.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testSideOutputDueToLatenessSessionZeroLateness() throws Exception {
        int gapSize = 3;
        long lateness = 0L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 0L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new Watermark(4998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 4500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 8500L));
        testHarness.processWatermark(new Watermark(7400L));
        expected.add(new Watermark(7400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processWatermark(new Watermark(11501L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-5", (Object)1000L, (Object)11500L), 11499L));
        expected.add(new Watermark(11501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 11600L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)11600L, (Object)14600L), 14599L));
        expected.add(new Watermark(14600L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        sideExpected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14500L));
        testHarness.processWatermark(new Watermark(20000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)14500L, (Object)17500L), 17499L));
        expected.add(new Watermark(20000L));
        testHarness.processWatermark(new Watermark(100000L));
        expected.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
        TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
        int gapSize = 3;
        long lateness = 10L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)EventTimeTrigger.create()), 10L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new Watermark(4998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 4500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 8500L));
        testHarness.processWatermark(new Watermark(7400L));
        expected.add(new Watermark(7400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processWatermark(new Watermark(11501L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-5", (Object)1000L, (Object)11500L), 11499L));
        expected.add(new Watermark(11501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 11600L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)11600L, (Object)14600L), 14599L));
        expected.add(new Watermark(14600L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)10000L, (Object)14600L), 14599L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14500L));
        testHarness.processWatermark(new Watermark(20000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)10000L, (Object)17500L), 17499L));
        expected.add(new Watermark(20000L));
        testHarness.processWatermark(new Watermark(100000L));
        expected.add(new Watermark(100000L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testNotSideOutputDueToLatenessSessionWithLateness() throws Exception {
        int gapSize = 3;
        long lateness = 10L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 10L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new Watermark(4998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 4500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 8500L));
        testHarness.processWatermark(new Watermark(7400L));
        expected.add(new Watermark(7400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processWatermark(new Watermark(11501L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-5", (Object)1000L, (Object)11500L), 11499L));
        expected.add(new Watermark(11501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 11600L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)11600L, (Object)14600L), 14599L));
        expected.add(new Watermark(14600L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14500L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-2", (Object)10000L, (Object)14600L), 14599L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
        Assertions.assertThat(sideActual).isNull();
        testHarness.processWatermark(new Watermark(20000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-3", (Object)10000L, (Object)17500L), 17499L));
        expected.add(new Watermark(20000L));
        testHarness.processWatermark(new Watermark(100000L));
        expected.add(new Watermark(100000L));
        actual = testHarness.getOutput();
        sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
        Assertions.assertThat(sideActual).isNull();
        testHarness.close();
    }

    @Test
    void testNotSideOutputDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
        int gapSize = 3;
        long lateness = 10000L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)PurgingTrigger.of((Trigger)EventTimeTrigger.create()), 10000L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new Watermark(4998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 4500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 8500L));
        testHarness.processWatermark(new Watermark(7400L));
        expected.add(new Watermark(7400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processWatermark(new Watermark(11501L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-5", (Object)1000L, (Object)11500L), 11499L));
        expected.add(new Watermark(11501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 11600L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)11600L, (Object)14600L), 14599L));
        expected.add(new Watermark(14600L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)1000L, (Object)14600L), 14599L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
        Assertions.assertThat(sideActual).isNull();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14500L));
        testHarness.processWatermark(new Watermark(20000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)1000L, (Object)17500L), 17499L));
        expected.add(new Watermark(20000L));
        testHarness.processWatermark(new Watermark(100000L));
        expected.add(new Watermark(100000L));
        actual = testHarness.getOutput();
        sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
        Assertions.assertThat(sideActual).isNull();
        testHarness.close();
    }

    @Test
    void testNotSideOutputDueToLatenessSessionWithHugeLateness() throws Exception {
        int gapSize = 3;
        long lateness = 10000L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 10000L, lateOutputTag);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1999L));
        expected.add(new Watermark(1999L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 2000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new Watermark(4998L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 4500L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 8500L));
        testHarness.processWatermark(new Watermark(7400L));
        expected.add(new Watermark(7400L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 7000L));
        testHarness.processWatermark(new Watermark(11501L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-5", (Object)1000L, (Object)11500L), 11499L));
        expected.add(new Watermark(11501L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 11600L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)11600L, (Object)14600L), 14599L));
        expected.add(new Watermark(14600L));
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 10000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-7", (Object)1000L, (Object)14600L), 14599L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        ConcurrentLinkedQueue<StreamRecord<Tuple2<String, Integer>>> sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
        Assertions.assertThat(sideActual).isNull();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 14500L));
        testHarness.processWatermark(new Watermark(20000L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-8", (Object)1000L, (Object)17500L), 17499L));
        expected.add(new Watermark(20000L));
        testHarness.processWatermark(new Watermark(100000L));
        expected.add(new Watermark(100000L));
        actual = testHarness.getOutput();
        sideActual = testHarness.getSideOutput(lateOutputTag);
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
        Assertions.assertThat(sideActual).isNull();
        testHarness.close();
    }

    @Test
    void testCleanupTimerWithEmptyListStateForTumblingWindows2() throws Exception {
        int windowSize = 2;
        long lateness = 100L;
        ListStateDescriptor windowStateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)2L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)windowStateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new PassThroughFunction2()), (Trigger)new EventTimeTriggerAccumGC(100L), 100L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1599L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.processWatermark(new Watermark(2100L));
        testHarness.processWatermark(new Watermark(5000L));
        expected.add(new Watermark(1599L));
        expected.add(new StreamRecord((Object)"GOT: (key2,1)", 1999L));
        expected.add(new Watermark(1999L));
        expected.add(new Watermark(2100L));
        expected.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testCleanupTimerWithEmptyListStateForTumblingWindows() throws Exception {
        int windowSize = 2;
        long lateness = 1L;
        ListStateDescriptor windowStateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)2L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)windowStateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new PassThroughFunction()), (Trigger)EventTimeTrigger.create(), 1L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1599L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.processWatermark(new Watermark(2000L));
        testHarness.processWatermark(new Watermark(5000L));
        expected.add(new Watermark(1599L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        expected.add(new Watermark(1999L));
        expected.add(new Watermark(2000L));
        expected.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testCleanupTimerWithEmptyReduceStateForTumblingWindows() throws Exception {
        int windowSize = 2;
        long lateness = 1L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)2L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new PassThroughWindowFunction()), (Trigger)EventTimeTrigger.create(), 1L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(1599L));
        testHarness.processWatermark(new Watermark(1999L));
        testHarness.processWatermark(new Watermark(2000L));
        testHarness.processWatermark(new Watermark(5000L));
        expected.add(new Watermark(1599L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1999L));
        expected.add(new Watermark(1999L));
        expected.add(new Watermark(2000L));
        expected.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testCleanupTimerWithEmptyListStateForSessionWindows() throws Exception {
        int gapSize = 3;
        long lateness = 10L;
        ListStateDescriptor windowStateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)windowStateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new PassThroughFunction()), (Trigger)EventTimeTrigger.create(), 10L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 3999L));
        expected.add(new Watermark(4998L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new Watermark(14600L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testCleanupTimerWithEmptyReduceStateForSessionWindows() throws Exception {
        int gapSize = 3;
        long lateness = 10L;
        ReducingStateDescriptor stateDesc = new ReducingStateDescriptor("window-contents", (ReduceFunction)new SumReducer(), STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)EventTimeSessionWindows.withGap((Time)Time.seconds((long)3L)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)stateDesc, (InternalWindowFunction)new InternalSingleValueWindowFunction((WindowFunction)new ReducedSessionWindowFunction()), (Trigger)EventTimeTrigger.create(), 10L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"key2", (Object)1), 1000L));
        testHarness.processWatermark(new Watermark(4998L));
        expected.add(new StreamRecord((Object)new Tuple3((Object)"key2-1", (Object)1000L, (Object)4000L), 3999L));
        expected.add(new Watermark(4998L));
        testHarness.processWatermark(new Watermark(14600L));
        expected.add(new Watermark(14600L));
        ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
        testHarness.close();
    }

    @Test
    void testCleanupTimerWithEmptyStateNoResultForTumblingWindows() throws Exception {
        int windowSize = 2;
        long lateness = 1L;
        ListStateDescriptor windowStateDesc = new ListStateDescriptor("window-contents", STRING_INT_TUPLE.createSerializer((SerializerConfig)new SerializerConfigImpl()));
        WindowOperator operator = new WindowOperator((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.of((long)2L, (TimeUnit)TimeUnit.SECONDS)), (TypeSerializer)new TimeWindow.Serializer(), (KeySelector)new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer((SerializerConfig)new SerializerConfigImpl()), (StateDescriptor)windowStateDesc, (InternalWindowFunction)new InternalIterableWindowFunction((WindowFunction)new EmptyReturnFunction()), (Trigger)new FireEverytimeOnElementAndEventTimeTrigger(), 1L, null);
        OneInputStreamOperatorTestHarness testHarness = WindowOperatorTest.createTestHarness(operator);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement((StreamRecord<Tuple2<String, Integer>>)new StreamRecord((Object)new Tuple2((Object)"test_key", (Object)1), 1000L));
        Assertions.assertThat((String)((Iterable)operator.processContext.windowState().getListState(windowStateDesc).get()).toString()).isEqualTo("[(test_key,1)]");
        testHarness.processWatermark(new Watermark(1599L));
        Assertions.assertThat((String)((Iterable)operator.processContext.windowState().getListState(windowStateDesc).get()).toString()).isEqualTo("[(test_key,1)]");
        testHarness.processWatermark(new Watermark(1699L));
        Assertions.assertThat((String)((Iterable)operator.processContext.windowState().getListState(windowStateDesc).get()).toString()).isEqualTo("[(test_key,1)]");
        testHarness.processWatermark(new Watermark(1799L));
        Assertions.assertThat((String)((Iterable)operator.processContext.windowState().getListState(windowStateDesc).get()).toString()).isEqualTo("[(test_key,1)]");
        testHarness.processWatermark(new Watermark(1999L));
        Assertions.assertThat((String)((Iterable)operator.processContext.windowState().getListState(windowStateDesc).get()).toString()).isEqualTo("[(test_key,1)]");
        testHarness.processWatermark(new Watermark(2000L));
        Assertions.assertThat((String)((Iterable)operator.processContext.windowState().getListState(windowStateDesc).get()).toString()).isEqualTo("[]");
        testHarness.processWatermark(new Watermark(5000L));
        Assertions.assertThat((String)((Iterable)operator.processContext.windowState().getListState(windowStateDesc).get()).toString()).isEqualTo("[]");
        expected.add(new Watermark(1599L));
        expected.add(new Watermark(1699L));
        expected.add(new Watermark(1799L));
        expected.add(new Watermark(1999L));
        expected.add(new Watermark(2000L));
        expected.add(new Watermark(5000L));
        TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
        testHarness.close();
    }

    private static class FireEverytimeOnElementAndEventTimeTrigger
    extends Trigger<Tuple2<String, Integer>, TimeWindow> {
        private FireEverytimeOnElementAndEventTimeTrigger() {
        }

        public TriggerResult onElement(Tuple2<String, Integer> element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
        }
    }

    private static class EventTimeTriggerAccumGC
    extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
        private long cleanupTime;

        public EventTimeTriggerAccumGC(long cleanupTime) {
            this.cleanupTime = cleanupTime;
        }

        public TriggerResult onElement(Object element, long timestamp, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                return TriggerResult.FIRE;
            }
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }

        public TriggerResult onEventTime(long time, TimeWindow window, Trigger.TriggerContext ctx) {
            return time == window.maxTimestamp() || time == window.maxTimestamp() + this.cleanupTime ? TriggerResult.FIRE_AND_PURGE : TriggerResult.CONTINUE;
        }

        public TriggerResult onProcessingTime(long time, TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        public void clear(TimeWindow window, Trigger.TriggerContext ctx) throws Exception {
            ctx.deleteEventTimeTimer(window.maxTimestamp());
        }

        public boolean canMerge() {
            return true;
        }

        public void onMerge(TimeWindow window, Trigger.OnMergeContext ctx) {
            ctx.registerEventTimeTimer(window.maxTimestamp());
        }

        public String toString() {
            return "EventTimeTrigger()";
        }
    }

    private static class PointSessionWindows
    extends EventTimeSessionWindows {
        private static final long serialVersionUID = 1L;

        private PointSessionWindows(long sessionTimeout) {
            super(sessionTimeout);
        }

        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssigner.WindowAssignerContext ctx) {
            if (element instanceof Tuple2) {
                Tuple2 t2 = (Tuple2)element;
                if ((Integer)t2.f1 == 33) {
                    return Collections.singletonList(new TimeWindow(timestamp, timestamp));
                }
            }
            return Collections.singletonList(new TimeWindow(timestamp, timestamp + this.sessionTimeout));
        }
    }

    private static class ReducedProcessSessionWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private ReducedProcessSessionWindowFunction() {
        }

        public void process(String key, ProcessWindowFunction.Context context, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
            TimeWindow window = (TimeWindow)context.window();
            for (Tuple2<String, Integer> val : values) {
                out.collect((Object)new Tuple3((Object)(key + "-" + val.f1), (Object)window.getStart(), (Object)window.getEnd()));
            }
        }
    }

    private static class SessionProcessWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private SessionProcessWindowFunction() {
        }

        public void process(String key, ProcessWindowFunction.Context context, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
            int sum = 0;
            for (Tuple2<String, Integer> i : values) {
                sum += ((Integer)i.f1).intValue();
            }
            String resultString = key + "-" + sum;
            TimeWindow window = (TimeWindow)context.window();
            out.collect((Object)new Tuple3((Object)resultString, (Object)window.getStart(), (Object)window.getEnd()));
        }
    }

    private static class ReducedSessionWindowFunction
    implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private ReducedSessionWindowFunction() {
        }

        public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
            for (Tuple2<String, Integer> val : values) {
                out.collect((Object)new Tuple3((Object)(key + "-" + val.f1), (Object)window.getStart(), (Object)window.getEnd()));
            }
        }
    }

    private static class SessionWindowFunction
    implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, Long, Long>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private SessionWindowFunction() {
        }

        public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> values, Collector<Tuple3<String, Long, Long>> out) throws Exception {
            int sum = 0;
            for (Tuple2<String, Integer> i : values) {
                sum += ((Integer)i.f1).intValue();
            }
            String resultString = key + "-" + sum;
            out.collect((Object)new Tuple3((Object)resultString, (Object)window.getStart(), (Object)window.getEnd()));
        }
    }

    private static class TupleKeySelector
    implements KeySelector<Tuple2<String, Integer>, String> {
        private static final long serialVersionUID = 1L;

        private TupleKeySelector() {
        }

        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return (String)value.f0;
        }
    }

    private static class Tuple3ResultSortComparator
    implements Comparator<Object>,
    Serializable {
        private Tuple3ResultSortComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((String)((Tuple3)sr0.getValue()).f0).compareTo((String)((Tuple3)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            comparison = (int)((Long)((Tuple3)sr0.getValue()).f1 - (Long)((Tuple3)sr1.getValue()).f1);
            if (comparison != 0) {
                return comparison;
            }
            return (int)((Long)((Tuple3)sr0.getValue()).f2 - (Long)((Tuple3)sr1.getValue()).f2);
        }
    }

    private static class Tuple2ResultSortComparator
    implements Comparator<Object>,
    Serializable {
        private Tuple2ResultSortComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            if (o1 instanceof Watermark || o2 instanceof Watermark) {
                return 0;
            }
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((String)((Tuple2)sr0.getValue()).f0).compareTo((String)((Tuple2)sr1.getValue()).f0);
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)((Tuple2)sr0.getValue()).f1 - (Integer)((Tuple2)sr1.getValue()).f1;
        }
    }

    private static class RichSumReducer<W extends Window>
    extends RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, W> {
        private static final long serialVersionUID = 1L;
        private boolean openCalled = false;

        private RichSumReducer() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.openCalled = true;
        }

        public void close() throws Exception {
            super.close();
            closeCalled.incrementAndGet();
        }

        public void apply(String key, W window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.openCalled).as("Open was not called", new Object[0])).isTrue();
            int sum = 0;
            for (Tuple2<String, Integer> t : input) {
                sum += ((Integer)t.f1).intValue();
            }
            out.collect((Object)new Tuple2((Object)key, (Object)sum));
        }
    }

    private static class SumReducer
    implements ReduceFunction<Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        private SumReducer() {
        }

        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
            return new Tuple2(value2.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1));
        }
    }

    private static class EmptyReturnFunction
    implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private EmptyReturnFunction() {
        }

        public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
        }
    }

    private static class PassThroughFunction
    implements WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private PassThroughFunction() {
        }

        public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (Tuple2<String, Integer> in : input) {
                out.collect(in);
            }
        }
    }

    private static class PassThroughFunction2
    implements WindowFunction<Tuple2<String, Integer>, String, String, TimeWindow> {
        private static final long serialVersionUID = 1L;

        private PassThroughFunction2() {
        }

        public void apply(String k, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<String> out) throws Exception {
            out.collect((Object)("GOT: " + Joiner.on((String)",").join(input)));
        }
    }
}

