/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.cell;

import com.yammer.metrics.core.MetricsRegistry;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import kafka.common.CellLoadDescriptionInternal;
import kafka.controller.ClusterBalanceManager;
import kafka.controller.NoOpDataBalanceManager;
import kafka.server.KafkaConfig;
import kafka.server.cell.CellLoadRefresher;
import kafka.utils.MockTime;
import org.apache.kafka.common.CellLoad;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.controller.metrics.CellControllerMetrics;
import org.apache.kafka.controller.metrics.CellMetrics;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class CellLoadRefresherTest {
    private static final List<CellLoad> CELL_LOADS_0 = Arrays.asList(new CellLoad(0, 0.1), new CellLoad(1, 0.2), new CellLoad(2, 0.3), new CellLoad(3, 0.4), new CellLoad(4, 0.5));
    private CellLoadRefresher refresher;
    private Set<CellLoad> cellLoads;
    private Scheduler scheduler;
    private final KafkaConfig config = CellLoadRefresherTest.kafkaConfig(true);
    private int refreshTimes;
    private long timestamp;
    private MetricsRegistry metricsRegistry;
    private CellControllerMetrics metrics;
    private BiConsumer<Set<CellLoad>, Long> cellLoadConsumer = (cellLoads, timestamp) -> {
        this.cellLoads = cellLoads;
        ++this.refreshTimes;
        this.timestamp = timestamp;
    };

    CellLoadRefresherTest() {
    }

    @BeforeEach
    public void setUp() {
        this.scheduler = CellLoadRefresher.createDefaultScheduler();
        this.scheduler.startup();
        this.cellLoads = new HashSet<CellLoad>();
        this.refreshTimes = 0;
        this.timestamp = 0L;
        this.metricsRegistry = new MetricsRegistry();
        this.metrics = new CellControllerMetrics(this.metricsRegistry, (Time)new MockTime(0L, 0L), new LogContext());
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.scheduler.shutdown();
        this.metricsRegistry.shutdown();
    }

    @Test
    void testRefresh() throws Exception {
        NoOpDataBalanceManager clusterBalanceManager = new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null){

            public void cellLoad(List<Integer> cellIds, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> cb) {
                cb.respond(ApiError.NONE, Optional.of(new CellLoadDescriptionInternal(CELL_LOADS_0)));
            }
        };
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, (ClusterBalanceManager)clusterBalanceManager, (Time)new MockTime(0L, 0L), this.scheduler, this.config, (CellMetrics)this.metrics, 100L);
        this.refresher.start();
        TestUtils.waitForCondition(() -> this.refreshTimes >= 5, (String)"Expected cell loads to be refreshed");
        TestUtils.waitForCondition(() -> this.cellLoads.equals(new HashSet<CellLoad>(CELL_LOADS_0)), (String)"Expected cell loads to be refreshed");
    }

    @Test
    void testRefreshEvenAfterException() throws Exception {
        final AtomicInteger describeCellLoadCalls = new AtomicInteger();
        NoOpDataBalanceManager clusterBalanceManager = new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null){

            public void cellLoad(List<Integer> cellIds, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> cb) {
                describeCellLoadCalls.incrementAndGet();
                throw new ApiException();
            }
        };
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, (ClusterBalanceManager)clusterBalanceManager, (Time)new MockTime(), this.scheduler, this.config, (CellMetrics)this.metrics, 100L);
        this.refresher.start();
        TestUtils.waitForCondition(() -> describeCellLoadCalls.get() >= 5, (String)"Expected scheduler to keep retrying");
        Assertions.assertEquals((int)0, (int)this.refreshTimes);
        Assertions.assertEquals(new HashSet(), this.cellLoads);
    }

    @Test
    void testRefreshCellLoads() {
        NoOpDataBalanceManager clusterBalanceManager = new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null){

            public void cellLoad(List<Integer> cellIds, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> cb) {
                cb.respond(ApiError.NONE, Optional.of(new CellLoadDescriptionInternal(CELL_LOADS_0)));
            }
        };
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, (ClusterBalanceManager)clusterBalanceManager, (Time)new MockTime(0L, 0L), this.scheduler, this.config, (CellMetrics)this.metrics, 100L);
        this.refresher.refreshCellLoads();
        Assertions.assertEquals((int)1, (int)this.refreshTimes);
        Assertions.assertEquals(new HashSet<CellLoad>(CELL_LOADS_0), this.cellLoads);
        Assertions.assertEquals((long)0L, (long)this.timestamp);
    }

    @Test
    void testRefreshCellLoadsTurnedOff() {
        NoOpDataBalanceManager clusterBalanceManager = new NoOpDataBalanceManager(NoOpDataBalanceManager.class.getSimpleName(), null){

            public void cellLoad(List<Integer> cellIds, ClusterBalanceManager.BalanceManagerStatusQueryClientCallback<CellLoadDescriptionInternal> cb) {
                cb.respond(ApiError.NONE, Optional.of(new CellLoadDescriptionInternal(CELL_LOADS_0)));
            }
        };
        this.refresher = new CellLoadRefresher(this.cellLoadConsumer, (ClusterBalanceManager)clusterBalanceManager, (Time)new MockTime(0L, 0L), this.scheduler, CellLoadRefresherTest.kafkaConfig(false), (CellMetrics)this.metrics, 100L);
        this.refresher.refreshCellLoads();
        Assertions.assertEquals((int)0, (int)this.refreshTimes);
        Assertions.assertEquals(Collections.emptySet(), this.cellLoads);
        Assertions.assertEquals((long)0L, (long)this.timestamp);
    }

    private static KafkaConfig kafkaConfig(boolean enabled) {
        HashMap<String, String> cfg = new HashMap<String, String>();
        cfg.put(KafkaConfig.ZkConnectProp(), "127.0.0.1:0000");
        cfg.put(KafkaConfig.TierFetcherOffsetCacheSizeProp(), "0");
        cfg.put("confluent.cells.enable", String.valueOf(enabled));
        cfg.put("confluent.cells.load.refresher.enable", String.valueOf(enabled));
        KafkaConfig kafkaConfig = new KafkaConfig(cfg);
        return kafkaConfig;
    }
}

