/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.tools.RelBuilder;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.FieldReferenceExpression;
import org.apache.flink.table.expressions.ValueLiteralExpression;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.LogicalWindow;
import org.apache.flink.table.planner.plan.logical.SessionGroupWindow;
import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow;
import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecAggregateBase;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
import org.apache.flink.table.planner.plan.utils.AggregateUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.runtime.generated.GeneratedClass;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedNamespaceTableAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.window.CountWindow;
import org.apache.flink.table.runtime.operators.window.TimeWindow;
import org.apache.flink.table.runtime.operators.window.WindowOperator;
import org.apache.flink.table.runtime.operators.window.WindowOperatorBuilder;
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExecNodeMetadata(name="stream-exec-group-window-aggregate", version=1, consumedOptions={"table.local-time-zone", "table.exec.mini-batch.enabled", "table.exec.mini-batch.size"}, producedTransformations={"group-window-aggregate"}, minPlanVersion=FlinkVersion.v1_15, minStateVersion=FlinkVersion.v1_15)
public class StreamExecGroupWindowAggregate
extends StreamExecAggregateBase {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecGroupWindowAggregate.class);
    public static final String GROUP_WINDOW_AGGREGATE_TRANSFORMATION = "group-window-aggregate";
    public static final String FIELD_NAME_WINDOW = "window";
    public static final String FIELD_NAME_NAMED_WINDOW_PROPERTIES = "namedWindowProperties";
    @JsonProperty(value="grouping")
    private final int[] grouping;
    @JsonProperty(value="aggCalls")
    private final AggregateCall[] aggCalls;
    @JsonProperty(value="window")
    private final LogicalWindow window;
    @JsonProperty(value="namedWindowProperties")
    private final NamedWindowProperty[] namedWindowProperties;
    @JsonProperty(value="needRetraction")
    private final boolean needRetraction;

    public StreamExecGroupWindowAggregate(ReadableConfig tableConfig, int[] grouping, AggregateCall[] aggCalls, LogicalWindow window, NamedWindowProperty[] namedWindowProperties, boolean needRetraction, InputProperty inputProperty, RowType outputType, String description) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecGroupWindowAggregate.class), ExecNodeContext.newPersistedConfig(StreamExecGroupWindowAggregate.class, tableConfig), grouping, aggCalls, window, namedWindowProperties, needRetraction, Collections.singletonList(inputProperty), outputType, description);
    }

    @JsonCreator
    public StreamExecGroupWindowAggregate(@JsonProperty(value="id") int id, @JsonProperty(value="type") ExecNodeContext context, @JsonProperty(value="configuration") ReadableConfig persistedConfig, @JsonProperty(value="grouping") int[] grouping, @JsonProperty(value="aggCalls") AggregateCall[] aggCalls, @JsonProperty(value="window") LogicalWindow window, @JsonProperty(value="namedWindowProperties") NamedWindowProperty[] namedWindowProperties, @JsonProperty(value="needRetraction") boolean needRetraction, @JsonProperty(value="inputProperties") List<InputProperty> inputProperties, @JsonProperty(value="outputType") RowType outputType, @JsonProperty(value="description") String description) {
        super(id, context, persistedConfig, inputProperties, (LogicalType)outputType, description);
        Preconditions.checkArgument((inputProperties.size() == 1 ? 1 : 0) != 0);
        this.grouping = (int[])Preconditions.checkNotNull((Object)grouping);
        this.aggCalls = (AggregateCall[])Preconditions.checkNotNull((Object)aggCalls);
        this.window = (LogicalWindow)Preconditions.checkNotNull((Object)window);
        this.namedWindowProperties = (NamedWindowProperty[])Preconditions.checkNotNull((Object)namedWindowProperties);
        this.needRetraction = needRetraction;
    }

    @Override
    protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) {
        int inputTimeFieldIndex;
        boolean isCountWindow = this.window instanceof TumblingGroupWindow ? AggregateUtil.hasRowIntervalType(((TumblingGroupWindow)this.window).size()) : (this.window instanceof SlidingGroupWindow ? AggregateUtil.hasRowIntervalType(((SlidingGroupWindow)this.window).size()) : false);
        if (isCountWindow && this.grouping.length > 0 && config.getStateRetentionTime() < 0L) {
            LOGGER.warn("No state retention interval configured for a query which accumulates state. Please provide a query configuration with valid retention interval to prevent excessive state size. You may specify a retention time of 0 to not clean up the state.");
        }
        ExecEdge inputEdge = this.getInputEdges().get(0);
        Transformation<?> inputTransform = inputEdge.translateToPlan(planner);
        RowType inputRowType = (RowType)inputEdge.getOutputType();
        if (AggregateUtil.isRowtimeAttribute(this.window.timeAttribute())) {
            inputTimeFieldIndex = AggregateUtil.timeFieldIndex(planner.getTypeFactory().buildRelNodeRowType(inputRowType), planner.createRelBuilder(), this.window.timeAttribute());
            if (inputTimeFieldIndex < 0) {
                throw new TableException("Group window must defined on a time attribute, but the time attribute can't be found.\nThis should never happen. Please file an issue.");
            }
        } else {
            inputTimeFieldIndex = -1;
        }
        ZoneId shiftTimeZone = TimeWindowUtil.getShiftTimeZone((LogicalType)this.window.timeAttribute().getOutputDataType().getLogicalType(), (ZoneId)TableConfigUtils.getLocalTimeZone(config));
        boolean[] aggCallNeedRetractions = new boolean[this.aggCalls.length];
        Arrays.fill(aggCallNeedRetractions, this.needRetraction);
        AggregateInfoList aggInfoList = AggregateUtil.transformToStreamAggregateInfoList(planner.getTypeFactory(), inputRowType, JavaScalaConversionUtil.toScala(Arrays.asList(this.aggCalls)), aggCallNeedRetractions, this.needRetraction, true, true);
        GeneratedClass<?> aggCodeGenerator = this.createAggsHandler(aggInfoList, config, planner.getFlinkContext().getClassLoader(), planner.createRelBuilder(), inputRowType.getChildren(), shiftTimeZone);
        Object[] aggResultTypes = this.extractLogicalTypes(aggInfoList.getActualValueTypes());
        Object[] windowPropertyTypes = (LogicalType[])Arrays.stream(this.namedWindowProperties).map(p -> p.getProperty().getResultType()).toArray(LogicalType[]::new);
        EqualiserCodeGenerator generator = new EqualiserCodeGenerator((LogicalType[])ArrayUtils.addAll((Object[])aggResultTypes, (Object[])windowPropertyTypes), planner.getFlinkContext().getClassLoader());
        GeneratedRecordEqualiser equaliser = generator.generateRecordEqualiser("WindowValueEqualiser");
        LogicalType[] aggValueTypes = this.extractLogicalTypes(aggInfoList.getActualValueTypes());
        LogicalType[] accTypes = this.extractLogicalTypes(aggInfoList.getAccTypes());
        int inputCountIndex = aggInfoList.getIndexOfCountStar();
        WindowOperator<?, ?> operator = this.createWindowOperator(config, aggCodeGenerator, equaliser, accTypes, (LogicalType[])windowPropertyTypes, aggValueTypes, inputRowType.getChildren().toArray(new LogicalType[0]), inputTimeFieldIndex, shiftTimeZone, inputCountIndex);
        OneInputTransformation transform = ExecNodeUtil.createOneInputTransformation(inputTransform, this.createTransformationMeta(GROUP_WINDOW_AGGREGATE_TRANSFORMATION, config), operator, InternalTypeInfo.of((LogicalType)this.getOutputType()), inputTransform.getParallelism());
        RowDataKeySelector selector = KeySelectorUtil.getRowDataSelector(planner.getFlinkContext().getClassLoader(), this.grouping, (InternalTypeInfo<RowData>)InternalTypeInfo.of((RowType)inputRowType));
        transform.setStateKeySelector((KeySelector)selector);
        transform.setStateKeyType((TypeInformation)selector.getProducedType());
        return transform;
    }

    private LogicalType[] extractLogicalTypes(DataType[] dataTypes) {
        return (LogicalType[])Arrays.stream(dataTypes).map(LogicalTypeDataTypeConverter::fromDataTypeToLogicalType).toArray(LogicalType[]::new);
    }

    private GeneratedClass<?> createAggsHandler(AggregateInfoList aggInfoList, ExecNodeConfig config, ClassLoader classLoader, RelBuilder relBuilder, List<LogicalType> fieldTypes, ZoneId shiftTimeZone) {
        Class<TimeWindow> windowClass;
        boolean needMerge;
        ValueLiteralExpression size;
        if (this.window instanceof SlidingGroupWindow) {
            size = ((SlidingGroupWindow)this.window).size();
            needMerge = AggregateUtil.hasTimeIntervalType(size);
            windowClass = AggregateUtil.hasRowIntervalType(size) ? CountWindow.class : TimeWindow.class;
        } else if (this.window instanceof TumblingGroupWindow) {
            needMerge = false;
            size = ((TumblingGroupWindow)this.window).size();
            windowClass = AggregateUtil.hasRowIntervalType(size) ? CountWindow.class : TimeWindow.class;
        } else if (this.window instanceof SessionGroupWindow) {
            needMerge = true;
            windowClass = TimeWindow.class;
        } else {
            throw new TableException("Unsupported window: " + this.window.toString());
        }
        AggsHandlerCodeGenerator generator = new AggsHandlerCodeGenerator(new CodeGeneratorContext(config, classLoader), relBuilder, JavaScalaConversionUtil.toScala(fieldTypes), false).needAccumulate();
        if (needMerge) {
            generator.needMerge(0, false, null);
        }
        if (this.needRetraction) {
            generator.needRetract();
        }
        List windowProperties = Arrays.asList(Arrays.stream(this.namedWindowProperties).map(NamedWindowProperty::getProperty).toArray(WindowProperty[]::new));
        boolean isTableAggregate = AggregateUtil.isTableAggregate(Arrays.asList(aggInfoList.getActualAggregateCalls()));
        if (isTableAggregate) {
            return generator.generateNamespaceTableAggsHandler("GroupingWindowTableAggsHandler", aggInfoList, JavaScalaConversionUtil.toScala(windowProperties), windowClass, shiftTimeZone);
        }
        return generator.generateNamespaceAggsHandler("GroupingWindowAggsHandler", aggInfoList, JavaScalaConversionUtil.toScala(windowProperties), windowClass, shiftTimeZone);
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private WindowOperator<?, ?> createWindowOperator(ReadableConfig config, GeneratedClass<?> aggsHandler, GeneratedRecordEqualiser recordEqualiser, LogicalType[] accTypes, LogicalType[] windowPropertyTypes, LogicalType[] aggValueTypes, LogicalType[] inputFields, int timeFieldIndex, ZoneId shiftTimeZone, int inputCountIndex) {
        WindowOperatorBuilder builder = WindowOperatorBuilder.builder().withInputFields(inputFields).withShiftTimezone(shiftTimeZone).withInputCountIndex(inputCountIndex);
        if (this.window instanceof TumblingGroupWindow) {
            TumblingGroupWindow tumblingWindow = (TumblingGroupWindow)this.window;
            FieldReferenceExpression timeField = tumblingWindow.timeField();
            ValueLiteralExpression size = tumblingWindow.size();
            if (AggregateUtil.isProctimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                builder = builder.tumble(AggregateUtil.toDuration(size)).withProcessingTime();
            } else if (AggregateUtil.isRowtimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                builder = builder.tumble(AggregateUtil.toDuration(size)).withEventTime(timeFieldIndex);
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField) || !AggregateUtil.hasRowIntervalType(size)) throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                builder = builder.countWindow(AggregateUtil.toLong(size).longValue());
            }
        } else if (this.window instanceof SlidingGroupWindow) {
            SlidingGroupWindow slidingWindow = (SlidingGroupWindow)this.window;
            FieldReferenceExpression timeField = slidingWindow.timeField();
            ValueLiteralExpression size = slidingWindow.size();
            ValueLiteralExpression slide = slidingWindow.slide();
            if (AggregateUtil.isProctimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                builder = builder.sliding(AggregateUtil.toDuration(size), AggregateUtil.toDuration(slide)).withProcessingTime();
            } else if (AggregateUtil.isRowtimeAttribute(timeField) && AggregateUtil.hasTimeIntervalType(size)) {
                builder = builder.sliding(AggregateUtil.toDuration(size), AggregateUtil.toDuration(slide)).withEventTime(timeFieldIndex);
            } else {
                if (!AggregateUtil.isProctimeAttribute(timeField) || !AggregateUtil.hasRowIntervalType(size)) throw new UnsupportedOperationException("Event-time grouping windows on row intervals are currently not supported.");
                builder = builder.countWindow(AggregateUtil.toLong(size).longValue(), AggregateUtil.toLong(slide).longValue());
            }
        } else {
            if (!(this.window instanceof SessionGroupWindow)) throw new TableException("Unsupported window: " + this.window.toString());
            SessionGroupWindow sessionWindow = (SessionGroupWindow)this.window;
            FieldReferenceExpression timeField = sessionWindow.timeField();
            ValueLiteralExpression gap = sessionWindow.gap();
            if (AggregateUtil.isProctimeAttribute(timeField)) {
                builder = builder.session(AggregateUtil.toDuration(gap)).withProcessingTime();
            } else {
                if (!AggregateUtil.isRowtimeAttribute(timeField)) throw new UnsupportedOperationException("This should not happen.");
                builder = builder.session(AggregateUtil.toDuration(gap)).withEventTime(timeFieldIndex);
            }
        }
        WindowEmitStrategy emitStrategy = WindowEmitStrategy.apply(config, this.window);
        if (emitStrategy.produceUpdates().booleanValue()) {
            builder.produceUpdates().triggering(emitStrategy.getTrigger()).withAllowedLateness(Duration.ofMillis(emitStrategy.getAllowLateness()));
        }
        if (aggsHandler instanceof GeneratedNamespaceAggsHandleFunction) {
            return builder.aggregate((GeneratedNamespaceAggsHandleFunction)aggsHandler, recordEqualiser, accTypes, aggValueTypes, windowPropertyTypes).build();
        }
        if (!(aggsHandler instanceof GeneratedNamespaceTableAggsHandleFunction)) throw new TableException("Unsupported agg handler class: " + aggsHandler.getClass().getSimpleName());
        return builder.aggregate((GeneratedNamespaceTableAggsHandleFunction)aggsHandler, accTypes, aggValueTypes, windowPropertyTypes).build();
    }
}

