/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ParallelismProvider;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.SinkProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer;
import org.apache.flink.table.runtime.operators.sink.SinkOperator;
import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;

public abstract class CommonExecSink
extends ExecNodeBase<Object>
implements MultipleTransformationTranslator<Object> {
    public static final String FIELD_NAME_DYNAMIC_TABLE_SINK = "dynamicTableSink";
    @JsonProperty(value="dynamicTableSink")
    protected final DynamicTableSinkSpec tableSinkSpec;
    @JsonIgnore
    private final ChangelogMode changelogMode;
    @JsonIgnore
    private final boolean isBounded;

    protected CommonExecSink(DynamicTableSinkSpec tableSinkSpec, ChangelogMode changelogMode, boolean isBounded, int id, List<InputProperty> inputProperties, LogicalType outputType, String description) {
        super(id, inputProperties, outputType, description);
        this.tableSinkSpec = tableSinkSpec;
        this.changelogMode = changelogMode;
        this.isBounded = isBounded;
    }

    public DynamicTableSinkSpec getTableSinkSpec() {
        return this.tableSinkSpec;
    }

    protected Transformation<Object> createSinkTransformation(StreamExecutionEnvironment env, TableConfig tableConfig, Transformation<RowData> inputTransform, int rowtimeFieldIndex, boolean upsertMaterialize) {
        DynamicTableSink tableSink = this.tableSinkSpec.getTableSink();
        ResolvedSchema schema = this.tableSinkSpec.getCatalogTable().getResolvedSchema();
        DynamicTableSink.SinkRuntimeProvider runtimeProvider = tableSink.getSinkRuntimeProvider((DynamicTableSink.Context)new SinkRuntimeProviderContext(this.isBounded));
        RowType physicalRowType = this.getPhysicalRowType(schema);
        int[] primaryKeys = this.getPrimaryKeyIndices(physicalRowType, schema);
        int sinkParallelism = this.deriveSinkParallelism(inputTransform, runtimeProvider);
        Transformation<RowData> sinkTransform = this.applyNotNullEnforcer(inputTransform, tableConfig, physicalRowType);
        sinkTransform = this.applyKeyBy(sinkTransform, primaryKeys, sinkParallelism, upsertMaterialize);
        if (upsertMaterialize) {
            sinkTransform = this.applyUpsertMaterialize(sinkTransform, primaryKeys, sinkParallelism, tableConfig, physicalRowType);
        }
        return this.applySinkProvider(sinkTransform, env, runtimeProvider, rowtimeFieldIndex, sinkParallelism);
    }

    private Transformation<RowData> applyNotNullEnforcer(Transformation<RowData> inputTransform, TableConfig config, RowType physicalRowType) {
        ExecutionConfigOptions.NotNullEnforcer notNullEnforcer = (ExecutionConfigOptions.NotNullEnforcer)config.getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER);
        int[] notNullFieldIndices = this.getNotNullFieldIndices(physicalRowType);
        String[] fieldNames = physicalRowType.getFieldNames().toArray(new String[0]);
        if (notNullFieldIndices.length > 0) {
            SinkNotNullEnforcer enforcer = new SinkNotNullEnforcer(notNullEnforcer, notNullFieldIndices, fieldNames);
            List notNullFieldNames = Arrays.stream(notNullFieldIndices).mapToObj(idx -> fieldNames[idx]).collect(Collectors.toList());
            String operatorName = String.format("NotNullEnforcer(fields=[%s])", String.join((CharSequence)", ", notNullFieldNames));
            return new OneInputTransformation(inputTransform, operatorName, (OneInputStreamOperator)new StreamFilter((FilterFunction)enforcer), this.getInputTypeInfo(), inputTransform.getParallelism());
        }
        return inputTransform;
    }

    private int[] getNotNullFieldIndices(RowType physicalType) {
        return IntStream.range(0, physicalType.getFieldCount()).filter(pos -> !physicalType.getTypeAt(pos).isNullable()).toArray();
    }

    private int deriveSinkParallelism(Transformation<RowData> inputTransform, DynamicTableSink.SinkRuntimeProvider runtimeProvider) {
        int inputParallelism = inputTransform.getParallelism();
        if (!(runtimeProvider instanceof ParallelismProvider)) {
            return inputParallelism;
        }
        ParallelismProvider parallelismProvider = (ParallelismProvider)runtimeProvider;
        return parallelismProvider.getParallelism().map(sinkParallelism -> {
            if (sinkParallelism <= 0) {
                throw new TableException(String.format("Invalid configured parallelism %s for table '%s'.", sinkParallelism, this.tableSinkSpec.getObjectIdentifier().asSummaryString()));
            }
            return sinkParallelism;
        }).orElse(inputParallelism);
    }

    private Transformation<RowData> applyKeyBy(Transformation<RowData> inputTransform, int[] primaryKeys, int sinkParallelism, boolean upsertMaterialize) {
        int inputParallelism = inputTransform.getParallelism();
        if ((inputParallelism == sinkParallelism || this.changelogMode.containsOnly(RowKind.INSERT)) && !upsertMaterialize) {
            return inputTransform;
        }
        if (primaryKeys.length == 0) {
            throw new TableException(String.format("The sink for table '%s' has a configured parallelism of %s, while the input parallelism is %s. Since the configured parallelism is different from the input's parallelism and the changelog mode is not insert-only, a primary key is required but could not be found.", this.tableSinkSpec.getObjectIdentifier().asSummaryString(), sinkParallelism, inputParallelism));
        }
        RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(primaryKeys, this.getInputTypeInfo());
        KeyGroupStreamPartitioner partitioner = new KeyGroupStreamPartitioner((KeySelector)selector, 128);
        PartitionTransformation partitionedTransform = new PartitionTransformation(inputTransform, (StreamPartitioner)partitioner);
        partitionedTransform.setParallelism(sinkParallelism);
        return partitionedTransform;
    }

    private Transformation<RowData> applyUpsertMaterialize(Transformation<RowData> inputTransform, int[] primaryKeys, int sinkParallelism, TableConfig tableConfig, RowType physicalRowType) {
        GeneratedRecordEqualiser equaliser = new EqualiserCodeGenerator(physicalRowType).generateRecordEqualiser("SinkMaterializeEqualiser");
        SinkUpsertMaterializer operator = new SinkUpsertMaterializer(StateConfigUtil.createTtlConfig((long)tableConfig.getIdleStateRetention().toMillis()), (TypeSerializer)InternalSerializers.create((RowType)physicalRowType), equaliser);
        OneInputTransformation materializeTransform = new OneInputTransformation(inputTransform, "SinkMaterializer", (OneInputStreamOperator)operator, inputTransform.getOutputType(), sinkParallelism);
        RowDataKeySelector keySelector = KeySelectorUtil.getRowDataSelector(primaryKeys, (InternalTypeInfo<RowData>)InternalTypeInfo.of((RowType)physicalRowType));
        materializeTransform.setStateKeySelector((KeySelector)keySelector);
        materializeTransform.setStateKeyType((TypeInformation)keySelector.getProducedType());
        return materializeTransform;
    }

    private Transformation<?> applySinkProvider(Transformation<RowData> inputTransform, StreamExecutionEnvironment env, DynamicTableSink.SinkRuntimeProvider runtimeProvider, int rowtimeFieldIndex, int sinkParallelism) {
        if (runtimeProvider instanceof DataStreamSinkProvider) {
            DataStream dataStream = new DataStream(env, inputTransform);
            DataStreamSinkProvider provider = (DataStreamSinkProvider)runtimeProvider;
            return provider.consumeDataStream(dataStream).getTransformation();
        }
        if (runtimeProvider instanceof TransformationSinkProvider) {
            TransformationSinkProvider provider = (TransformationSinkProvider)runtimeProvider;
            return provider.createTransformation(TransformationSinkProvider.Context.of(inputTransform, rowtimeFieldIndex));
        }
        if (runtimeProvider instanceof SinkFunctionProvider) {
            SinkFunction sinkFunction = ((SinkFunctionProvider)runtimeProvider).createSinkFunction();
            return this.createSinkFunctionTransformation((SinkFunction<RowData>)sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism);
        }
        if (runtimeProvider instanceof OutputFormatProvider) {
            OutputFormat outputFormat = ((OutputFormatProvider)runtimeProvider).createOutputFormat();
            OutputFormatSinkFunction sinkFunction = new OutputFormatSinkFunction(outputFormat);
            return this.createSinkFunctionTransformation((SinkFunction<RowData>)sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkParallelism);
        }
        if (runtimeProvider instanceof SinkProvider) {
            return new SinkTransformation(inputTransform, ((SinkProvider)runtimeProvider).createSink(), this.getDescription(), sinkParallelism);
        }
        throw new TableException("Unsupported sink runtime provider.");
    }

    private Transformation<?> createSinkFunctionTransformation(SinkFunction<RowData> sinkFunction, StreamExecutionEnvironment env, Transformation<RowData> inputTransformation, int rowtimeFieldIndex, int sinkParallelism) {
        SinkOperator operator = new SinkOperator((SinkFunction)env.clean(sinkFunction), rowtimeFieldIndex);
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable)sinkFunction).setInputType(this.getInputTypeInfo(), env.getConfig());
        }
        return new LegacySinkTransformation(inputTransformation, this.getDescription(), (StreamOperatorFactory)SimpleOperatorFactory.of((StreamOperator)operator), sinkParallelism);
    }

    private InternalTypeInfo<RowData> getInputTypeInfo() {
        return InternalTypeInfo.of((LogicalType)this.getInputEdges().get(0).getOutputType());
    }

    private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema schema) {
        return schema.getPrimaryKey().map(k -> k.getColumns().stream().mapToInt(arg_0 -> ((RowType)sinkRowType).getFieldIndex(arg_0)).toArray()).orElse(new int[0]);
    }

    private RowType getPhysicalRowType(ResolvedSchema schema) {
        return (RowType)schema.toPhysicalRowDataType().getLogicalType();
    }
}

