/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ExecutionGraphFinishTest
extends TestLogger {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();

    @Test
    public void testJobFinishes() throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(ExecutionGraphTestUtils.createJobVertex("Task1", 2, NoOpInvokable.class), ExecutionGraphTestUtils.createJobVertex("Task2", 2, NoOpInvokable.class));
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ExecutionGraphTestUtils.switchAllVerticesToRunning(eg);
        Iterator jobVertices = eg.getVerticesTopologically().iterator();
        ExecutionJobVertex sender = (ExecutionJobVertex)jobVertices.next();
        ExecutionJobVertex receiver = (ExecutionJobVertex)jobVertices.next();
        List<ExecutionVertex> senderVertices = Arrays.asList(sender.getTaskVertices());
        List<ExecutionVertex> receiverVertices = Arrays.asList(receiver.getTaskVertices());
        senderVertices.get(0).getCurrentExecutionAttempt().markFinished();
        Assert.assertEquals((long)1L, (long)sender.getNumExecutionVertexFinished());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        senderVertices.get(1).getCurrentExecutionAttempt().markFinished();
        Assert.assertEquals((long)2L, (long)sender.getNumExecutionVertexFinished());
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        receiverVertices.get(0).getCurrentExecutionAttempt().markFinished();
        receiverVertices.get(1).getCurrentExecutionAttempt().markFinished();
        Assert.assertEquals((long)4L, (long)eg.getNumFinishedVertices());
        Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
    }
}

