/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingState;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateBinder;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.Snapshotable;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalFoldingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

public abstract class AbstractKeyedStateBackend<K>
implements KeyedStateBackend<K>,
Snapshotable<SnapshotResult<KeyedStateHandle>, Collection<KeyedStateHandle>>,
Closeable,
CheckpointListener {
    protected final TypeSerializer<K> keySerializer;
    protected K currentKey;
    private int currentKeyGroup;
    protected final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
    private String lastName;
    private InternalKvState lastState;
    protected final int numberOfKeyGroups;
    protected final KeyGroupRange keyGroupRange;
    protected final TaskKvStateRegistry kvStateRegistry;
    protected CloseableRegistry cancelStreamRegistry;
    protected final ClassLoader userCodeClassLoader;
    private final ExecutionConfig executionConfig;
    protected final StreamCompressionDecorator keyGroupCompressionDecorator;

    public AbstractKeyedStateBackend(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig) {
        this.kvStateRegistry = kvStateRegistry;
        this.keySerializer = (TypeSerializer)Preconditions.checkNotNull(keySerializer);
        this.numberOfKeyGroups = (Integer)Preconditions.checkNotNull((Object)numberOfKeyGroups);
        this.userCodeClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeClassLoader);
        this.keyGroupRange = (KeyGroupRange)Preconditions.checkNotNull((Object)keyGroupRange);
        this.cancelStreamRegistry = new CloseableRegistry();
        this.keyValueStatesByName = new HashMap();
        this.executionConfig = executionConfig;
        this.keyGroupCompressionDecorator = this.determineStreamCompression(executionConfig);
    }

    private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
        if (executionConfig != null && executionConfig.isUseSnapshotCompression()) {
            return SnappyStreamCompressionDecorator.INSTANCE;
        }
        return UncompressedStreamCompressionDecorator.INSTANCE;
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly((AutoCloseable)this.cancelStreamRegistry);
        if (this.kvStateRegistry != null) {
            this.kvStateRegistry.unregisterAll();
        }
        this.lastName = null;
        this.lastState = null;
        this.keyValueStatesByName.clear();
    }

    protected abstract <N, T> InternalValueState<K, N, T> createValueState(TypeSerializer<N> var1, ValueStateDescriptor<T> var2) throws Exception;

    protected abstract <N, T> InternalListState<K, N, T> createListState(TypeSerializer<N> var1, ListStateDescriptor<T> var2) throws Exception;

    protected abstract <N, T> InternalReducingState<K, N, T> createReducingState(TypeSerializer<N> var1, ReducingStateDescriptor<T> var2) throws Exception;

    protected abstract <N, T, ACC, R> InternalAggregatingState<K, N, T, ACC, R> createAggregatingState(TypeSerializer<N> var1, AggregatingStateDescriptor<T, ACC, R> var2) throws Exception;

    @Deprecated
    protected abstract <N, T, ACC> InternalFoldingState<K, N, T, ACC> createFoldingState(TypeSerializer<N> var1, FoldingStateDescriptor<T, ACC> var2) throws Exception;

    protected abstract <N, UK, UV> InternalMapState<K, N, UK, UV> createMapState(TypeSerializer<N> var1, MapStateDescriptor<UK, UV> var2) throws Exception;

    @Override
    public void setCurrentKey(K newKey) {
        this.currentKey = newKey;
        this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, this.numberOfKeyGroups);
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    @Override
    public K getCurrentKey() {
        return this.currentKey;
    }

    @Override
    public int getCurrentKeyGroupIndex() {
        return this.currentKeyGroup;
    }

    @Override
    public int getNumberOfKeyGroups() {
        return this.numberOfKeyGroups;
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> function) throws Exception {
        try (Stream keyStream = this.getKeys(stateDescriptor.getName(), namespace);){
            Object state = this.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
            keyStream.forEach(key -> {
                this.setCurrentKey(key);
                try {
                    function.process(key, state);
                }
                catch (Throwable e) {
                    throw new RuntimeException(e);
                }
            });
        }
    }

    @Override
    public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> namespaceSerializer, StateDescriptor<S, V> stateDescriptor) throws Exception {
        InternalKvState<K, ?, ?> existing;
        Preconditions.checkNotNull(namespaceSerializer, (String)"Namespace serializer");
        if (this.keySerializer == null) {
            throw new UnsupportedOperationException("State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        }
        if (!stateDescriptor.isSerializerInitialized()) {
            stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
        }
        if ((existing = this.keyValueStatesByName.get(stateDescriptor.getName())) != null) {
            InternalKvState<K, ?, ?> typedState = existing;
            return (S)typedState;
        }
        State state = stateDescriptor.bind(new StateBinder(){

            public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
            }

            public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
            }

            public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
            }

            public <T, ACC, R> AggregatingState<T, R> createAggregatingState(AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createAggregatingState(namespaceSerializer, stateDesc);
            }

            public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
            }

            public <UK, UV> MapState<UK, UV> createMapState(MapStateDescriptor<UK, UV> stateDesc) throws Exception {
                return AbstractKeyedStateBackend.this.createMapState(namespaceSerializer, stateDesc);
            }
        });
        InternalKvState kvState = (InternalKvState)state;
        this.keyValueStatesByName.put(stateDescriptor.getName(), kvState);
        if (stateDescriptor.isQueryable()) {
            if (this.kvStateRegistry == null) {
                throw new IllegalStateException("State backend has not been initialized for job.");
            }
            String name = stateDescriptor.getQueryableStateName();
            this.kvStateRegistry.registerKvState(this.keyGroupRange, name, kvState);
        }
        return (S)state;
    }

    @Override
    public <N, S extends State> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(namespace, (String)"Namespace");
        if (this.lastName != null && this.lastName.equals(stateDescriptor.getName())) {
            this.lastState.setCurrentNamespace(namespace);
            return (S)this.lastState;
        }
        InternalKvState<K, ?, ?> previous = this.keyValueStatesByName.get(stateDescriptor.getName());
        if (previous != null) {
            this.lastState = previous;
            this.lastState.setCurrentNamespace(namespace);
            this.lastName = stateDescriptor.getName();
            return (S)previous;
        }
        S state = this.getOrCreateKeyedState(namespaceSerializer, (StateDescriptor<S, V>)((StateDescriptor)stateDescriptor));
        InternalKvState kvState = (InternalKvState)state;
        this.lastName = stateDescriptor.getName();
        this.lastState = kvState;
        kvState.setCurrentNamespace(namespace);
        return state;
    }

    @Override
    public void close() throws IOException {
        this.cancelStreamRegistry.close();
    }

    @VisibleForTesting
    public boolean supportsAsynchronousSnapshots() {
        return false;
    }

    @VisibleForTesting
    public StreamCompressionDecorator getKeyGroupCompressionDecorator() {
        return this.keyGroupCompressionDecorator;
    }

    @VisibleForTesting
    public abstract int numStateEntries();
}

