/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.HashCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
import org.apache.flink.table.runtime.partitioner.BinaryHashPartitioner;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

public class BatchExecExchange
extends CommonExecExchange
implements BatchExecNode<RowData> {
    @Nullable
    private ShuffleMode requiredShuffleMode;

    public BatchExecExchange(InputProperty inputProperty, RowType outputType, String description) {
        super(BatchExecExchange.getNewNodeId(), Collections.singletonList(inputProperty), outputType, description);
    }

    public void setRequiredShuffleMode(@Nullable ShuffleMode requiredShuffleMode) {
        this.requiredShuffleMode = requiredShuffleMode;
    }

    @Override
    public String getDescription() {
        InputProperty.RequiredDistribution requiredDistribution = this.getInputProperties().get(0).getRequiredDistribution();
        StringBuilder sb = new StringBuilder();
        String type = requiredDistribution.getType().name().toLowerCase();
        if (type.equals("singleton")) {
            type = "single";
        }
        sb.append("distribution=[").append(type);
        if (requiredDistribution.getType() == InputProperty.DistributionType.HASH) {
            RowType inputRowType = (RowType)this.getInputEdges().get(0).getOutputType();
            InputProperty.HashDistribution hashDistribution = (InputProperty.HashDistribution)requiredDistribution;
            CharSequence[] fieldNames = (String[])Arrays.stream(hashDistribution.getKeys()).mapToObj(i -> (String)inputRowType.getFieldNames().get(i)).toArray(String[]::new);
            sb.append("[").append(String.join((CharSequence)", ", fieldNames)).append("]");
        }
        sb.append("]");
        if (this.requiredShuffleMode == ShuffleMode.BATCH) {
            sb.append(", shuffle_mode=[BATCH]");
        }
        return String.format("Exchange(%s)", sb.toString());
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
        int parallelism;
        BroadcastPartitioner partitioner;
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        InputProperty inputProperty = this.getInputProperties().get(0);
        InputProperty.DistributionType distributionType = inputProperty.getRequiredDistribution().getType();
        switch (distributionType) {
            case ANY: {
                partitioner = null;
                parallelism = -1;
                break;
            }
            case BROADCAST: {
                partitioner = new BroadcastPartitioner();
                parallelism = -1;
                break;
            }
            case SINGLETON: {
                partitioner = new GlobalPartitioner();
                parallelism = 1;
                break;
            }
            case HASH: {
                int[] keys = ((InputProperty.HashDistribution)inputProperty.getRequiredDistribution()).getKeys();
                RowType inputType = (RowType)inputEdge.getOutputType();
                String[] fieldNames = (String[])Arrays.stream(keys).mapToObj(i -> (String)inputType.getFieldNames().get(i)).toArray(String[]::new);
                partitioner = new BinaryHashPartitioner(HashCodeGenerator.generateRowHash(new CodeGeneratorContext(planner.getTableConfig()), inputEdge.getOutputType(), "HashPartitioner", keys), fieldNames);
                parallelism = -1;
                break;
            }
            default: {
                throw new TableException((Object)((Object)distributionType) + "is not supported now!");
            }
        }
        ShuffleMode shuffleMode = BatchExecExchange.getShuffleMode(planner.getTableConfig().getConfiguration(), this.requiredShuffleMode);
        PartitionTransformation transformation = new PartitionTransformation(inputTransform, (StreamPartitioner)partitioner, shuffleMode);
        transformation.setParallelism(parallelism);
        transformation.setOutputType((TypeInformation)InternalTypeInfo.of((LogicalType)this.getOutputType()));
        return transformation;
    }

    public static ShuffleMode getShuffleMode(Configuration config, @Nullable ShuffleMode requiredShuffleMode) {
        if (requiredShuffleMode == ShuffleMode.BATCH) {
            return ShuffleMode.BATCH;
        }
        if (config.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE).equalsIgnoreCase(GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString())) {
            return ShuffleMode.BATCH;
        }
        return ShuffleMode.UNDEFINED;
    }

    @VisibleForTesting
    public Optional<ShuffleMode> getRequiredShuffleMode() {
        return Optional.ofNullable(this.requiredShuffleMode);
    }
}

