/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.stopwithsavepoint;

import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkCompletableFutureAssert;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TestingCheckpointScheduling;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.TestingSchedulerNG;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandler;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StopWithSavepointTerminationHandlerImplTest {
    private static final Logger log = LoggerFactory.getLogger(StopWithSavepointTerminationHandlerImplTest.class);
    private static final JobID JOB_ID = new JobID();
    private final TestingCheckpointScheduling checkpointScheduling = new TestingCheckpointScheduling(false);

    StopWithSavepointTerminationHandlerImplTest() {
    }

    private StopWithSavepointTerminationHandlerImpl createTestInstanceFailingOnGlobalFailOver() {
        return this.createTestInstance(throwableCausingGlobalFailOver -> {
            throw new AssertionError((Object)"No global failover should be triggered.");
        });
    }

    private StopWithSavepointTerminationHandlerImpl createTestInstance(Consumer<Throwable> handleGlobalFailureConsumer) {
        this.checkpointScheduling.stopCheckpointScheduler();
        TestingSchedulerNG scheduler = TestingSchedulerNG.newBuilder().setHandleGlobalFailureConsumer(handleGlobalFailureConsumer).build();
        return new StopWithSavepointTerminationHandlerImpl(JOB_ID, (SchedulerNG)scheduler, (CheckpointScheduling)this.checkpointScheduling, log);
    }

    @Test
    void testHappyPath() throws ExecutionException, InterruptedException {
        StopWithSavepointTerminationHandlerImpl testInstance = this.createTestInstanceFailingOnGlobalFailOver();
        EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle();
        CompletedCheckpoint completedSavepoint = StopWithSavepointTerminationHandlerImplTest.createCompletedSavepoint(streamStateHandle);
        testInstance.handleSavepointCreation(completedSavepoint, null);
        testInstance.handleExecutionsTermination(Collections.singleton(ExecutionState.FINISHED));
        FlinkAssertions.assertThatFuture((CompletableFuture)testInstance.getSavepointPath()).isCompletedWithValue((Object)completedSavepoint.getExternalPointer());
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)streamStateHandle.isDisposed()).withFailMessage("The savepoint should not have been discarded.", new Object[0])).isFalse();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.checkpointScheduling.isEnabled()).withFailMessage("Checkpoint scheduling should be disabled.", new Object[0])).isFalse();
    }

    @Test
    void testSavepointCreationFailureWithoutExecutionTermination() {
        this.assertSavepointCreationFailure(testInstance -> {});
    }

    @Test
    void testSavepointCreationFailureWithFailingExecutions() {
        this.assertSavepointCreationFailure(testInstance -> testInstance.handleExecutionsTermination(Collections.singletonList(ExecutionState.FAILED)));
    }

    @Test
    void testSavepointCreationFailureWithFinishingExecutions() {
        this.assertSavepointCreationFailure(testInstance -> testInstance.handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED)));
    }

    public void assertSavepointCreationFailure(Consumer<StopWithSavepointTerminationHandler> handleExecutionsTermination) {
        StopWithSavepointTerminationHandlerImpl testInstance = this.createTestInstanceFailingOnGlobalFailOver();
        String expectedErrorMessage = "Expected exception during savepoint creation.";
        testInstance.handleSavepointCreation(null, (Throwable)new Exception("Expected exception during savepoint creation."));
        handleExecutionsTermination.accept((StopWithSavepointTerminationHandler)testInstance);
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            String cfr_ignored_0 = (String)testInstance.getSavepointPath().get();
        }).withFailMessage("An ExecutionException is expected.", new Object[0])).isInstanceOf(Throwable.class)).hasMessageContaining("Expected exception during savepoint creation.");
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.checkpointScheduling.isEnabled()).withFailMessage("Checkpoint scheduling should be enabled.", new Object[0])).isTrue();
    }

    @Test
    void testFailedTerminationHandling() {
        CompletableFuture globalFailOverTriggered = new CompletableFuture();
        StopWithSavepointTerminationHandlerImpl testInstance = this.createTestInstance(globalFailOverTriggered::complete);
        ExecutionState expectedNonFinishedState = ExecutionState.FAILED;
        EmptyStreamStateHandle streamStateHandle = new EmptyStreamStateHandle();
        CompletedCheckpoint completedSavepoint = StopWithSavepointTerminationHandlerImplTest.createCompletedSavepoint(streamStateHandle);
        testInstance.handleSavepointCreation(completedSavepoint, null);
        testInstance.handleExecutionsTermination(Collections.singletonList(expectedNonFinishedState));
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            String cfr_ignored_0 = (String)testInstance.getSavepointPath().get();
        }).withFailMessage("An ExecutionException is expected.", new Object[0])).isInstanceOf(Throwable.class)).hasCauseInstanceOf(StopWithSavepointStoppingException.class);
        ((FlinkCompletableFutureAssert)FlinkAssertions.assertThatFuture(globalFailOverTriggered).withFailMessage("Global fail-over was not triggered.", new Object[0])).isDone();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)streamStateHandle.isDisposed()).withFailMessage("Savepoint should not be discarded.", new Object[0])).isFalse();
    }

    @Test
    void testInvalidExecutionTerminationCall() {
        Assertions.assertThatThrownBy(() -> this.createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination(Collections.singletonList(ExecutionState.FINISHED))).isInstanceOf(UnsupportedOperationException.class);
    }

    @Test
    void testSavepointCreationParameterBothNull() {
        Assertions.assertThatThrownBy(() -> this.createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation(null, null)).isInstanceOf(NullPointerException.class);
    }

    @Test
    void testSavepointCreationParameterBothSet() {
        Assertions.assertThatThrownBy(() -> this.createTestInstanceFailingOnGlobalFailOver().handleSavepointCreation(StopWithSavepointTerminationHandlerImplTest.createCompletedSavepoint(new EmptyStreamStateHandle()), (Throwable)new Exception("No exception should be passed if a savepoint is available."))).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    void testExecutionTerminationWithNull() {
        Assertions.assertThatThrownBy(() -> this.createTestInstanceFailingOnGlobalFailOver().handleExecutionsTermination(null)).isInstanceOf(NullPointerException.class);
    }

    private static CompletedCheckpoint createCompletedSavepoint(StreamStateHandle streamStateHandle) {
        return new CompletedCheckpoint(JOB_ID, 0L, 0L, 0L, new HashMap(), null, CheckpointProperties.forSavepoint((boolean)true, (SavepointFormatType)SavepointFormatType.CANONICAL), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(streamStateHandle, "savepoint-path"), null);
    }
}

