/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.table.fullcache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.connector.source.lookup.cache.InterceptingCacheMetricGroup;
import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.functions.table.fullcache.TestCacheLoader;
import org.apache.flink.table.runtime.functions.table.fullcache.TestManualCacheReloadTrigger;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.CacheLoader;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class LookupFullCacheTest {
    private final TestManualCacheReloadTrigger reloadTrigger = new TestManualCacheReloadTrigger();

    @Test
    void testAddDataAfterLoad() throws Exception {
        RowData key = StreamRecordUtils.row(2);
        List<RowData> newResult = Collections.singletonList(StreamRecordUtils.row(2, "Alex"));
        TestCacheLoader cacheLoader = new TestCacheLoader(cache -> {
            Collection cfr_ignored_0 = cache.put(key, newResult);
        });
        try (LookupFullCache fullCache = this.createAndLoadCache(cacheLoader);){
            Collection result = fullCache.getIfPresent(key);
            Assertions.assertThat((Collection)result).isNotNull();
            Assertions.assertThat((int)result.size()).isEqualTo(0);
            this.reloadTrigger.trigger();
            Assertions.assertThat((int)cacheLoader.getNumLoads()).isEqualTo(2);
            result = fullCache.getIfPresent(key);
            Assertions.assertThat((Collection)result).isEqualTo(newResult);
        }
        Assertions.assertThat((boolean)this.reloadTrigger.isClosed()).isTrue();
        Assertions.assertThat((boolean)cacheLoader.isStopped()).isTrue();
    }

    @Test
    void testUpdateDataAfterLoad() throws Exception {
        RowData key = StreamRecordUtils.row(4);
        List<RowData> newResult = Collections.singletonList(StreamRecordUtils.row(4, "Frank"));
        TestCacheLoader cacheLoader = new TestCacheLoader(cache -> {
            Collection cfr_ignored_0 = cache.put(key, newResult);
        });
        try (LookupFullCache fullCache = this.createAndLoadCache(cacheLoader);){
            Collection result = fullCache.getIfPresent(key);
            Assertions.assertThat((Collection)result).isEqualTo(TestCacheLoader.DATA.get(key));
            this.reloadTrigger.trigger();
            Assertions.assertThat((int)cacheLoader.getNumLoads()).isEqualTo(2);
            result = fullCache.getIfPresent(key);
            Assertions.assertThat((Collection)result).isEqualTo(newResult);
        }
        Assertions.assertThat((boolean)this.reloadTrigger.isClosed()).isTrue();
        Assertions.assertThat((boolean)cacheLoader.isStopped()).isTrue();
    }

    @Test
    void testRemoveDataAfterLoad() throws Exception {
        RowData key = StreamRecordUtils.row(1);
        TestCacheLoader cacheLoader = new TestCacheLoader(cache -> {
            Collection cfr_ignored_0 = (Collection)cache.remove(key);
        });
        try (LookupFullCache fullCache = this.createAndLoadCache(cacheLoader);){
            Collection result = fullCache.getIfPresent(key);
            Assertions.assertThat((Collection)result).isEqualTo(TestCacheLoader.DATA.get(key));
            this.reloadTrigger.trigger();
            Assertions.assertThat((int)cacheLoader.getNumLoads()).isEqualTo(2);
            result = fullCache.getIfPresent(key);
            Assertions.assertThat((Collection)result).isNotNull();
            Assertions.assertThat((int)result.size()).isEqualTo(0);
        }
        Assertions.assertThat((boolean)this.reloadTrigger.isClosed()).isTrue();
        Assertions.assertThat((boolean)cacheLoader.isStopped()).isTrue();
    }

    @Test
    void testExceptionDuringReload() throws Exception {
        RuntimeException exception = new RuntimeException("Reload failed.");
        TestCacheLoader cacheLoader = new TestCacheLoader(cache -> {
            throw exception;
        });
        try (LookupFullCache fullCache = this.createAndLoadCache(cacheLoader);){
            this.reloadTrigger.trigger();
            Assertions.assertThat((boolean)cacheLoader.isStopped()).isTrue();
            Assertions.assertThat((int)cacheLoader.getNumLoads()).isEqualTo(2);
            Assertions.assertThatThrownBy(() -> fullCache.getIfPresent(StreamRecordUtils.row(1))).hasRootCause((Throwable)exception);
            this.reloadTrigger.trigger();
            Assertions.assertThat((int)cacheLoader.getNumLoads()).isEqualTo(2);
        }
        Assertions.assertThat((boolean)this.reloadTrigger.isClosed()).isTrue();
        Assertions.assertThat((boolean)cacheLoader.isStopped()).isTrue();
    }

    @Test
    void testUnsupportedOperations() {
        TestCacheLoader cacheLoader = new TestCacheLoader(cache -> {});
        LookupFullCache fullCache = new LookupFullCache((CacheLoader)cacheLoader, (CacheReloadTrigger)this.reloadTrigger);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> fullCache.invalidate(StreamRecordUtils.row(1))).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("invalidate");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> fullCache.put(StreamRecordUtils.row(1), Collections.singletonList(StreamRecordUtils.row(1, "Julian")))).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("put");
    }

    @Test
    void testCacheMetrics() throws Exception {
        TestCacheLoader cacheLoader = new TestCacheLoader(cache -> {});
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        LookupFullCache fullCache = this.createAndLoadCache(cacheLoader, (CacheMetricGroup)metricGroup);
        Assertions.assertThat((Object)metricGroup.hitCounter).isNotNull();
        Assertions.assertThat((long)metricGroup.hitCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat((Object)metricGroup.missCounter).isNotNull();
        Assertions.assertThat((long)metricGroup.missCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat((Collection)fullCache.getIfPresent(StreamRecordUtils.row(1))).isNotEmpty();
        Assertions.assertThat((long)metricGroup.hitCounter.getCount()).isEqualTo(1L);
        Assertions.assertThat((long)metricGroup.missCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat((Collection)fullCache.getIfPresent(StreamRecordUtils.row(2))).isEmpty();
        Assertions.assertThat((long)metricGroup.hitCounter.getCount()).isEqualTo(2L);
        Assertions.assertThat((long)metricGroup.missCounter.getCount()).isEqualTo(0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentAccess() throws Exception {
        int concurrency = 4;
        ExecutorService executor = Executors.newFixedThreadPool(concurrency);
        TestCacheLoader cacheLoader = new TestCacheLoader(cache -> {});
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        try (LookupFullCache cache2 = new LookupFullCache((CacheLoader)cacheLoader, (CacheReloadTrigger)this.reloadTrigger);){
            int i;
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
            for (i = 0; i < concurrency; ++i) {
                CompletableFuture<Void> future = this.runAsync(ThrowingRunnable.unchecked(() -> {
                    cache2.open((CacheMetricGroup)metricGroup);
                    cache2.open(new Configuration());
                }), executor);
                futures.add(future);
            }
            FutureUtils.waitForAll(futures).get();
            futures.clear();
            for (i = 0; i < concurrency; ++i) {
                CompletableFuture<Void> hitFuture = this.runAsync(() -> {
                    RowData key = StreamRecordUtils.row(1);
                    cache2.open((CacheMetricGroup)metricGroup);
                    Assertions.assertThat((Collection)cache2.getIfPresent(key)).isEqualTo(TestCacheLoader.DATA.get(key));
                }, executor);
                futures.add(hitFuture);
            }
            FutureUtils.waitForAll(futures).get();
            Assertions.assertThat((long)metricGroup.hitCounter.getCount()).isEqualTo((long)concurrency);
            Assertions.assertThat((long)metricGroup.missCounter.getCount()).isZero();
        }
        finally {
            executor.shutdownNow();
        }
    }

    private LookupFullCache createAndLoadCache(TestCacheLoader cacheLoader) throws Exception {
        return this.createAndLoadCache(cacheLoader, UnregisteredMetricsGroup.createCacheMetricGroup());
    }

    private LookupFullCache createAndLoadCache(TestCacheLoader cacheLoader, CacheMetricGroup metricGroup) throws Exception {
        LookupFullCache fullCache = new LookupFullCache((CacheLoader)cacheLoader, (CacheReloadTrigger)this.reloadTrigger);
        fullCache.open(metricGroup);
        Assertions.assertThat((boolean)cacheLoader.isAwaitTriggered()).isFalse();
        Assertions.assertThat((int)cacheLoader.getNumLoads()).isZero();
        fullCache.open(new Configuration());
        Assertions.assertThat((boolean)cacheLoader.isAwaitTriggered()).isTrue();
        Assertions.assertThat((int)cacheLoader.getNumLoads()).isEqualTo(1);
        Assertions.assertThat((Map)cacheLoader.getCache()).isEqualTo(TestCacheLoader.DATA);
        return fullCache;
    }

    private CompletableFuture<Void> runAsync(Runnable runnable, ExecutorService executor) {
        return CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> {
            Thread.sleep(ThreadLocalRandom.current().nextLong(0L, 10L));
            runnable.run();
        }), executor);
    }
}

