/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.TestingPluginManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ChangelogRecoveryCachingITCase
extends TestLogger {
    private static final int ACCUMULATE_TIME_MILLIS = 500;
    private static final int PARALLELISM = 10;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private OpenOnceFileSystem fileSystem;
    private MiniClusterWithClientResource cluster;

    @Before
    public void before() throws Exception {
        File tmpFolder = this.temporaryFolder.newFolder();
        this.fileSystem = new OpenOnceFileSystem();
        ChangelogRecoveryCachingITCase.registerFileSystem((FileSystem)this.fileSystem, tmpFolder.toURI().getScheme());
        Configuration configuration = new Configuration();
        configuration.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, (Object)Duration.ofDays(365L));
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)tmpFolder, (Duration)Duration.ofMinutes(1L), (int)10);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(10).build());
        this.cluster.before();
    }

    @After
    public void after() throws Exception {
        if (this.cluster != null) {
            this.cluster.after();
            this.cluster = null;
        }
        FileSystem.initialize((Configuration)new Configuration(), null);
    }

    @Test
    public void test() throws Exception {
        JobID jobID1 = this.submit(this.configureJob(this.temporaryFolder.newFolder()), graph -> {});
        Thread.sleep(500L);
        String cpLocation = this.checkpointAndCancel(jobID1);
        JobID jobID2 = this.submit(this.configureJob(this.temporaryFolder.newFolder()), graph -> graph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)cpLocation)));
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)this.cluster.getMiniCluster(), (JobID)jobID2, (boolean)true);
        this.cluster.getClusterClient().cancel(jobID2).get();
        Preconditions.checkState((boolean)this.fileSystem.hasOpenedPaths());
    }

    private JobID submit(Configuration conf, Consumer<JobGraph> updateGraph) throws InterruptedException, ExecutionException {
        JobGraph jobGraph = this.createJobGraph(conf);
        updateGraph.accept(jobGraph);
        return (JobID)this.cluster.getClusterClient().submitJob(jobGraph).get();
    }

    private JobGraph createJobGraph(Configuration conf) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).keyBy((KeySelector & Serializable)num -> num % 1000L).map((MapFunction)new RichMapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                this.getRuntimeContext().getState(new ValueStateDescriptor("state", Long.class)).update((Object)value);
                return value;
            }
        }).addSink((SinkFunction)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    private Configuration configureJob(File cpDir) {
        Configuration conf = new Configuration();
        conf.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, (Object)CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        conf.set(CoreOptions.DEFAULT_PARALLELISM, (Object)10);
        conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)true);
        conf.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, (Object)CheckpointingMode.EXACTLY_ONCE);
        conf.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, (Object)Duration.ofMillis(10L));
        conf.set(CheckpointingOptions.CHECKPOINT_STORAGE, (Object)"filesystem");
        conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)cpDir.toURI().toString());
        conf.set(StateBackendOptions.STATE_BACKEND, (Object)"hashmap");
        conf.set(CheckpointingOptions.LOCAL_RECOVERY, (Object)false);
        conf.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, (Object)MemorySize.ofMebiBytes((long)10L));
        conf.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, (Object)Duration.ofDays(-1L));
        conf.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, (Object)true);
        conf.set(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, (Object)Duration.ZERO);
        conf.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, (Object)false);
        conf.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"none");
        return conf;
    }

    private String checkpointAndCancel(JobID jobID) throws Exception {
        CommonTestUtils.waitForCheckpoint((JobID)jobID, (MiniCluster)this.cluster.getMiniCluster(), (int)1);
        this.cluster.getClusterClient().cancel(jobID).get();
        this.checkStatus(jobID);
        return (String)CommonTestUtils.getLatestCompletedCheckpointPath((JobID)jobID, (MiniCluster)this.cluster.getMiniCluster()).orElseThrow(() -> {
            throw new NoSuchElementException("No checkpoint was created yet");
        });
    }

    private void checkStatus(JobID jobID) throws InterruptedException, ExecutionException {
        if (((JobStatus)this.cluster.getClusterClient().getJobStatus(jobID).get()).isGloballyTerminalState()) {
            ((JobResult)this.cluster.getClusterClient().requestJobResult(jobID).get()).getSerializedThrowable().ifPresent(serializedThrowable -> {
                throw new RuntimeException((Throwable)serializedThrowable);
            });
        }
    }

    private static void registerFileSystem(final FileSystem fs, final String scheme) {
        FileSystem.initialize((Configuration)new Configuration(), (PluginManager)new TestingPluginManager(Collections.singletonMap(FileSystemFactory.class, Collections.singleton(new FileSystemFactory(){

            public FileSystem create(URI fsUri) {
                return fs;
            }

            public String getScheme() {
                return scheme;
            }
        }).iterator())));
    }

    private static class OpenOnceFileSystem
    extends LocalFileSystem {
        private final Set<Path> openedPaths = new HashSet<Path>();

        private OpenOnceFileSystem() {
        }

        public FSDataInputStream open(Path f) throws IOException {
            Assert.assertTrue((String)(f + " was already opened"), (boolean)this.openedPaths.add(f));
            return super.open(f);
        }

        public boolean isDistributedFS() {
            return true;
        }

        private boolean hasOpenedPaths() {
            return !this.openedPaths.isEmpty();
        }
    }
}

