/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
import org.apache.flink.streaming.api.operators.BackendRestorerProcedure;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamTaskStateInitializerImpl
implements StreamTaskStateInitializer {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTaskStateInitializerImpl.class);
    private final Environment environment;
    private final ProcessingTimeService processingTimeService;
    private final TaskStateManager taskStateManager;
    private final StateBackend stateBackend;

    public StreamTaskStateInitializerImpl(Environment environment, StateBackend stateBackend, ProcessingTimeService processingTimeService) {
        this.environment = environment;
        this.taskStateManager = (TaskStateManager)Preconditions.checkNotNull((Object)environment.getTaskStateManager());
        this.stateBackend = (StateBackend)Preconditions.checkNotNull((Object)stateBackend);
        this.processingTimeService = processingTimeService;
    }

    @Override
    public StreamOperatorStateContext streamOperatorStateContext(@Nonnull OperatorID operatorID, @Nonnull String operatorClassName, @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup) throws Exception {
        TaskInfo taskInfo = this.environment.getTaskInfo();
        OperatorSubtaskDescriptionText operatorSubtaskDescription = new OperatorSubtaskDescriptionText(operatorID, operatorClassName, taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks());
        String operatorIdentifierText = operatorSubtaskDescription.toString();
        PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates = this.taskStateManager.prioritizedOperatorState(operatorID);
        AbstractKeyedStateBackend<?> keyedStatedBackend = null;
        OperatorStateBackend operatorStateBackend = null;
        CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
        CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
        try {
            keyedStatedBackend = this.keyedStatedBackend(keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup);
            operatorStateBackend = this.operatorStateBackend(operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry);
            rawKeyedStateInputs = this.rawKeyedStateInputs(prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator());
            streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
            rawOperatorStateInputs = this.rawOperatorStateInputs(prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator());
            streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
            InternalTimeServiceManager<?> timeServiceManager = this.internalTimeServiceManager((AbstractKeyedStateBackend)keyedStatedBackend, keyContext, (Iterable<KeyGroupStatePartitionStreamProvider>)rawKeyedStateInputs);
            return new StreamOperatorStateContextImpl(prioritizedOperatorSubtaskStates.isRestored(), operatorStateBackend, keyedStatedBackend, timeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs);
        }
        catch (Exception ex) {
            if (keyedStatedBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
                    IOUtils.closeQuietly(keyedStatedBackend);
                }
                keyedStatedBackend.dispose();
            }
            if (operatorStateBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
                    IOUtils.closeQuietly(operatorStateBackend);
                }
                operatorStateBackend.dispose();
            }
            if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
                IOUtils.closeQuietly(rawKeyedStateInputs);
            }
            if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
                IOUtils.closeQuietly(rawOperatorStateInputs);
            }
            throw new Exception("Exception while creating StreamOperatorStateContext.", ex);
        }
    }

    protected <K> InternalTimeServiceManager<K> internalTimeServiceManager(AbstractKeyedStateBackend<K> keyedStatedBackend, KeyContext keyContext, Iterable<KeyGroupStatePartitionStreamProvider> rawKeyedStates) throws Exception {
        if (keyedStatedBackend == null) {
            return null;
        }
        KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange();
        InternalTimeServiceManager timeServiceManager = new InternalTimeServiceManager(keyGroupRange, keyContext, (PriorityQueueSetFactory)keyedStatedBackend, this.processingTimeService, keyedStatedBackend.requiresLegacySynchronousTimerSnapshots());
        for (KeyGroupStatePartitionStreamProvider streamProvider : rawKeyedStates) {
            int keyGroupIdx = streamProvider.getKeyGroupId();
            Preconditions.checkArgument((boolean)keyGroupRange.contains(keyGroupIdx), (Object)("Key Group " + keyGroupIdx + " does not belong to the local range."));
            timeServiceManager.restoreStateForKeyGroup(streamProvider.getStream(), keyGroupIdx, this.environment.getUserClassLoader());
        }
        return timeServiceManager;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected OperatorStateBackend operatorStateBackend(String operatorIdentifierText, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry) throws Exception {
        String logDescription = "operator state backend for " + operatorIdentifierText;
        CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();
        backendCloseableRegistry.registerCloseable((Closeable)cancelStreamRegistryForRestore);
        BackendRestorerProcedure backendRestorer = new BackendRestorerProcedure(stateHandles -> this.stateBackend.createOperatorStateBackend(this.environment, operatorIdentifierText, stateHandles, cancelStreamRegistryForRestore), backendCloseableRegistry, logDescription);
        try {
            OperatorStateBackend operatorStateBackend = (OperatorStateBackend)backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState());
            return operatorStateBackend;
        }
        finally {
            if (backendCloseableRegistry.unregisterCloseable((Closeable)cancelStreamRegistryForRestore)) {
                IOUtils.closeQuietly((Closeable)cancelStreamRegistryForRestore);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <K> AbstractKeyedStateBackend<K> keyedStatedBackend(TypeSerializer<K> keySerializer, String operatorIdentifierText, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry, MetricGroup metricGroup) throws Exception {
        if (keySerializer == null) {
            return null;
        }
        String logDescription = "keyed state backend for " + operatorIdentifierText;
        TaskInfo taskInfo = this.environment.getTaskInfo();
        KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)taskInfo.getMaxNumberOfParallelSubtasks(), (int)taskInfo.getNumberOfParallelSubtasks(), (int)taskInfo.getIndexOfThisSubtask());
        CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();
        backendCloseableRegistry.registerCloseable((Closeable)cancelStreamRegistryForRestore);
        BackendRestorerProcedure backendRestorer = new BackendRestorerProcedure(stateHandles -> this.stateBackend.createKeyedStateBackend(this.environment, this.environment.getJobID(), operatorIdentifierText, keySerializer, taskInfo.getMaxNumberOfParallelSubtasks(), keyGroupRange, this.environment.getTaskKvStateRegistry(), TtlTimeProvider.DEFAULT, metricGroup, stateHandles, cancelStreamRegistryForRestore), backendCloseableRegistry, logDescription);
        try {
            AbstractKeyedStateBackend abstractKeyedStateBackend = (AbstractKeyedStateBackend)backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState());
            return abstractKeyedStateBackend;
        }
        finally {
            if (backendCloseableRegistry.unregisterCloseable((Closeable)cancelStreamRegistryForRestore)) {
                IOUtils.closeQuietly((Closeable)cancelStreamRegistryForRestore);
            }
        }
    }

    protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs(Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives) {
        if (restoreStateAlternatives.hasNext()) {
            final Collection rawOperatorState = (Collection)restoreStateAlternatives.next();
            Preconditions.checkState((!restoreStateAlternatives.hasNext() ? 1 : 0) != 0, (Object)"Local recovery is currently not implemented for raw operator state, but found state alternative.");
            if (rawOperatorState != null) {
                return new CloseableIterable<StatePartitionStreamProvider>(){
                    final CloseableRegistry closeableRegistry = new CloseableRegistry();

                    public void close() throws IOException {
                        this.closeableRegistry.close();
                    }

                    @Nonnull
                    public Iterator<StatePartitionStreamProvider> iterator() {
                        return new OperatorStateStreamIterator("_default_", rawOperatorState.iterator(), this.closeableRegistry);
                    }
                };
            }
        }
        return CloseableIterable.empty();
    }

    protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs(Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives) {
        if (restoreStateAlternatives.hasNext()) {
            Collection rawKeyedState = (Collection)restoreStateAlternatives.next();
            Preconditions.checkState((!restoreStateAlternatives.hasNext() ? 1 : 0) != 0, (Object)"Local recovery is currently not implemented for raw keyed state, but found state alternative.");
            if (rawKeyedState != null) {
                final Collection<KeyGroupsStateHandle> keyGroupsStateHandles = StreamTaskStateInitializerImpl.transform(rawKeyedState);
                final CloseableRegistry closeableRegistry = new CloseableRegistry();
                return new CloseableIterable<KeyGroupStatePartitionStreamProvider>(){

                    public void close() throws IOException {
                        closeableRegistry.close();
                    }

                    public Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
                        return new KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
                    }
                };
            }
        }
        return CloseableIterable.empty();
    }

    private static Collection<KeyGroupsStateHandle> transform(Collection<KeyedStateHandle> keyedStateHandles) {
        if (keyedStateHandles == null) {
            return null;
        }
        ArrayList<KeyGroupsStateHandle> keyGroupsStateHandles = new ArrayList<KeyGroupsStateHandle>(keyedStateHandles.size());
        for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
            if (keyedStateHandle instanceof KeyGroupsStateHandle) {
                keyGroupsStateHandles.add((KeyGroupsStateHandle)keyedStateHandle);
                continue;
            }
            if (keyedStateHandle == null) continue;
            throw new IllegalStateException("Unexpected state handle type, expected: " + KeyGroupsStateHandle.class + ", but found: " + keyedStateHandle.getClass() + ".");
        }
        return keyGroupsStateHandles;
    }

    private static class StreamOperatorStateContextImpl
    implements StreamOperatorStateContext {
        private final boolean restored;
        private final OperatorStateBackend operatorStateBackend;
        private final AbstractKeyedStateBackend<?> keyedStateBackend;
        private final InternalTimeServiceManager<?> internalTimeServiceManager;
        private final CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
        private final CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;

        StreamOperatorStateContextImpl(boolean restored, OperatorStateBackend operatorStateBackend, AbstractKeyedStateBackend<?> keyedStateBackend, InternalTimeServiceManager<?> internalTimeServiceManager, CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs, CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs) {
            this.restored = restored;
            this.operatorStateBackend = operatorStateBackend;
            this.keyedStateBackend = keyedStateBackend;
            this.internalTimeServiceManager = internalTimeServiceManager;
            this.rawOperatorStateInputs = rawOperatorStateInputs;
            this.rawKeyedStateInputs = rawKeyedStateInputs;
        }

        @Override
        public boolean isRestored() {
            return this.restored;
        }

        @Override
        public AbstractKeyedStateBackend<?> keyedStateBackend() {
            return this.keyedStateBackend;
        }

        @Override
        public OperatorStateBackend operatorStateBackend() {
            return this.operatorStateBackend;
        }

        @Override
        public InternalTimeServiceManager<?> internalTimerServiceManager() {
            return this.internalTimeServiceManager;
        }

        @Override
        public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
            return this.rawOperatorStateInputs;
        }

        @Override
        public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
            return this.rawKeyedStateInputs;
        }
    }

    private static abstract class AbstractStateStreamIterator<T extends StatePartitionStreamProvider, H extends StreamStateHandle>
    implements Iterator<T> {
        protected final Iterator<H> stateHandleIterator;
        protected final CloseableRegistry closableRegistry;
        protected H currentStateHandle;
        protected FSDataInputStream currentStream;

        AbstractStateStreamIterator(Iterator<H> stateHandleIterator, CloseableRegistry closableRegistry) {
            this.stateHandleIterator = (Iterator)Preconditions.checkNotNull(stateHandleIterator);
            this.closableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closableRegistry);
        }

        protected void openCurrentStream() throws IOException {
            Preconditions.checkState((this.currentStream == null ? 1 : 0) != 0);
            FSDataInputStream stream = this.currentStateHandle.openInputStream();
            this.closableRegistry.registerCloseable((Closeable)stream);
            this.currentStream = stream;
        }

        protected void closeCurrentStream() {
            if (this.closableRegistry.unregisterCloseable((Closeable)this.currentStream)) {
                IOUtils.closeQuietly((InputStream)this.currentStream);
            }
            this.currentStream = null;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Read only Iterator");
        }
    }

    private static class OperatorStateStreamIterator
    extends AbstractStateStreamIterator<StatePartitionStreamProvider, OperatorStateHandle> {
        private final String stateName;
        private long[] offsets;
        private int offPos;

        OperatorStateStreamIterator(String stateName, Iterator<OperatorStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
            super(stateHandleIterator, closableRegistry);
            this.stateName = (String)Preconditions.checkNotNull((Object)stateName);
        }

        @Override
        public boolean hasNext() {
            if (null != this.offsets && this.offPos < this.offsets.length) {
                return true;
            }
            this.closeCurrentStream();
            while (this.stateHandleIterator.hasNext()) {
                long[] metaOffsets;
                this.currentStateHandle = (StreamStateHandle)this.stateHandleIterator.next();
                OperatorStateHandle.StateMetaInfo metaInfo = (OperatorStateHandle.StateMetaInfo)((OperatorStateHandle)this.currentStateHandle).getStateNameToPartitionOffsets().get(this.stateName);
                if (null == metaInfo || null == (metaOffsets = metaInfo.getOffsets()) || metaOffsets.length <= 0) continue;
                this.offsets = metaOffsets;
                this.offPos = 0;
                if (this.closableRegistry.unregisterCloseable((Closeable)this.currentStream)) {
                    IOUtils.closeQuietly((InputStream)this.currentStream);
                    this.currentStream = null;
                }
                return true;
            }
            return false;
        }

        @Override
        public StatePartitionStreamProvider next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Iterator exhausted");
            }
            long offset = this.offsets[this.offPos++];
            try {
                if (null == this.currentStream) {
                    this.openCurrentStream();
                }
                this.currentStream.seek(offset);
                return new StatePartitionStreamProvider((InputStream)this.currentStream);
            }
            catch (IOException ioex) {
                return new StatePartitionStreamProvider(ioex);
            }
        }
    }

    private static class KeyGroupStreamIterator
    extends AbstractStateStreamIterator<KeyGroupStatePartitionStreamProvider, KeyGroupsStateHandle> {
        private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;

        KeyGroupStreamIterator(Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
            super(stateHandleIterator, closableRegistry);
        }

        @Override
        public boolean hasNext() {
            if (null != this.currentStateHandle && this.currentOffsetsIterator.hasNext()) {
                return true;
            }
            this.closeCurrentStream();
            while (this.stateHandleIterator.hasNext()) {
                this.currentStateHandle = (StreamStateHandle)this.stateHandleIterator.next();
                if (((KeyGroupsStateHandle)this.currentStateHandle).getKeyGroupRange().getNumberOfKeyGroups() <= 0) continue;
                this.currentOffsetsIterator = ((KeyGroupsStateHandle)this.currentStateHandle).getGroupRangeOffsets().iterator();
                return true;
            }
            return false;
        }

        @Override
        public KeyGroupStatePartitionStreamProvider next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Iterator exhausted");
            }
            Tuple2<Integer, Long> keyGroupOffset = this.currentOffsetsIterator.next();
            try {
                if (null == this.currentStream) {
                    this.openCurrentStream();
                }
                this.currentStream.seek(((Long)keyGroupOffset.f1).longValue());
                return new KeyGroupStatePartitionStreamProvider((InputStream)this.currentStream, ((Integer)keyGroupOffset.f0).intValue());
            }
            catch (IOException ioex) {
                return new KeyGroupStatePartitionStreamProvider(ioex, ((Integer)keyGroupOffset.f0).intValue());
            }
        }
    }
}

