/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.AbstractDoubleAssert;
import org.assertj.core.api.Assertions;

public class SourceStreamTaskTestBase {
    public void testMetrics(FunctionWithException<Environment, ? extends StreamTask<Integer, ?>, Exception> taskFactory, StreamOperatorFactory<?> operatorFactory, Consumer<AbstractDoubleAssert<?>> busyTimeMatcher) throws Exception {
        long sleepTime = 42L;
        StreamTaskMailboxTestHarnessBuilder builder = new StreamTaskMailboxTestHarnessBuilder(taskFactory, BasicTypeInfo.INT_TYPE_INFO);
        ConcurrentHashMap<String, Metric> metrics = new ConcurrentHashMap<String, Metric>();
        TaskMetricGroup taskMetricGroup = StreamTaskTestHarness.createTaskMetricGroup(metrics);
        try (StreamTaskMailboxTestHarness harness = builder.setupOutputForSingletonOperatorChain(operatorFactory).setTaskMetricGroup(taskMetricGroup).build();){
            CompletableFuture triggerFuture = harness.streamTask.triggerCheckpointAsync(new CheckpointMetaData(1L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation());
            OneShotLatch checkpointAcknowledgeLatch = new OneShotLatch();
            harness.getCheckpointResponder().setAcknowledgeLatch(checkpointAcknowledgeLatch);
            Assertions.assertThat((Future)triggerFuture).isNotDone();
            Thread.sleep(sleepTime);
            while (!triggerFuture.isDone()) {
                harness.streamTask.runMailboxStep();
            }
            Gauge checkpointStartDelayGauge = (Gauge)metrics.get("checkpointStartDelayNanos");
            Assertions.assertThat((Long)((Long)checkpointStartDelayGauge.getValue())).isGreaterThanOrEqualTo(sleepTime * 1000000L);
            Gauge busyTimeGauge = (Gauge)metrics.get("busyTimeMsPerSecond");
            busyTimeMatcher.accept(Assertions.assertThat((Double)((Double)busyTimeGauge.getValue())));
            checkpointAcknowledgeLatch.await();
            TestCheckpointResponder.AcknowledgeReport acknowledgeReport = (TestCheckpointResponder.AcknowledgeReport)Iterables.getOnlyElement((Iterable)harness.getCheckpointResponder().getAcknowledgeReports());
            Assertions.assertThat((long)acknowledgeReport.getCheckpointMetrics().getCheckpointStartDelayNanos()).isGreaterThanOrEqualTo(sleepTime * 1000000L);
        }
    }
}

