/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.rules.physical.stream;

import java.io.Serializable;
import java.util.ArrayList;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery;
import org.apache.flink.table.planner.plan.nodes.FlinkConventions$;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGlobalGroupAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLocalGroupAggregate;
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalRel;
import org.apache.flink.table.planner.plan.rules.physical.FlinkExpandConversionRule$;
import org.apache.flink.table.planner.plan.rules.physical.stream.TwoStageOptimizedAggregateRule$;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution$;
import org.apache.flink.table.planner.plan.trait.FlinkRelDistributionTraitDef$;
import org.apache.flink.table.planner.plan.trait.ModifyKindSetTrait$;
import org.apache.flink.table.planner.plan.trait.RelModifiedMonotonicity;
import org.apache.flink.table.planner.plan.trait.UpdateKindTrait$;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil$;
import org.apache.flink.table.planner.plan.utils.ChangelogPlanUtils$;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import scala.Function1;
import scala.Predef$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001y4A!\u0001\u0002\u0001+\tqBk^8Ti\u0006<Wm\u00149uS6L'0\u001a3BO\u001e\u0014XmZ1uKJ+H.\u001a\u0006\u0003\u0007\u0011\taa\u001d;sK\u0006l'BA\u0003\u0007\u0003!\u0001\b._:jG\u0006d'BA\u0004\t\u0003\u0015\u0011X\u000f\\3t\u0015\tI!\"\u0001\u0003qY\u0006t'BA\u0006\r\u0003\u001d\u0001H.\u00198oKJT!!\u0004\b\u0002\u000bQ\f'\r\\3\u000b\u0005=\u0001\u0012!\u00024mS:\\'BA\t\u0013\u0003\u0019\t\u0007/Y2iK*\t1#A\u0002pe\u001e\u001c\u0001a\u0005\u0002\u0001-A\u0011qcG\u0007\u00021)\u0011\u0011\"\u0007\u0006\u00035A\tqaY1mG&$X-\u0003\u0002\u001d1\tQ!+\u001a7PaR\u0014V\u000f\\3\t\u000by\u0001A\u0011A\u0010\u0002\rqJg.\u001b;?)\u0005\u0001\u0003CA\u0011\u0001\u001b\u0005\u0011\u0001\"B\u0012\u0001\t\u0003\"\u0013aB7bi\u000eDWm\u001d\u000b\u0003K-\u0002\"AJ\u0015\u000e\u0003\u001dR\u0011\u0001K\u0001\u0006g\u000e\fG.Y\u0005\u0003U\u001d\u0012qAQ8pY\u0016\fg\u000eC\u0003-E\u0001\u0007Q&\u0001\u0003dC2d\u0007CA\f/\u0013\ty\u0003D\u0001\bSK2|\u0005\u000f\u001e*vY\u0016\u001c\u0015\r\u001c7\t\u000bE\u0002A\u0011\u0002\u001a\u0002E%\u001c\u0018J\u001c9viN\u000bG/[:gsJ+\u0017/^5sK\u0012$\u0015n\u001d;sS\n,H/[8o)\r)3g\u000f\u0005\u0006iA\u0002\r!N\u0001\u0006S:\u0004X\u000f\u001e\t\u0003mej\u0011a\u000e\u0006\u0003qe\t1A]3m\u0013\tQtGA\u0004SK2tu\u000eZ3\t\u000bq\u0002\u0004\u0019A\u001f\u0002\t-,\u0017p\u001d\t\u0004My\u0002\u0015BA (\u0005\u0015\t%O]1z!\t1\u0013)\u0003\u0002CO\t\u0019\u0011J\u001c;\t\u000b\u0011\u0003A\u0011I#\u0002\u000f=tW*\u0019;dQR\u0011a)\u0013\t\u0003M\u001dK!\u0001S\u0014\u0003\tUs\u0017\u000e\u001e\u0005\u0006Y\r\u0003\r!\f\u0005\u0006\u0017\u0002!I\u0001T\u0001\u0012GJ,\u0017\r^3Uo>\u001cF/Y4f\u0003\u001e<G#B'V-z\u0003\u0007C\u0001(T\u001b\u0005y%BA\u0002Q\u0015\t)\u0011K\u0003\u0002S\u0011\u0005)an\u001c3fg&\u0011Ak\u0014\u0002\u001f'R\u0014X-Y7Fq\u0016\u001cw\t\\8cC2<%o\\;q\u0003\u001e<'/Z4bi\u0016DQ\u0001\u000e&A\u0002UBQa\u0016&A\u0002a\u000b\u0001\u0003\\8dC2\fumZ%oM>d\u0015n\u001d;\u0011\u0005ecV\"\u0001.\u000b\u0005mC\u0011!B;uS2\u001c\u0018BA/[\u0005E\tum\u001a:fO\u0006$X-\u00138g_2K7\u000f\u001e\u0005\u0006?*\u0003\r\u0001W\u0001\u0012O2|'-\u00197BO\u001eLeNZ8MSN$\b\"B1K\u0001\u0004\u0011\u0017aA1hOB\u0011ajY\u0005\u0003I>\u0013\u0001d\u0015;sK\u0006lW\t_3d\u000fJ|W\u000f]!hOJ,w-\u0019;f\u0011\u00151\u0007\u0001\"\u0003h\u0003I\u0019'/Z1uK\u0012K7\u000f\u001e:jEV$\u0018n\u001c8\u0015\u0005!t\u0007CA5m\u001b\u0005Q'BA6\t\u0003\u0015!(/Y5u\u0013\ti'N\u0001\u000bGY&t7NU3m\t&\u001cHO]5ckRLwN\u001c\u0005\u0006y\u0015\u0004\r!P\u0004\u0006a\nA\t!]\u0001\u001f)^|7\u000b^1hK>\u0003H/[7ju\u0016$\u0017iZ4sK\u001e\fG/\u001a*vY\u0016\u0004\"!\t:\u0007\u000b\u0005\u0011\u0001\u0012A:\u0014\u0005I$\bC\u0001\u0014v\u0013\t1xE\u0001\u0004B]f\u0014VM\u001a\u0005\u0006=I$\t\u0001\u001f\u000b\u0002c\"9!P\u001db\u0001\n\u0003Y\u0018\u0001C%O'R\u000bejQ#\u0016\u0003YAa! :!\u0002\u00131\u0012!C%O'R\u000bejQ#!\u0001")
public class TwoStageOptimizedAggregateRule
extends RelOptRule {
    public static RelOptRule INSTANCE() {
        return TwoStageOptimizedAggregateRule$.MODULE$.INSTANCE();
    }

    @Override
    public boolean matches(RelOptRuleCall call) {
        TableConfig tableConfig = call.getPlanner().getContext().unwrap(FlinkContext.class).getTableConfig();
        StreamExecGroupAggregate agg = (StreamExecGroupAggregate)call.rel(0);
        Object realInput = call.rel(2);
        boolean needRetraction = !ChangelogPlanUtils$.MODULE$.isInsertOnly((StreamPhysicalRel)realInput);
        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery());
        RelModifiedMonotonicity monotonicity = fmq.getRelModifiedMonotonicity(agg);
        boolean[] needRetractionArray = AggregateUtil$.MODULE$.getNeedRetractions(agg.grouping().length, needRetraction, monotonicity, agg.aggCalls());
        AggregateInfoList aggInfoList = AggregateUtil$.MODULE$.transformToStreamAggregateInfoList(agg.aggCalls(), agg.getInput().getRowType(), needRetractionArray, needRetraction, true, AggregateUtil$.MODULE$.transformToStreamAggregateInfoList$default$6());
        boolean isMiniBatchEnabled = tableConfig.getConfiguration().getBoolean(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED);
        AggregatePhaseStrategy aggregatePhaseStrategy = TableConfigUtils.getAggPhaseStrategy(tableConfig);
        AggregatePhaseStrategy aggregatePhaseStrategy2 = AggregatePhaseStrategy.ONE_PHASE;
        boolean isTwoPhaseEnabled = aggregatePhaseStrategy == null ? aggregatePhaseStrategy2 != null : !((Object)((Object)aggregatePhaseStrategy)).equals((Object)aggregatePhaseStrategy2);
        return isMiniBatchEnabled && isTwoPhaseEnabled && AggregateUtil$.MODULE$.doAllSupportPartialMerge(aggInfoList.aggInfos()) && !this.isInputSatisfyRequiredDistribution((RelNode)realInput, agg.grouping());
    }

    private boolean isInputSatisfyRequiredDistribution(RelNode input, int[] keys) {
        FlinkRelDistribution requiredDistribution = this.createDistribution(keys);
        FlinkRelDistribution inputDistribution = input.getTraitSet().getTrait(FlinkRelDistributionTraitDef$.MODULE$.INSTANCE());
        return inputDistribution.satisfies(requiredDistribution);
    }

    @Override
    public void onMatch(RelOptRuleCall call) {
        StreamExecGroupAggregate agg = (StreamExecGroupAggregate)call.rel(0);
        Object realInput = call.rel(2);
        boolean needRetraction = !ChangelogPlanUtils$.MODULE$.isInsertOnly((StreamPhysicalRel)realInput);
        FlinkRelMetadataQuery fmq = FlinkRelMetadataQuery.reuseOrCreate(call.getMetadataQuery());
        RelModifiedMonotonicity monotonicity = fmq.getRelModifiedMonotonicity(agg);
        boolean[] needRetractionArray = AggregateUtil$.MODULE$.getNeedRetractions(agg.grouping().length, needRetraction, monotonicity, agg.aggCalls());
        AggregateInfoList localAggInfoList = AggregateUtil$.MODULE$.transformToStreamAggregateInfoList(agg.aggCalls(), realInput.getRowType(), needRetractionArray, needRetraction, false, AggregateUtil$.MODULE$.transformToStreamAggregateInfoList$default$6());
        AggregateInfoList globalAggInfoList = AggregateUtil$.MODULE$.transformToStreamAggregateInfoList(agg.aggCalls(), realInput.getRowType(), needRetractionArray, needRetraction, true, AggregateUtil$.MODULE$.transformToStreamAggregateInfoList$default$6());
        StreamExecGlobalGroupAggregate globalHashAgg = this.createTwoStageAgg((RelNode)realInput, localAggInfoList, globalAggInfoList, agg);
        call.transformTo(globalHashAgg);
    }

    private StreamExecGlobalGroupAggregate createTwoStageAgg(RelNode input, AggregateInfoList localAggInfoList, AggregateInfoList globalAggInfoList, StreamExecGroupAggregate agg) {
        RelDataType localAggRowType = AggregateUtil$.MODULE$.inferLocalAggRowType(localAggInfoList, input.getRowType(), agg.grouping(), (FlinkTypeFactory)input.getCluster().getTypeFactory());
        RelTraitSet localAggTraitSet = input.getTraitSet().plus(ModifyKindSetTrait$.MODULE$.INSERT_ONLY()).plus(UpdateKindTrait$.MODULE$.NONE());
        StreamExecLocalGroupAggregate localHashAgg = new StreamExecLocalGroupAggregate(agg.getCluster(), localAggTraitSet, input, localAggRowType, agg.grouping(), agg.aggCalls(), localAggInfoList, agg.partialFinalType());
        int[] globalGrouping = (int[])new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(agg.grouping())).indices().toArray(ClassTag$.MODULE$.Int());
        FlinkRelDistribution globalDistribution = this.createDistribution(globalGrouping);
        RelNode newInput = FlinkExpandConversionRule$.MODULE$.satisfyDistribution(FlinkConventions$.MODULE$.STREAM_PHYSICAL(), localHashAgg, globalDistribution);
        RelTraitSet globalAggProvidedTraitSet = agg.getTraitSet();
        return new StreamExecGlobalGroupAggregate(agg.getCluster(), globalAggProvidedTraitSet, newInput, input.getRowType(), agg.getRowType(), globalGrouping, localAggInfoList, globalAggInfoList, agg.partialFinalType());
    }

    private FlinkRelDistribution createDistribution(int[] keys) {
        FlinkRelDistribution flinkRelDistribution;
        if (new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(keys)).nonEmpty()) {
            ArrayList fields = new ArrayList();
            new ArrayOps.ofInt(Predef$.MODULE$.intArrayOps(keys)).foreach((Function1)(JFunction1.mcZI.sp & Serializable & scala.Serializable)x$1 -> fields.add(Predef$.MODULE$.int2Integer(x$1)));
            flinkRelDistribution = FlinkRelDistribution$.MODULE$.hash(fields, FlinkRelDistribution$.MODULE$.hash$default$2());
        } else {
            flinkRelDistribution = FlinkRelDistribution$.MODULE$.SINGLETON();
        }
        return flinkRelDistribution;
    }

    public TwoStageOptimizedAggregateRule() {
        super(RelOptRule.operand(StreamExecGroupAggregate.class, RelOptRule.operand(StreamExecExchange.class, RelOptRule.operand(RelNode.class, RelOptRule.any()), new RelOptRuleOperand[0]), new RelOptRuleOperand[0]), "TwoStageOptimizedAggregateRule");
    }
}

