/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Optional;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsAggregator;
import org.apache.kafka.clients.consumer.internals.ShareInFlightBatch;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class ShareCompletedFetch {
    final int nodeId;
    final TopicIdPartition partition;
    final ShareFetchResponseData.PartitionData partitionData;
    final short requestVersion;
    private final Logger log;
    private final BufferSupplier decompressionBufferSupplier;
    private final Iterator<? extends RecordBatch> batches;
    private int recordsRead;
    private int bytesRead;
    private RecordBatch currentBatch;
    private Record lastRecord;
    private CloseableIterator<Record> records;
    private KafkaException cachedBatchException = null;
    private KafkaException cachedRecordException = null;
    private boolean isConsumed = false;
    private boolean initialized = false;
    private final List<OffsetAndDeliveryCount> acquiredRecordList;
    private ListIterator<OffsetAndDeliveryCount> acquiredRecordIterator;
    private OffsetAndDeliveryCount nextAcquired;
    private final ShareFetchMetricsAggregator metricAggregator;

    ShareCompletedFetch(LogContext logContext, BufferSupplier decompressionBufferSupplier, int nodeId, TopicIdPartition partition, ShareFetchResponseData.PartitionData partitionData, ShareFetchMetricsAggregator metricAggregator, short requestVersion) {
        this.log = logContext.logger(ShareCompletedFetch.class);
        this.decompressionBufferSupplier = decompressionBufferSupplier;
        this.nodeId = nodeId;
        this.partition = partition;
        this.partitionData = partitionData;
        this.metricAggregator = metricAggregator;
        this.requestVersion = requestVersion;
        this.batches = ShareFetchResponse.recordsOrFail(partitionData).batches().iterator();
        this.acquiredRecordList = this.buildAcquiredRecordList(partitionData.acquiredRecords());
        this.nextAcquired = null;
    }

    private List<OffsetAndDeliveryCount> buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> partitionAcquiredRecords) {
        LinkedList<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<OffsetAndDeliveryCount>();
        partitionAcquiredRecords.forEach(acquiredRecords -> {
            for (long offset = acquiredRecords.firstOffset(); offset <= acquiredRecords.lastOffset(); ++offset) {
                acquiredRecordList.add(new OffsetAndDeliveryCount(offset, acquiredRecords.deliveryCount()));
            }
        });
        return acquiredRecordList;
    }

    boolean isInitialized() {
        return this.initialized;
    }

    void setInitialized() {
        this.initialized = true;
    }

    public boolean isConsumed() {
        return this.isConsumed;
    }

    void drain() {
        if (!this.isConsumed) {
            this.maybeCloseRecordStream();
            this.cachedRecordException = null;
            this.cachedBatchException = null;
            this.isConsumed = true;
            this.recordAggregatedMetrics(this.bytesRead, this.recordsRead);
        }
    }

    void recordAggregatedMetrics(int bytes, int records) {
        this.metricAggregator.record(this.partition.topicPartition(), bytes, records);
    }

    <K, V> ShareInFlightBatch<K, V> fetchRecords(Deserializers<K, V> deserializers, int maxRecords, boolean checkCrcs) {
        ShareInFlightBatch<K, V> inFlightBatch = new ShareInFlightBatch<K, V>(this.nodeId, this.partition);
        if (this.cachedBatchException != null) {
            this.rejectRecordBatch(inFlightBatch, this.currentBatch);
            inFlightBatch.setException(this.cachedBatchException);
            this.cachedBatchException = null;
            return inFlightBatch;
        }
        if (this.cachedRecordException != null) {
            inFlightBatch.addAcknowledgement(this.lastRecord.offset(), AcknowledgeType.RELEASE);
            inFlightBatch.setException(this.cachedRecordException);
            this.cachedRecordException = null;
            return inFlightBatch;
        }
        if (this.isConsumed) {
            return inFlightBatch;
        }
        this.initializeNextAcquired();
        try {
            int recordsInBatch = 0;
            boolean currentBatchHasMoreRecords = false;
            block3: while (recordsInBatch < maxRecords || currentBatchHasMoreRecords) {
                currentBatchHasMoreRecords = this.nextFetchedRecord(checkCrcs);
                if (this.lastRecord == null) {
                    while (this.nextAcquired != null) {
                        inFlightBatch.addGap(this.nextAcquired.offset);
                        this.nextAcquired = this.nextAcquiredRecord();
                    }
                    break;
                }
                while (this.nextAcquired != null) {
                    if (this.lastRecord.offset() == this.nextAcquired.offset) {
                        Optional<Integer> leaderEpoch = this.maybeLeaderEpoch(this.currentBatch.partitionLeaderEpoch());
                        TimestampType timestampType = this.currentBatch.timestampType();
                        ConsumerRecord<K, V> record = this.parseRecord(deserializers, this.partition, leaderEpoch, timestampType, this.lastRecord, this.nextAcquired.deliveryCount);
                        inFlightBatch.addRecord(record);
                        ++this.recordsRead;
                        this.bytesRead += this.lastRecord.sizeInBytes();
                        ++recordsInBatch;
                        this.nextAcquired = this.nextAcquiredRecord();
                        continue block3;
                    }
                    if (this.lastRecord.offset() < this.nextAcquired.offset) continue block3;
                    inFlightBatch.addGap(this.nextAcquired.offset);
                    this.nextAcquired = this.nextAcquiredRecord();
                }
            }
        }
        catch (SerializationException se) {
            this.nextAcquired = this.nextAcquiredRecord();
            if (inFlightBatch.isEmpty()) {
                inFlightBatch.addAcknowledgement(this.lastRecord.offset(), AcknowledgeType.RELEASE);
                inFlightBatch.setException(se);
            } else {
                this.cachedRecordException = se;
                inFlightBatch.setHasCachedException(true);
            }
        }
        catch (CorruptRecordException e) {
            if (inFlightBatch.isEmpty()) {
                this.rejectRecordBatch(inFlightBatch, this.currentBatch);
                inFlightBatch.setException(e);
            }
            this.cachedBatchException = e;
            inFlightBatch.setHasCachedException(true);
        }
        return inFlightBatch;
    }

    private void initializeNextAcquired() {
        if (this.nextAcquired == null) {
            if (this.acquiredRecordIterator == null) {
                this.acquiredRecordIterator = this.acquiredRecordList.listIterator();
            }
            if (this.acquiredRecordIterator.hasNext()) {
                this.nextAcquired = this.acquiredRecordIterator.next();
            }
        }
    }

    private OffsetAndDeliveryCount nextAcquiredRecord() {
        if (this.acquiredRecordIterator.hasNext()) {
            return this.acquiredRecordIterator.next();
        }
        return null;
    }

    private <K, V> void rejectRecordBatch(ShareInFlightBatch<K, V> inFlightBatch, RecordBatch currentBatch) {
        this.acquiredRecordIterator = this.acquiredRecordList.listIterator();
        OffsetAndDeliveryCount nextAcquired = this.nextAcquiredRecord();
        for (long offset = currentBatch.baseOffset(); offset <= currentBatch.lastOffset() && nextAcquired != null; ++offset) {
            if (offset == nextAcquired.offset) {
                inFlightBatch.addAcknowledgement(offset, AcknowledgeType.REJECT);
            } else if (offset < nextAcquired.offset) continue;
            nextAcquired = this.nextAcquiredRecord();
        }
    }

    <K, V> ConsumerRecord<K, V> parseRecord(Deserializers<K, V> deserializers, TopicIdPartition partition, Optional<Integer> leaderEpoch, TimestampType timestampType, Record record, short deliveryCount) {
        Object value;
        Object key;
        RecordHeaders headers = new RecordHeaders(record.headers());
        ByteBuffer keyBytes = record.key();
        ByteBuffer valueBytes = record.value();
        try {
            key = keyBytes == null ? null : (Object)deserializers.keyDeserializer.deserialize(partition.topic(), (Headers)headers, keyBytes);
        }
        catch (RuntimeException e) {
            this.log.error("Key Deserializers with error: {}", deserializers);
            throw ShareCompletedFetch.newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY, partition.topicPartition(), timestampType, record, e, headers);
        }
        try {
            value = valueBytes == null ? null : (Object)deserializers.valueDeserializer.deserialize(partition.topic(), (Headers)headers, valueBytes);
        }
        catch (RuntimeException e) {
            this.log.error("Value Deserializers with error: {}", deserializers);
            throw ShareCompletedFetch.newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE, partition.topicPartition(), timestampType, record, e, headers);
        }
        return new ConsumerRecord<Object, Object>(partition.topic(), partition.partition(), record.offset(), record.timestamp(), timestampType, keyBytes == null ? -1 : keyBytes.remaining(), valueBytes == null ? -1 : valueBytes.remaining(), key, value, headers, leaderEpoch, Optional.of(deliveryCount));
    }

    private static RecordDeserializationException newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin origin, TopicPartition partition, TimestampType timestampType, Record record, RuntimeException e, Headers headers) {
        return new RecordDeserializationException(origin, partition, record.offset(), record.timestamp(), timestampType, record.key(), record.value(), headers, "Error deserializing " + origin.name() + " for partition " + String.valueOf(partition) + " at offset " + record.offset() + ". The record has been released.", e);
    }

    private boolean nextFetchedRecord(boolean checkCrcs) {
        block3: {
            Record record;
            while (true) {
                if (this.records == null || !this.records.hasNext()) {
                    this.maybeCloseRecordStream();
                    if (!this.batches.hasNext()) {
                        this.drain();
                        this.lastRecord = null;
                        break block3;
                    }
                    this.currentBatch = this.batches.next();
                    this.maybeEnsureValid(this.currentBatch, checkCrcs);
                    this.records = this.currentBatch.streamingIterator(this.decompressionBufferSupplier);
                    continue;
                }
                record = (Record)this.records.next();
                this.maybeEnsureValid(record, checkCrcs);
                if (!this.currentBatch.isControlBatch()) break;
            }
            this.lastRecord = record;
        }
        return this.records != null && this.records.hasNext();
    }

    private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
        return leaderEpoch == -1 ? Optional.empty() : Optional.of(leaderEpoch);
    }

    private void maybeEnsureValid(RecordBatch batch, boolean checkCrcs) {
        if (checkCrcs && batch.magic() >= 2) {
            try {
                batch.ensureValid();
            }
            catch (CorruptRecordException e) {
                throw new CorruptRecordException("Record batch for partition " + String.valueOf(this.partition.topicPartition()) + " at offset " + batch.baseOffset() + " is invalid, cause: " + e.getMessage());
            }
        }
    }

    private void maybeEnsureValid(Record record, boolean checkCrcs) {
        if (checkCrcs) {
            try {
                record.ensureValid();
            }
            catch (CorruptRecordException e) {
                throw new CorruptRecordException("Record for partition " + String.valueOf(this.partition.topicPartition()) + " at offset " + record.offset() + " is invalid, cause: " + e.getMessage());
            }
        }
    }

    private void maybeCloseRecordStream() {
        if (this.records != null) {
            this.records.close();
            this.records = null;
        }
    }

    private static class OffsetAndDeliveryCount {
        final long offset;
        final short deliveryCount;

        OffsetAndDeliveryCount(long offset, short deliveryCount) {
            this.offset = offset;
            this.deliveryCount = deliveryCount;
        }

        public String toString() {
            return "OffsetAndDeliveryCount{offset=" + this.offset + ", deliveryCount=" + this.deliveryCount + "}";
        }
    }
}

