/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.tasks.StreamConfigChainer;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.FunctionWithException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class StreamTaskITCase {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    @Test
    void testRecordWriterClosedOnTransitDeployingStateError() throws Exception {
        this.testRecordWriterClosedOnTransitStateError(ExecutionState.DEPLOYING);
    }

    @Test
    void testRecordWriterClosedOnTransitInitializingStateError() throws Exception {
        this.testRecordWriterClosedOnTransitStateError(ExecutionState.INITIALIZING);
    }

    @Test
    void testRecordWriterClosedOnTransitRunningStateError() throws Exception {
        this.testRecordWriterClosedOnTransitStateError(ExecutionState.RUNNING);
    }

    private void testRecordWriterClosedOnTransitStateError(final ExecutionState executionState) throws Exception {
        NoOpTaskManagerActions taskManagerActions = new NoOpTaskManagerActions(){

            public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
                if (taskExecutionState.getExecutionState() == executionState) {
                    throw new ExpectedTestException();
                }
            }
        };
        this.testRecordWriterClosedOnError((FunctionWithException<NettyShuffleEnvironment, Task, Exception>)((FunctionWithException)env -> this.taskBuilderWithConfiguredRecordWriter((NettyShuffleEnvironment)env).setTaskManagerActions((TaskManagerActions)taskManagerActions).build((Executor)EXECUTOR_RESOURCE.getExecutor())));
    }

    @Test
    void testFailInEndOfConstructor() throws Exception {
        Configuration conf = new Configuration();
        conf.setString(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD.key(), "a");
        this.testRecordWriterClosedOnError((FunctionWithException<NettyShuffleEnvironment, Task, Exception>)((FunctionWithException)env -> this.taskBuilderWithConfiguredRecordWriter((NettyShuffleEnvironment)env).setTaskManagerConfig(conf).build((Executor)EXECUTOR_RESOURCE.getExecutor())));
    }

    private void testRecordWriterClosedOnError(FunctionWithException<NettyShuffleEnvironment, Task, Exception> taskProvider) throws Exception {
        try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();){
            Task task = (Task)taskProvider.apply((Object)shuffleEnvironment);
            task.startTaskThread();
            task.getExecutingThread().join();
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
            for (Thread thread : Thread.getAllStackTraces().keySet()) {
                Assertions.assertThat((String)thread.getName()).doesNotContain(new CharSequence[]{"OutputFlusher"});
            }
        }
    }

    private TestTaskBuilder taskBuilderWithConfiguredRecordWriter(NettyShuffleEnvironment shuffleEnvironment) {
        Configuration taskConfiguration = new Configuration();
        this.outputEdgeConfiguration(taskConfiguration);
        ResultPartitionDeploymentDescriptor descriptor = new ResultPartitionDeploymentDescriptor(PartitionDescriptorBuilder.newBuilder().build(), (ShuffleDescriptor)NettyShuffleDescriptorBuilder.newBuilder().buildLocal(), 1);
        return new TestTaskBuilder((ShuffleEnvironment)shuffleEnvironment).setInvokable(NoOpStreamTask.class).setTaskConfig(taskConfiguration).setResultPartitions(Collections.singletonList(descriptor));
    }

    private void outputEdgeConfiguration(Configuration taskConfiguration) {
        StreamConfig streamConfig = new StreamConfig(taskConfiguration);
        streamConfig.setStreamOperatorFactory((StreamOperatorFactory)new UnusedOperatorFactory());
        StreamConfigChainer<StreamTaskITCase> cfg = new StreamConfigChainer<StreamTaskITCase>(new OperatorID(42L, 42L), streamConfig, this, 1);
        cfg.setBufferTimeout(1);
        cfg.chain(new OperatorID(44L, 44L), new UnusedOperatorFactory(), StringSerializer.INSTANCE, StringSerializer.INSTANCE, false);
        cfg.finish();
    }

    private static class UnusedOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private UnusedOperatorFactory() {
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            throw new UnsupportedOperationException("This shouldn't be called");
        }

        public void setChainingStrategy(ChainingStrategy strategy) {
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            throw new UnsupportedOperationException();
        }
    }

    public static class NoOpStreamTask<T, OP extends StreamOperator<T>>
    extends StreamTask<T, OP> {
        public NoOpStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() throws Exception {
            this.inputProcessor = new StreamTaskTest.EmptyInputProcessor();
        }

        protected void cleanUpInternal() throws Exception {
        }
    }
}

