/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.util.List;
import java.util.function.IntSupplier;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkTestUtil;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

abstract class CommitterOperatorTestBase {
    CommitterOperatorTestBase() {
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testEmitCommittables(boolean withPostCommitTopology) throws Exception {
        SinkAndCounters sinkAndCounters = withPostCommitTopology ? this.sinkWithPostCommit() : this.sinkWithoutPostCommit();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new CommitterOperatorFactory(sinkAndCounters.sink, false, true));
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 1, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
        if (withPostCommitTopology) {
            List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
            SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
            SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        } else {
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
        }
        testHarness.close();
    }

    @Test
    void testWaitForCommittablesOfLatestCheckpointBeforeCommitting() throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(sinkAndCounters.sink, false, true);
        testHarness.open();
        testHarness.setProcessingTime(0L);
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 2, 2, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isZero();
        CommittableWithLineage second = new CommittableWithLineage((Object)"2", Long.valueOf(1L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)second));
        testHarness.getProcessingTimeService().setCurrentTime(2000L);
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(3);
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(first, 0));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(2))).isEqualTo(this.copyCommittableWithDifferentOrigin(second, 0));
        testHarness.close();
    }

    @Test
    void testImmediatelyCommitLateCommittables() throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(sinkAndCounters.sink, false, true);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        testHarness.notifyOfCompletedCheckpoint(1L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(2);
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(1);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(committableSummary.getNumberOfFailedCommittables()).hasOverallCommittables(committableSummary.getNumberOfCommittables()).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(first, 0));
        testHarness.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testEmitAllCommittablesOnEndOfInput(boolean isBatchMode) throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(sinkAndCounters.sink, isBatchMode, !isBatchMode);
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 2, null, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableSummary committableSummary2 = new CommittableSummary(2, 2, null, 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary2));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", null, 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        CommittableWithLineage second = new CommittableWithLineage((Object)"1", null, 2);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)second));
        testHarness.endInput();
        if (!isBatchMode) {
            Assertions.assertThat(testHarness.getOutput()).isEmpty();
            testHarness.notifyOfCompletedCheckpoint(1L);
        }
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(3);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasFailedCommittables(0).hasOverallCommittables(2).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(first, 0));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(2))).isEqualTo(this.copyCommittableWithDifferentOrigin(second, 0));
        testHarness.close();
    }

    @Test
    void testStateRestore() throws Exception {
        boolean originalSubtaskId = false;
        int subtaskIdAfterRecovery = 9;
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> testHarness = this.createTestHarness(this.sinkWithPostCommitWithRetry().sink, false, true, 1, 1, 0);
        testHarness.open();
        long checkpointId = 0L;
        CommittableSummary committableSummary = new CommittableSummary(0, 1, Long.valueOf(checkpointId), 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary));
        CommittableWithLineage first = new CommittableWithLineage((Object)"1", Long.valueOf(checkpointId), 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)first));
        CommittableSummary committableSummary2 = new CommittableSummary(1, 1, Long.valueOf(checkpointId), 1, 1, 0);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)committableSummary2));
        CommittableWithLineage second = new CommittableWithLineage((Object)"2", Long.valueOf(checkpointId), 1);
        testHarness.processElement((StreamRecord<CommittableMessage<String>>)new StreamRecord((Object)second));
        OperatorSubtaskState snapshot = testHarness.snapshot(checkpointId, 2L);
        testHarness.notifyOfCompletedCheckpoint(0L);
        Assertions.assertThat(testHarness.getOutput()).isEmpty();
        testHarness.close();
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> restored = this.createTestHarness(sinkAndCounters.sink, false, true, 10, 10, 9);
        restored.initializeState(snapshot);
        restored.open();
        List<StreamElement> output = SinkTestUtil.fromOutput(restored.getOutput());
        Assertions.assertThat(output).hasSize(3);
        Assertions.assertThat((int)sinkAndCounters.commitCounter.getAsInt()).isEqualTo(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasCheckpointId(checkpointId).hasFailedCommittables(0).hasOverallCommittables(2).hasPendingCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(new CommittableWithLineage(first.getCommittable(), Long.valueOf(checkpointId), 9));
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(2))).isEqualTo(new CommittableWithLineage(second.getCommittable(), Long.valueOf(checkpointId), 9));
        restored.close();
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        SinkAndCounters sinkAndCounters = this.sinkWithPostCommit();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(new CommitterOperatorFactory(sinkAndCounters.sink, false, isCheckpointingEnabled));
        testHarness.open();
        CommittableSummary committableSummary = new CommittableSummary(1, 1, Long.valueOf(1L), 1, 1, 0);
        testHarness.processElement(new StreamRecord((Object)committableSummary));
        CommittableWithLineage committableWithLineage = new CommittableWithLineage((Object)"1", Long.valueOf(1L), 1);
        testHarness.processElement(new StreamRecord((Object)committableWithLineage));
        testHarness.endInput();
        if (isCheckpointingEnabled) {
            testHarness.notifyOfCompletedCheckpoint(1L);
        }
        List<StreamElement> output = SinkTestUtil.fromOutput(testHarness.getOutput());
        Assertions.assertThat(output).hasSize(2);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableSummary(output.get(0))).hasCheckpointId(1L).hasPendingCommittables(0).hasOverallCommittables(1).hasFailedCommittables(0);
        SinkV2Assertions.assertThat(SinkTestUtil.toCommittableWithLinage(output.get(1))).isEqualTo(this.copyCommittableWithDifferentOrigin(committableWithLineage, 0));
        testHarness.notifyOfCompletedCheckpoint(2L);
        testHarness.endInput();
        Assertions.assertThat(testHarness.getOutput()).hasSize(2);
    }

    CommittableWithLineage<?> copyCommittableWithDifferentOrigin(CommittableWithLineage<?> committable, int subtaskId) {
        return new CommittableWithLineage(committable.getCommittable(), committable.getCheckpointId().isPresent() ? Long.valueOf(committable.getCheckpointId().getAsLong()) : null, subtaskId);
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness(SupportsCommitter<String> sink, boolean isBatchMode, boolean isCheckpointingEnabled) throws Exception {
        return new OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>>((OneInputStreamOperatorFactory<CommittableMessage<String>, CommittableMessage<String>>)new CommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled));
    }

    private OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>> createTestHarness(SupportsCommitter<String> sink, boolean isBatchMode, boolean isCheckpointingEnabled, int maxParallelism, int parallelism, int subtaskId) throws Exception {
        return new OneInputStreamOperatorTestHarness<CommittableMessage<String>, CommittableMessage<String>>((OneInputStreamOperatorFactory<CommittableMessage<String>, CommittableMessage<String>>)new CommitterOperatorFactory(sink, isBatchMode, isCheckpointingEnabled), maxParallelism, parallelism, subtaskId);
    }

    abstract SinkAndCounters sinkWithPostCommit();

    abstract SinkAndCounters sinkWithPostCommitWithRetry();

    abstract SinkAndCounters sinkWithoutPostCommit();

    static class SinkAndCounters {
        SupportsCommitter<String> sink;
        IntSupplier commitCounter;

        public SinkAndCounters(SupportsCommitter<String> sink, IntSupplier commitCounter) {
            this.sink = sink;
            this.commitCounter = commitCounter;
        }
    }
}

