/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class DefaultExecutionGraphRescalingTest
extends TestLogger {
    @Test
    public void testExecutionGraphArbitraryDopConstructionTest() throws Exception {
        int initialParallelism = 5;
        int maxParallelism = 10;
        JobVertex[] jobVertices = DefaultExecutionGraphRescalingTest.createVerticesForSimpleBipartiteJobGraph(5, 10);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
        DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
        for (JobVertex jv : jobVertices) {
            Assert.assertThat((Object)jv.getParallelism(), (Matcher)CoreMatchers.is((Object)5));
        }
        DefaultExecutionGraphRescalingTest.verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph((ExecutionGraph)eg, jobVertices);
        boolean scaleDownParallelism = true;
        for (JobVertex jv : jobVertices) {
            jv.setParallelism(1);
        }
        eg = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
        for (JobVertex jv : jobVertices) {
            Assert.assertThat((Object)jv.getParallelism(), (Matcher)CoreMatchers.is((Object)1));
        }
        DefaultExecutionGraphRescalingTest.verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph((ExecutionGraph)eg, jobVertices);
        int scaleUpParallelism = 10;
        for (JobVertex jv : jobVertices) {
            jv.setParallelism(10);
        }
        eg = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
        for (JobVertex jv : jobVertices) {
            Assert.assertThat((Object)jv.getParallelism(), (Matcher)CoreMatchers.is((Object)10));
        }
        DefaultExecutionGraphRescalingTest.verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph((ExecutionGraph)eg, jobVertices);
    }

    @Test
    public void testExecutionGraphConstructionFailsRescaleDopExceedMaxParallelism() throws Exception {
        Configuration config = new Configuration();
        boolean initialParallelism = true;
        int maxParallelism = 10;
        JobVertex[] jobVertices = DefaultExecutionGraphRescalingTest.createVerticesForSimpleBipartiteJobGraph(1, 10);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(jobVertices);
        for (JobVertex jv : jobVertices) {
            jv.setParallelism(11);
        }
        try {
            TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build();
            Assert.fail((String)"Building the ExecutionGraph with a parallelism higher than the max parallelism should fail.");
        }
        catch (JobException jobException) {
            // empty catch block
        }
    }

    private static JobVertex[] createVerticesForSimpleBipartiteJobGraph(int parallelism, int maxParallelism) {
        JobVertex[] jobVertices;
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        for (JobVertex jobVertex : jobVertices = new JobVertex[]{v1, v2, v3, v4, v5}) {
            jobVertex.setInvokableClass(AbstractInvokable.class);
            jobVertex.setParallelism(parallelism);
            jobVertex.setMaxParallelism(maxParallelism);
        }
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        return jobVertices;
    }

    private static void verifyGeneratedExecutionGraphOfSimpleBitartiteJobGraph(ExecutionGraph generatedExecutionGraph, JobVertex[] jobVertices) {
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(generatedExecutionGraph, jobVertices[0], null, Collections.singletonList(jobVertices[1]));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(generatedExecutionGraph, jobVertices[1], Collections.singletonList(jobVertices[0]), Collections.singletonList(jobVertices[3]));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(generatedExecutionGraph, jobVertices[2], null, Arrays.asList(jobVertices[3], jobVertices[4]));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(generatedExecutionGraph, jobVertices[3], Arrays.asList(jobVertices[1], jobVertices[2]), Collections.singletonList(jobVertices[4]));
        ExecutionGraphTestUtils.verifyGeneratedExecutionJobVertex(generatedExecutionGraph, jobVertices[4], Arrays.asList(jobVertices[3], jobVertices[2]), null);
    }
}

