/*
 * Decompiled with CFR 0.152.
 */
package com.mongodb.spark.sql.connector.read;

import com.mongodb.spark.sql.connector.assertions.Assertions;
import com.mongodb.spark.sql.connector.config.ReadConfig;
import com.mongodb.spark.sql.connector.read.MongoInputPartition;
import com.mongodb.spark.sql.connector.read.MongoStreamPartitionReaderFactory;
import com.mongodb.spark.sql.connector.read.ResumeTokenOffset;
import com.mongodb.spark.sql.connector.read.ResumeTokenPartitionOffset;
import com.mongodb.spark.sql.connector.schema.BsonDocumentToRowConverter;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoContinuousStream
implements ContinuousStream {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoContinuousStream.class);
    private final BsonDocumentToRowConverter bsonDocumentToRowConverter;
    private final ReadConfig readConfig;

    public MongoContinuousStream(StructType schema, ReadConfig readConfig) {
        Assertions.validateConfig(schema, s -> !s.isEmpty(), () -> "Mongo Continuous streams require a schema to be defined");
        this.bsonDocumentToRowConverter = new BsonDocumentToRowConverter(schema);
        this.readConfig = readConfig;
    }

    public InputPartition[] planInputPartitions(Offset start) {
        return new InputPartition[]{new MongoInputPartition(0, this.readConfig.getAggregationPipeline(), new ResumeTokenPartitionOffset(((ResumeTokenOffset)start).getResumeToken()))};
    }

    public ContinuousPartitionReaderFactory createContinuousReaderFactory() {
        return new MongoStreamPartitionReaderFactory(this.bsonDocumentToRowConverter, this.readConfig);
    }

    public Offset mergeOffsets(PartitionOffset[] offsets) {
        Assertions.ensureState(() -> offsets.length == 1, () -> "Multiple offsets found when there should only be one.");
        Assertions.ensureState(() -> offsets[0] instanceof ResumeTokenPartitionOffset, () -> String.format("Unexpected partition offset type. Expected ResumeTokenPartitionOffset` found `%s`", offsets[0].getClass()));
        return new ResumeTokenOffset(((ResumeTokenPartitionOffset)offsets[0]).getResumeToken());
    }

    public Offset initialOffset() {
        return ResumeTokenOffset.INITIAL_RESUME_TOKEN_OFFSET;
    }

    public Offset deserializeOffset(String json) {
        return ResumeTokenOffset.parse(json);
    }

    public void commit(Offset end) {
        LOGGER.info("ContinuousStream commit: {}", (Object)end);
    }

    public void stop() {
        LOGGER.info("ContinuousStream stopped.");
    }
}

