/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Locale;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>,
Serializable {
    private static final long serialVersionUID = 1L;
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
    private transient StreamTask<?, ?> container;
    protected transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    private transient StreamingRuntimeContext runtimeContext;
    private transient KeySelector<?, ?> stateKeySelector1;
    private transient KeySelector<?, ?> stateKeySelector2;
    private transient AbstractKeyedStateBackend<?> keyedStateBackend;
    private transient DefaultKeyedStateStore keyedStateStore;
    private transient OperatorStateBackend operatorStateBackend;
    protected transient OperatorMetricGroup metrics;
    protected transient LatencyStats latencyStats;
    protected transient InternalTimeServiceManager<?, ?> timeServiceManager;
    private long combinedWatermark = Long.MIN_VALUE;
    private long input1Watermark = Long.MIN_VALUE;
    private long input2Watermark = Long.MIN_VALUE;

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        Environment environment = containingTask.getEnvironment();
        this.container = containingTask;
        this.config = config;
        try {
            OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().addOperator(config.getOperatorID(), config.getOperatorName());
            this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
            if (config.isChainStart()) {
                operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
            }
            if (config.isChainEnd()) {
                operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
            }
            this.metrics = operatorMetricGroup;
        }
        catch (Exception e) {
            LOG.warn("An error occurred while instantiating task metrics.", (Throwable)e);
            this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
            this.output = output;
        }
        try {
            LatencyStats.Granularity granularity;
            Configuration taskManagerConfig = environment.getTaskManagerInfo().getConfiguration();
            int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
            if (historySize <= 0) {
                LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", (Object)MetricOptions.LATENCY_HISTORY_SIZE, (Object)historySize);
                historySize = (Integer)MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
            }
            String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
            try {
                granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
            }
            catch (IllegalArgumentException iae) {
                granularity = LatencyStats.Granularity.OPERATOR;
                LOG.warn("Configured value {} option for {} is invalid. Defaulting to {}.", new Object[]{configuredGranularity, MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), granularity});
            }
            TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
            this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), historySize, this.container.getIndexInSubtaskGroup(), this.getOperatorID(), granularity);
        }
        catch (Exception e) {
            LOG.warn("An error occurred while instantiating latency metrics.", (Throwable)e);
            this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID(), LatencyStats.Granularity.SINGLE);
        }
        this.runtimeContext = new StreamingRuntimeContext(this, environment, this.container.getAccumulatorMap());
        this.stateKeySelector1 = config.getStatePartitioner(0, this.getUserCodeClassloader());
        this.stateKeySelector2 = config.getStatePartitioner(1, this.getUserCodeClassloader());
    }

    @Override
    public MetricGroup getMetricGroup() {
        return this.metrics;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public final void initializeState() throws Exception {
        TypeSerializer keySerializer = this.config.getStateKeySerializer(this.getUserCodeClassloader());
        StreamTask containingTask = (StreamTask)Preconditions.checkNotNull(this.getContainingTask());
        CloseableRegistry streamTaskCloseableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)containingTask.getCancelables());
        StreamTaskStateInitializer streamTaskStateManager = (StreamTaskStateInitializer)Preconditions.checkNotNull((Object)containingTask.createStreamTaskStateInitializer());
        StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(this.getOperatorID(), this.getClass().getSimpleName(), this, keySerializer, streamTaskCloseableRegistry);
        this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();
        if (this.keyedStateBackend != null) {
            this.keyedStateStore = new DefaultKeyedStateStore(this.keyedStateBackend, this.getExecutionConfig());
        }
        this.timeServiceManager = context.internalTimerServiceManager();
        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();
        try {
            StateInitializationContextImpl initializationContext = new StateInitializationContextImpl(context.isRestored(), (OperatorStateStore)this.operatorStateBackend, (KeyedStateStore)this.keyedStateStore, keyedStateInputs, operatorStateInputs);
            this.initializeState((StateInitializationContext)initializationContext);
        }
        finally {
            AbstractStreamOperator.closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
            AbstractStreamOperator.closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
        }
    }

    private static void closeFromRegistry(Closeable closeable, CloseableRegistry registry) {
        if (registry.unregisterCloseable(closeable)) {
            IOUtils.closeQuietly((AutoCloseable)closeable);
        }
    }

    @Override
    public void open() throws Exception {
    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public void dispose() throws Exception {
        Exception exception = null;
        StreamTask<?, ?> containingTask = this.getContainingTask();
        CloseableRegistry taskCloseableRegistry = containingTask != null ? containingTask.getCancelables() : null;
        try {
            if (taskCloseableRegistry == null || taskCloseableRegistry.unregisterCloseable((Closeable)this.operatorStateBackend)) {
                this.operatorStateBackend.close();
            }
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            if (taskCloseableRegistry == null || taskCloseableRegistry.unregisterCloseable(this.keyedStateBackend)) {
                this.keyedStateBackend.close();
            }
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            if (this.operatorStateBackend != null) {
                this.operatorStateBackend.dispose();
            }
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            if (this.keyedStateBackend != null) {
                this.keyedStateBackend.dispose();
            }
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            throw exception;
        }
    }

    @Override
    public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception {
        KeyGroupRange keyGroupRange = null != this.keyedStateBackend ? this.keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId, timestamp, factory, keyGroupRange, this.getContainingTask().getCancelables());){
            this.snapshotState((StateSnapshotContext)snapshotContext);
            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
            if (null != this.operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(this.operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            if (null != this.keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(this.keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
        }
        catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            }
            catch (Exception e) {
                snapshotException.addSuppressed(e);
            }
            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + this.getOperatorName() + ".";
            LOG.info(snapshotFailMessage, (Throwable)snapshotException);
            throw new Exception(snapshotFailMessage, snapshotException);
        }
        return snapshotInProgress;
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        if (this.getKeyedStateBackend() != null) {
            KeyedStateCheckpointOutputStream out;
            try {
                out = context.getRawKeyedOperatorStateOutput();
            }
            catch (Exception exception) {
                throw new Exception("Could not open raw keyed operator state stream for " + this.getOperatorName() + '.', exception);
            }
            try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                Iterator iterator = allKeyGroups.iterator();
                while (iterator.hasNext()) {
                    int keyGroupIdx = (Integer)iterator.next();
                    out.startNewKeyGroup(keyGroupIdx);
                    this.timeServiceManager.snapshotStateForKeyGroup((DataOutputView)new DataOutputViewStreamWrapper((OutputStream)out), keyGroupIdx);
                }
            }
            catch (Exception exception) {
                throw new Exception("Could not write timer service of " + this.getOperatorName() + " to checkpoint state stream.", exception);
            }
            finally {
                try {
                    out.close();
                }
                catch (Exception closeException) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", (Object)this.getOperatorName(), (Object)closeException);
                }
            }
        }
    }

    public void initializeState(StateInitializationContext context) throws Exception {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.keyedStateBackend != null) {
            this.keyedStateBackend.notifyCheckpointComplete(checkpointId);
        }
    }

    public ExecutionConfig getExecutionConfig() {
        return this.container.getExecutionConfig();
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public StreamTask<?, ?> getContainingTask() {
        return this.container;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.container.getUserCodeClassLoader();
    }

    protected String getOperatorName() {
        if (this.runtimeContext != null) {
            return this.runtimeContext.getTaskNameWithSubtasks();
        }
        return this.getClass().getSimpleName();
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.keyedStateBackend;
    }

    public OperatorStateBackend getOperatorStateBackend() {
        return this.operatorStateBackend;
    }

    protected ProcessingTimeService getProcessingTimeService() {
        return this.container.getProcessingTimeService();
    }

    protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return this.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    protected <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        if (this.keyedStateStore != null) {
            return (S)this.keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        }
        throw new IllegalStateException("Cannot create partitioned state. The keyed state backend has not been set.This indicates that the operator is not partitioned/keyed.");
    }

    protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        if (this.keyedStateStore != null) {
            return (S)this.keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
        }
        throw new RuntimeException("Cannot create partitioned state. The keyed state backend has not been set. This indicates that the operator is not partitioned/keyed.");
    }

    @Override
    public void setKeyContextElement1(StreamRecord record) throws Exception {
        this.setKeyContextElement(record, this.stateKeySelector1);
    }

    @Override
    public void setKeyContextElement2(StreamRecord record) throws Exception {
        this.setKeyContextElement(record, this.stateKeySelector2);
    }

    private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
        if (selector != null) {
            Object key = selector.getKey(record.getValue());
            this.setCurrentKey(key);
        }
    }

    @Override
    public void setCurrentKey(Object key) {
        if (this.keyedStateBackend != null) {
            try {
                AbstractKeyedStateBackend<?> rawBackend = this.keyedStateBackend;
                rawBackend.setCurrentKey(key);
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while setting the current key context.", e);
            }
        }
    }

    @Override
    public Object getCurrentKey() {
        if (this.keyedStateBackend != null) {
            return this.keyedStateBackend.getCurrentKey();
        }
        throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream.");
    }

    public KeyedStateStore getKeyedStateStore() {
        return this.keyedStateStore;
    }

    @Override
    public final void setChainingStrategy(ChainingStrategy strategy) {
        this.chainingStrategy = strategy;
    }

    @Override
    public final ChainingStrategy getChainingStrategy() {
        return this.chainingStrategy;
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
        this.latencyStats.reportLatency(marker);
        this.output.emitLatencyMarker(marker);
    }

    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        this.checkTimerServiceInitialization();
        TypeSerializer keySerializer = this.getKeyedStateBackend().getKeySerializer();
        InternalTimeServiceManager<?, ?> keyedTimeServiceHandler = this.timeServiceManager;
        return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable);
    }

    public void processWatermark(Watermark mark) throws Exception {
        if (this.timeServiceManager != null) {
            this.timeServiceManager.advanceWatermark(mark);
        }
        this.output.emitWatermark(mark);
    }

    private void checkTimerServiceInitialization() {
        if (this.getKeyedStateBackend() == null) {
            throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
        }
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
    }

    public void processWatermark1(Watermark mark) throws Exception {
        this.input1Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.combinedWatermark) {
            this.combinedWatermark = newMin;
            this.processWatermark(new Watermark(this.combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        this.input2Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.combinedWatermark) {
            this.combinedWatermark = newMin;
            this.processWatermark(new Watermark(this.combinedWatermark));
        }
    }

    @Override
    public OperatorID getOperatorID() {
        return this.config.getOperatorID();
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        return this.timeServiceManager == null ? 0 : this.timeServiceManager.numProcessingTimeTimers();
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        return this.timeServiceManager == null ? 0 : this.timeServiceManager.numEventTimeTimers();
    }

    public class CountingOutput
    implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            this.output.emitWatermark(mark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            this.output.emitLatencyMarker(latencyMarker);
        }

        public void collect(StreamRecord<OUT> record) {
            this.numRecordsOut.inc();
            this.output.collect(record);
        }

        @Override
        public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
            this.numRecordsOut.inc();
            this.output.collect(outputTag, record);
        }

        public void close() {
            this.output.close();
        }
    }
}

