/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.window;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.dataview.StateDataViewStore;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.window.SlicingWindowAggOperatorBuilder;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigners;
import org.apache.flink.table.runtime.operators.window.slicing.SlicingWindowOperator;
import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.GenericRowRecordSortComparator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SlicingWindowAggOperatorTest {
    private static final ZoneId UTC_ZONE_ID = ZoneId.of("UTC");
    private static final ZoneId SHANGHAI_ZONE_ID = ZoneId.of("Asia/Shanghai");
    private final ZoneId shiftTimeZone;
    private static final RowType INPUT_ROW_TYPE = new RowType(Arrays.asList(new RowType.RowField("f0", (LogicalType)new VarCharType(Integer.MAX_VALUE)), new RowType.RowField("f1", (LogicalType)new IntType()), new RowType.RowField("f2", (LogicalType)new TimestampType())));
    private static final RowDataSerializer INPUT_ROW_SER = new RowDataSerializer(INPUT_ROW_TYPE);
    private static final RowDataSerializer ACC_SER = new RowDataSerializer(new LogicalType[]{new BigIntType(), new BigIntType()});
    private static final LogicalType[] OUTPUT_TYPES = new LogicalType[]{new VarCharType(Integer.MAX_VALUE), new BigIntType(), new BigIntType(), new BigIntType(), new BigIntType()};
    private static final RowDataKeySelector KEY_SELECTOR = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, INPUT_ROW_TYPE.getChildren().toArray(new LogicalType[0]));
    private static final PagedTypeSerializer<RowData> KEY_SER = (PagedTypeSerializer)KEY_SELECTOR.getProducedType().toSerializer();
    private static final TypeSerializer<RowData> OUT_SERIALIZER = new RowDataSerializer(OUTPUT_TYPES);
    private static final RowDataHarnessAssertor ASSERTER = new RowDataHarnessAssertor(OUTPUT_TYPES, new GenericRowRecordSortComparator(0, (LogicalType)VarCharType.STRING_TYPE));

    public SlicingWindowAggOperatorTest(ZoneId shiftTimeZone) {
        this.shiftTimeZone = shiftTimeZone;
    }

    @Test
    public void testEventTimeHoppingWindows() throws Exception {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SumAndCountAggsFunction aggsFunction = new SumAndCountAggsFunction((SliceAssigner)assigner);
        SlicingWindowOperator operator = SlicingWindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((SliceAssigner)assigner).aggregate(SlicingWindowAggOperatorTest.wrapGenerated(aggsFunction), (AbstractRowDataSerializer)ACC_SER).countStarIndex(1).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-2000L), this.localMills(1000L)));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(-1000L), this.localMills(2000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(-1000L), this.localMills(2000L)));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 5L, 5L, this.localMills(1000L), this.localMills(4000L)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3500L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(2000L), this.localMills(5000L)));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testProcessingTimeHoppingWindows() throws Exception {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofHours(3L), (Duration)Duration.ofHours(1L));
        SumAndCountAggsFunction aggsFunction = new SumAndCountAggsFunction((SliceAssigner)assigner);
        SlicingWindowOperator operator = SlicingWindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((SliceAssigner)assigner).aggregate(SlicingWindowAggOperatorTest.wrapGenerated(aggsFunction), (AbstractRowDataSerializer)ACC_SER).countStarIndex(1).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T01:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1969-12-31T22:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T02:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1969-12-31T23:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T02:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T03:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T07:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T04:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 5L, 5L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T01:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T04:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 5L, 5L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T02:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T03:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T06:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
    }

    @Test
    public void testEventTimeCumulativeWindows() throws Exception {
        SliceAssigners.CumulativeSliceAssigner assigner = SliceAssigners.cumulative((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SumAndCountAggsFunction aggsFunction = new SumAndCountAggsFunction((SliceAssigner)assigner);
        SlicingWindowOperator operator = SlicingWindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((SliceAssigner)assigner).aggregate(SlicingWindowAggOperatorTest.wrapGenerated(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(1000L)));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(2000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(2000L)));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 5L, 5L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(4000L)));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 2, TimestampData.fromEpochMillis((long)3500L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(5000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 1L, this.localMills(3000L), this.localMills(5000L)));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 1L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(1L);
        testHarness.close();
    }

    @Test
    public void testProcessingTimeCumulativeWindows() throws Exception {
        SliceAssigners.CumulativeSliceAssigner assigner = SliceAssigners.cumulative((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofDays(1L), (Duration)Duration.ofHours(8L));
        SumAndCountAggsFunction aggsFunction = new SumAndCountAggsFunction((SliceAssigner)assigner);
        SlicingWindowOperator operator = SlicingWindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((SliceAssigner)assigner).aggregate(SlicingWindowAggOperatorTest.wrapGenerated(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T08:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T08:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T16:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T16:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-02T00:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-03T08:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T08:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T08:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T16:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T16:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-03T00:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 1L, 1L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-02T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-03T00:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
    }

    @Test
    public void testEventTimeTumblingWindows() throws Exception {
        SliceAssigners.TumblingSliceAssigner assigner = SliceAssigners.tumbling((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L));
        SumAndCountAggsFunction aggsFunction = new SumAndCountAggsFunction((SliceAssigner)assigner);
        SlicingWindowOperator operator = SlicingWindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((SliceAssigner)assigner).aggregate(SlicingWindowAggOperatorTest.wrapGenerated(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)3000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)20L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)0L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1998L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1999L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)1000L)));
        testHarness.processWatermark(new Watermark(999L));
        expectedOutput.add(new Watermark(999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(1999L));
        expectedOutput.add(new Watermark(1999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.prepareSnapshotPreBarrier(0L);
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        ((AbstractIntegerAssert)Assertions.assertThat((int)aggsFunction.closeCalled.get()).as("Close was not called.", new Object[0])).isGreaterThan(0);
        expectedOutput.clear();
        testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.processWatermark(new Watermark(2999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, this.localMills(0L), this.localMills(3000L)));
        expectedOutput.add(new Watermark(2999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(3999L));
        expectedOutput.add(new Watermark(3999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)2500L)));
        testHarness.processWatermark(new Watermark(4999L));
        expectedOutput.add(new Watermark(4999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)2999L)));
        testHarness.processWatermark(new Watermark(5999L));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 2L, 2L, this.localMills(3000L), this.localMills(6000L)));
        expectedOutput.add(new Watermark(5999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processWatermark(new Watermark(6999L));
        testHarness.processWatermark(new Watermark(7999L));
        expectedOutput.add(new Watermark(6999L));
        expectedOutput.add(new Watermark(7999L));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        Assertions.assertThat((long)operator.getNumLateRecordsDropped().getCount()).isEqualTo(2L);
        testHarness.close();
    }

    @Test
    public void testProcessingTimeTumblingWindows() throws Exception {
        SliceAssigners.TumblingSliceAssigner assigner = SliceAssigners.tumbling((int)-1, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofHours(5L));
        SumAndCountAggsFunction aggsFunction = new SumAndCountAggsFunction((SliceAssigner)assigner);
        SlicingWindowOperator operator = SlicingWindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner((SliceAssigner)assigner).aggregate(SlicingWindowAggOperatorTest.wrapGenerated(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = SlicingWindowAggOperatorTest.createTestHarness(operator);
        testHarness.setup(OUT_SERIALIZER);
        testHarness.open();
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T00:00:00.003"));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)Long.MAX_VALUE)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key2", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T05:00:00"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key2", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00")));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 2L, 2L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T00:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00")));
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.processElement(StreamRecordUtils.insertRecord("key1", 1, TimestampData.fromEpochMillis((long)7000L)));
        testHarness.setProcessingTime(SlicingWindowAggOperatorTest.epochMills(this.shiftTimeZone, "1970-01-01T10:00:01"));
        expectedOutput.add(StreamRecordUtils.insertRecord("key1", 3L, 3L, SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T05:00:00"), SlicingWindowAggOperatorTest.epochMills(UTC_ZONE_ID, "1970-01-01T10:00:00")));
        Assertions.assertThat((Long)((Long)operator.getWatermarkLatency().getValue())).isEqualTo((Object)0L);
        ASSERTER.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput());
        testHarness.close();
    }

    @Test
    public void testInvalidWindows() {
        SliceAssigners.HoppingSliceAssigner assigner = SliceAssigners.hopping((int)2, (ZoneId)this.shiftTimeZone, (Duration)Duration.ofSeconds(3L), (Duration)Duration.ofSeconds(1L));
        SumAndCountAggsFunction aggsFunction = new SumAndCountAggsFunction((SliceAssigner)assigner);
        Assertions.assertThatThrownBy(() -> this.lambda$testInvalidWindows$0((SliceAssigner)assigner, aggsFunction)).hasMessageContaining("Hopping window requires a COUNT(*) in the aggregate functions.");
    }

    private long localMills(long epochMills) {
        return TimeWindowUtil.toUtcTimestampMills((long)epochMills, (ZoneId)this.shiftTimeZone);
    }

    private static OneInputStreamOperatorTestHarness<RowData, RowData> createTestHarness(SlicingWindowOperator<RowData, ?> operator) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness(operator, (KeySelector)KEY_SELECTOR, (TypeInformation)KEY_SELECTOR.getProducedType());
    }

    private static GeneratedNamespaceAggsHandleFunction<Long> wrapGenerated(final NamespaceAggsHandleFunction<Long> aggsFunction) {
        return new GeneratedNamespaceAggsHandleFunction<Long>("N/A", "", new Object[0]){
            private static final long serialVersionUID = 1L;

            public NamespaceAggsHandleFunction<Long> newInstance(ClassLoader classLoader) {
                return aggsFunction;
            }
        };
    }

    private static long epochMills(ZoneId shiftTimeZone, String timestampStr) {
        LocalDateTime localDateTime = LocalDateTime.parse(timestampStr);
        ZoneOffset zoneOffset = shiftTimeZone.getRules().getOffset(localDateTime);
        return localDateTime.toInstant(zoneOffset).toEpochMilli();
    }

    @Parameterized.Parameters(name="TimeZone = {0}")
    public static Collection<Object[]> runMode() {
        return Arrays.asList({UTC_ZONE_ID}, {SHANGHAI_ZONE_ID});
    }

    private /* synthetic */ void lambda$testInvalidWindows$0(SliceAssigner assigner, SumAndCountAggsFunction aggsFunction) throws Throwable {
        SlicingWindowAggOperatorBuilder.builder().inputSerializer((AbstractRowDataSerializer)INPUT_ROW_SER).shiftTimeZone(this.shiftTimeZone).keySerializer(KEY_SER).assigner(assigner).aggregate(SlicingWindowAggOperatorTest.wrapGenerated(aggsFunction), (AbstractRowDataSerializer)ACC_SER).build();
    }

    private static class SumAndCountAggsFunction
    implements NamespaceAggsHandleFunction<Long> {
        private static final long serialVersionUID = 1L;
        private final SliceAssigner assigner;
        boolean openCalled;
        final AtomicInteger closeCalled = new AtomicInteger(0);
        long sum;
        boolean sumIsNull;
        long count;
        boolean countIsNull;
        protected transient JoinedRowData result;

        private SumAndCountAggsFunction(SliceAssigner assigner) {
            this.assigner = assigner;
        }

        public void open(StateDataViewStore store) throws Exception {
            this.openCalled = true;
            this.result = new JoinedRowData();
        }

        public void setAccumulators(Long window, RowData acc) throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            this.sumIsNull = acc.isNullAt(0);
            this.sum = !this.sumIsNull ? acc.getLong(0) : 0L;
            this.countIsNull = acc.isNullAt(1);
            this.count = !this.countIsNull ? acc.getLong(1) : 0L;
        }

        public void accumulate(RowData inputRow) throws Exception {
            boolean inputIsNull;
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            if (!(inputIsNull = inputRow.isNullAt(1))) {
                this.sum += (long)inputRow.getInt(1);
                ++this.count;
                this.sumIsNull = false;
                this.countIsNull = false;
            }
        }

        public void retract(RowData inputRow) throws Exception {
            boolean inputIsNull;
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            if (!(inputIsNull = inputRow.isNullAt(1))) {
                this.sum -= (long)inputRow.getInt(1);
                --this.count;
            }
        }

        public void merge(Long window, RowData otherAcc) throws Exception {
            boolean countIsNull2;
            boolean sumIsNull2;
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            if (!(sumIsNull2 = otherAcc.isNullAt(0))) {
                this.sum += otherAcc.getLong(0);
                this.sumIsNull = false;
            }
            if (!(countIsNull2 = otherAcc.isNullAt(1))) {
                this.count += otherAcc.getLong(1);
                this.countIsNull = false;
            }
        }

        public RowData createAccumulators() {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData rowData = new GenericRowData(2);
            rowData.setField(1, (Object)0L);
            return rowData;
        }

        public RowData getAccumulators() throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData row = new GenericRowData(2);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            return row;
        }

        public void cleanup(Long window) {
        }

        public void close() {
            this.closeCalled.incrementAndGet();
        }

        public RowData getValue(Long window) throws Exception {
            if (!this.openCalled) {
                Assertions.fail((String)"Open was not called");
            }
            GenericRowData row = new GenericRowData(4);
            if (!this.sumIsNull) {
                row.setField(0, (Object)this.sum);
            }
            if (!this.countIsNull) {
                row.setField(1, (Object)this.count);
            }
            row.setField(1, (Object)this.count);
            row.setField(2, (Object)this.assigner.getWindowStart(window.longValue()));
            row.setField(3, (Object)window);
            return row;
        }
    }
}

