/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.table.fullcache.inputformat;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.source.lookup.cache.InterceptingCacheMetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.functions.table.fullcache.TestCacheLoader;
import org.apache.flink.table.runtime.functions.table.fullcache.inputformat.FullCacheTestInputFormat;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.inputformat.InputFormatCacheLoader;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.generated.Projection;
import org.apache.flink.table.runtime.keyselector.GenericRowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class InputFormatCacheLoaderTest {
    InputFormatCacheLoaderTest() {
    }

    @BeforeEach
    void resetCounter() {
        FullCacheTestInputFormat.OPEN_CLOSED_COUNTER.set(0);
    }

    @AfterEach
    void checkCounter() {
        Assertions.assertThat((AtomicInteger)FullCacheTestInputFormat.OPEN_CLOSED_COUNTER).hasValue(0);
    }

    @ParameterizedTest
    @MethodSource(value={"deltaNumSplits"})
    void testReadWithDifferentSplits(int deltaNumSplits) throws Exception {
        InputFormatCacheLoader cacheLoader = this.createCacheLoader(deltaNumSplits);
        cacheLoader.open(UnregisteredMetricsGroup.createCacheMetricGroup());
        cacheLoader.run();
        ConcurrentHashMap cache = cacheLoader.getCache();
        this.assertCacheContent(cache);
        cacheLoader.run();
        Assertions.assertThat((Map)cacheLoader.getCache()).isNotSameAs((Object)cache);
        cacheLoader.close();
        Assertions.assertThat((int)cacheLoader.getCache().size()).isZero();
    }

    @Test
    void testCacheMetrics() throws Exception {
        InputFormatCacheLoader cacheLoader = this.createCacheLoader(0);
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        cacheLoader.open((CacheMetricGroup)metricGroup);
        Assertions.assertThat((Object)metricGroup.loadCounter).isNotNull();
        Assertions.assertThat((long)metricGroup.loadCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat((Object)metricGroup.numLoadFailuresCounter).isNotNull();
        Assertions.assertThat((long)metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat((Object)metricGroup.numCachedRecordsGauge).isNotNull();
        Assertions.assertThat((Long)((Long)metricGroup.numCachedRecordsGauge.getValue())).isEqualTo(0L);
        Assertions.assertThat((Object)metricGroup.latestLoadTimeGauge).isNotNull();
        Assertions.assertThat((Long)((Long)metricGroup.latestLoadTimeGauge.getValue())).isEqualTo(-1L);
        Assertions.assertThat((Object)metricGroup.hitCounter).isNull();
        Assertions.assertThat((Object)metricGroup.missCounter).isNull();
        Assertions.assertThat((Object)metricGroup.numCachedBytesGauge).isNull();
        cacheLoader.run();
        Assertions.assertThat((long)metricGroup.loadCounter.getCount()).isEqualTo(1L);
        Assertions.assertThat((Long)((Long)metricGroup.latestLoadTimeGauge.getValue())).isNotEqualTo(-1L);
        Assertions.assertThat((Long)((Long)metricGroup.numCachedRecordsGauge.getValue())).isEqualTo((long)TestCacheLoader.DATA.size());
    }

    @Test
    void testExceptionDuringReload() throws Exception {
        RuntimeException exception = new RuntimeException("Load failed.");
        Runnable reloadAction = () -> {
            throw exception;
        };
        InputFormatCacheLoader cacheLoader = this.createCacheLoader(0, reloadAction);
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        cacheLoader.open((CacheMetricGroup)metricGroup);
        Assertions.assertThatThrownBy(() -> ((InputFormatCacheLoader)cacheLoader).run()).hasRootCause((Throwable)exception);
        Assertions.assertThat((long)metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(1L);
    }

    @Test
    void testCloseAndInterruptDuringReload() throws Exception {
        AtomicInteger sleepCounter = new AtomicInteger(0);
        int totalSleepCount = TestCacheLoader.DATA.size() + 1;
        Runnable reloadAction = ThrowingRunnable.unchecked(() -> {
            sleepCounter.incrementAndGet();
            Thread.sleep(1000L);
        });
        InputFormatCacheLoader cacheLoader = this.createCacheLoader(0, reloadAction);
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        cacheLoader.open((CacheMetricGroup)metricGroup);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> future = executorService.submit((Runnable)cacheLoader);
        executorService.shutdownNow();
        Assertions.assertThatNoException().isThrownBy(future::get);
        Assertions.assertThat((AtomicInteger)sleepCounter).hasValueLessThan(totalSleepCount);
        Assertions.assertThat((long)metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
        sleepCounter.set(0);
        executorService = Executors.newSingleThreadExecutor();
        future = executorService.submit((Runnable)cacheLoader);
        cacheLoader.close();
        Assertions.assertThatNoException().isThrownBy(future::get);
        Assertions.assertThat((AtomicInteger)sleepCounter).hasValueLessThan(totalSleepCount);
        Assertions.assertThat((long)metricGroup.numLoadFailuresCounter.getCount()).isEqualTo(0L);
    }

    static Stream<Arguments> deltaNumSplits() {
        return Stream.of(Arguments.of((Object[])new Object[]{-1}), Arguments.of((Object[])new Object[]{0}), Arguments.of((Object[])new Object[]{1}));
    }

    private void assertCacheContent(Map<RowData, Collection<RowData>> actual) {
        Assertions.assertThat(actual).containsOnlyKeys(TestCacheLoader.DATA.keySet());
        TestCacheLoader.DATA.forEach((key, rows) -> {
            AbstractCollectionAssert cfr_ignored_0 = (AbstractCollectionAssert)Assertions.assertThat((Collection)rows).containsExactlyInAnyOrderElementsOf((Iterable)actual.get(key));
        });
    }

    private InputFormatCacheLoader createCacheLoader(int deltaNumSplits) throws Exception {
        return this.createCacheLoader(deltaNumSplits, () -> {});
    }

    private InputFormatCacheLoader createCacheLoader(int deltaNumSplits, final Runnable reloadAction) throws Exception {
        DataType rightRowDataType = DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)((DataType)DataTypes.STRING().bridgedTo(String.class)))});
        RowDataSerializer rightRowSerializer = (RowDataSerializer)InternalSerializers.create((LogicalType)rightRowDataType.getLogicalType());
        DataType[] dataTypes = rightRowDataType.getChildren().toArray(new DataType[0]);
        DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(dataTypes);
        Collection dataRows = TestCacheLoader.DATA.values().stream().map(Collection::stream).reduce(Stream.empty(), Stream::concat).map(arg_0 -> ((DataFormatConverters.RowConverter)converter).toExternal(arg_0)).collect(Collectors.toList());
        FullCacheTestInputFormat inputFormat = new FullCacheTestInputFormat(dataRows, Optional.empty(), converter, deltaNumSplits);
        RowType keyType = (RowType)DataTypes.ROW((DataType[])new DataType[]{DataTypes.INT()}).getLogicalType();
        GeneratedProjection generatedProjection = new GeneratedProjection("", "", new Object[0]){

            public Projection newInstance(ClassLoader classLoader) {
                return row -> {
                    reloadAction.run();
                    return StreamRecordUtils.row(row.getInt(0));
                };
            }
        };
        GenericRowDataKeySelector keySelector = new GenericRowDataKeySelector(InternalTypeInfo.of((RowType)keyType), InternalSerializers.create((RowType)keyType), generatedProjection);
        InputFormatCacheLoader cacheLoader = new InputFormatCacheLoader((InputFormat)inputFormat, keySelector, rightRowSerializer);
        cacheLoader.open(new Configuration());
        return cacheLoader;
    }
}

