/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1beta1.Storage;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public class BigQueryStorageStreamSource<T>
extends BoundedSource<T> {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageStreamSource.class);
    private final Storage.ReadSession readSession;
    private final Storage.Stream stream;
    private final String jsonTableSchema;
    private final SerializableFunction<SchemaAndRecord, T> parseFn;
    private final Coder<T> outputCoder;
    private final BigQueryServices bqServices;

    public static <T> BigQueryStorageStreamSource<T> create(Storage.ReadSession readSession, Storage.Stream stream, TableSchema tableSchema, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, BigQueryServices bqServices) {
        return new BigQueryStorageStreamSource<T>(readSession, stream, BigQueryHelpers.toJsonString(Preconditions.checkNotNull((Object)tableSchema, (Object)"tableSchema")), parseFn, outputCoder, bqServices);
    }

    public BigQueryStorageStreamSource<T> fromExisting(Storage.Stream newStream) {
        return new BigQueryStorageStreamSource<T>(this.readSession, newStream, this.jsonTableSchema, this.parseFn, this.outputCoder, this.bqServices);
    }

    private BigQueryStorageStreamSource(Storage.ReadSession readSession, Storage.Stream stream, String jsonTableSchema, SerializableFunction<SchemaAndRecord, T> parseFn, Coder<T> outputCoder, BigQueryServices bqServices) {
        this.readSession = (Storage.ReadSession)Preconditions.checkNotNull((Object)readSession, (Object)"readSession");
        this.stream = (Storage.Stream)Preconditions.checkNotNull((Object)stream, (Object)"stream");
        this.jsonTableSchema = (String)Preconditions.checkNotNull((Object)jsonTableSchema, (Object)"jsonTableSchema");
        this.parseFn = (SerializableFunction)Preconditions.checkNotNull(parseFn, (Object)"parseFn");
        this.outputCoder = (Coder)Preconditions.checkNotNull(outputCoder, (Object)"outputCoder");
        this.bqServices = (BigQueryServices)Preconditions.checkNotNull((Object)bqServices, (Object)"bqServices");
    }

    public Coder<T> getOutputCoder() {
        return this.outputCoder;
    }

    public void populateDisplayData(DisplayData.Builder builder) {
        super.populateDisplayData(builder);
        builder.addIfNotNull(DisplayData.item((String)"table", (String)BigQueryHelpers.toTableSpec(this.readSession.getTableReference())).withLabel("Table")).add(DisplayData.item((String)"readSession", (String)this.readSession.getName()).withLabel("Read session")).add(DisplayData.item((String)"stream", (String)this.stream.getName()).withLabel("Stream"));
    }

    public long getEstimatedSizeBytes(PipelineOptions options) {
        return 0L;
    }

    public List<? extends BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options) {
        return ImmutableList.of((Object)((Object)this));
    }

    public BigQueryStorageStreamReader<T> createReader(PipelineOptions options) throws IOException {
        return new BigQueryStorageStreamReader(this, (BigQueryOptions)options.as(BigQueryOptions.class));
    }

    public String toString() {
        return this.stream.toString();
    }

    @Experimental(value=Experimental.Kind.SOURCE_SINK)
    public static class BigQueryStorageStreamReader<T>
    extends BoundedSource.BoundedReader<T> {
        private final DatumReader<GenericRecord> datumReader;
        private final SerializableFunction<SchemaAndRecord, T> parseFn;
        private final BigQueryServices.StorageClient storageClient;
        private final TableSchema tableSchema;
        private BigQueryStorageStreamSource<T> source;
        private BigQueryServices.BigQueryServerStream<Storage.ReadRowsResponse> responseStream;
        private Iterator<Storage.ReadRowsResponse> responseIterator;
        private BinaryDecoder decoder;
        private GenericRecord record;
        private T current;
        private long currentOffset;
        private double fractionConsumed;
        private double fractionConsumedFromPreviousResponse;
        private double fractionConsumedFromCurrentResponse;
        private long rowsReadFromCurrentResponse;
        private long totalRowCountFromCurrentResponse;

        private BigQueryStorageStreamReader(BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
            this.source = source;
            this.datumReader = new GenericDatumReader(new Schema.Parser().parse(((BigQueryStorageStreamSource)source).readSession.getAvroSchema().getSchema()));
            this.parseFn = ((BigQueryStorageStreamSource)source).parseFn;
            this.storageClient = ((BigQueryStorageStreamSource)source).bqServices.getStorageClient(options);
            this.tableSchema = BigQueryHelpers.fromJsonString(((BigQueryStorageStreamSource)source).jsonTableSchema, TableSchema.class);
            this.fractionConsumed = 0.0;
            this.fractionConsumedFromPreviousResponse = 0.0;
            this.fractionConsumedFromCurrentResponse = 0.0;
            this.rowsReadFromCurrentResponse = 0L;
            this.totalRowCountFromCurrentResponse = 0L;
        }

        public synchronized boolean start() throws IOException {
            BoundedSource source = this.getCurrentSource();
            Storage.ReadRowsRequest request = Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(((BigQueryStorageStreamSource)source).stream).setOffset(this.currentOffset)).build();
            this.responseStream = this.storageClient.readRows(request);
            this.responseIterator = this.responseStream.iterator();
            LOG.info("Started BigQuery Storage API read from stream {}.", (Object)((BigQueryStorageStreamSource)source).stream.getName());
            return this.readNextRecord();
        }

        public synchronized boolean advance() throws IOException {
            ++this.currentOffset;
            return this.readNextRecord();
        }

        private synchronized boolean readNextRecord() throws IOException {
            while (this.decoder == null || this.decoder.isEnd()) {
                if (!this.responseIterator.hasNext()) {
                    this.fractionConsumed = 1.0;
                    return false;
                }
                this.fractionConsumedFromPreviousResponse = this.fractionConsumedFromCurrentResponse;
                Storage.ReadRowsResponse currentResponse = this.responseIterator.next();
                this.decoder = DecoderFactory.get().binaryDecoder(currentResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), this.decoder);
                this.rowsReadFromCurrentResponse = 0L;
                this.totalRowCountFromCurrentResponse = currentResponse.getAvroRows().getRowCount();
                this.fractionConsumedFromCurrentResponse = BigQueryStorageStreamReader.getFractionConsumed(currentResponse);
                Preconditions.checkArgument((this.totalRowCountFromCurrentResponse >= 0L ? 1 : 0) != 0, (String)"Row count from current response (%s) must be greater than or equal to zero.", (long)this.totalRowCountFromCurrentResponse);
                Preconditions.checkArgument((0.0 <= this.fractionConsumedFromCurrentResponse && this.fractionConsumedFromCurrentResponse <= 1.0 ? 1 : 0) != 0, (String)"Fraction consumed from current response (%s) is not in the range [0.0, 1.0].", (Object)this.fractionConsumedFromCurrentResponse);
                Preconditions.checkArgument((this.fractionConsumedFromPreviousResponse <= this.fractionConsumedFromCurrentResponse ? 1 : 0) != 0, (String)"Fraction consumed from the current response (%s) has to be larger than or equal to the fraction consumed from the previous response (%s).", (Object)this.fractionConsumedFromCurrentResponse, (Object)this.fractionConsumedFromPreviousResponse);
            }
            this.record = (GenericRecord)this.datumReader.read((Object)this.record, (Decoder)this.decoder);
            this.current = this.parseFn.apply((Object)new SchemaAndRecord(this.record, this.tableSchema));
            ++this.rowsReadFromCurrentResponse;
            this.fractionConsumed = this.fractionConsumedFromPreviousResponse + (this.fractionConsumedFromCurrentResponse - this.fractionConsumedFromPreviousResponse) * (double)this.rowsReadFromCurrentResponse * 1.0 / (double)this.totalRowCountFromCurrentResponse;
            return true;
        }

        public T getCurrent() throws NoSuchElementException {
            return this.current;
        }

        public synchronized void close() {
            this.storageClient.close();
        }

        public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
            return this.source;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public BoundedSource<T> splitAtFraction(double fraction) {
            Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls").inc();
            LOG.debug("Received BigQuery Storage API split request for stream {} at fraction {}.", (Object)((BigQueryStorageStreamSource)this.source).stream.getName(), (Object)fraction);
            Storage.SplitReadStreamRequest splitRequest = Storage.SplitReadStreamRequest.newBuilder().setOriginalStream(((BigQueryStorageStreamSource)this.source).stream).setFraction((float)fraction).build();
            Storage.SplitReadStreamResponse splitResponse = this.storageClient.splitReadStream(splitRequest);
            if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) {
                Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-impossible-split-point").inc();
                LOG.info("BigQuery Storage API stream {} cannot be split at {}.", (Object)((BigQueryStorageStreamSource)this.source).stream.getName(), (Object)fraction);
                return null;
            }
            BigQueryStorageStreamReader bigQueryStorageStreamReader = this;
            synchronized (bigQueryStorageStreamReader) {
                Iterator newResponseIterator;
                BigQueryServices.BigQueryServerStream<Storage.ReadRowsResponse> newResponseStream;
                try {
                    newResponseStream = this.storageClient.readRows(Storage.ReadRowsRequest.newBuilder().setReadPosition(Storage.StreamPosition.newBuilder().setStream(splitResponse.getPrimaryStream()).setOffset(this.currentOffset + 1L)).build());
                    newResponseIterator = newResponseStream.iterator();
                    newResponseIterator.hasNext();
                }
                catch (FailedPreconditionException e) {
                    Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-bad-split-point").inc();
                    LOG.info("BigQuery Storage API split of stream {} abandoned because the primary stream is to the left of the split fraction {}.", (Object)((BigQueryStorageStreamSource)this.source).stream.getName(), (Object)fraction);
                    return null;
                }
                catch (Exception e) {
                    Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-failed-due-to-other-reasons").inc();
                    LOG.error("BigQuery Storage API stream split failed.", (Throwable)e);
                    return null;
                }
                this.responseStream.cancel();
                this.source = this.source.fromExisting(splitResponse.getPrimaryStream());
                this.responseStream = newResponseStream;
                this.responseIterator = newResponseIterator;
                this.fractionConsumedFromCurrentResponse = this.fractionConsumed;
                this.decoder = null;
            }
            Metrics.counter(BigQueryStorageStreamReader.class, (String)"split-at-fraction-calls-successful").inc();
            LOG.info("Successfully split BigQuery Storage API stream at {}. Split response: {}", (Object)fraction, (Object)splitResponse);
            return this.source.fromExisting(splitResponse.getRemainderStream());
        }

        public synchronized Double getFractionConsumed() {
            return this.fractionConsumed;
        }

        private static float getFractionConsumed(Storage.ReadRowsResponse response) {
            return response.getStatus().getFractionConsumed();
        }
    }
}

