/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.Preconditions;

@Internal
public class OneInputStreamTask<IN, OUT>
extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
    private final WatermarkGauge inputWatermarkGauge = new WatermarkGauge();

    public OneInputStreamTask(Environment env) {
        super(env);
    }

    @VisibleForTesting
    public OneInputStreamTask(Environment env, @Nullable TimerService timeProvider) {
        super(env, timeProvider);
    }

    @Override
    public void init() throws Exception {
        StreamConfig configuration = this.getConfiguration();
        int numberOfInputs = configuration.getNumberOfInputs();
        if (numberOfInputs > 0) {
            CheckpointedInputGate inputGate = this.createCheckpointedInputGate();
            TaskIOMetricGroup taskIOMetricGroup = this.getEnvironment().getMetricGroup().getIOMetricGroup();
            taskIOMetricGroup.gauge("checkpointAlignmentTime", inputGate::getAlignmentDurationNanos);
            PushingAsyncDataInput.DataOutput<IN> output = this.createDataOutput();
            StreamTaskInput<IN> input = this.createTaskInput(inputGate, output);
            this.inputProcessor = new StreamOneInputProcessor<IN>(input, output, this.getCheckpointLock(), this.operatorChain);
        }
        ((OneInputStreamOperator)this.headOperator).getMetricGroup().gauge("currentInputWatermark", (Gauge)this.inputWatermarkGauge);
        this.getEnvironment().getMetricGroup().gauge("currentInputWatermark", this.inputWatermarkGauge::getValue);
    }

    private CheckpointedInputGate createCheckpointedInputGate() throws IOException {
        InputGate[] inputGates = this.getEnvironment().getAllInputGates();
        InputGate inputGate = InputGateUtil.createInputGate(inputGates);
        return InputProcessorUtil.createCheckpointedInputGate(this, this.configuration.getCheckpointMode(), this.getEnvironment().getIOManager(), inputGate, this.getEnvironment().getTaskManagerInfo().getConfiguration(), this.getTaskNameWithSubtaskAndId());
    }

    private PushingAsyncDataInput.DataOutput<IN> createDataOutput() {
        return new StreamTaskNetworkOutput((OneInputStreamOperator)this.headOperator, this.getStreamStatusMaintainer(), this.getCheckpointLock(), this.inputWatermarkGauge, this.setupNumRecordsInCounter(this.headOperator));
    }

    private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate, PushingAsyncDataInput.DataOutput<IN> output) {
        int numberOfInputChannels = inputGate.getNumberOfInputChannels();
        StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels, output);
        TypeSerializer inSerializer = this.configuration.getTypeSerializerIn1(this.getUserCodeClassLoader());
        return new StreamTaskNetworkInput(inputGate, inSerializer, this.getEnvironment().getIOManager(), statusWatermarkValve, 0);
    }

    private static class StreamTaskNetworkOutput<IN>
    extends AbstractDataOutput<IN> {
        private final OneInputStreamOperator<IN, ?> operator;
        private final WatermarkGauge watermarkGauge;
        private final Counter numRecordsIn;

        private StreamTaskNetworkOutput(OneInputStreamOperator<IN, ?> operator, StreamStatusMaintainer streamStatusMaintainer, Object lock, WatermarkGauge watermarkGauge, Counter numRecordsIn) {
            super(streamStatusMaintainer, lock);
            this.operator = (OneInputStreamOperator)Preconditions.checkNotNull(operator);
            this.watermarkGauge = (WatermarkGauge)Preconditions.checkNotNull((Object)watermarkGauge);
            this.numRecordsIn = (Counter)Preconditions.checkNotNull((Object)numRecordsIn);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitRecord(StreamRecord<IN> record) throws Exception {
            Object object = this.lock;
            synchronized (object) {
                this.numRecordsIn.inc();
                this.operator.setKeyContextElement1(record);
                this.operator.processElement(record);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            Object object = this.lock;
            synchronized (object) {
                this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                this.operator.processWatermark(watermark);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            Object object = this.lock;
            synchronized (object) {
                this.operator.processLatencyMarker(latencyMarker);
            }
        }
    }
}

