/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.spark.sql.connector.write;

import com.mongodb.client.MongoCollection;
import com.mongodb.spark.sql.connector.config.WriteConfig;
import com.mongodb.spark.sql.connector.exceptions.DataException;
import com.mongodb.spark.sql.connector.schema.RowToBsonDocumentConverter;
import com.mongodb.spark.sql.connector.write.MongoDataWriterFactory;
import java.util.Arrays;
import java.util.Objects;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.connector.write.streaming.StreamingDataWriterFactory;
import org.apache.spark.sql.connector.write.streaming.StreamingWrite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoStreamingWrite
implements StreamingWrite {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoStreamingWrite.class);
    private final LogicalWriteInfo info;
    private final WriteConfig writeConfig;
    private final RowToBsonDocumentConverter rowToBsonDocumentConverter;
    private final boolean truncate;

    MongoStreamingWrite(LogicalWriteInfo info, RowToBsonDocumentConverter rowToBsonDocumentConverter, WriteConfig writeConfig, boolean truncate) {
        this.info = info;
        this.rowToBsonDocumentConverter = rowToBsonDocumentConverter;
        this.writeConfig = writeConfig;
        this.truncate = truncate;
    }

    public StreamingDataWriterFactory createStreamingWriterFactory(PhysicalWriteInfo info) {
        if (this.truncate) {
            this.writeConfig.doWithCollection(MongoCollection::drop);
        }
        return new MongoDataWriterFactory(this.rowToBsonDocumentConverter, this.writeConfig);
    }

    public void commit(long epochId, WriterCommitMessage[] messages) {
        LOGGER.debug("Write committed for: {}, with {} task(s).", (Object)this.info.queryId(), (Object)messages.length);
    }

    public void abort(long epochId, WriterCommitMessage[] messages) {
        long tasksCompleted = Arrays.stream(messages).filter(Objects::nonNull).count();
        throw new DataException(String.format("Write aborted for: %s. %s/%s tasks completed. EpochId: %s", this.info.queryId(), tasksCompleted, messages.length, epochId));
    }
}

