/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.model.Datapoint;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
import com.amazonaws.services.kinesis.model.ExpiredIteratorException;
import com.amazonaws.services.kinesis.model.GetRecordsRequest;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.Shard;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.services.kinesis.model.StreamDescription;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.beam.sdk.io.kinesis.AWSClientsProvider;
import org.apache.beam.sdk.io.kinesis.GetKinesisRecordsResult;
import org.apache.beam.sdk.io.kinesis.KinesisClientThrottledException;
import org.apache.beam.sdk.io.kinesis.TransientKinesisException;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Minutes;
import org.joda.time.ReadableInstant;

class SimplifiedKinesisClient {
    private static final String KINESIS_NAMESPACE = "AWS/Kinesis";
    private static final String INCOMING_RECORDS_METRIC = "IncomingBytes";
    private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
    private static final String SUM_STATISTIC = "Sum";
    private static final String STREAM_NAME_DIMENSION = "StreamName";
    private static final int LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS = 10;
    private static final Duration LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF = Duration.standardSeconds((long)1L);
    private final AmazonKinesis kinesis;
    private final AmazonCloudWatch cloudWatch;
    private final Integer limit;

    public SimplifiedKinesisClient(AmazonKinesis kinesis, AmazonCloudWatch cloudWatch, Integer limit) {
        this.kinesis = (AmazonKinesis)Preconditions.checkNotNull((Object)kinesis, (Object)"kinesis");
        this.cloudWatch = (AmazonCloudWatch)Preconditions.checkNotNull((Object)cloudWatch, (Object)"cloudWatch");
        this.limit = limit;
    }

    public static SimplifiedKinesisClient from(AWSClientsProvider provider, Integer limit) {
        return new SimplifiedKinesisClient(provider.getKinesisClient(), provider.getCloudWatchClient(), limit);
    }

    public String getShardIterator(String streamName, String shardId, ShardIteratorType shardIteratorType, String startingSequenceNumber, Instant timestamp) throws TransientKinesisException {
        Date date = timestamp != null ? timestamp.toDate() : null;
        return this.wrapExceptions(() -> this.kinesis.getShardIterator(new GetShardIteratorRequest().withStreamName(streamName).withShardId(shardId).withShardIteratorType(shardIteratorType).withStartingSequenceNumber(startingSequenceNumber).withTimestamp(date)).getShardIterator());
    }

    public List<Shard> listShards(String streamName) throws TransientKinesisException {
        return this.wrapExceptions(() -> {
            ArrayList shards = Lists.newArrayList();
            String lastShardId = null;
            FluentBackoff retryBackoff = FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF);
            StreamDescription description = null;
            do {
                BackOff backoff = retryBackoff.backoff();
                Sleeper sleeper = Sleeper.DEFAULT;
                while (true) {
                    try {
                        description = this.kinesis.describeStream(streamName, lastShardId).getStreamDescription();
                    }
                    catch (LimitExceededException exc) {
                        if (BackOffUtils.next((Sleeper)sleeper, (BackOff)backoff)) continue;
                        throw exc;
                    }
                    break;
                }
                shards.addAll(description.getShards());
                lastShardId = ((Shard)shards.get(shards.size() - 1)).getShardId();
            } while (description.getHasMoreShards().booleanValue());
            return shards;
        });
    }

    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName, String shardId) throws TransientKinesisException {
        return this.getRecords(shardIterator, streamName, shardId, this.limit);
    }

    public GetKinesisRecordsResult getRecords(String shardIterator, String streamName, String shardId, Integer limit) throws TransientKinesisException {
        return this.wrapExceptions(() -> {
            GetRecordsResult response = this.kinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(limit));
            return new GetKinesisRecordsResult(UserRecord.deaggregate((List)response.getRecords()), response.getNextShardIterator(), response.getMillisBehindLatest(), streamName, shardId);
        });
    }

    public long getBacklogBytes(String streamName, Instant countSince) throws TransientKinesisException {
        return this.getBacklogBytes(streamName, countSince, new Instant());
    }

    public long getBacklogBytes(String streamName, Instant countSince, Instant countTo) throws TransientKinesisException {
        return this.wrapExceptions(() -> {
            Minutes period = Minutes.minutesBetween((ReadableInstant)countSince, (ReadableInstant)countTo);
            if (period.isLessThan(Minutes.ONE)) {
                return 0L;
            }
            GetMetricStatisticsRequest request = this.createMetricStatisticsRequest(streamName, countSince, countTo, period);
            long totalSizeInBytes = 0L;
            GetMetricStatisticsResult result = this.cloudWatch.getMetricStatistics(request);
            for (Datapoint point : result.getDatapoints()) {
                totalSizeInBytes += point.getSum().longValue();
            }
            return totalSizeInBytes;
        });
    }

    GetMetricStatisticsRequest createMetricStatisticsRequest(String streamName, Instant countSince, Instant countTo, Minutes period) {
        return new GetMetricStatisticsRequest().withNamespace(KINESIS_NAMESPACE).withMetricName(INCOMING_RECORDS_METRIC).withPeriod(Integer.valueOf(period.getMinutes() * 60)).withStartTime(countSince.toDate()).withEndTime(countTo.toDate()).withStatistics(Collections.singletonList(SUM_STATISTIC)).withDimensions(Collections.singletonList(new Dimension().withName(STREAM_NAME_DIMENSION).withValue(streamName)));
    }

    private <T> T wrapExceptions(Callable<T> callable) throws TransientKinesisException {
        try {
            return callable.call();
        }
        catch (ExpiredIteratorException e) {
            throw e;
        }
        catch (LimitExceededException | ProvisionedThroughputExceededException e) {
            throw new KinesisClientThrottledException("Too many requests to Kinesis. Wait some time and retry.", (AmazonClientException)e);
        }
        catch (AmazonServiceException e) {
            if (e.getErrorType() == AmazonServiceException.ErrorType.Service) {
                throw new TransientKinesisException("Kinesis backend failed. Wait some time and retry.", (AmazonClientException)((Object)e));
            }
            throw new RuntimeException("Kinesis client side failure", e);
        }
        catch (AmazonClientException e) {
            if (e.isRetryable()) {
                throw new TransientKinesisException("Retryable client failure", e);
            }
            throw new RuntimeException("Not retryable client failure", e);
        }
        catch (Exception e) {
            throw new RuntimeException("Unknown kinesis failure, when trying to reach kinesis", e);
        }
    }
}

