/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.TestMemoryCheckpointOutputStream;
import org.junit.Assert;
import org.junit.Test;

public class KeyedStateCheckpointOutputStreamTest {
    private static final int STREAM_CAPACITY = 128;

    private static KeyedStateCheckpointOutputStream createStream(KeyGroupRange keyGroupRange) {
        TestMemoryCheckpointOutputStream checkStream = new TestMemoryCheckpointOutputStream(128);
        return new KeyedStateCheckpointOutputStream((CheckpointStateOutputStream)checkStream, keyGroupRange);
    }

    private KeyGroupsStateHandle writeAllTestKeyGroups(KeyedStateCheckpointOutputStream stream, KeyGroupRange keyRange) throws Exception {
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        Iterator iterator = keyRange.iterator();
        while (iterator.hasNext()) {
            int kg = (Integer)iterator.next();
            stream.startNewKeyGroup(kg);
            dov.writeInt(kg);
        }
        return stream.closeAndGetHandle();
    }

    @Test
    public void testCloseNotPropagated() throws Exception {
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(new KeyGroupRange(0, 0));
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        stream.close();
        Assert.assertFalse((boolean)innerStream.isClosed());
    }

    @Test
    public void testEmptyKeyedStream() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        TestMemoryCheckpointOutputStream innerStream = (TestMemoryCheckpointOutputStream)stream.getDelegate();
        KeyGroupsStateHandle emptyHandle = stream.closeAndGetHandle();
        Assert.assertTrue((boolean)innerStream.isClosed());
        Assert.assertEquals(null, (Object)emptyHandle);
    }

    @Test
    public void testWriteReadRoundtrip() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        KeyGroupsStateHandle fullHandle = this.writeAllTestKeyGroups(stream, keyRange);
        Assert.assertNotNull((Object)fullHandle);
        KeyedStateCheckpointOutputStreamTest.verifyRead(fullHandle, keyRange);
    }

    @Test
    public void testWriteKeyGroupTracking() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        try {
            stream.startNewKeyGroup(4711);
            Assert.fail();
        }
        catch (IllegalArgumentException illegalArgumentException) {
            // empty catch block
        }
        Assert.assertEquals((long)-1L, (long)stream.getCurrentKeyGroup());
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        int previous = -1;
        Iterator iterator = keyRange.iterator();
        while (iterator.hasNext()) {
            int kg = (Integer)iterator.next();
            Assert.assertFalse((boolean)stream.isKeyGroupAlreadyStarted(kg));
            Assert.assertFalse((boolean)stream.isKeyGroupAlreadyFinished(kg));
            stream.startNewKeyGroup(kg);
            if (-1 != previous) {
                Assert.assertTrue((boolean)stream.isKeyGroupAlreadyStarted(previous));
                Assert.assertTrue((boolean)stream.isKeyGroupAlreadyFinished(previous));
            }
            Assert.assertTrue((boolean)stream.isKeyGroupAlreadyStarted(kg));
            Assert.assertFalse((boolean)stream.isKeyGroupAlreadyFinished(kg));
            dov.writeInt(kg);
            previous = kg;
        }
        KeyGroupsStateHandle fullHandle = stream.closeAndGetHandle();
        KeyedStateCheckpointOutputStreamTest.verifyRead(fullHandle, keyRange);
        Iterator iterator2 = keyRange.iterator();
        while (iterator2.hasNext()) {
            int kg = (Integer)iterator2.next();
            try {
                stream.startNewKeyGroup(kg);
                Assert.fail();
            }
            catch (IOException iOException) {}
        }
    }

    @Test
    public void testReadWriteMissingKeyGroups() throws Exception {
        KeyGroupRange keyRange = new KeyGroupRange(0, 2);
        KeyedStateCheckpointOutputStream stream = KeyedStateCheckpointOutputStreamTest.createStream(keyRange);
        DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)stream);
        stream.startNewKeyGroup(1);
        dov.writeInt(1);
        KeyGroupsStateHandle fullHandle = stream.closeAndGetHandle();
        int count = 0;
        try (FSDataInputStream in = fullHandle.openInputStream();){
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            Iterator iterator = fullHandle.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int kg = (Integer)iterator.next();
                long off = fullHandle.getOffsetForKeyGroup(kg);
                if (off < 0L) continue;
                in.seek(off);
                Assert.assertEquals((long)1L, (long)div.readInt());
                ++count;
            }
        }
        Assert.assertEquals((long)1L, (long)count);
    }

    private static void verifyRead(KeyGroupsStateHandle fullHandle, KeyGroupRange keyRange) throws IOException {
        int count = 0;
        try (FSDataInputStream in = fullHandle.openInputStream();){
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            Iterator iterator = fullHandle.getKeyGroupRange().iterator();
            while (iterator.hasNext()) {
                int kg = (Integer)iterator.next();
                long off = fullHandle.getOffsetForKeyGroup(kg);
                in.seek(off);
                Assert.assertEquals((long)kg, (long)div.readInt());
                ++count;
            }
        }
        Assert.assertEquals((long)keyRange.getNumberOfKeyGroups(), (long)count);
    }
}

