/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.ChangelogRecoveryITCaseBase;
import org.apache.flink.test.util.InfiniteIntegerSource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ChangelogLocalRecoveryITCase
extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_TASK_SLOTS = 1;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Parameterized.Parameter
    public AbstractStateBackend delegatedStateBackend;
    private MiniClusterWithClientResource cluster;
    private static String workingDir;

    @Parameterized.Parameters(name="delegated state backend type = {0}")
    public static Collection<AbstractStateBackend> parameter() {
        return Arrays.asList(new HashMapStateBackend(), new EmbeddedRocksDBStateBackend(false), new EmbeddedRocksDBStateBackend(true));
    }

    @BeforeClass
    public static void setWorkingDir() throws IOException {
        workingDir = TEMPORARY_FOLDER.newFolder("work").getAbsolutePath();
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
        configuration.setString(ClusterOptions.PROCESS_WORKING_DIR_BASE, workingDir);
        configuration.setString(ClusterOptions.JOB_MANAGER_PROCESS_WORKING_DIR_BASE, workingDir);
        configuration.setString(ClusterOptions.TASK_MANAGER_PROCESS_WORKING_DIR_BASE, workingDir);
        configuration.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, true);
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)TEMPORARY_FOLDER.newFolder(), (Duration)Duration.ofMillis(1000L), (int)1);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(1).build());
        this.cluster.before();
        this.cluster.getMiniCluster().overrideRestoreModeForChangelogStateBackend();
    }

    @After
    public void teardown() {
        this.cluster.after();
    }

    private JobGraph buildJobGraph(StreamExecutionEnvironment env) {
        env.addSource((SourceFunction)new InfiniteIntegerSource()).setParallelism(1).keyBy((KeySelector & Serializable)element -> element).process((KeyedProcessFunction)new ChangelogRecoveryITCaseBase.CountFunction()).addSink((SinkFunction)new ChangelogRecoveryITCaseBase.CollectionSink()).setParallelism(1);
        return env.getStreamGraph().getJobGraph();
    }

    @Test
    public void testRestartTM() throws Exception {
        File checkpointFolder = TEMPORARY_FOLDER.newFolder();
        MiniCluster miniCluster = this.cluster.getMiniCluster();
        StreamExecutionEnvironment env1 = this.getEnv((StateBackend)this.delegatedStateBackend, checkpointFolder, true, 200L, 800L);
        JobGraph firstJobGraph = this.buildJobGraph(env1);
        miniCluster.submitJob(firstJobGraph).get();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)firstJobGraph.getJobID(), (boolean)false);
        CommonTestUtils.waitUntilCondition(() -> !ChangelogRecoveryITCaseBase.getAllStateHandleId(firstJobGraph.getJobID(), miniCluster).isEmpty());
        CompletableFuture terminationFuture = miniCluster.terminateTaskManager(1);
        terminationFuture.get();
        miniCluster.startTaskManager();
        CommonTestUtils.waitForAllTaskRunning(() -> (AccessExecutionGraph)miniCluster.getExecutionGraph(firstJobGraph.getJobID()).get(500L, TimeUnit.SECONDS), (boolean)false);
        miniCluster.triggerCheckpoint(firstJobGraph.getJobID());
    }

    private StreamExecutionEnvironment getEnv(StateBackend stateBackend, File checkpointFile, boolean changelogEnabled, long checkpointInterval, long materializationInterval) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(checkpointInterval);
        env.getCheckpointConfig().enableUnalignedCheckpoints(false);
        env.setStateBackend(stateBackend).setRestartStrategy(RestartStrategies.fixedDelayRestart((int)3, (long)10L));
        env.configure((ReadableConfig)new Configuration().set(CheckpointingOptions.LOCAL_RECOVERY, (Object)true));
        env.getCheckpointConfig().setCheckpointStorage(checkpointFile.toURI());
        env.enableChangelogStateBackend(changelogEnabled);
        env.configure((ReadableConfig)new Configuration().set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, (Object)Duration.ofMillis(materializationInterval)).set(StateChangelogOptions.MATERIALIZATION_MAX_FAILURES_ALLOWED, (Object)1));
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        Configuration configuration = new Configuration();
        configuration.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, 1);
        env.configure((ReadableConfig)configuration);
        return env;
    }
}

