/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.operators.sink.BatchCommitterOperator;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

public final class BatchCommitterOperatorFactory<CommT>
extends AbstractStreamOperatorFactory<CommT>
implements OneInputStreamOperatorFactory<CommT, CommT> {
    private final Sink<?, CommT, ?, ?> sink;

    public BatchCommitterOperatorFactory(Sink<?, CommT, ?, ?> sink) {
        this.sink = (Sink)Preconditions.checkNotNull(sink);
    }

    @Override
    public <T extends StreamOperator<CommT>> T createStreamOperator(StreamOperatorParameters<CommT> parameters) {
        BatchCommitterOperator committerOperator;
        try {
            committerOperator = new BatchCommitterOperator((Committer)this.sink.createCommitter().orElseThrow(() -> new IllegalStateException("Could not create committer from the sink")));
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not create the Committer.", (Throwable)e);
        }
        committerOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
        return (T)committerOperator;
    }

    @Override
    public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
        return BatchCommitterOperator.class;
    }
}

