/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyExtractorFunction;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StateSnapshotKeyGroupReader;
import org.apache.flink.runtime.state.StateSnapshotRestore;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.CopyOnWriteStateTable;
import org.apache.flink.runtime.state.heap.HeapAggregatingState;
import org.apache.flink.runtime.state.heap.HeapFoldingState;
import org.apache.flink.runtime.state.heap.HeapListState;
import org.apache.flink.runtime.state.heap.HeapMapState;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapReducingState;
import org.apache.flink.runtime.state.heap.HeapValueState;
import org.apache.flink.runtime.state.heap.NestedMapsStateTable;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeapKeyedStateBackend<K>
extends AbstractKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(HeapKeyedStateBackend.class);
    private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES = Stream.of(Tuple2.of(ValueStateDescriptor.class, HeapValueState::create), Tuple2.of(ListStateDescriptor.class, HeapListState::create), Tuple2.of(MapStateDescriptor.class, HeapMapState::create), Tuple2.of(AggregatingStateDescriptor.class, HeapAggregatingState::create), Tuple2.of(ReducingStateDescriptor.class, HeapReducingState::create), Tuple2.of(FoldingStateDescriptor.class, HeapFoldingState::create)).collect(Collectors.toMap(t -> (Class)t.f0, t -> (StateFactory)t.f1));
    private final Map<String, StateTable<K, ?, ?>> registeredKVStates = new HashMap();
    private final Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<String, HeapPriorityQueueSnapshotRestoreWrapper>();
    private final Map<StateUID, StateMetaInfoSnapshot> restoredStateMetaInfo;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final HeapSnapshotStrategy snapshotStrategy;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;

    public HeapKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, boolean asynchronousSnapshots, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, HeapPriorityQueueSetFactory priorityQueueSetFactory, TtlTimeProvider ttlTimeProvider) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
        this.localRecoveryConfig = (LocalRecoveryConfig)Preconditions.checkNotNull((Object)localRecoveryConfig);
        SnapshotStrategySynchronicityBehavior synchronicityTrait = asynchronousSnapshots ? new AsyncSnapshotStrategySynchronicityBehavior() : new SyncSnapshotStrategySynchronicityBehavior();
        this.snapshotStrategy = new HeapSnapshotStrategy(synchronicityTrait);
        LOG.info("Initializing heap keyed state backend with stream factory.");
        this.restoredStateMetaInfo = new HashMap<StateUID, StateMetaInfoSnapshot>();
        this.priorityQueueSetFactory = priorityQueueSetFactory;
    }

    @Override
    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        HeapPriorityQueueSnapshotRestoreWrapper existingState = this.registeredPQStates.get(stateName);
        if (existingState != null) {
            StateMetaInfoSnapshot restoredMetaInfoSnapshot = this.restoredStateMetaInfo.get(StateUID.of(stateName, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE));
            Preconditions.checkState((restoredMetaInfoSnapshot != null ? 1 : 0) != 0, (Object)"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            StateMetaInfoSnapshot.CommonSerializerKeys serializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER;
            TypeSerializerSnapshot serializerSnapshot = (TypeSerializerSnapshot)Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(serializerKey));
            TypeSerializerSchemaCompatibility compatibilityResult = serializerSnapshot.resolveSchemaCompatibility(byteOrderedElementSerializer);
            if (compatibilityResult.isIncompatible()) {
                throw new FlinkRuntimeException((Throwable)new StateMigrationException("For heap backends, the new priority queue serializer must not be incompatible."));
            }
            this.registeredPQStates.put(stateName, existingState.forUpdatedSerializer(byteOrderedElementSerializer));
            return existingState.getPriorityQueue();
        }
        RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo = new RegisteredPriorityQueueStateBackendMetaInfo<T>(stateName, byteOrderedElementSerializer);
        return this.createInternal(metaInfo);
    }

    @Nonnull
    private <T extends HeapPriorityQueueElement & PriorityComparable> KeyGroupedInternalPriorityQueue<T> createInternal(RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo) {
        String stateName = metaInfo.getName();
        KeyGroupedInternalPriorityQueue priorityQueue = this.priorityQueueSetFactory.create(stateName, (TypeSerializer)metaInfo.getElementSerializer());
        HeapPriorityQueueSnapshotRestoreWrapper<T> wrapper = new HeapPriorityQueueSnapshotRestoreWrapper<T>(priorityQueue, metaInfo, KeyExtractorFunction.forKeyedObjects(), this.keyGroupRange, this.numberOfKeyGroups);
        this.registeredPQStates.put(stateName, wrapper);
        return priorityQueue;
    }

    private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer, StateDescriptor<?, V> stateDesc, StateSnapshotTransformer<V> snapshotTransformer) throws StateMigrationException {
        StateTable<K, Object, Object> stateTable = this.registeredKVStates.get(stateDesc.getName());
        TypeSerializer newStateSerializer = stateDesc.getSerializer();
        RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<N, V>(stateDesc.getType(), stateDesc.getName(), namespaceSerializer, newStateSerializer, snapshotTransformer);
        if (stateTable != null) {
            StateMetaInfoSnapshot restoredMetaInfoSnapshot = this.restoredStateMetaInfo.get(StateUID.of(stateDesc.getName(), StateMetaInfoSnapshot.BackendStateType.KEY_VALUE));
            Preconditions.checkState((restoredMetaInfoSnapshot != null ? 1 : 0) != 0, (Object)"Requested to check compatibility of a restored RegisteredKeyedBackendStateMetaInfo, but its corresponding restored snapshot cannot be found.");
            TypeSerializerSnapshot namespaceSerializerSnapshot = (TypeSerializerSnapshot)Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.NAMESPACE_SERIALIZER.toString()));
            TypeSerializerSchemaCompatibility namespaceCompatibility = namespaceSerializerSnapshot.resolveSchemaCompatibility(namespaceSerializer);
            if (namespaceCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new namespace serializer must not be incompatible.");
            }
            TypeSerializerSnapshot stateSerializerSnapshot = (TypeSerializerSnapshot)Preconditions.checkNotNull(restoredMetaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString()));
            RegisteredKeyValueStateBackendMetaInfo.checkStateMetaInfo(restoredMetaInfoSnapshot, stateDesc);
            TypeSerializerSchemaCompatibility stateCompatibility = stateSerializerSnapshot.resolveSchemaCompatibility(newStateSerializer);
            if (stateCompatibility.isIncompatible()) {
                throw new StateMigrationException("For heap backends, the new state serializer must not be incompatible.");
            }
            stateTable.setMetaInfo(newMetaInfo);
        } else {
            stateTable = this.snapshotStrategy.newStateTable(newMetaInfo);
            this.registeredKVStates.put(stateDesc.getName(), stateTable);
        }
        return stateTable;
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        if (!this.registeredKVStates.containsKey(state)) {
            return Stream.empty();
        }
        StateSnapshotRestore stateSnapshotRestore = this.registeredKVStates.get(state);
        StateTable table = (StateTable)stateSnapshotRestore;
        return table.getKeys(namespace);
    }

    private boolean hasRegisteredState() {
        return !this.registeredKVStates.isEmpty() || !this.registeredPQStates.isEmpty();
    }

    @Override
    @Nonnull
    public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<S, SV> stateDesc, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
        if (stateFactory == null) {
            String message = String.format("State %s is not supported by %s", stateDesc.getClass(), this.getClass());
            throw new FlinkRuntimeException(message);
        }
        StateTable<K, N, SV> stateTable = this.tryRegisterStateTable(namespaceSerializer, stateDesc, this.getStateSnapshotTransformer(stateDesc, snapshotTransformFactory));
        return stateFactory.createState(stateDesc, stateTable, this.keySerializer);
    }

    private <SV, SEV> StateSnapshotTransformer<SV> getStateSnapshotTransformer(StateDescriptor<?, SV> stateDesc, StateSnapshotTransformer.StateSnapshotTransformFactory<SEV> snapshotTransformFactory) {
        Optional<StateSnapshotTransformer<SEV>> original = snapshotTransformFactory.createForDeserializedState();
        if (original.isPresent()) {
            if (stateDesc instanceof ListStateDescriptor) {
                return new StateSnapshotTransformer.ListStateSnapshotTransformer<SEV>(original.get());
            }
            if (stateDesc instanceof MapStateDescriptor) {
                return new StateSnapshotTransformer.MapStateSnapshotTransformer(original.get());
            }
            return original.get();
        }
        return null;
    }

    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws IOException {
        long startTime = System.currentTimeMillis();
        RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner = this.snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
        this.snapshotStrategy.logSyncCompleted(streamFactory, startTime);
        return snapshotRunner;
    }

    @Override
    public void restore(Collection<KeyedStateHandle> restoredState) throws Exception {
        if (restoredState == null || restoredState.isEmpty()) {
            return;
        }
        LOG.info("Initializing heap keyed state backend from snapshot.");
        if (LOG.isDebugEnabled()) {
            LOG.debug("Restoring snapshot from state handles: {}.", restoredState);
        }
        this.restorePartitionedState(restoredState);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restorePartitionedState(Collection<KeyedStateHandle> state) throws Exception {
        HashMap<Integer, StateMetaInfoSnapshot> kvStatesById = new HashMap<Integer, StateMetaInfoSnapshot>();
        this.registeredKVStates.clear();
        this.registeredPQStates.clear();
        boolean keySerializerRestored = false;
        for (KeyedStateHandle keyedStateHandle : state) {
            if (keyedStateHandle == null) continue;
            if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
                throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass());
            }
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)keyedStateHandle;
            FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
            this.cancelStreamRegistry.registerCloseable((Closeable)fsDataInputStream);
            try {
                DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper((InputStream)fsDataInputStream);
                KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(this.userCodeClassLoader);
                serializationProxy.read((DataInputView)inView);
                if (!keySerializerRestored) {
                    if (!serializationProxy.getKeySerializerConfigSnapshot().resolveSchemaCompatibility(this.keySerializer).isCompatibleAsIs()) {
                        throw new StateMigrationException("The new key serializer must be compatible.");
                    }
                    keySerializerRestored = true;
                }
                List<StateMetaInfoSnapshot> restoredMetaInfos = serializationProxy.getStateMetaInfoSnapshots();
                this.createOrCheckStateForMetaInfo(restoredMetaInfos, kvStatesById);
                this.readStateHandleStateData(fsDataInputStream, inView, keyGroupsStateHandle.getGroupRangeOffsets(), kvStatesById, restoredMetaInfos.size(), serializationProxy.getReadVersion(), serializationProxy.isUsingKeyGroupCompression());
            }
            finally {
                if (!this.cancelStreamRegistry.unregisterCloseable((Closeable)fsDataInputStream)) continue;
                IOUtils.closeQuietly((InputStream)fsDataInputStream);
            }
        }
    }

    private void readStateHandleStateData(FSDataInputStream fsDataInputStream, DataInputViewStreamWrapper inView, KeyGroupRangeOffsets keyGroupOffsets, Map<Integer, StateMetaInfoSnapshot> kvStatesById, int numStates, int readVersion, boolean isCompressed) throws IOException {
        StreamCompressionDecorator streamCompressionDecorator = isCompressed ? SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
        for (Tuple2<Integer, Long> groupOffset : keyGroupOffsets) {
            int keyGroupIndex = (Integer)groupOffset.f0;
            long offset = (Long)groupOffset.f1;
            Preconditions.checkState((boolean)this.keyGroupRange.contains(keyGroupIndex), (Object)"The key group must belong to the backend.");
            fsDataInputStream.seek(offset);
            int writtenKeyGroupIndex = inView.readInt();
            Preconditions.checkState((writtenKeyGroupIndex == keyGroupIndex ? 1 : 0) != 0, (Object)"Unexpected key-group in restore.");
            InputStream kgCompressionInStream = streamCompressionDecorator.decorateWithCompression((InputStream)fsDataInputStream);
            Throwable throwable = null;
            try {
                this.readKeyGroupStateData(kgCompressionInStream, kvStatesById, keyGroupIndex, numStates, readVersion);
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (kgCompressionInStream == null) continue;
                if (throwable != null) {
                    try {
                        kgCompressionInStream.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                kgCompressionInStream.close();
            }
        }
    }

    private void readKeyGroupStateData(InputStream inputStream, Map<Integer, StateMetaInfoSnapshot> kvStatesById, int keyGroupIndex, int numStates, int readVersion) throws IOException {
        DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(inputStream);
        for (int i = 0; i < numStates; ++i) {
            StateSnapshotRestore registeredState;
            short kvStateId = inView.readShort();
            StateMetaInfoSnapshot stateMetaInfoSnapshot = kvStatesById.get(kvStateId);
            switch (stateMetaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE: {
                    registeredState = this.registeredKVStates.get(stateMetaInfoSnapshot.getName());
                    break;
                }
                case PRIORITY_QUEUE: {
                    registeredState = this.registeredPQStates.get(stateMetaInfoSnapshot.getName());
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected state type: " + (Object)((Object)stateMetaInfoSnapshot.getBackendStateType()) + ".");
                }
            }
            StateSnapshotKeyGroupReader keyGroupReader = registeredState.keyGroupReader(readVersion);
            keyGroupReader.readMappingsInKeyGroup((DataInputView)inView, keyGroupIndex);
        }
    }

    private void createOrCheckStateForMetaInfo(List<StateMetaInfoSnapshot> restoredMetaInfo, Map<Integer, StateMetaInfoSnapshot> kvStatesById) {
        for (StateMetaInfoSnapshot metaInfoSnapshot : restoredMetaInfo) {
            StateSnapshotRestore registeredState;
            this.restoredStateMetaInfo.put(StateUID.of(metaInfoSnapshot.getName(), metaInfoSnapshot.getBackendStateType()), metaInfoSnapshot);
            switch (metaInfoSnapshot.getBackendStateType()) {
                case KEY_VALUE: {
                    registeredState = this.registeredKVStates.get(metaInfoSnapshot.getName());
                    if (registeredState != null) break;
                    RegisteredKeyValueStateBackendMetaInfo registeredKeyedBackendStateMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(metaInfoSnapshot);
                    this.registeredKVStates.put(metaInfoSnapshot.getName(), this.snapshotStrategy.newStateTable(registeredKeyedBackendStateMetaInfo));
                    break;
                }
                case PRIORITY_QUEUE: {
                    registeredState = this.registeredPQStates.get(metaInfoSnapshot.getName());
                    if (registeredState != null) break;
                    this.createInternal(new RegisteredPriorityQueueStateBackendMetaInfo(metaInfoSnapshot));
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected state type: " + (Object)((Object)metaInfoSnapshot.getBackendStateType()) + ".");
                }
            }
            if (registeredState != null) continue;
            kvStatesById.put(kvStatesById.size(), metaInfoSnapshot);
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) {
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        try (Stream<K> keyStream = this.getKeys(stateDescriptor.getName(), namespace);){
            List keys = keyStream.collect(Collectors.toList());
            S state = this.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
            for (Object key : keys) {
                this.setCurrentKey(key);
                function.process(key, state);
            }
        }
    }

    public String toString() {
        return "HeapKeyedStateBackend";
    }

    @Override
    @VisibleForTesting
    public int numKeyValueStateEntries() {
        int sum = 0;
        for (StateSnapshotRestore stateSnapshotRestore : this.registeredKVStates.values()) {
            sum += ((StateTable)stateSnapshotRestore).size();
        }
        return sum;
    }

    @VisibleForTesting
    public int numKeyValueStateEntries(Object namespace) {
        int sum = 0;
        for (StateTable<K, ?, ?> state : this.registeredKVStates.values()) {
            sum += state.sizeOfNamespace(namespace);
        }
        return sum;
    }

    @Override
    public boolean supportsAsynchronousSnapshots() {
        return this.snapshotStrategy.isAsynchronous();
    }

    @VisibleForTesting
    public LocalRecoveryConfig getLocalRecoveryConfig() {
        return this.localRecoveryConfig;
    }

    private static final class StateUID {
        @Nonnull
        private final String stateName;
        @Nonnull
        private final StateMetaInfoSnapshot.BackendStateType stateType;

        StateUID(@Nonnull String stateName, @Nonnull StateMetaInfoSnapshot.BackendStateType stateType) {
            this.stateName = stateName;
            this.stateType = stateType;
        }

        @Nonnull
        public String getStateName() {
            return this.stateName;
        }

        @Nonnull
        public StateMetaInfoSnapshot.BackendStateType getStateType() {
            return this.stateType;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            StateUID uid = (StateUID)o;
            return Objects.equals(this.getStateName(), uid.getStateName()) && this.getStateType() == uid.getStateType();
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.getStateName(), this.getStateType()});
        }

        public static StateUID of(@Nonnull String stateName, @Nonnull StateMetaInfoSnapshot.BackendStateType stateType) {
            return new StateUID(stateName, stateType);
        }
    }

    private static interface StateFactory {
        public <K, N, SV, S extends State, IS extends S> IS createState(StateDescriptor<S, SV> var1, StateTable<K, N, SV> var2, TypeSerializer<K> var3) throws Exception;
    }

    private class HeapSnapshotStrategy
    extends AbstractSnapshotStrategy<KeyedStateHandle>
    implements SnapshotStrategySynchronicityBehavior<K> {
        private final SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait;

        HeapSnapshotStrategy(SnapshotStrategySynchronicityBehavior<K> snapshotStrategySynchronicityTrait) {
            super("Heap backend snapshot");
            this.snapshotStrategySynchronicityTrait = snapshotStrategySynchronicityTrait;
        }

        @Override
        @Nonnull
        public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, final @Nonnull CheckpointStreamFactory primaryStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws IOException {
            if (!HeapKeyedStateBackend.this.hasRegisteredState()) {
                return DoneFuture.of(SnapshotResult.empty());
            }
            int numStates = HeapKeyedStateBackend.this.registeredKVStates.size() + HeapKeyedStateBackend.this.registeredPQStates.size();
            Preconditions.checkState((numStates <= Short.MAX_VALUE ? 1 : 0) != 0, (Object)("Too many states: " + numStates + ". Currently at most " + Short.MAX_VALUE + " states are supported"));
            ArrayList<StateMetaInfoSnapshot> metaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(numStates);
            final HashMap<StateUID, Integer> stateNamesToId = new HashMap<StateUID, Integer>(numStates);
            final HashMap<StateUID, StateSnapshot> cowStateStableSnapshots = new HashMap<StateUID, StateSnapshot>(numStates);
            this.processSnapshotMetaInfoForAllStates(metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId, HeapKeyedStateBackend.this.registeredKVStates, StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);
            this.processSnapshotMetaInfoForAllStates(metaInfoSnapshots, cowStateStableSnapshots, stateNamesToId, HeapKeyedStateBackend.this.registeredPQStates, StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);
            final KeyedBackendSerializationProxy serializationProxy = new KeyedBackendSerializationProxy(HeapKeyedStateBackend.this.keySerializer, metaInfoSnapshots, !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, HeapKeyedStateBackend.this.keyGroupCompressionDecorator));
            final SupplierWithException checkpointStreamSupplier = HeapKeyedStateBackend.this.localRecoveryConfig.isLocalRecoveryEnabled() ? () -> CheckpointStreamWithResultProvider.createDuplicatingStream(checkpointId, CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory, HeapKeyedStateBackend.this.localRecoveryConfig.getLocalStateDirectoryProvider()) : () -> CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE, primaryStreamFactory);
            AsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>> asyncSnapshotCallable = new AsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>>(){

                @Override
                protected SnapshotResult<KeyedStateHandle> callInternal() throws Exception {
                    CheckpointStreamWithResultProvider streamWithResultProvider = (CheckpointStreamWithResultProvider)checkpointStreamSupplier.get();
                    this.registerCloseableForCancellation(streamWithResultProvider);
                    CheckpointStreamFactory.CheckpointStateOutputStream localStream = streamWithResultProvider.getCheckpointOutputStream();
                    DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper((OutputStream)((Object)localStream));
                    serializationProxy.write((DataOutputView)outView);
                    long[] keyGroupRangeOffsets = new long[HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups()];
                    for (int keyGroupPos = 0; keyGroupPos < HeapKeyedStateBackend.this.keyGroupRange.getNumberOfKeyGroups(); ++keyGroupPos) {
                        int keyGroupId = HeapKeyedStateBackend.this.keyGroupRange.getKeyGroupId(keyGroupPos);
                        keyGroupRangeOffsets[keyGroupPos] = localStream.getPos();
                        outView.writeInt(keyGroupId);
                        for (Map.Entry stateSnapshot : cowStateStableSnapshots.entrySet()) {
                            StateSnapshot.StateKeyGroupWriter partitionedSnapshot = ((StateSnapshot)stateSnapshot.getValue()).getKeyGroupWriter();
                            OutputStream kgCompressionOut = HeapKeyedStateBackend.this.keyGroupCompressionDecorator.decorateWithCompression((OutputStream)((Object)localStream));
                            Throwable throwable = null;
                            try {
                                DataOutputViewStreamWrapper kgCompressionView = new DataOutputViewStreamWrapper(kgCompressionOut);
                                kgCompressionView.writeShort(((Integer)stateNamesToId.get(stateSnapshot.getKey())).intValue());
                                partitionedSnapshot.writeStateInKeyGroup((DataOutputView)kgCompressionView, keyGroupId);
                            }
                            catch (Throwable throwable2) {
                                throwable = throwable2;
                                throw throwable2;
                            }
                            finally {
                                if (kgCompressionOut == null) continue;
                                if (throwable != null) {
                                    try {
                                        kgCompressionOut.close();
                                    }
                                    catch (Throwable throwable3) {
                                        throwable.addSuppressed(throwable3);
                                    }
                                    continue;
                                }
                                kgCompressionOut.close();
                            }
                        }
                    }
                    if (this.unregisterCloseableFromCancellation(streamWithResultProvider)) {
                        KeyGroupRangeOffsets kgOffs = new KeyGroupRangeOffsets(HeapKeyedStateBackend.this.keyGroupRange, keyGroupRangeOffsets);
                        SnapshotResult<StreamStateHandle> result = streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();
                        return CheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs);
                    }
                    throw new IOException("Stream already unregistered.");
                }

                @Override
                protected void cleanupProvidedResources() {
                    for (StateSnapshot tableSnapshot : cowStateStableSnapshots.values()) {
                        tableSnapshot.release();
                    }
                }

                @Override
                protected void logAsyncSnapshotComplete(long startTime) {
                    if (HeapSnapshotStrategy.this.snapshotStrategySynchronicityTrait.isAsynchronous()) {
                        HeapSnapshotStrategy.this.logAsyncCompleted(primaryStreamFactory, startTime);
                    }
                }
            };
            AsyncSnapshotCallable.AsyncSnapshotTask task = asyncSnapshotCallable.toAsyncSnapshotFutureTask(HeapKeyedStateBackend.this.cancelStreamRegistry);
            this.finalizeSnapshotBeforeReturnHook(task);
            return task;
        }

        @Override
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            this.snapshotStrategySynchronicityTrait.finalizeSnapshotBeforeReturnHook(runnable);
        }

        @Override
        public boolean isAsynchronous() {
            return this.snapshotStrategySynchronicityTrait.isAsynchronous();
        }

        @Override
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
            return this.snapshotStrategySynchronicityTrait.newStateTable(newMetaInfo);
        }

        private void processSnapshotMetaInfoForAllStates(List<StateMetaInfoSnapshot> metaInfoSnapshots, Map<StateUID, StateSnapshot> cowStateStableSnapshots, Map<StateUID, Integer> stateNamesToId, Map<String, ? extends StateSnapshotRestore> registeredStates, StateMetaInfoSnapshot.BackendStateType stateType) {
            for (Map.Entry<String, ? extends StateSnapshotRestore> kvState : registeredStates.entrySet()) {
                StateUID stateUid = StateUID.of(kvState.getKey(), stateType);
                stateNamesToId.put(stateUid, stateNamesToId.size());
                StateSnapshotRestore state = kvState.getValue();
                if (null == state) continue;
                StateSnapshot stateSnapshot = state.stateSnapshot();
                metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
                cowStateStableSnapshots.put(stateUid, stateSnapshot);
            }
        }
    }

    private class SyncSnapshotStrategySynchronicityBehavior
    implements SnapshotStrategySynchronicityBehavior<K> {
        private SyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override
        public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
            runnable.run();
        }

        @Override
        public boolean isAsynchronous() {
            return false;
        }

        @Override
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
            return new NestedMapsStateTable(HeapKeyedStateBackend.this, newMetaInfo);
        }
    }

    private class AsyncSnapshotStrategySynchronicityBehavior
    implements SnapshotStrategySynchronicityBehavior<K> {
        private AsyncSnapshotStrategySynchronicityBehavior() {
        }

        @Override
        public boolean isAsynchronous() {
            return true;
        }

        @Override
        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo) {
            return new CopyOnWriteStateTable(HeapKeyedStateBackend.this, newMetaInfo);
        }
    }

    private static interface SnapshotStrategySynchronicityBehavior<K> {
        default public void finalizeSnapshotBeforeReturnHook(Runnable runnable) {
        }

        public boolean isAsynchronous();

        public <N, V> StateTable<K, N, V> newStateTable(RegisteredKeyValueStateBackendMetaInfo<N, V> var1);
    }
}

