/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor.threadinfo;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.messages.ThreadInfoSample;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.util.JvmUtils;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoStats;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTracker;
import org.apache.flink.runtime.webmonitor.threadinfo.JobVertexThreadInfoTrackerBuilder;
import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoRequestCoordinator;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalListener;
import org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

public class JobVertexThreadInfoTrackerTest
extends TestLogger {
    private static final int REQUEST_ID = 0;
    private static final ExecutionJobVertex EXECUTION_JOB_VERTEX = JobVertexThreadInfoTrackerTest.createExecutionJobVertex();
    private static final ExecutionVertex[] TASK_VERTICES = EXECUTION_JOB_VERTEX.getTaskVertices();
    private static final JobID JOB_ID = new JobID();
    private static ThreadInfoSample threadInfoSample;
    private static JobVertexThreadInfoStats threadInfoStatsDefaultSample;
    private static final Duration CLEAN_UP_INTERVAL;
    private static final Duration STATS_REFRESH_INTERVAL;
    private static final Duration TIME_GAP;
    private static final Duration SMALL_TIME_GAP;
    private static final Duration REQUEST_TIMEOUT;
    private static final int NUMBER_OF_SAMPLES = 1;
    private static final int MAX_STACK_TRACE_DEPTH = 100;
    private static final Duration DELAY_BETWEEN_SAMPLES;
    private static ScheduledExecutorService executor;

    @BeforeAll
    public static void setUp() {
        threadInfoSample = (ThreadInfoSample)JvmUtils.createThreadInfoSample((long)Thread.currentThread().getId(), (int)100).get();
        threadInfoStatsDefaultSample = JobVertexThreadInfoTrackerTest.createThreadInfoStats(0, SMALL_TIME_GAP, Collections.singletonList(threadInfoSample));
        executor = Executors.newScheduledThreadPool(1);
    }

    @AfterAll
    public static void tearDown() {
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    @Test
    public void testGetThreadInfoStats() throws Exception {
        this.doInitialRequestAndVerifyResult(this.createThreadInfoTracker());
    }

    @Test
    public void testCachedStatsNotUpdatedWithinRefreshInterval() throws Exception {
        JobVertexThreadInfoStats unusedThreadInfoStats = JobVertexThreadInfoTrackerTest.createThreadInfoStats(1, TIME_GAP, null);
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample, unusedThreadInfoStats);
        this.doInitialRequestAndVerifyResult(tracker);
        Optional result = tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX);
        AssertionsForClassTypes.assertThat((Object)threadInfoStatsDefaultSample).isEqualTo(result.get());
    }

    @Test
    public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
        Duration shortRefreshInterval = Duration.ofMillis(1L);
        JobVertexThreadInfoStats initialThreadInfoStats = JobVertexThreadInfoTrackerTest.createThreadInfoStats(Instant.now().minus(10L, ChronoUnit.SECONDS), 0, Duration.ofMillis(5L), Collections.singletonList(threadInfoSample));
        JobVertexThreadInfoStats threadInfoStatsAfterRefresh = JobVertexThreadInfoTrackerTest.createThreadInfoStats(1, TIME_GAP, Collections.singletonList(threadInfoSample));
        CountDownLatch cacheRefreshed = new CountDownLatch(1);
        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache = this.createCache(CLEAN_UP_INTERVAL, new LatchRemovalListener<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>(cacheRefreshed));
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker(CLEAN_UP_INTERVAL, shortRefreshInterval, vertexStatsCache, initialThreadInfoStats, threadInfoStatsAfterRefresh);
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        tracker.getResultAvailableFuture().get();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(initialThreadInfoStats, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
        cacheRefreshed.await();
        Optional result = tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX);
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
    }

    @Test
    public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
        Duration shortCleanUpInterval = Duration.ofMillis(1L);
        CountDownLatch cacheExpired = new CountDownLatch(1);
        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache = this.createCache(shortCleanUpInterval, new LatchRemovalListener<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats>(cacheExpired));
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker(shortCleanUpInterval, STATS_REFRESH_INTERVAL, vertexStatsCache, threadInfoStatsDefaultSample);
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        cacheExpired.await();
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
    }

    @Test
    public void testCachedStatsNotCleanedWithinCleanupInterval() throws Exception {
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.cleanUpVertexStatsCache();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsDefaultSample, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
    }

    @Test
    public void testShutDown() throws Exception {
        JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker = this.createThreadInfoTracker();
        this.doInitialRequestAndVerifyResult(tracker);
        tracker.shutDown();
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
    }

    private Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> createCache(Duration cleanUpInterval, RemovalListener<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> removalListener) {
        return CacheBuilder.newBuilder().concurrencyLevel(1).expireAfterAccess(cleanUpInterval.toMillis(), TimeUnit.MILLISECONDS).removalListener(removalListener).build();
    }

    private void doInitialRequestAndVerifyResult(JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker) throws InterruptedException, ExecutionException {
        AssertionsForClassTypes.assertThat((Optional)tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX)).isNotPresent();
        tracker.getResultAvailableFuture().get();
        JobVertexThreadInfoTrackerTest.assertExpectedEqualsReceived(threadInfoStatsDefaultSample, tracker.getVertexStats(JOB_ID, (AccessExecutionJobVertex)EXECUTION_JOB_VERTEX));
    }

    private static void assertExpectedEqualsReceived(JobVertexThreadInfoStats expected, Optional<JobVertexThreadInfoStats> receivedOptional) {
        AssertionsForClassTypes.assertThat(receivedOptional).isPresent();
        JobVertexThreadInfoStats received = receivedOptional.get();
        AssertionsForClassTypes.assertThat((int)expected.getRequestId()).isEqualTo(received.getRequestId());
        AssertionsForClassTypes.assertThat((long)expected.getEndTime()).isEqualTo(received.getEndTime());
        AssertionsForClassTypes.assertThat((int)TASK_VERTICES.length).isEqualTo(received.getNumberOfSubtasks());
        for (Collection samples : received.getSamplesBySubtask().values()) {
            AssertionsForClassTypes.assertThat((boolean)samples.isEmpty()).isFalse();
        }
    }

    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker() {
        return this.createThreadInfoTracker(STATS_REFRESH_INTERVAL, threadInfoStatsDefaultSample);
    }

    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker(Duration statsRefreshInterval, JobVertexThreadInfoStats ... stats) {
        return this.createThreadInfoTracker(CLEAN_UP_INTERVAL, statsRefreshInterval, null, stats);
    }

    private JobVertexThreadInfoTracker<JobVertexThreadInfoStats> createThreadInfoTracker(Duration cleanUpInterval, Duration statsRefreshInterval, Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> vertexStatsCache, JobVertexThreadInfoStats ... stats) {
        TestingThreadInfoRequestCoordinator coordinator = new TestingThreadInfoRequestCoordinator(Runnable::run, REQUEST_TIMEOUT, stats);
        return JobVertexThreadInfoTrackerBuilder.newBuilder(JobVertexThreadInfoTrackerTest::createMockResourceManagerGateway, Function.identity(), (ScheduledExecutorService)executor, (Time)TestingUtils.TIMEOUT).setCoordinator((ThreadInfoRequestCoordinator)coordinator).setCleanUpInterval(cleanUpInterval).setNumSamples(1).setStatsRefreshInterval(statsRefreshInterval).setDelayBetweenSamples(DELAY_BETWEEN_SAMPLES).setMaxThreadInfoDepth(100).setVertexStatsCache(vertexStatsCache).build();
    }

    private static JobVertexThreadInfoStats createThreadInfoStats(int requestId, Duration timeGap, List<ThreadInfoSample> threadInfoSamples) {
        return JobVertexThreadInfoTrackerTest.createThreadInfoStats(Instant.now(), requestId, timeGap, threadInfoSamples);
    }

    private static JobVertexThreadInfoStats createThreadInfoStats(Instant startTime, int requestId, Duration timeGap, List<ThreadInfoSample> threadInfoSamples) {
        Instant endTime = startTime.plus(timeGap);
        HashMap<ImmutableSet, List<ThreadInfoSample>> threadInfoRatiosByTask = new HashMap<ImmutableSet, List<ThreadInfoSample>>();
        for (ExecutionVertex vertex : TASK_VERTICES) {
            HashSet<ExecutionAttemptID> attemptIds = new HashSet<ExecutionAttemptID>();
            attemptIds.add(vertex.getCurrentExecutionAttempt().getAttemptId());
            threadInfoRatiosByTask.put(ImmutableSet.copyOf(attemptIds), threadInfoSamples);
        }
        return new JobVertexThreadInfoStats(requestId, startTime.toEpochMilli(), endTime.toEpochMilli(), threadInfoRatiosByTask);
    }

    private static ExecutionJobVertex createExecutionJobVertex() {
        try {
            JobVertex jobVertex = new JobVertex("testVertex");
            jobVertex.setInvokableClass(AbstractInvokable.class);
            return ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex);
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create ExecutionJobVertex.");
        }
    }

    private static CompletableFuture<ResourceManagerGateway> createMockResourceManagerGateway() {
        Function<ResourceID, CompletableFuture<TaskExecutorThreadInfoGateway>> function = resourceID -> CompletableFuture.completedFuture(null);
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestTaskExecutorGatewayFunction(function);
        return CompletableFuture.completedFuture(testingResourceManagerGateway);
    }

    static {
        CLEAN_UP_INTERVAL = Duration.ofSeconds(60L);
        STATS_REFRESH_INTERVAL = Duration.ofSeconds(60L);
        TIME_GAP = Duration.ofSeconds(60L);
        SMALL_TIME_GAP = Duration.ofMillis(1L);
        REQUEST_TIMEOUT = Duration.ofSeconds(10L);
        DELAY_BETWEEN_SAMPLES = Duration.ofMillis(50L);
    }

    private static class LatchRemovalListener<K, V>
    implements RemovalListener<K, V> {
        private final CountDownLatch latch;

        private LatchRemovalListener(CountDownLatch latch) {
            this.latch = latch;
        }

        public void onRemoval(@Nonnull RemovalNotification<K, V> removalNotification) {
            this.latch.countDown();
        }
    }

    private static class TestingThreadInfoRequestCoordinator
    extends ThreadInfoRequestCoordinator {
        private final JobVertexThreadInfoStats[] jobVertexThreadInfoStats;
        private int counter = 0;

        TestingThreadInfoRequestCoordinator(Executor executor, Duration requestTimeout, JobVertexThreadInfoStats ... jobVertexThreadInfoStats) {
            super(executor, requestTimeout);
            this.jobVertexThreadInfoStats = jobVertexThreadInfoStats;
        }

        public CompletableFuture<JobVertexThreadInfoStats> triggerThreadInfoRequest(Map<ImmutableSet<ExecutionAttemptID>, CompletableFuture<TaskExecutorThreadInfoGateway>> executionsWithGateways, int ignored2, Duration ignored3, int ignored4) {
            return CompletableFuture.completedFuture(this.jobVertexThreadInfoStats[this.counter++ % this.jobVertexThreadInfoStats.length]);
        }
    }
}

