/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.runtime.io.network.partition.hybrid.BufferIndexAndChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.HsFullSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingInfoProvider;
import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleTestUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.TestingSpillingInfoProvider;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.MapAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

class HsFullSpillingStrategyTest {
    public static final int NUM_SUBPARTITIONS = 2;
    public static final float NUM_BUFFERS_TRIGGER_SPILLING_RATIO = 0.2f;
    public static final float FULL_SPILL_RELEASE_THRESHOLD = 0.8f;
    public static final float FULL_SPILL_RELEASE_RATIO = 0.6f;
    private final HsSpillingStrategy spillStrategy = new HsFullSpillingStrategy(HybridShuffleConfiguration.builder((int)2, (int)1).setFullStrategyNumBuffersTriggerSpillingRatio(0.2f).setFullStrategyReleaseThreshold(0.8f).setFullStrategyReleaseBufferRatio(0.6f).build());

    HsFullSpillingStrategyTest() {
    }

    @Test
    void testOnBufferFinishedUnSpillBufferBelowThreshold() {
        int poolSize = 10;
        Optional finishedDecision = this.spillStrategy.onBufferFinished(1, 10);
        Assertions.assertThat((Optional)finishedDecision).hasValue((Object)HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Test
    void testOnBufferFinishedUnSpillBufferEqualToOrGreatThenThreshold() {
        int poolSize = 10;
        Optional finishedDecision = this.spillStrategy.onBufferFinished(2, 10);
        Assertions.assertThat((Optional)finishedDecision).isNotPresent();
        finishedDecision = this.spillStrategy.onBufferFinished(3, 10);
        Assertions.assertThat((Optional)finishedDecision).isNotPresent();
    }

    @Test
    void testOnBufferConsumed() {
        BufferIndexAndChannel bufferIndexAndChannel = new BufferIndexAndChannel(0, 0);
        Optional bufferConsumedDecision = this.spillStrategy.onBufferConsumed(bufferIndexAndChannel);
        Assertions.assertThat((Optional)bufferConsumedDecision).hasValue((Object)HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Test
    void testOnUsedMemoryBelowThreshold() {
        Optional memoryUsageChangedDecision = this.spillStrategy.onMemoryUsageChanged(5, 10);
        Assertions.assertThat((Optional)memoryUsageChangedDecision).hasValue((Object)HsSpillingStrategy.Decision.NO_ACTION);
    }

    @Test
    void testOnUsedMemoryExceedThreshold() {
        int poolSize = 10;
        int threshold = 8;
        Optional memoryUsageChangedDecision = this.spillStrategy.onMemoryUsageChanged(9, 10);
        Assertions.assertThat((Optional)memoryUsageChangedDecision).isNotPresent();
    }

    @Test
    void testDecideActionWithGlobalInfo() {
        boolean subpartition1 = false;
        boolean subpartition2 = true;
        int progress1 = 10;
        int progress2 = 20;
        List<BufferIndexAndChannel> subpartitionBuffers1 = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 10, 12, 14, 16, 18);
        List<BufferIndexAndChannel> subpartitionBuffers2 = HybridShuffleTestUtils.createBufferIndexAndChannelsList(1, 21, 23, 25, 27, 29);
        TestingSpillingInfoProvider spillInfoProvider = TestingSpillingInfoProvider.builder().setGetNumSubpartitionsSupplier(() -> 2).addSubpartitionBuffers(0, subpartitionBuffers1).addSubpartitionBuffers(1, subpartitionBuffers2).addSpillBuffers(0, Arrays.asList(0, 1, 2, 3)).addConsumedBuffers(0, Arrays.asList(0, 1)).addSpillBuffers(1, Arrays.asList(1, 2, 3)).addConsumedBuffers(1, Arrays.asList(0, 1)).setGetNumTotalUnSpillBuffersSupplier(() -> 2).setGetNumTotalRequestedBuffersSupplier(() -> 10).setGetPoolSizeSupplier(() -> 10).setGetNextBufferIndexToConsumeSupplier(() -> Arrays.asList(10, 20)).build();
        HsSpillingStrategy.Decision decision = this.spillStrategy.decideActionWithGlobalInfo((HsSpillingInfoProvider)spillInfoProvider);
        HashMap<Integer, List<BufferIndexAndChannel>> expectedSpillBuffers = new HashMap<Integer, List<BufferIndexAndChannel>>();
        expectedSpillBuffers.put(0, subpartitionBuffers1.subList(4, 5));
        expectedSpillBuffers.put(1, new ArrayList<BufferIndexAndChannel>(subpartitionBuffers2.subList(0, 1)));
        ((List)expectedSpillBuffers.get(1)).addAll(subpartitionBuffers2.subList(4, 5));
        Assertions.assertThat((Map)decision.getBufferToSpill()).isEqualTo(expectedSpillBuffers);
        HashMap<Integer, ArrayList<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<Integer, ArrayList<BufferIndexAndChannel>>();
        expectedReleaseBuffers.put(0, new ArrayList<BufferIndexAndChannel>(subpartitionBuffers1.subList(0, 2)));
        ((List)expectedReleaseBuffers.get(0)).addAll(subpartitionBuffers1.subList(3, 4));
        expectedReleaseBuffers.put(1, new ArrayList<BufferIndexAndChannel>(subpartitionBuffers2.subList(1, 2)));
        ((List)expectedReleaseBuffers.get(1)).addAll(subpartitionBuffers2.subList(2, 4));
        Assertions.assertThat((Map)decision.getBufferToRelease()).isEqualTo(expectedReleaseBuffers);
    }

    @Test
    void testDecideActionWithGlobalInfoAllConsumedSpillBufferShouldRelease() {
        boolean subpartitionId = false;
        List<BufferIndexAndChannel> subpartitionBuffers = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1, 2, 3, 4);
        int poolSize = 5;
        TestingSpillingInfoProvider spillInfoProvider = TestingSpillingInfoProvider.builder().setGetNumSubpartitionsSupplier(() -> 1).addSubpartitionBuffers(0, subpartitionBuffers).addSpillBuffers(0, Arrays.asList(0, 1, 2, 3, 4)).addConsumedBuffers(0, Arrays.asList(0, 1, 2, 3)).setGetNumTotalUnSpillBuffersSupplier(() -> 0).setGetNumTotalRequestedBuffersSupplier(() -> 5).setGetPoolSizeSupplier(() -> 5).build();
        int numReleaseBuffer = 3;
        HsSpillingStrategy.Decision decision = this.spillStrategy.decideActionWithGlobalInfo((HsSpillingInfoProvider)spillInfoProvider);
        Assertions.assertThat((Map)decision.getBufferToSpill()).isEmpty();
        ((MapAssert)Assertions.assertThat((Map)decision.getBufferToRelease()).containsOnly(new Map.Entry[]{Assertions.entry((Object)0, subpartitionBuffers.subList(0, 4))})).extractingByKey((Object)0).satisfies(new ThrowingConsumer[]{buffers -> {
            ListAssert cfr_ignored_0 = (ListAssert)Assertions.assertThat((List)buffers).hasSizeGreaterThan(numReleaseBuffer);
        }});
    }

    @Test
    void testOnResultPartitionClosed() {
        boolean subpartition1 = false;
        boolean subpartition2 = true;
        List<BufferIndexAndChannel> subpartitionBuffer1 = HybridShuffleTestUtils.createBufferIndexAndChannelsList(0, 0, 1, 2, 3);
        List<BufferIndexAndChannel> subpartitionBuffer2 = HybridShuffleTestUtils.createBufferIndexAndChannelsList(1, 0, 1, 2);
        TestingSpillingInfoProvider spillInfoProvider = TestingSpillingInfoProvider.builder().setGetNumSubpartitionsSupplier(() -> 2).addSubpartitionBuffers(0, subpartitionBuffer1).addSubpartitionBuffers(1, subpartitionBuffer2).addSpillBuffers(0, Arrays.asList(2, 3)).addConsumedBuffers(0, Collections.singletonList(0)).addSpillBuffers(1, Collections.singletonList(2)).build();
        HsSpillingStrategy.Decision decision = this.spillStrategy.onResultPartitionClosed((HsSpillingInfoProvider)spillInfoProvider);
        HashMap<Integer, List<BufferIndexAndChannel>> expectedToSpillBuffers = new HashMap<Integer, List<BufferIndexAndChannel>>();
        expectedToSpillBuffers.put(0, subpartitionBuffer1.subList(0, 2));
        expectedToSpillBuffers.put(1, subpartitionBuffer2.subList(0, 2));
        Assertions.assertThat((Map)decision.getBufferToSpill()).isEqualTo(expectedToSpillBuffers);
        HashMap<Integer, List<BufferIndexAndChannel>> expectedToReleaseBuffers = new HashMap<Integer, List<BufferIndexAndChannel>>();
        expectedToReleaseBuffers.put(0, subpartitionBuffer1.subList(0, 4));
        expectedToReleaseBuffers.put(1, subpartitionBuffer2.subList(0, 3));
        Assertions.assertThat((Map)decision.getBufferToRelease()).isEqualTo(expectedToReleaseBuffers);
    }
}

