/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.VersionedSerializer;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.CommitterMetrics;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.CommitterOperatorTestBase;
import org.apache.paimon.flink.sink.NoopCommittableStateManager;
import org.apache.paimon.flink.sink.RestoreAndFailCommittableStateManager;
import org.apache.paimon.flink.sink.StoreCommitter;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class CommitterOperatorTest
extends CommitterOperatorTestBase {
    protected String initialCommitUser;

    @Override
    @BeforeEach
    public void before() {
        super.before();
        this.initialCommitUser = UUID.randomUUID().toString();
    }

    @Test
    public void testFailIntentionallyAfterRestore() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 20L}));
        long timestamp = 1L;
        for (CommitMessage committable : write.prepareCommit(false, 8L)) {
            testHarness.processElement((Object)new Committable(8L, Committable.Kind.FILE, (Object)committable), timestamp++);
        }
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, timestamp++);
        Assertions.assertThat((Long)table.snapshotManager().latestSnapshotId()).isNull();
        testHarness.close();
        testHarness = this.createRecoverableTestHarness(table);
        try {
            testHarness.initializeState(snapshot);
            testHarness.open();
            Assertions.fail((String)"Expecting intentional exception");
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e).hasMessageContaining("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
        this.assertResults(table, "1, 10", "2, 20");
        testHarness.close();
        testHarness = this.createRecoverableTestHarness(table);
        testHarness.initializeState(snapshot);
        testHarness.open();
        this.assertResults(table, "1, 10", "2, 20");
        testHarness.close();
    }

    @Test
    public void testCheckpointAbort() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        long cpId = 0L;
        for (int i = 0; i < 10; ++i) {
            StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
            write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
            write.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 20L}));
            for (CommitMessage committable : write.prepareCommit(false, ++cpId)) {
                testHarness.processElement((Object)new Committable(cpId, Committable.Kind.FILE, (Object)committable), 1L);
            }
        }
        testHarness.snapshot(cpId, 1L);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        SnapshotManager snapshotManager = new SnapshotManager((FileIO)LocalFileIO.create(), this.tablePath);
        Assertions.assertThat((Long)snapshotManager.latestSnapshotId()).isEqualTo(cpId);
        testHarness.close();
    }

    @Test
    public void testSnapshotLostWhenFailed() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createLossyTestHarness(table);
        testHarness.open();
        long timestamp = 1L;
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser);
        StreamTableWrite write = streamWriteBuilder.newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 20L}));
        for (CommitMessage committable : write.prepareCommit(false, 1L)) {
            testHarness.processElement((Object)new Committable(1L, Committable.Kind.FILE, (Object)committable), timestamp++);
        }
        testHarness.snapshot(1L, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(1L);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{3, 30L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{4, 40L}));
        for (CommitMessage committable : write.prepareCommit(false, 2L)) {
            testHarness.processElement((Object)new Committable(2L, Committable.Kind.FILE, (Object)committable), timestamp++);
        }
        OperatorSubtaskState snapshot = testHarness.snapshot(2L, timestamp++);
        write.close();
        testHarness.close();
        testHarness = this.createLossyTestHarness(table);
        testHarness.initializeState(snapshot);
        testHarness.open();
        write = streamWriteBuilder.newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{5, 50L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{6, 60L}));
        for (CommitMessage committable : write.prepareCommit(false, 3L)) {
            testHarness.processElement((Object)new Committable(3L, Committable.Kind.FILE, (Object)committable), timestamp++);
        }
        testHarness.snapshot(3L, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(3L);
        write.close();
        testHarness.close();
        this.assertResults(table, "1, 10", "2, 20", "5, 50", "6, 60");
    }

    @Test
    public void testRestoreCommitUser() throws Exception {
        OperatorSubtaskState snapshot;
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness;
        FileStoreTable table = this.createFileStoreTable();
        String commitUser = UUID.randomUUID().toString();
        ArrayList<OperatorSubtaskState> operatorSubtaskStates = new ArrayList<OperatorSubtaskState>();
        long timestamp = 1L;
        long checkpoint = 1L;
        for (int i = 0; i < 5; ++i) {
            testHarness = this.createLossyTestHarness(table, commitUser);
            testHarness.open();
            snapshot = CommitterOperatorTest.writeAndSnapshot(table, commitUser, timestamp, ++checkpoint, testHarness);
            operatorSubtaskStates.add(snapshot);
            testHarness.close();
        }
        OperatorSubtaskState operatorSubtaskState = AbstractStreamOperatorTestHarness.repackageState((OperatorSubtaskState[])operatorSubtaskStates.toArray(new OperatorSubtaskState[0]));
        testHarness = this.createLossyTestHarness(table);
        testHarness.initializeState(operatorSubtaskState);
        snapshot = CommitterOperatorTest.writeAndSnapshot(table, this.initialCommitUser, timestamp, ++checkpoint, testHarness);
        testHarness.close();
        ArrayList actual = new ArrayList();
        OneInputStreamOperator<Committable, Committable> operator = this.createCommitterOperator(table, this.initialCommitUser, (CommittableStateManager<ManifestCommittable>)new NoopCommittableStateManager(), (ThrowingConsumer<StateInitializationContext, Exception>)((ThrowingConsumer)context -> {
            ListState state = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor("commit_user_state", String.class));
            ((Iterable)state.get()).forEach(actual::add);
        }));
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness1 = this.createTestHarness(operator);
        testHarness1.initializeState(snapshot);
        testHarness1.close();
        Assertions.assertThat((int)actual.size()).isEqualTo(1);
        Assertions.assertThat(actual).hasSameElementsAs((Iterable)Lists.newArrayList((Object[])new String[]{commitUser}));
    }

    @Test
    public void testCommitInputEnd() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        String commitUser = UUID.randomUUID().toString();
        OneInputStreamOperator<Committable, Committable> operator = this.createCommitterOperator(table, commitUser, (CommittableStateManager<ManifestCommittable>)new NoopCommittableStateManager());
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createTestHarness(operator);
        testHarness.open();
        Assertions.assertThatCode(() -> {
            long time = System.currentTimeMillis();
            long cp = 0L;
            testHarness.processElement((Object)new Committable(Long.MAX_VALUE, Committable.Kind.FILE, (Object)new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), cp);
            testHarness.snapshot(cp++, time++);
            testHarness.processElement((Object)new Committable(Long.MAX_VALUE, Committable.Kind.FILE, (Object)new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), cp);
            testHarness.processElement((Object)new Committable(Long.MAX_VALUE, Committable.Kind.FILE, (Object)new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), cp);
            testHarness.snapshot(cp++, time++);
            testHarness.snapshot(cp, time);
        }).doesNotThrowAnyException();
        if (operator instanceof CommitterOperator) {
            Assertions.assertThat((int)((ManifestCommittable)((CommitterOperator)operator).committablesPerCheckpoint.get(Long.MAX_VALUE)).fileCommittables().size()).isEqualTo(3);
        }
        Assertions.assertThatCode(() -> {
            long time = System.currentTimeMillis();
            long cp = 0L;
            testHarness.processElement((Object)new Committable(0L, Committable.Kind.FILE, (Object)new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), cp);
            testHarness.snapshot(cp++, time++);
            testHarness.processElement((Object)new Committable(0L, Committable.Kind.FILE, (Object)new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), cp);
            testHarness.processElement((Object)new Committable(Long.MAX_VALUE, Committable.Kind.FILE, (Object)new CommitMessageImpl(BinaryRow.EMPTY_ROW, 0, new DataIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), new CompactIncrement(Collections.emptyList(), Collections.emptyList(), Collections.emptyList()))), cp);
            testHarness.snapshot(cp++, time++);
            testHarness.snapshot(cp, time);
        }).hasRootCauseInstanceOf(RuntimeException.class).cause().hasMessageContaining("Repeatedly commit the same checkpoint files.");
    }

    private static OperatorSubtaskState writeAndSnapshot(FileStoreTable table, String commitUser, long timestamp, long checkpoint, OneInputStreamOperatorTestHarness<Committable, Committable> testHarness) throws Exception {
        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
        StreamTableWrite write = streamWriteBuilder.withCommitUser(commitUser).newWrite();
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        for (CommitMessage committable : write.prepareCommit(false, 1L)) {
            testHarness.processElement((Object)new Committable(checkpoint, Committable.Kind.FILE, (Object)committable), ++timestamp);
        }
        OperatorSubtaskState snapshot = testHarness.snapshot(checkpoint, ++timestamp);
        return snapshot;
    }

    @Test
    public void testWatermarkCommit() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        long timestamp = 0L;
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        long cpId = 1L;
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        testHarness.processElement((Object)new Committable(cpId, Committable.Kind.FILE, write.prepareCommit(true, cpId).get(0)), timestamp++);
        testHarness.processWatermark(new Watermark(1024L));
        testHarness.snapshot(cpId, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        Assertions.assertThat((Long)table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
        cpId = 2L;
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 20L}));
        testHarness.processElement((Object)new Committable(cpId, Committable.Kind.FILE, write.prepareCommit(true, cpId).get(0)), timestamp++);
        testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
        testHarness.snapshot(cpId, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        testHarness.close();
        write.close();
        Assertions.assertThat((Long)table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
    }

    @Test
    public void testEmptyCommit() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        testHarness.snapshot(1L, 1L);
        testHarness.notifyOfCompletedCheckpoint(1L);
        Snapshot snapshot = table.snapshotManager().latestSnapshot();
        Assertions.assertThat((Object)snapshot).isNull();
    }

    @Test
    public void testForceCreateSnapshotCommit() throws Exception {
        FileStoreTable table = this.createFileStoreTable(options -> options.set(CoreOptions.COMMIT_FORCE_CREATE_SNAPSHOT.key(), "true"));
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        testHarness.snapshot(1L, 1L);
        testHarness.notifyOfCompletedCheckpoint(1L);
        Snapshot snapshot = table.snapshotManager().latestSnapshot();
        Assertions.assertThat((Object)snapshot).isNotNull();
    }

    @Test
    public void testEmptyCommitWithProcessTimeTag() throws Exception {
        FileStoreTable table = this.createFileStoreTable(options -> {
            options.set(CoreOptions.TAG_AUTOMATIC_CREATION, (Object)CoreOptions.TagCreationMode.PROCESS_TIME);
            options.set(CoreOptions.TAG_CREATION_PERIOD, (Object)CoreOptions.TagCreationPeriod.DAILY);
        });
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createRecoverableTestHarness(table);
        testHarness.open();
        testHarness.snapshot(1L, 1L);
        testHarness.notifyOfCompletedCheckpoint(1L);
        Snapshot snapshot = table.snapshotManager().latestSnapshot();
        Assertions.assertThat((Object)snapshot).isNotNull();
        Assertions.assertThat((long)snapshot.id()).isEqualTo(1L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(1L);
        testHarness.snapshot(2L, 2L);
        testHarness.notifyOfCompletedCheckpoint(2L);
        snapshot = table.snapshotManager().latestSnapshot();
        Assertions.assertThat((Object)snapshot).isNotNull();
        Assertions.assertThat((long)snapshot.id()).isEqualTo(1L);
        Assertions.assertThat((long)table.tagManager().tagCount()).isEqualTo(1L);
    }

    @Test
    public void testCalcDataBytesSend() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        TableWriteImpl write = table.newWrite(this.initialCommitUser);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 10L}));
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 20L}));
        List committable = write.prepareCommit(false, 0L);
        write.close();
        ManifestCommittable manifestCommittable = new ManifestCommittable(0L);
        for (CommitMessage commitMessage : committable) {
            manifestCommittable.addFileCommittable(commitMessage);
        }
        TableCommitImpl commit = table.newCommit(this.initialCommitUser);
        OperatorMetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup();
        StoreCommitter committer = new StoreCommitter((TableCommit)commit, metricGroup);
        committer.commit(Collections.singletonList(manifestCommittable));
        CommitterMetrics metrics = committer.getCommitterMetrics();
        Assertions.assertThat((long)metrics.getNumBytesOutCounter().getCount()).isEqualTo(285L);
        Assertions.assertThat((long)metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2L);
        committer.close();
    }

    @Test
    public void testCommitMetrics() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        OneInputStreamOperator<Committable, Committable> operator = this.createCommitterOperator(table, null, (CommittableStateManager<ManifestCommittable>)new RestoreAndFailCommittableStateManager((SerializableSupplier & Serializable)() -> new VersionedSerializerWrapper((VersionedSerializer)new ManifestCommittableSerializer())));
        OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createTestHarness(operator);
        testHarness.open();
        long timestamp = 0L;
        StreamTableWrite write = table.newStreamWriteBuilder().withCommitUser(this.initialCommitUser).newWrite();
        long cpId = 1L;
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 100L}));
        testHarness.processElement((Object)new Committable(cpId, Committable.Kind.FILE, write.prepareCommit(false, cpId).get(0)), timestamp++);
        testHarness.snapshot(cpId, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        MetricGroup commitMetricGroup = operator.getMetricGroup().addGroup("paimon").addGroup("table", table.name()).addGroup("commit");
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAdded").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesDeleted").getValue()).isEqualTo((Object)0L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAppended").getValue()).isEqualTo((Object)1L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesCommitCompacted").getValue()).isEqualTo((Object)0L);
        cpId = 2L;
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{1, 101L}));
        write.compact(BinaryRow.EMPTY_ROW, 0, false);
        write.write((InternalRow)GenericRow.of((Object[])new Object[]{2, 200L}));
        write.compact(BinaryRow.EMPTY_ROW, 0, true);
        testHarness.processElement((Object)new Committable(cpId, Committable.Kind.FILE, write.prepareCommit(true, cpId).get(0)), timestamp++);
        testHarness.snapshot(cpId, timestamp++);
        testHarness.notifyOfCompletedCheckpoint(cpId);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAdded").getValue()).isEqualTo((Object)3L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesDeleted").getValue()).isEqualTo((Object)3L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesAppended").getValue()).isEqualTo((Object)2L);
        Assertions.assertThat((Object)TestingMetricUtils.getGauge(commitMetricGroup, "lastTableFilesCommitCompacted").getValue()).isEqualTo((Object)4L);
        testHarness.close();
        write.close();
    }

    @Test
    public void testParallelism() throws Exception {
        FileStoreTable table = this.createFileStoreTable();
        String commitUser = UUID.randomUUID().toString();
        OneInputStreamOperator<Committable, Committable> operator = this.createCommitterOperator(table, commitUser, (CommittableStateManager<ManifestCommittable>)new NoopCommittableStateManager());
        try (OneInputStreamOperatorTestHarness<Committable, Committable> testHarness = this.createTestHarness(operator, 10, 10, 3);){
            Assertions.assertThatCode(() -> testHarness.open()).hasMessage("Committer Operator parallelism in paimon MUST be one.");
        }
    }

    protected OneInputStreamOperatorTestHarness<Committable, Committable> createRecoverableTestHarness(FileStoreTable table) throws Exception {
        OneInputStreamOperator<Committable, Committable> operator = this.createCommitterOperator(table, null, (CommittableStateManager<ManifestCommittable>)new RestoreAndFailCommittableStateManager((SerializableSupplier & Serializable)() -> new VersionedSerializerWrapper((VersionedSerializer)new ManifestCommittableSerializer())));
        return this.createTestHarness(operator);
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createLossyTestHarness(FileStoreTable table) throws Exception {
        return this.createLossyTestHarness(table, null);
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createLossyTestHarness(FileStoreTable table, String commitUser) throws Exception {
        OneInputStreamOperator<Committable, Committable> operator = this.createCommitterOperator(table, commitUser, (CommittableStateManager<ManifestCommittable>)new NoopCommittableStateManager());
        return this.createTestHarness(operator);
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(OneInputStreamOperator<Committable, Committable> operator) throws Exception {
        return this.createTestHarness(operator, 1, 1, 0);
    }

    private OneInputStreamOperatorTestHarness<Committable, Committable> createTestHarness(OneInputStreamOperator<Committable, Committable> operator, int maxParallelism, int parallelism, int subTaskIndex) throws Exception {
        TypeSerializer serializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness(operator, maxParallelism, parallelism, subTaskIndex, serializer, new OperatorID());
        harness.setup(serializer);
        return harness;
    }

    protected OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable table, String commitUser, CommittableStateManager<ManifestCommittable> committableStateManager) {
        return new CommitterOperator(true, true, commitUser == null ? this.initialCommitUser : commitUser, (Committer.Factory & Serializable)(user, metricGroup) -> new StoreCommitter((TableCommit)table.newStreamWriteBuilder().withCommitUser(user).newCommit(), metricGroup), committableStateManager);
    }

    protected OneInputStreamOperator<Committable, Committable> createCommitterOperator(FileStoreTable table, String commitUser, CommittableStateManager<ManifestCommittable> committableStateManager, final ThrowingConsumer<StateInitializationContext, Exception> initializeFunction) {
        return new CommitterOperator<Committable, ManifestCommittable>(true, true, commitUser == null ? this.initialCommitUser : commitUser, (Committer.Factory & Serializable)(user, metricGroup) -> new StoreCommitter((TableCommit)table.newStreamWriteBuilder().withCommitUser(user).newCommit(), metricGroup), committableStateManager){

            public void initializeState(StateInitializationContext context) throws Exception {
                initializeFunction.accept((Object)context);
            }
        };
    }
}

