/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.junit.Assert;
import org.junit.Test;

public class ChannelStatePersisterTest {
    @Test
    public void testNewBarrierNotOverwrittenByStopPersisting() throws Exception {
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
        ChannelStatePersister persister = new ChannelStatePersister((ChannelStateWriter)channelStateWriter, channelInfo);
        long checkpointId = 1L;
        channelStateWriter.start(checkpointId, CheckpointOptions.unaligned((CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId));
        persister.startPersisting(checkpointId, Arrays.asList(BufferBuilderTestUtils.buildSomeBuffer()));
        Assert.assertEquals((long)1L, (long)channelStateWriter.getAddedInput().get((Object)channelInfo).size());
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assert.assertEquals((long)1L, (long)channelStateWriter.getAddedInput().get((Object)channelInfo).size());
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId + 1L));
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        persister.stopPersisting(checkpointId);
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assert.assertEquals((long)1L, (long)channelStateWriter.getAddedInput().get((Object)channelInfo).size());
        Assert.assertTrue((boolean)persister.hasBarrierReceived());
    }

    @Test
    public void testNewBarrierNotOverwrittenByCheckForBarrier() throws Exception {
        ChannelStatePersister persister = new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
        persister.startPersisting(1L, Collections.emptyList());
        persister.startPersisting(2L, Collections.emptyList());
        Assert.assertFalse((boolean)persister.checkForBarrier(ChannelStatePersisterTest.barrier(1L)).isPresent());
        Assert.assertFalse((boolean)persister.hasBarrierReceived());
    }

    @Test
    public void testLateBarrierOnStartedAndCancelledCheckpoint() throws Exception {
        this.testLateBarrier(true, true);
    }

    @Test
    public void testLateBarrierOnCancelledCheckpoint() throws Exception {
        this.testLateBarrier(false, true);
    }

    @Test
    public void testLateBarrierOnNotYetCancelledCheckpoint() throws Exception {
        this.testLateBarrier(false, false);
    }

    private void testLateBarrier(boolean startCheckpointOnLateBarrier, boolean cancelCheckpointBeforeLateBarrier) throws Exception {
        RecordingChannelStateWriter channelStateWriter = new RecordingChannelStateWriter();
        InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
        ChannelStatePersister persister = new ChannelStatePersister((ChannelStateWriter)channelStateWriter, channelInfo);
        long lateCheckpointId = 1L;
        long checkpointId = 2L;
        if (startCheckpointOnLateBarrier) {
            persister.startPersisting(lateCheckpointId, Collections.emptyList());
        }
        if (cancelCheckpointBeforeLateBarrier) {
            persister.stopPersisting(lateCheckpointId);
        }
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(lateCheckpointId));
        channelStateWriter.start(checkpointId, CheckpointOptions.unaligned((CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault()));
        persister.startPersisting(checkpointId, Arrays.asList(BufferBuilderTestUtils.buildSomeBuffer()));
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId));
        persister.maybePersist(BufferBuilderTestUtils.buildSomeBuffer());
        Assert.assertTrue((boolean)persister.hasBarrierReceived());
        Assert.assertEquals((long)2L, (long)channelStateWriter.getAddedInput().get((Object)channelInfo).size());
    }

    @Test(expected=CheckpointException.class)
    public void testLateBarrierTriggeringCheckpoint() throws Exception {
        ChannelStatePersister persister = new ChannelStatePersister(ChannelStateWriter.NO_OP, new InputChannelInfo(0, 0));
        long lateCheckpointId = 1L;
        long checkpointId = 2L;
        persister.checkForBarrier(ChannelStatePersisterTest.barrier(checkpointId));
        persister.startPersisting(lateCheckpointId, Collections.emptyList());
    }

    private static Buffer barrier(long id) throws IOException {
        return EventSerializer.toBuffer((AbstractEvent)new CheckpointBarrier(id, 1L, CheckpointOptions.forCheckpointWithDefaultLocation()), (boolean)true);
    }
}

