/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.utils.SerializableSupplier;

public class RestoreAndFailCommittableStateManager<GlobalCommitT>
implements CommittableStateManager<GlobalCommitT> {
    private static final long serialVersionUID = 1L;
    private final SerializableSupplier<SimpleVersionedSerializer<GlobalCommitT>> committableSerializer;
    private ListState<GlobalCommitT> streamingCommitterState;

    public RestoreAndFailCommittableStateManager(SerializableSupplier<SimpleVersionedSerializer<GlobalCommitT>> committableSerializer) {
        this.committableSerializer = committableSerializer;
    }

    @Override
    public void initializeState(StateInitializationContext context, Committer<?, GlobalCommitT> committer) throws Exception {
        this.streamingCommitterState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE)), (SimpleVersionedSerializer)this.committableSerializer.get());
        ArrayList restored = new ArrayList();
        ((Iterable)this.streamingCommitterState.get()).forEach(restored::add);
        this.streamingCommitterState.clear();
        this.recover(restored, committer);
    }

    private void recover(List<GlobalCommitT> committables, Committer<?, GlobalCommitT> committer) throws Exception {
        int numCommitted = committer.filterAndCommit(committables);
        if (numCommitted > 0) {
            throw new RuntimeException("This exception is intentionally thrown after committing the restored checkpoints. By restarting the job we hope that writers can start writing based on these new commits.");
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context, List<GlobalCommitT> committables) throws Exception {
        this.streamingCommitterState.update(committables);
    }
}

