/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.kinesis.GetKinesisRecordsResult;
import org.apache.beam.sdk.io.kinesis.KinesisRecord;
import org.apache.beam.sdk.io.kinesis.RecordFilter;
import org.apache.beam.sdk.io.kinesis.ShardCheckpoint;
import org.apache.beam.sdk.io.kinesis.SimplifiedKinesisClient;
import org.apache.beam.sdk.io.kinesis.TransientKinesisException;
import org.apache.beam.sdks.java.io.kinesis.repackaged.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ShardRecordsIterator {
    private static final Logger LOG = LoggerFactory.getLogger(ShardRecordsIterator.class);
    private final SimplifiedKinesisClient kinesis;
    private final RecordFilter filter;
    private final String streamName;
    private final String shardId;
    private AtomicReference<ShardCheckpoint> checkpoint;
    private String shardIterator;
    private AtomicLong millisBehindLatest = new AtomicLong(Long.MAX_VALUE);

    ShardRecordsIterator(ShardCheckpoint initialCheckpoint, SimplifiedKinesisClient simplifiedKinesisClient) throws TransientKinesisException {
        this(initialCheckpoint, simplifiedKinesisClient, new RecordFilter());
    }

    ShardRecordsIterator(ShardCheckpoint initialCheckpoint, SimplifiedKinesisClient simplifiedKinesisClient, RecordFilter filter) throws TransientKinesisException {
        this.checkpoint = new AtomicReference<ShardCheckpoint>(Preconditions.checkNotNull(initialCheckpoint, "initialCheckpoint"));
        this.filter = Preconditions.checkNotNull(filter, "filter");
        this.kinesis = Preconditions.checkNotNull(simplifiedKinesisClient, "simplifiedKinesisClient");
        this.streamName = initialCheckpoint.getStreamName();
        this.shardId = initialCheckpoint.getShardId();
        this.shardIterator = initialCheckpoint.getShardIterator(this.kinesis);
    }

    List<KinesisRecord> readNextBatch() throws TransientKinesisException {
        GetKinesisRecordsResult response = this.fetchRecords();
        LOG.debug("Fetched {} new records", (Object)response.getRecords().size());
        List<KinesisRecord> filteredRecords = this.filter.apply(response.getRecords(), this.checkpoint.get());
        this.millisBehindLatest.set(response.getMillisBehindLatest());
        return filteredRecords;
    }

    private GetKinesisRecordsResult fetchRecords() throws TransientKinesisException {
        try {
            GetKinesisRecordsResult response = this.kinesis.getRecords(this.shardIterator, this.streamName, this.shardId);
            this.shardIterator = response.getNextShardIterator();
            return response;
        }
        catch (ExpiredIteratorException e) {
            LOG.info("Refreshing expired iterator", (Throwable)e);
            this.shardIterator = this.checkpoint.get().getShardIterator(this.kinesis);
            return this.fetchRecords();
        }
    }

    ShardCheckpoint getCheckpoint() {
        return this.checkpoint.get();
    }

    boolean isUpToDate() {
        return this.millisBehindLatest.get() == 0L;
    }

    void ackRecord(KinesisRecord record) {
        this.checkpoint.set(this.checkpoint.get().moveAfter(record));
    }
}

