/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.checkpointing.utils.FailingSource;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.apache.flink.test.checkpointing.utils.ValidatingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class EventTimeWindowCheckpointingITCase
extends TestLogger {
    private static final int MAX_MEM_STATE_SIZE = 0x1400000;
    private static final int PARALLELISM = 4;
    private static final int NUM_OF_TASK_MANAGERS = 2;
    private TestingServer zkServer;
    public MiniClusterWithClientResource miniClusterResource;
    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    @Rule
    public TestName name = new TestName();
    private AbstractStateBackend stateBackend;
    public StateBackendEnum stateBackendEnum;
    private final int buffersPerChannel;

    @Parameterized.Parameters(name="statebackend type ={0}, buffersPerChannel = {1}")
    public static Collection<Object[]> parameter() {
        return Arrays.stream(StateBackendEnum.values()).map(type -> new Object[][]{{type, 0}, {type, 2}}).flatMap(Arrays::stream).collect(Collectors.toList());
    }

    public EventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum, int buffersPerChannel) {
        this.stateBackendEnum = stateBackendEnum;
        this.buffersPerChannel = buffersPerChannel;
    }

    protected StateBackendEnum getStateBackend() {
        return this.stateBackendEnum;
    }

    protected final MiniClusterWithClientResource getMiniClusterResource() {
        return new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.getConfigurationSafe()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
    }

    private Configuration getConfigurationSafe() {
        try {
            return this.getConfiguration();
        }
        catch (Exception e) {
            throw new AssertionError("Could not initialize test.", e);
        }
    }

    private Configuration getConfiguration() throws Exception {
        System.out.println("Starting " + ((Object)((Object)this)).getClass().getCanonicalName() + "#" + this.name.getMethodName() + ".");
        StateBackendEnum stateBackendEnum = this.getStateBackend();
        if (StateBackendEnum.ROCKSDB_INCREMENTAL_ZK.equals((Object)stateBackendEnum)) {
            this.zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();
        }
        Configuration config = this.createClusterConfig();
        config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL, this.buffersPerChannel);
        switch (stateBackendEnum) {
            case MEM: {
                this.stateBackend = new MemoryStateBackend(0x1400000);
                break;
            }
            case FILE: {
                File backups = tempFolder.newFolder().getAbsoluteFile();
                this.stateBackend = new FsStateBackend(Path.fromLocalFile((File)backups));
                break;
            }
            case ROCKSDB_FULL: {
                this.setupRocksDB(config, -1, false);
                break;
            }
            case ROCKSDB_INCREMENTAL: {
                config.set(RocksDBOptions.TIMER_SERVICE_FACTORY, (Object)EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB);
                this.setupRocksDB(config, 16, true);
                break;
            }
            case ROCKSDB_INCREMENTAL_ZK: {
                this.setupRocksDB(config, 16, true);
                break;
            }
            default: {
                throw new IllegalStateException("No backend selected.");
            }
        }
        FsStateChangelogStorageFactory.configure((Configuration)config, (File)tempFolder.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        return config;
    }

    private void setupRocksDB(Configuration config, int fileSizeThreshold, boolean incrementalCheckpoints) throws IOException {
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.ofMebiBytes((long)128L));
        String rocksDb = tempFolder.newFolder().getAbsolutePath();
        File backups = tempFolder.newFolder().getAbsoluteFile();
        RocksDBStateBackend rdb = new RocksDBStateBackend((AbstractStateBackend)new FsStateBackend(Path.fromLocalFile((File)backups).toUri(), fileSizeThreshold), incrementalCheckpoints);
        rdb.setDbStoragePath(rocksDb);
        this.stateBackend = rdb;
    }

    protected Configuration createClusterConfig() throws IOException {
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        File haDir = temporaryFolder.newFolder();
        Configuration config = new Configuration();
        config.setString(AkkaOptions.FRAMESIZE, String.valueOf(0x1400000) + "b");
        if (this.zkServer != null) {
            config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
            config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zkServer.getConnectString());
            config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
        }
        return config;
    }

    @Before
    public void setupTestCluster() throws Exception {
        this.miniClusterResource = this.getMiniClusterResource();
        this.miniClusterResource.before();
    }

    @After
    public void stopTestCluster() throws IOException {
        if (this.miniClusterResource != null) {
            this.miniClusterResource.after();
            this.miniClusterResource = null;
        }
        if (this.zkServer != null) {
            this.zkServer.close();
            this.zkServer = null;
        }
        System.out.println("Finished " + ((Object)((Object)this)).getClass().getCanonicalName() + "#" + this.name.getMethodName() + ".");
    }

    @Test
    public void testTumblingTimeWindow() {
        int numElementsPerKey = this.numElementsPerKey();
        int windowSize = this.windowSize();
        int numKeys = this.numKeys();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.setStateBackend((StateBackend)this.stateBackend);
            env.getConfig().setUseSnapshotCompression(true);
            env.addSource((SourceFunction)new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)windowSize))).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    Tuple4 result = new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum));
                    out.collect((Object)result);
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTumblingTimeWindowWithKVStateMinMaxParallelism() {
        this.doTestTumblingTimeWindowWithKVState(4);
    }

    @Test
    public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() {
        this.doTestTumblingTimeWindowWithKVState(32768);
    }

    public void doTestTumblingTimeWindowWithKVState(int maxParallelism) {
        int numElementsPerKey = this.numElementsPerKey();
        int windowSize = this.windowSize();
        int numKeys = this.numKeys();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.setMaxParallelism(maxParallelism);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.setStateBackend((StateBackend)this.stateBackend);
            env.getConfig().setUseSnapshotCompression(true);
            env.addSource((SourceFunction)new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)windowSize))).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;
                private ValueState<Integer> count;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                    this.count = this.getRuntimeContext().getState(new ValueStateDescriptor("count", Integer.class, (Object)0));
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) throws Exception {
                    if ((Integer)this.count.value() == 0) {
                        this.count.update((Object)((Long)tuple.getField(0)).intValue());
                    }
                    Assert.assertTrue((boolean)this.open);
                    this.count.update((Object)((Integer)this.count.value() + 1));
                    out.collect((Object)new Tuple4(tuple.getField(0), (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType((Integer)this.count.value())));
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new CountingSinkValidatorUpdateFun(), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSlidingTimeWindow() {
        int numElementsPerKey = this.numElementsPerKey();
        int windowSize = this.windowSize();
        int windowSlide = this.windowSlide();
        int numKeys = this.numKeys();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setMaxParallelism(8);
            env.setParallelism(4);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.setStateBackend((StateBackend)this.stateBackend);
            env.getConfig().setUseSnapshotCompression(true);
            env.addSource((SourceFunction)new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)windowSize), (Time)Time.milliseconds((long)windowSlide))).apply((WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> values, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    int sum = 0;
                    long key = -1L;
                    for (Tuple2<Long, IntType> value : values) {
                        sum += ((IntType)value.f1).value;
                        key = (Long)value.f0;
                    }
                    Tuple4 output = new Tuple4((Object)key, (Object)window.getStart(), (Object)window.getEnd(), (Object)new IntType(sum));
                    out.collect((Object)output);
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedTumblingTimeWindow() {
        int numElementsPerKey = this.numElementsPerKey();
        int windowSize = this.windowSize();
        int numKeys = this.numKeys();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.setStateBackend((StateBackend)this.stateBackend);
            env.getConfig().setUseSnapshotCompression(true);
            env.addSource((SourceFunction)new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSize), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)windowSize))).reduce((ReduceFunction)new ReduceFunction<Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    return new Tuple2(a.f0, (Object)new IntType(((IntType)a.f1).value + ((IntType)b.f1).value));
                }
            }, (WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> in : input) {
                        Tuple4 output = new Tuple4(in.f0, (Object)window.getStart(), (Object)window.getEnd(), in.f1);
                        out.collect((Object)output);
                    }
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testPreAggregatedSlidingTimeWindow() {
        int numElementsPerKey = this.numElementsPerKey();
        int windowSize = this.windowSize();
        int windowSlide = this.windowSlide();
        int numKeys = this.numKeys();
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(4);
            env.enableCheckpointing(100L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.setStateBackend((StateBackend)this.stateBackend);
            env.getConfig().setUseSnapshotCompression(true);
            env.addSource((SourceFunction)new FailingSource(new KeyedEventTimeGenerator(numKeys, windowSlide), numElementsPerKey)).rebalance().keyBy(new int[]{0}).window((WindowAssigner)SlidingEventTimeWindows.of((Time)Time.milliseconds((long)windowSize), (Time)Time.milliseconds((long)windowSlide))).reduce((ReduceFunction)new ReduceFunction<Tuple2<Long, IntType>>(){

                public Tuple2<Long, IntType> reduce(Tuple2<Long, IntType> a, Tuple2<Long, IntType> b) {
                    return new Tuple2(a.f0, (Object)new IntType(((IntType)a.f1).value + ((IntType)b.f1).value));
                }
            }, (WindowFunction)new RichWindowFunction<Tuple2<Long, IntType>, Tuple4<Long, Long, Long, IntType>, Tuple, TimeWindow>(){
                private boolean open = false;

                public void open(Configuration parameters) {
                    Assert.assertEquals((long)4L, (long)this.getRuntimeContext().getNumberOfParallelSubtasks());
                    this.open = true;
                }

                public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<Long, IntType>> input, Collector<Tuple4<Long, Long, Long, IntType>> out) {
                    Assert.assertTrue((boolean)this.open);
                    for (Tuple2<Long, IntType> in : input) {
                        out.collect((Object)new Tuple4(in.f0, (Object)window.getStart(), (Object)window.getEnd(), in.f1));
                    }
                }
            }).addSink(new ValidatingSink<Tuple4<Long, Long, Long, IntType>>(new SinkValidatorUpdateFun(numElementsPerKey), new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))).setParallelism(1);
            env.execute("Tumbling Window Test");
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private int numElementsPerKey() {
        return 3000;
    }

    private int windowSize() {
        return 1000;
    }

    private int windowSlide() {
        return 100;
    }

    private int numKeys() {
        return 100;
    }

    static class KeyedEventTimeGenerator
    implements FailingSource.EventEmittingGenerator {
        private final int keyUniverseSize;
        private final int watermarkTrailing;

        public KeyedEventTimeGenerator(int keyUniverseSize, int numElementsPerWindow) {
            this.keyUniverseSize = keyUniverseSize;
            this.watermarkTrailing = 4 * numElementsPerWindow / 3;
        }

        @Override
        public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx, int eventSequenceNo) {
            IntType intTypeNext = new IntType(eventSequenceNo);
            for (long i = 0L; i < (long)this.keyUniverseSize; ++i) {
                Tuple2 generatedEvent = new Tuple2((Object)i, (Object)intTypeNext);
                ctx.collectWithTimestamp((Object)generatedEvent, (long)eventSequenceNo);
            }
            ctx.emitWatermark(new Watermark((long)(eventSequenceNo - this.watermarkTrailing)));
        }
    }

    static class SinkValidatorCheckFun
    implements ValidatingSink.ResultChecker {
        private final int numKeys;
        private final int numWindowsExpected;

        SinkValidatorCheckFun(int numKeys, int elementsPerKey, int elementsPerWindow) {
            this.numKeys = numKeys;
            this.numWindowsExpected = elementsPerKey / elementsPerWindow;
        }

        @Override
        public boolean checkResult(Map<Long, Integer> windowCounts) {
            if (windowCounts.size() == this.numKeys) {
                for (Integer windowCount : windowCounts.values()) {
                    if (windowCount >= this.numWindowsExpected) continue;
                    return false;
                }
                return true;
            }
            return false;
        }
    }

    static class SinkValidatorUpdateFun
    implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
        private final int elementsPerKey;

        SinkValidatorUpdateFun(int elementsPerKey) {
            this.elementsPerKey = elementsPerKey;
        }

        @Override
        public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
            int expectedSum = 0;
            long countUntil = Math.min((Long)value.f2, (long)this.elementsPerKey);
            for (long i = ((Long)value.f1).longValue(); i < countUntil; ++i) {
                if (i <= 0L) continue;
                expectedSum = (int)((long)expectedSum + i);
            }
            Assert.assertEquals((String)("Window start: " + value.f1 + " end: " + value.f2), (long)expectedSum, (long)((IntType)value.f3).value);
            windowCounts.merge((Long)value.f0, 1, (val, increment) -> val + increment);
        }
    }

    static class CountingSinkValidatorUpdateFun
    implements ValidatingSink.CountUpdater<Tuple4<Long, Long, Long, IntType>> {
        CountingSinkValidatorUpdateFun() {
        }

        @Override
        public void updateCount(Tuple4<Long, Long, Long, IntType> value, Map<Long, Integer> windowCounts) {
            windowCounts.merge((Long)value.f0, 1, (a, b) -> a + b);
            Assert.assertEquals((String)("Window counts don't match for key " + value.f0 + "."), (long)(((Long)value.f0).intValue() + windowCounts.get(value.f0)), (long)((IntType)value.f3).value);
        }
    }

    static enum StateBackendEnum {
        MEM,
        FILE,
        ROCKSDB_FULL,
        ROCKSDB_INCREMENTAL,
        ROCKSDB_INCREMENTAL_ZK;

    }
}

