/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.optimizer.iterations;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.graph.PageRank;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.junit.Assert;
import org.junit.Test;

public class PageRankCompilerTest
extends CompilerTestBase {
    @Test
    public void testPageRank() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource pagesInput = env.fromElements((Object[])new Long[]{1L});
            DataSource linksInput = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1L, (Object)2L)});
            MapOperator pagesWithRanks = pagesInput.map((MapFunction)new PageRank.RankAssigner(0.1));
            GroupReduceOperator adjacencyListInput = linksInput.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new PageRank.BuildOutgoingEdgeList());
            IterativeDataSet iteration = pagesWithRanks.iterate(10);
            Configuration cfg = new Configuration();
            cfg.setString("LOCAL_STRATEGY", "LOCAL_STRATEGY_HASH_BUILD_SECOND");
            MapOperator newRanks = ((JoinOperator)iteration.join((DataSet)adjacencyListInput).where(new int[]{0}).equalTo(new int[]{0}).withParameters(cfg)).flatMap((FlatMapFunction)new PageRank.JoinVertexWithEdgesMatch()).groupBy(new int[]{0}).aggregate(Aggregations.SUM, 1).map((MapFunction)new PageRank.Dampener(0.85, 10.0));
            DataSet finalPageRanks = iteration.closeWith((DataSet)newRanks, (DataSet)newRanks.join((DataSet)iteration).where(new int[]{0}).equalTo(new int[]{0}).filter((FilterFunction)new PageRank.EpsilonFilter()));
            finalPageRanks.output((OutputFormat)new DiscardingOutputFormat());
            Plan p = env.createProgramPlan();
            OptimizedPlan op = this.compileNoStats(p);
            SinkPlanNode sinkPlanNode = (SinkPlanNode)op.getDataSinks().iterator().next();
            BulkIterationPlanNode iterPlanNode = (BulkIterationPlanNode)sinkPlanNode.getInput().getSource();
            Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iterPlanNode.getInput().getShipStrategy());
            Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iterPlanNode.getInput().getLocalStrategy());
            BulkPartialSolutionPlanNode partSolPlanNode = iterPlanNode.getPartialSolutionPlanNode();
            Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)((Channel)partSolPlanNode.getOutgoingChannels().get(0)).getShipStrategy());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

