/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.MemoryTracker;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class MemoryTrackerTest {
    @Test
    public void testLeaseReclaim() {
        MockTime time = new MockTime();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        MemoryTracker.MemoryLease lease = pool.newLease(ctx, 1024L);
        Assertions.assertEquals((long)pool.leased(), (long)lease.leased());
        lease.release();
        Assertions.assertEquals((long)pool.leased(), (long)0L);
        pool.close();
    }

    @Test
    public void testTryLease() {
        MockTime time = new MockTime();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        MemoryTracker.MemoryLease lease = pool.newLease(ctx, 1024L);
        Assertions.assertFalse((boolean)pool.tryLease(1024L).isPresent());
        lease.release();
        Assertions.assertTrue((boolean)pool.tryLease(1024L).isPresent());
        pool.close();
    }

    @Test
    public void testTryLeaseBurst() {
        MockTime time = new MockTime();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        pool.newLease(ctx, 5120L);
        Assertions.assertFalse((boolean)pool.tryLease(1L).isPresent());
        pool.close();
    }

    @Test
    public void testLeaseExtend() {
        MockTime time = new MockTime();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        MemoryTracker.MemoryLease lease = pool.newLease(ctx, 512L);
        Assertions.assertTrue((boolean)lease.tryExtendLease(512L));
        Assertions.assertFalse((boolean)lease.tryExtendLease(512L));
        lease.release();
        Assertions.assertEquals((long)pool.leased(), (long)0L);
        pool.close();
    }

    @Test
    public void testCancelledNewLeaseClaimsNothing() throws InterruptedException {
        MockTime time = new MockTime();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        pool.newLease(CancellationContext.newContext(), 1024L);
        Thread blocked = new Thread(() -> pool.newLease(ctx, 1024L));
        ctx.cancel();
        pool.wakeup();
        blocked.join();
        Assertions.assertEquals((long)pool.leased(), (long)1024L, (String)"expected no additional memory to be taken from the pool");
    }

    @Test
    public void testReclaimedLeaseUnblocksWaiter() throws InterruptedException {
        MockTime time = new MockTime();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        MemoryTracker.MemoryLease oldLease = pool.newLease(CancellationContext.newContext(), 1024L);
        Thread blocked = new Thread(() -> pool.newLease(ctx, 1024L));
        oldLease.release();
        blocked.join(1000L);
    }

    private boolean futureDone(Future<?> future, long timeout, TimeUnit unit) {
        try {
            future.get(timeout, unit);
            return true;
        }
        catch (Exception e) {
            return false;
        }
    }

    @Test
    public void testPoolSizeZeroIsUnrestricted() {
        MockTime time = new MockTime();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 0L);
        Assertions.assertTrue((boolean)pool.isDisabled());
        Optional lease1 = pool.tryLease(1024L);
        Assertions.assertTrue((boolean)lease1.isPresent());
        Optional lease2 = pool.tryLease(1024L);
        Assertions.assertTrue((boolean)lease2.isPresent());
        Future<?> fut = executor.submit(() -> {
            MemoryTracker.MemoryLease memoryLease2 = pool.newLease(ctx, 1024L);
            memoryLease2.release();
        });
        Assertions.assertTrue((boolean)this.futureDone(fut, 5L, TimeUnit.SECONDS), (String)"expected MemoryTracker::newLease not to block");
        Assertions.assertEquals((long)2048L, (long)pool.leased());
        lease1.ifPresent(MemoryTracker.MemoryLease::release);
        Assertions.assertEquals((long)1024L, (long)pool.leased());
        lease2.ifPresent(MemoryTracker.MemoryLease::release);
        Assertions.assertEquals((long)0L, (long)pool.leased());
    }

    @Test
    public void testChangingPoolSizeDynamicallyWakesBlockedRequests() {
        MockTime time = new MockTime();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        pool.newLease(ctx, 1024L);
        Future<?> fut = executor.submit(() -> {
            MemoryTracker.MemoryLease memoryLease2 = pool.newLease(ctx, 1024L);
            memoryLease2.release();
        });
        Assertions.assertFalse((boolean)this.futureDone(fut, 5L, TimeUnit.SECONDS), (String)"memory acquisition should be blocked");
        pool.setPoolSize(0L);
        Assertions.assertTrue((boolean)this.futureDone(fut, 5L, TimeUnit.SECONDS), (String)"expected setting the pool size to 0 would unblock memory acquisition");
    }

    @Test
    public void testChangingPoolSizeDynamically() {
        MockTime time = new MockTime();
        MemoryTracker pool = new MemoryTracker((Time)time, 1024L);
        Optional lease1 = pool.tryLease(1024L);
        Assertions.assertTrue((boolean)lease1.isPresent());
        Optional lease2 = pool.tryLease(1024L);
        Assertions.assertFalse((boolean)lease2.isPresent());
        pool.setPoolSize(2048L);
        Optional lease3 = pool.tryLease(1024L);
        Assertions.assertTrue((boolean)lease3.isPresent());
        Optional lease4 = pool.tryLease(1024L);
        Assertions.assertFalse((boolean)lease4.isPresent());
    }

    @Test
    public void testOomTimeSensor() throws InterruptedException, ExecutionException, TimeoutException {
        MockTime time = new MockTime();
        Metrics metrics = new Metrics((Time)time);
        MemoryTracker pool = new MemoryTracker((Time)time, metrics, 1024L);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        KafkaMetric depletedTime = metrics.metric(pool.memoryTrackerDepletedTimeMetricName);
        KafkaMetric depletedPercent = metrics.metric(pool.memoryTrackerDepletedPercentMetricName);
        CancellationContext ctx = CancellationContext.newContext();
        MemoryTracker.MemoryLease lease1 = pool.newLease(ctx, 1024L);
        Future<?> fut = executor.submit(() -> {
            MemoryTracker.MemoryLease lease2 = pool.newLease(ctx, 1024L);
            lease2.release();
        });
        Thread.sleep(500L);
        time.sleep(10000L);
        lease1.release();
        fut.get();
        Assertions.assertEquals((double)((Double)depletedTime.metricValue()), (double)10000.0, (double)0.0, (String)"expected 10 seconds of blocked time");
        Assertions.assertTrue(((Double)depletedPercent.metricValue() > 0.0 ? 1 : 0) != 0, (String)"expected a nonzero amount of blocked time");
    }
}

