/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.multipleinput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.MultipleInputTestBase;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapper;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapperGenerator;
import org.apache.flink.table.runtime.operators.multipleinput.TestingOneInputStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.TestingTwoInputStreamOperator;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSpec;
import org.apache.flink.table.runtime.operators.multipleinput.input.OneInput;
import org.apache.flink.table.runtime.operators.multipleinput.input.SecondInputOfTwoInput;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class BatchMultipleInputStreamOperatorTest
extends MultipleInputTestBase {
    @Test
    public void testOpen() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        Assertions.assertThat((boolean)aggOp1.isOpened()).isFalse();
        Assertions.assertThat((boolean)aggOp2.isOpened()).isFalse();
        Assertions.assertThat((boolean)aggOp1.isOpened()).isFalse();
        Assertions.assertThat((boolean)joinOp2.isOpened()).isFalse();
        op.open();
        Assertions.assertThat((boolean)aggOp1.isOpened()).isTrue();
        Assertions.assertThat((boolean)aggOp2.isOpened()).isTrue();
        Assertions.assertThat((boolean)joinOp1.isOpened()).isTrue();
        Assertions.assertThat((boolean)joinOp2.isOpened()).isTrue();
    }

    @Test
    public void testNextSelectionAndEndInput() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        Assertions.assertThat((boolean)aggOp1.isEnd()).isFalse();
        Assertions.assertThat((boolean)aggOp2.isEnd()).isFalse();
        Assertions.assertThat(joinOp1.getEndInputs()).isEmpty();
        Assertions.assertThat(joinOp2.getEndInputs()).isEmpty();
        Assertions.assertThat((Object)op.nextSelection()).isEqualTo((Object)new InputSelection.Builder().select(3).build(3));
        op.endInput(3);
        Assertions.assertThat((boolean)aggOp1.isEnd()).isFalse();
        Assertions.assertThat((boolean)aggOp2.isEnd()).isFalse();
        Assertions.assertThat(joinOp1.getEndInputs()).isEmpty();
        Assertions.assertThat(joinOp2.getEndInputs()).containsExactly((Object[])new Integer[]{2});
        Assertions.assertThat((Object)op.nextSelection()).isEqualTo((Object)new InputSelection.Builder().select(1).build(3));
        op.endInput(1);
        Assertions.assertThat((boolean)aggOp1.isEnd()).isTrue();
        Assertions.assertThat((boolean)aggOp2.isEnd()).isFalse();
        Assertions.assertThat(joinOp1.getEndInputs()).containsExactly((Object[])new Integer[]{1});
        Assertions.assertThat(joinOp2.getEndInputs()).containsExactly((Object[])new Integer[]{2});
        Assertions.assertThat((Object)op.nextSelection()).isEqualTo((Object)new InputSelection.Builder().select(2).build(3));
        op.endInput(2);
        Assertions.assertThat((boolean)aggOp1.isEnd()).isTrue();
        Assertions.assertThat((boolean)aggOp2.isEnd()).isTrue();
        Assertions.assertThat(joinOp1.getEndInputs()).isEqualTo(Arrays.asList(1, 2));
        Assertions.assertThat(joinOp2.getEndInputs()).isEqualTo(Arrays.asList(2, 1));
        Assertions.assertThat((Object)op.nextSelection()).isEqualTo((Object)InputSelection.ALL);
    }

    @Test
    public void testClose() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        Assertions.assertThat((boolean)aggOp1.isClosed()).isFalse();
        Assertions.assertThat((boolean)aggOp2.isClosed()).isFalse();
        Assertions.assertThat((boolean)aggOp1.isClosed()).isFalse();
        Assertions.assertThat((boolean)joinOp2.isClosed()).isFalse();
        op.close();
        Assertions.assertThat((boolean)aggOp1.isClosed()).isTrue();
        Assertions.assertThat((boolean)aggOp2.isClosed()).isTrue();
        Assertions.assertThat((boolean)joinOp1.isClosed()).isTrue();
        Assertions.assertThat((boolean)joinOp2.isClosed()).isTrue();
    }

    @Test
    public void testProcess() throws Exception {
        TestingBatchMultipleInputStreamOperator op = this.createMultipleInputStreamOperator();
        List<StreamElement> outputData = op.getOutputData();
        TestingTwoInputStreamOperator joinOp2 = (TestingTwoInputStreamOperator)op.getTailWrapper().getStreamOperator();
        TableOperatorWrapper joinWrapper1 = (TableOperatorWrapper)op.getTailWrapper().getInputWrappers().get(0);
        TestingTwoInputStreamOperator joinOp1 = (TestingTwoInputStreamOperator)joinWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper1 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(0);
        TestingOneInputStreamOperator aggOp1 = (TestingOneInputStreamOperator)aggWrapper1.getStreamOperator();
        TableOperatorWrapper aggWrapper2 = (TableOperatorWrapper)joinWrapper1.getInputWrappers().get(1);
        TestingOneInputStreamOperator aggOp2 = (TestingOneInputStreamOperator)aggWrapper2.getStreamOperator();
        List inputs = op.getInputs();
        Assertions.assertThat((List)inputs).hasSize(3);
        Input input1 = (Input)inputs.get(0);
        Input input2 = (Input)inputs.get(1);
        Input input3 = (Input)inputs.get(2);
        Assertions.assertThat((Object)input1).isInstanceOf(OneInput.class);
        Assertions.assertThat((Object)input2).isInstanceOf(OneInput.class);
        Assertions.assertThat((Object)input3).isInstanceOf(SecondInputOfTwoInput.class);
        Assertions.assertThat(joinOp2.getCurrentElement1()).isNull();
        Assertions.assertThat(joinOp2.getCurrentElement2()).isNull();
        Assertions.assertThat(joinOp1.getCurrentElement1()).isNull();
        Assertions.assertThat(joinOp1.getCurrentElement2()).isNull();
        Assertions.assertThat(aggOp1.getCurrentElement()).isNull();
        Assertions.assertThat(aggOp2.getCurrentElement()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        StreamRecord element1 = new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"123")}), 456L);
        input3.processElement(element1);
        Assertions.assertThat(joinOp2.getCurrentElement2()).isEqualTo((Object)element1);
        Assertions.assertThat(joinOp2.getCurrentElement1()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(joinOp2.getEndInputs()).isEmpty();
        op.endInput(3);
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(joinOp2.getEndInputs()).containsExactly((Object[])new Integer[]{2});
        StreamRecord element2 = new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"124")}), 457L);
        input1.processElement(element2);
        Assertions.assertThat(aggOp1.getCurrentElement()).isEqualTo((Object)element2);
        Assertions.assertThat(joinOp1.getCurrentElement1()).isNull();
        Assertions.assertThat(joinOp2.getCurrentElement1()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(joinOp1.getEndInputs()).isEmpty();
        op.endInput(1);
        Assertions.assertThat(joinOp1.getEndInputs()).containsExactly((Object[])new Integer[]{1});
        Assertions.assertThat(joinOp2.getEndInputs()).containsExactly((Object[])new Integer[]{2});
        Assertions.assertThat(joinOp1.getCurrentElement1()).isEqualTo((Object)element2);
        Assertions.assertThat(outputData).isEmpty();
        StreamRecord element3 = new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"125")}), 458L);
        input2.processElement(element3);
        Assertions.assertThat(aggOp2.getCurrentElement()).isEqualTo((Object)element3);
        Assertions.assertThat(joinOp1.getCurrentElement2()).isNull();
        Assertions.assertThat(joinOp2.getCurrentElement1()).isNull();
        Assertions.assertThat(outputData).isEmpty();
        Assertions.assertThat(joinOp1.getEndInputs()).containsExactly((Object[])new Integer[]{1});
        op.endInput(2);
        Assertions.assertThat(joinOp1.getEndInputs()).isEqualTo(Arrays.asList(1, 2));
        Assertions.assertThat(joinOp2.getEndInputs()).isEqualTo(Arrays.asList(2, 1));
        Assertions.assertThat(joinOp1.getCurrentElement2()).isEqualTo((Object)element3);
        Assertions.assertThat(outputData).hasSize(3);
    }

    private TestingBatchMultipleInputStreamOperator createMultipleInputStreamOperator() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Transformation<RowData> source1 = this.createSource(env, "source1");
        Transformation<RowData> source2 = this.createSource(env, "source2");
        Transformation<RowData> source3 = this.createSource(env, "source3");
        OneInputTransformation<RowData, RowData> agg1 = this.createOneInputTransform(source1, "agg1", new TestingOneInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        OneInputTransformation<RowData, RowData> agg2 = this.createOneInputTransform(source2, "agg2", new TestingOneInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        TwoInputTransformation<RowData, RowData, RowData> join1 = this.createTwoInputTransform((Transformation<RowData>)agg1, (Transformation<RowData>)agg2, "join1", new TestingTwoInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        TwoInputTransformation<RowData, RowData, RowData> join2 = this.createTwoInputTransform((Transformation<RowData>)join1, source3, "join2", new TestingTwoInputStreamOperator(true), (TypeInformation<RowData>)InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{DataTypes.STRING().getLogicalType()})));
        TableOperatorWrapperGenerator generator = new TableOperatorWrapperGenerator(Arrays.asList(source1, source2, source3), join2, new int[]{1, 2, 0});
        generator.generate();
        List inputTransformAndInputSpecPairs = generator.getInputTransformAndInputSpecPairs();
        ArrayList<StreamElement> outputData = new ArrayList<StreamElement>();
        return new TestingBatchMultipleInputStreamOperator(this.createStreamOperatorParameters(new TestingOutput(outputData)), inputTransformAndInputSpecPairs.stream().map(Pair::getValue).collect(Collectors.toList()), generator.getHeadWrappers(), generator.getTailWrapper(), outputData);
    }

    private static class TestingOutput
    extends CollectorOutput<RowData> {
        private final List<StreamElement> list;

        public TestingOutput(List<StreamElement> list) {
            super(list);
            this.list = list;
        }

        public void collect(StreamRecord<RowData> record) {
            this.list.add((StreamElement)record);
        }
    }

    private static class TestingBatchMultipleInputStreamOperator
    extends BatchMultipleInputStreamOperator {
        private final TableOperatorWrapper<?> tailWrapper;
        private final List<StreamElement> outputData;

        public TestingBatchMultipleInputStreamOperator(StreamOperatorParameters<RowData> parameters, List<InputSpec> inputSpecs, List<TableOperatorWrapper<?>> headWrapper, TableOperatorWrapper<?> tailWrapper, List<StreamElement> outputData) {
            super(parameters, inputSpecs, headWrapper, tailWrapper);
            this.tailWrapper = tailWrapper;
            this.outputData = outputData;
        }

        public List<StreamElement> getOutputData() {
            return this.outputData;
        }

        public TableOperatorWrapper<?> getTailWrapper() {
            return this.tailWrapper;
        }
    }
}

