/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.messages.TaskThreadInfoResponse;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class ThreadInfoRequestCoordinatorTest
extends TestLogger {
    private static final Duration REQUEST_TIMEOUT = Duration.ofMillis(100L);
    private static final String REQUEST_TIMEOUT_MESSAGE = "Request timeout.";
    private static final int DEFAULT_NUMBER_OF_SAMPLES = 1;
    private static final int DEFAULT_MAX_STACK_TRACE_DEPTH = 100;
    private static final Duration DEFAULT_DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50L);
    private static ScheduledExecutorService executorService;
    private ThreadInfoRequestCoordinator coordinator;
    @Rule
    public Timeout caseTimeout = new Timeout(10L, TimeUnit.SECONDS);

    @BeforeClass
    public static void setUp() throws Exception {
        executorService = new ScheduledThreadPoolExecutor(1);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (executorService != null) {
            executorService.shutdown();
        }
    }

    @Before
    public void initCoordinator() throws Exception {
        this.coordinator = new ThreadInfoRequestCoordinator((Executor)executorService, REQUEST_TIMEOUT);
    }

    @After
    public void shutdownCoordinator() throws Exception {
        if (this.coordinator != null) {
            Assert.assertEquals((long)0L, (long)this.coordinator.getNumberOfPendingRequests());
            this.coordinator.shutDown();
        }
    }

    @Test
    public void testSuccessfulThreadInfoRequest() throws Exception {
        Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.SUCCESSFULLY);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        JobVertexThreadInfoStats threadInfoStats = (JobVertexThreadInfoStats)requestFuture.get();
        Assert.assertEquals((long)0L, (long)threadInfoStats.getRequestId());
        Map samplesBySubtask = threadInfoStats.getSamplesBySubtask();
        for (List result : samplesBySubtask.values()) {
            MatcherAssert.assertThat((Object)((ThreadInfoSample)result.get(0)).getStackTrace(), (Matcher)Matchers.not((Matcher)Matchers.emptyArray()));
        }
    }

    @Test
    public void testThreadInfoRequestWithException() throws Exception {
        Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.EXCEPTIONALLY);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        try {
            requestFuture.get();
            Assert.fail((String)"Exception expected.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)(e.getCause() instanceof RuntimeException));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testThreadInfoRequestTimeout() throws Exception {
        Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
        CompletableFuture requestFuture = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        try {
            requestFuture.get();
            Assert.fail((String)"Exception expected.");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)REQUEST_TIMEOUT_MESSAGE).isPresent());
        }
        finally {
            this.coordinator.shutDown();
        }
    }

    @Test
    public void testShutDown() throws Exception {
        Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> executionWithGateways = ThreadInfoRequestCoordinatorTest.createMockSubtaskWithGateways(CompletionType.SUCCESSFULLY, CompletionType.TIMEOUT);
        ArrayList<CompletableFuture> requestFutures = new ArrayList<CompletableFuture>();
        CompletableFuture requestFuture1 = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        CompletableFuture requestFuture2 = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        requestFutures.add(requestFuture1);
        requestFutures.add(requestFuture2);
        for (CompletableFuture future : requestFutures) {
            Assert.assertFalse((boolean)future.isDone());
        }
        this.coordinator.shutDown();
        for (CompletableFuture future : requestFutures) {
            Assert.assertTrue((boolean)future.isCompletedExceptionally());
        }
        CompletableFuture future = this.coordinator.triggerThreadInfoRequest(executionWithGateways, 1, DEFAULT_DELAY_BETWEEN_SAMPLES, 100);
        Assert.assertTrue((boolean)future.isCompletedExceptionally());
    }

    private static CompletableFuture<TaskExecutorThreadInfoGateway> createMockTaskManagerGateway(CompletionType completionType) {
        CompletableFuture<TaskThreadInfoResponse> responseFuture = new CompletableFuture<TaskThreadInfoResponse>();
        switch (completionType) {
            case SUCCESSFULLY: {
                ThreadInfoSample sample = (ThreadInfoSample)JvmUtils.createThreadInfoSample((long)Thread.currentThread().getId(), (int)100).get();
                responseFuture.complete(new TaskThreadInfoResponse(Collections.singletonList(sample)));
                break;
            }
            case EXCEPTIONALLY: {
                responseFuture.completeExceptionally(new RuntimeException("Request failed."));
                break;
            }
            case TIMEOUT: {
                executorService.schedule(() -> responseFuture.completeExceptionally(new TimeoutException(REQUEST_TIMEOUT_MESSAGE)), REQUEST_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                break;
            }
            case NEVER_COMPLETE: {
                break;
            }
            default: {
                throw new RuntimeException("Unknown completion type.");
            }
        }
        TaskExecutorThreadInfoGateway executorGateway = (taskExecutionAttemptId, requestParams, timeout) -> responseFuture;
        return CompletableFuture.completedFuture(executorGateway);
    }

    private static Map<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> createMockSubtaskWithGateways(CompletionType ... completionTypes) {
        HashMap<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>> result = new HashMap<ExecutionAttemptID, CompletableFuture<TaskExecutorThreadInfoGateway>>();
        for (CompletionType completionType : completionTypes) {
            result.put(new ExecutionAttemptID(), ThreadInfoRequestCoordinatorTest.createMockTaskManagerGateway(completionType));
        }
        return result;
    }

    private static enum CompletionType {
        SUCCESSFULLY,
        EXCEPTIONALLY,
        TIMEOUT,
        NEVER_COMPLETE;

    }
}

