/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PartitionableListState;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class OperatorStateBackendTest {
    private final ClassLoader classLoader = this.getClass().getClassLoader();
    private final Collection<OperatorStateHandle> emptyStateHandles = Collections.emptyList();

    @Test
    public void testCreateOnAbstractStateBackend() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "test-operator", this.emptyStateHandles, cancelStreamRegistry);
        Assert.assertNotNull((Object)operatorStateBackend);
        Assert.assertTrue((boolean)operatorStateBackend.getRegisteredStateNames().isEmpty());
        Assert.assertTrue((boolean)operatorStateBackend.getRegisteredBroadcastStateNames().isEmpty());
    }

    @Test
    public void testRegisterStatesWithoutTypeSerializer() throws Exception {
        Class<FutureTask> registeredType = FutureTask.class;
        Assert.assertFalse((boolean)(new KryoSerializer(File.class, new ExecutionConfig()).getKryo().getDefaultSerializer(registeredType) instanceof com.esotericsoftware.kryo.serializers.JavaSerializer));
        ExecutionConfig cfg = new ExecutionConfig();
        cfg.registerTypeWithKryoSerializer(registeredType, com.esotericsoftware.kryo.serializers.JavaSerializer.class);
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(this.classLoader, cfg, false, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor = new ListStateDescriptor("test", File.class);
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", String.class);
        ListState listState = operatorStateBackend.getListState(stateDescriptor);
        Assert.assertNotNull((Object)listState);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        Assert.assertNotNull((Object)listState2);
        Assert.assertEquals((long)2L, (long)operatorStateBackend.getRegisteredStateNames().size());
        TypeSerializer serializer = ((PartitionableListState)listState).getStateMetaInfo().getPartitionStateSerializer();
        Assert.assertTrue((boolean)(serializer instanceof KryoSerializer));
        Assert.assertTrue((boolean)(((KryoSerializer)serializer).getKryo().getSerializer(registeredType) instanceof com.esotericsoftware.kryo.serializers.JavaSerializer));
        Iterator it = ((Iterable)listState2.get()).iterator();
        Assert.assertFalse((boolean)it.hasNext());
        listState2.add((Object)"kevin");
        listState2.add((Object)"sunny");
        it = ((Iterable)listState2.get()).iterator();
        Assert.assertEquals((Object)"kevin", it.next());
        Assert.assertEquals((Object)"sunny", it.next());
        Assert.assertFalse((boolean)it.hasNext());
    }

    @Test
    public void testRegisterStates() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(this.classLoader, new ExecutionConfig(), false, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        Assert.assertNotNull((Object)listState1);
        Assert.assertEquals((long)1L, (long)operatorStateBackend.getRegisteredStateNames().size());
        Iterator it = ((Iterable)listState1.get()).iterator();
        Assert.assertFalse((boolean)it.hasNext());
        listState1.add((Object)42);
        listState1.add((Object)4711);
        it = ((Iterable)listState1.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        Assert.assertNotNull((Object)listState2);
        Assert.assertEquals((long)2L, (long)operatorStateBackend.getRegisteredStateNames().size());
        Assert.assertFalse((boolean)it.hasNext());
        listState2.add((Object)7);
        listState2.add((Object)13);
        listState2.add((Object)23);
        it = ((Iterable)listState2.get()).iterator();
        Assert.assertEquals((Object)7, it.next());
        Assert.assertEquals((Object)13, it.next());
        Assert.assertEquals((Object)23, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        Assert.assertNotNull((Object)listState3);
        Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
        Assert.assertFalse((boolean)it.hasNext());
        listState3.add((Object)17);
        listState3.add((Object)3);
        listState3.add((Object)123);
        it = ((Iterable)listState3.get()).iterator();
        Assert.assertEquals((Object)17, it.next());
        Assert.assertEquals((Object)3, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        ListState listState1b = operatorStateBackend.getListState(stateDescriptor1);
        Assert.assertNotNull((Object)listState1b);
        listState1b.add((Object)123);
        it = ((Iterable)listState1b.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        it = ((Iterable)listState1.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        it = ((Iterable)listState1b.get()).iterator();
        Assert.assertEquals((Object)42, it.next());
        Assert.assertEquals((Object)4711, it.next());
        Assert.assertEquals((Object)123, it.next());
        Assert.assertFalse((boolean)it.hasNext());
        try {
            operatorStateBackend.getUnionListState(stateDescriptor2);
            Assert.fail((String)"Did not detect changed mode");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
        try {
            operatorStateBackend.getListState(stateDescriptor3);
            Assert.fail((String)"Did not detect changed mode");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
        Environment env = OperatorStateBackendTest.createMockEnvironment();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(env, "test-op-name", this.emptyStateHandles, cancelStreamRegistry);
        AtomicInteger copyCounter = new AtomicInteger(0);
        VerifyingIntSerializer serializer = new VerifyingIntSerializer(env.getUserCodeClassLoader().asClassLoader(), copyCounter);
        ListStateDescriptor stateDescriptor = new ListStateDescriptor("test", (TypeSerializer)serializer);
        ListState listState = operatorStateBackend.getListState(stateDescriptor);
        listState.add((Object)42);
        AtomicInteger keyCopyCounter = new AtomicInteger(0);
        AtomicInteger valueCopyCounter = new AtomicInteger(0);
        VerifyingIntSerializer keySerializer = new VerifyingIntSerializer(env.getUserCodeClassLoader().asClassLoader(), keyCopyCounter);
        VerifyingIntSerializer valueSerializer = new VerifyingIntSerializer(env.getUserCodeClassLoader().asClassLoader(), valueCopyCounter);
        MapStateDescriptor broadcastStateDesc = new MapStateDescriptor("test-broadcast", (TypeSerializer)keySerializer, (TypeSerializer)valueSerializer);
        BroadcastState broadcastState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
        broadcastState.put((Object)1, (Object)2);
        broadcastState.put((Object)3, (Object)4);
        broadcastState.put((Object)5, (Object)6);
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        FutureUtils.runIfNotDoneAndGet((RunnableFuture)runnableFuture);
        Assert.assertTrue((copyCounter.get() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((keyCopyCounter.get() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((valueCopyCounter.get() > 0 ? 1 : 0) != 0);
    }

    @Test
    public void testSnapshotEmpty() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator", this.emptyStateHandles, cancelStreamRegistry);
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
        RunnableFuture snapshot = operatorStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        SnapshotResult snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
        OperatorStateHandle stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        Assert.assertNull((Object)stateHandle);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator", this.emptyStateHandles, new CloseableRegistry());
        MapStateDescriptor broadcastStateDesc = new MapStateDescriptor("test-broadcast", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>(3);
        expected.put(1, 2);
        expected.put(3, 4);
        expected.put(5, 6);
        BroadcastState broadcastState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
        broadcastState.putAll(expected);
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
        OperatorStateHandle stateHandle = null;
        try {
            RunnableFuture snapshot = operatorStateBackend.snapshot(0L, 0L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            SnapshotResult snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
            stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            Assert.assertNotNull((Object)stateHandle);
            HashMap retrieved = new HashMap();
            operatorStateBackend = OperatorStateBackendTest.recreateOperatorStateBackend(operatorStateBackend, (AbstractStateBackend)abstractStateBackend, (Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));
            BroadcastState retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
            for (Map.Entry e : retrievedState.entries()) {
                retrieved.put(e.getKey(), e.getValue());
            }
            Assert.assertEquals(expected, retrieved);
            retrievedState.remove((Object)1);
            expected.remove(1);
            snapshot = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
            stateHandle.discardState();
            stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            retrieved.clear();
            operatorStateBackend = OperatorStateBackendTest.recreateOperatorStateBackend(operatorStateBackend, (AbstractStateBackend)abstractStateBackend, (Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));
            retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
            for (Map.Entry e : retrievedState.immutableEntries()) {
                retrieved.put(e.getKey(), e.getValue());
            }
            Assert.assertEquals(expected, retrieved);
            retrievedState.clear();
            expected.clear();
            snapshot = operatorStateBackend.snapshot(2L, 2L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
            snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
            if (stateHandle != null) {
                stateHandle.discardState();
            }
            stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
            retrieved.clear();
            operatorStateBackend = OperatorStateBackendTest.recreateOperatorStateBackend(operatorStateBackend, (AbstractStateBackend)abstractStateBackend, (Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)stateHandle));
            retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc);
            for (Map.Entry e : retrievedState.immutableEntries()) {
                retrieved.put(e.getKey(), e.getValue());
            }
            Assert.assertTrue((boolean)expected.isEmpty());
            Assert.assertEquals(expected, retrieved);
            if (stateHandle != null) {
                stateHandle.discardState();
                stateHandle = null;
            }
        }
        finally {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            if (stateHandle != null) {
                stateHandle.discardState();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotRestoreSync() throws Exception {
        MemoryStateBackend abstractStateBackend = new MemoryStateBackend(8192);
        OperatorStateBackend operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "test-op-name", this.emptyStateHandles, new CloseableRegistry());
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor1 = new MapStateDescriptor("test4", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor2 = new MapStateDescriptor("test5", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor3 = new MapStateDescriptor("test6", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        BroadcastState broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
        BroadcastState broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
        BroadcastState broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
        listState1.add((Object)42);
        listState1.add((Object)4711);
        listState2.add((Object)7);
        listState2.add((Object)13);
        listState2.add((Object)23);
        listState3.add((Object)17);
        listState3.add((Object)18);
        listState3.add((Object)19);
        listState3.add((Object)20);
        broadcastState1.put((Object)1, (Object)2);
        broadcastState1.put((Object)2, (Object)5);
        broadcastState2.put((Object)2, (Object)5);
        MemCheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(8192);
        RunnableFuture snapshot = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        SnapshotResult snapshotResult = (SnapshotResult)FutureUtils.runIfNotDoneAndGet((RunnableFuture)snapshot);
        OperatorStateHandle stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        try {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator", (Collection)StateObjectCollection.singleton((StateObject)stateHandle), new CloseableRegistry());
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredBroadcastStateNames().size());
            listState1 = operatorStateBackend.getListState(stateDescriptor1);
            listState2 = operatorStateBackend.getListState(stateDescriptor2);
            listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
            broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
            broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
            broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredBroadcastStateNames().size());
            Iterator it = ((Iterable)listState1.get()).iterator();
            Assert.assertEquals((Object)42, it.next());
            Assert.assertEquals((Object)4711, it.next());
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState2.get()).iterator();
            Assert.assertEquals((Object)7, it.next());
            Assert.assertEquals((Object)13, it.next());
            Assert.assertEquals((Object)23, it.next());
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState3.get()).iterator();
            Assert.assertEquals((Object)17, it.next());
            Assert.assertEquals((Object)18, it.next());
            Assert.assertEquals((Object)19, it.next());
            Assert.assertEquals((Object)20, it.next());
            Assert.assertFalse((boolean)it.hasNext());
            Iterator bIt = broadcastState1.iterator();
            Assert.assertTrue((boolean)bIt.hasNext());
            Map.Entry entry = (Map.Entry)bIt.next();
            Assert.assertEquals((Object)1, entry.getKey());
            Assert.assertEquals((Object)2, entry.getValue());
            Assert.assertTrue((boolean)bIt.hasNext());
            entry = (Map.Entry)bIt.next();
            Assert.assertEquals((Object)2, entry.getKey());
            Assert.assertEquals((Object)5, entry.getValue());
            Assert.assertFalse((boolean)bIt.hasNext());
            bIt = broadcastState2.iterator();
            Assert.assertTrue((boolean)bIt.hasNext());
            entry = (Map.Entry)bIt.next();
            Assert.assertEquals((Object)2, entry.getKey());
            Assert.assertEquals((Object)5, entry.getValue());
            Assert.assertFalse((boolean)bIt.hasNext());
            bIt = broadcastState3.iterator();
            Assert.assertFalse((boolean)bIt.hasNext());
            operatorStateBackend.close();
            operatorStateBackend.dispose();
        }
        finally {
            stateHandle.discardState();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotRestoreAsync() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor2 = new ListStateDescriptor("test2", (TypeSerializer)new JavaSerializer());
        ListStateDescriptor stateDescriptor3 = new ListStateDescriptor("test3", (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor1 = new MapStateDescriptor("test4", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor2 = new MapStateDescriptor("test5", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        MapStateDescriptor broadcastStateDescriptor3 = new MapStateDescriptor("test6", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        ListState listState2 = operatorStateBackend.getListState(stateDescriptor2);
        ListState listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
        BroadcastState broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
        BroadcastState broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
        BroadcastState broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        listState2.add((Object)MutableType.of(7));
        listState2.add((Object)MutableType.of(13));
        listState2.add((Object)MutableType.of(23));
        listState3.add((Object)MutableType.of(17));
        listState3.add((Object)MutableType.of(18));
        listState3.add((Object)MutableType.of(19));
        listState3.add((Object)MutableType.of(20));
        broadcastState1.put((Object)MutableType.of(1), (Object)MutableType.of(2));
        broadcastState1.put((Object)MutableType.of(2), (Object)MutableType.of(5));
        broadcastState2.put((Object)MutableType.of(2), (Object)MutableType.of(5));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(runnableFuture);
        waiterLatch.await();
        listState1.add((Object)MutableType.of(77));
        broadcastState1.put((Object)MutableType.of(32), (Object)MutableType.of(97));
        int n = 0;
        for (MutableType mutableType : (Iterable)listState2.get()) {
            if (++n == 2) {
                blockerLatch.trigger();
            }
            mutableType.setValue(mutableType.getValue() + 10);
        }
        listState3.clear();
        broadcastState2.clear();
        operatorStateBackend.getListState(new ListStateDescriptor("test4", (TypeSerializer)new JavaSerializer()));
        SnapshotResult snapshotResult = (SnapshotResult)runnableFuture.get();
        OperatorStateHandle stateHandle = (OperatorStateHandle)snapshotResult.getJobManagerOwnedSnapshot();
        try {
            operatorStateBackend.close();
            operatorStateBackend.dispose();
            MemoryStateBackend abstractStateBackend = new MemoryStateBackend(4096);
            CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
            operatorStateBackend = abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator", (Collection)StateObjectCollection.singleton((StateObject)stateHandle), cancelStreamRegistry);
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredBroadcastStateNames().size());
            listState1 = operatorStateBackend.getListState(stateDescriptor1);
            listState2 = operatorStateBackend.getListState(stateDescriptor2);
            listState3 = operatorStateBackend.getUnionListState(stateDescriptor3);
            broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
            broadcastState2 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor2);
            broadcastState3 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor3);
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredStateNames().size());
            Assert.assertEquals((long)3L, (long)operatorStateBackend.getRegisteredBroadcastStateNames().size());
            Iterator it = ((Iterable)listState1.get()).iterator();
            Assert.assertEquals((long)42L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)4711L, (long)((MutableType)it.next()).value);
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState2.get()).iterator();
            Assert.assertEquals((long)7L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)13L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)23L, (long)((MutableType)it.next()).value);
            Assert.assertFalse((boolean)it.hasNext());
            it = ((Iterable)listState3.get()).iterator();
            Assert.assertEquals((long)17L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)18L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)19L, (long)((MutableType)it.next()).value);
            Assert.assertEquals((long)20L, (long)((MutableType)it.next()).value);
            Assert.assertFalse((boolean)it.hasNext());
            Iterator bIt = broadcastState1.iterator();
            Assert.assertTrue((boolean)bIt.hasNext());
            Map.Entry entry = (Map.Entry)bIt.next();
            Assert.assertEquals((long)1L, (long)((MutableType)entry.getKey()).value);
            Assert.assertEquals((long)2L, (long)((MutableType)entry.getValue()).value);
            Assert.assertTrue((boolean)bIt.hasNext());
            entry = (Map.Entry)bIt.next();
            Assert.assertEquals((long)2L, (long)((MutableType)entry.getKey()).value);
            Assert.assertEquals((long)5L, (long)((MutableType)entry.getValue()).value);
            Assert.assertFalse((boolean)bIt.hasNext());
            bIt = broadcastState2.iterator();
            Assert.assertTrue((boolean)bIt.hasNext());
            entry = (Map.Entry)bIt.next();
            Assert.assertEquals((long)2L, (long)((MutableType)entry.getKey()).value);
            Assert.assertEquals((long)5L, (long)((MutableType)entry.getValue()).value);
            Assert.assertFalse((boolean)bIt.hasNext());
            bIt = broadcastState3.iterator();
            Assert.assertFalse((boolean)bIt.hasNext());
            operatorStateBackend.close();
            operatorStateBackend.dispose();
        }
        finally {
            stateHandle.discardState();
        }
        executorService.shutdown();
    }

    @Test
    public void testSnapshotAsyncClose() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        MapStateDescriptor broadcastStateDescriptor1 = new MapStateDescriptor("test4", (TypeSerializer)new JavaSerializer(), (TypeSerializer)new JavaSerializer());
        BroadcastState broadcastState1 = operatorStateBackend.getBroadcastState(broadcastStateDescriptor1);
        broadcastState1.put((Object)MutableType.of(1), (Object)MutableType.of(2));
        broadcastState1.put((Object)MutableType.of(2), (Object)MutableType.of(5));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(runnableFuture);
        waiterLatch.await();
        operatorStateBackend.close();
        blockerLatch.trigger();
        try {
            runnableFuture.get(60L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
    }

    @Test
    public void testSnapshotAsyncCancel() throws Exception {
        DefaultOperatorStateBackend operatorStateBackend = new DefaultOperatorStateBackendBuilder(OperatorStateBackendTest.class.getClassLoader(), new ExecutionConfig(), true, this.emptyStateHandles, new CloseableRegistry()).build();
        ListStateDescriptor stateDescriptor1 = new ListStateDescriptor("test1", (TypeSerializer)new JavaSerializer());
        ListState listState1 = operatorStateBackend.getListState(stateDescriptor1);
        listState1.add((Object)MutableType.of(42));
        listState1.add((Object)MutableType.of(4711));
        BlockerCheckpointStreamFactory streamFactory = new BlockerCheckpointStreamFactory(0x100000);
        OneShotLatch waiterLatch = new OneShotLatch();
        OneShotLatch blockerLatch = new OneShotLatch();
        streamFactory.setWaiterLatch(waiterLatch);
        streamFactory.setBlockerLatch(blockerLatch);
        RunnableFuture runnableFuture = operatorStateBackend.snapshot(1L, 1L, (CheckpointStreamFactory)streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(runnableFuture);
        waiterLatch.await();
        runnableFuture.cancel(true);
        for (BlockingCheckpointOutputStream stream : streamFactory.getAllCreatedStreams()) {
            Assert.assertTrue((boolean)stream.isClosed());
        }
        blockerLatch.trigger();
        try {
            runnableFuture.get(60L, TimeUnit.SECONDS);
            Assert.fail();
        }
        catch (CancellationException cancellationException) {
            // empty catch block
        }
    }

    private static Environment createMockEnvironment() {
        Environment env = (Environment)Mockito.mock(Environment.class);
        Mockito.when((Object)env.getExecutionConfig()).thenReturn((Object)new ExecutionConfig());
        Mockito.when((Object)env.getUserCodeClassLoader()).thenReturn((Object)TestingUserCodeClassLoader.newBuilder().build());
        return env;
    }

    private static OperatorStateBackend recreateOperatorStateBackend(OperatorStateBackend oldOperatorStateBackend, AbstractStateBackend abstractStateBackend, Collection<OperatorStateHandle> toRestore) throws Exception {
        oldOperatorStateBackend.close();
        oldOperatorStateBackend.dispose();
        return abstractStateBackend.createOperatorStateBackend(OperatorStateBackendTest.createMockEnvironment(), "testOperator", toRestore, new CloseableRegistry());
    }

    static final class MutableType
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private int value;

        public MutableType() {
            this(0);
        }

        public MutableType(int value) {
            this.value = value;
        }

        public int getValue() {
            return this.value;
        }

        public void setValue(int value) {
            this.value = value;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            MutableType that = (MutableType)o;
            return this.value == that.value;
        }

        public int hashCode() {
            return this.value;
        }

        static MutableType of(int value) {
            return new MutableType(value);
        }
    }

    public static class VerifyingIntSerializerSnapshot
    extends SimpleTypeSerializerSnapshot<Integer> {
        public VerifyingIntSerializerSnapshot() {
            super(() -> new VerifyingIntSerializer(Thread.currentThread().getContextClassLoader(), new AtomicInteger()));
        }
    }

    private static final class VerifyingIntSerializer
    extends TypeSerializer<Integer> {
        private static final long serialVersionUID = -5344563614550163898L;
        private transient ClassLoader classLoader;
        private transient AtomicInteger atomicInteger;

        private VerifyingIntSerializer(ClassLoader classLoader, AtomicInteger atomicInteger) {
            this.classLoader = (ClassLoader)Preconditions.checkNotNull((Object)classLoader);
            this.atomicInteger = (AtomicInteger)Preconditions.checkNotNull((Object)atomicInteger);
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<Integer> duplicate() {
            return this;
        }

        public Integer createInstance() {
            return 0;
        }

        public Integer copy(Integer from) {
            Assert.assertEquals((Object)this.classLoader, (Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(from);
        }

        public Integer copy(Integer from, Integer reuse) {
            Assert.assertEquals((Object)this.classLoader, (Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            return IntSerializer.INSTANCE.copy(from, reuse);
        }

        public int getLength() {
            return IntSerializer.INSTANCE.getLength();
        }

        public void serialize(Integer record, DataOutputView target) throws IOException {
            IntSerializer.INSTANCE.serialize(record, target);
        }

        public Integer deserialize(DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(source);
        }

        public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
            return IntSerializer.INSTANCE.deserialize(reuse, source);
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            Assert.assertEquals((Object)this.classLoader, (Object)Thread.currentThread().getContextClassLoader());
            this.atomicInteger.incrementAndGet();
            IntSerializer.INSTANCE.copy(source, target);
        }

        public boolean equals(Object obj) {
            return obj instanceof VerifyingIntSerializer;
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public TypeSerializerSnapshot<Integer> snapshotConfiguration() {
            return new VerifyingIntSerializerSnapshot();
        }
    }
}

