/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.structuredstreaming.translation.batch;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.spark.structuredstreaming.translation.SchemaHelpers;
import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.RowHelpers;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.types.StructType;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Instant;

public class DatasetSourceBatch
implements DataSourceV2,
ReadSupport {
    static final @UnknownKeyFor @NonNull @Initialized String BEAM_SOURCE_OPTION = "beam-source";
    static final @UnknownKeyFor @NonNull @Initialized String DEFAULT_PARALLELISM = "default-parallelism";
    static final @UnknownKeyFor @NonNull @Initialized String PIPELINE_OPTIONS = "pipeline-options";

    public @UnknownKeyFor @NonNull @Initialized DataSourceReader createReader(@UnknownKeyFor @NonNull @Initialized DataSourceOptions options) {
        return new DatasetReader(options);
    }

    private static class DatasetPartitionReader<@UnknownKeyFor T>
    implements InputPartitionReader<InternalRow> {
        private @UnknownKeyFor @NonNull @Initialized boolean started = false;
        private @UnknownKeyFor @NonNull @Initialized boolean closed = false;
        private final @UnknownKeyFor @NonNull @Initialized BoundedSource<T> source;
        private // Could not load outer class - annotation placement on inner may be incorrect
        @UnknownKeyFor @NonNull @Initialized BoundedSource.BoundedReader<T> reader;

        DatasetPartitionReader(@UnknownKeyFor @NonNull @Initialized BoundedSource<T> source, @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions serializablePipelineOptions) {
            this.source = source;
            try {
                this.reader = source.createReader(serializablePipelineOptions.get().as(PipelineOptions.class));
            }
            catch (IOException e) {
                throw new RuntimeException("Error creating BoundedReader ", e);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized boolean next() throws @UnknownKeyFor @NonNull @Initialized IOException {
            if (!this.started) {
                this.started = true;
                return this.reader.start();
            }
            return !this.closed && this.reader.advance();
        }

        public @UnknownKeyFor @NonNull @Initialized InternalRow get() {
            WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow((Object)this.reader.getCurrent(), (Instant)this.reader.getCurrentTimestamp());
            return RowHelpers.storeWindowedValueInRow(windowedValue, this.source.getOutputCoder());
        }

        public void close() throws @UnknownKeyFor @NonNull @Initialized IOException {
            this.closed = true;
            this.reader.close();
        }
    }

    private static class DatasetReader<@UnknownKeyFor T>
    implements DataSourceReader,
    Serializable {
        private @UnknownKeyFor @NonNull @Initialized int numPartitions;
        private @UnknownKeyFor @NonNull @Initialized BoundedSource<T> source;
        private @UnknownKeyFor @NonNull @Initialized SerializablePipelineOptions serializablePipelineOptions;

        private DatasetReader(@UnknownKeyFor @NonNull @Initialized DataSourceOptions options) {
            if (!options.get(DatasetSourceBatch.BEAM_SOURCE_OPTION).isPresent()) {
                throw new RuntimeException("Beam source was not set in DataSource options");
            }
            this.source = (BoundedSource)Base64Serializer.deserializeUnchecked((String)((String)options.get(DatasetSourceBatch.BEAM_SOURCE_OPTION).get()), BoundedSource.class);
            if (!options.get(DatasetSourceBatch.DEFAULT_PARALLELISM).isPresent()) {
                throw new RuntimeException("Spark default parallelism was not set in DataSource options");
            }
            this.numPartitions = Integer.parseInt((String)options.get(DatasetSourceBatch.DEFAULT_PARALLELISM).get());
            Preconditions.checkArgument((this.numPartitions > 0 ? 1 : 0) != 0, (Object)"Number of partitions must be greater than zero.");
            if (!options.get(DatasetSourceBatch.PIPELINE_OPTIONS).isPresent()) {
                throw new RuntimeException("Beam pipelineOptions were not set in DataSource options");
            }
            this.serializablePipelineOptions = new SerializablePipelineOptions((String)options.get(DatasetSourceBatch.PIPELINE_OPTIONS).get());
        }

        public @UnknownKeyFor @NonNull @Initialized StructType readSchema() {
            return SchemaHelpers.binarySchema();
        }

        public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized InputPartition<@UnknownKeyFor @NonNull @Initialized InternalRow>> planInputPartitions() {
            PipelineOptions options = this.serializablePipelineOptions.get();
            ArrayList<InputPartition<InternalRow>> result = new ArrayList<InputPartition<InternalRow>>();
            try {
                long desiredSizeBytes = this.source.getEstimatedSizeBytes(options) / (long)this.numPartitions;
                List splits = this.source.split(desiredSizeBytes, options);
                for (BoundedSource split : splits) {
                    result.add((InputPartition<InternalRow>)(InputPartition & Serializable)() -> new DatasetPartitionReader(split, this.serializablePipelineOptions));
                }
                return result;
            }
            catch (Exception e) {
                throw new RuntimeException("Error in splitting BoundedSource " + this.source.getClass().getCanonicalName(), e);
            }
        }
    }
}

