/*
 * Decompiled with CFR 0.152.
 */
package fs2.aws.kinesis;

import cats.Applicative;
import cats.ApplicativeError;
import cats.Functor;
import cats.effect.Blocker;
import cats.effect.Blocker$;
import cats.effect.Bracket;
import cats.effect.Concurrent;
import cats.effect.ConcurrentEffect;
import cats.effect.ConcurrentEffect$;
import cats.effect.ContextShift;
import cats.effect.Sync;
import cats.effect.Sync$;
import cats.effect.Timer;
import cats.implicits$;
import eu.timepit.refined.api.RefType$;
import eu.timepit.refined.api.Refined;
import eu.timepit.refined.auto$;
import fs2.Chunk;
import fs2.Stream;
import fs2.Stream$;
import fs2.aws.core.package$;
import fs2.aws.kinesis.ChunkedRecordProcessor;
import fs2.aws.kinesis.CommittableRecord;
import fs2.aws.kinesis.CommittableRecord$;
import fs2.aws.kinesis.KinesisCheckpointSettings;
import fs2.aws.kinesis.KinesisCheckpointSettings$;
import fs2.aws.kinesis.KinesisConsumerSettings;
import fs2.aws.kinesis.KinesisConsumerSettings$;
import fs2.aws.kinesis.Polling$;
import fs2.aws.kinesis.RetrievalMode;
import fs2.concurrent.Queue;
import fs2.concurrent.Queue$;
import fs2.internal.FreeC;
import java.io.Serializable;
import java.util.Date;
import java.util.UUID;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.PartialFunction;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.client.builder.SdkAsyncClientBuilder;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClientBuilder;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClientBuilder;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.awssdk.utils.builder.SdkBuilder;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.retrieval.RetrievalSpecificConfig;
import software.amazon.kinesis.retrieval.polling.PollingConfig;

public final class consumer$ {
    public static final consumer$ MODULE$ = new consumer$();

    public KinesisAsyncClient mkDefaultKinesisClient(KinesisConsumerSettings settings) {
        KinesisAsyncClientBuilder builder = KinesisAsyncClient.builder();
        settings.endpoint().foreach((Function1 & Serializable)x$1 -> (KinesisAsyncClientBuilder)builder.endpointOverride(x$1));
        return (KinesisAsyncClient)((SdkBuilder)((SdkAsyncClientBuilder)builder.region(settings.region()).credentialsProvider((AwsCredentialsProvider)settings.stsAssumeRole().map((Function1 & Serializable)stsSettings -> {
            AssumeRoleRequest.Builder assumeRoleRequest = AssumeRoleRequest.builder().roleArn(stsSettings.roleArn()).roleSessionName(stsSettings.roleSessionName());
            stsSettings.externalId().foreach((Function1 & Serializable)x$1 -> assumeRoleRequest.externalId(x$1));
            stsSettings.durationSeconds().foreach((Function1 & Serializable)d -> assumeRoleRequest.durationSeconds(Predef$.MODULE$.int2Integer(BoxesRunTime.unboxToInt((Object)d))));
            return ((StsAssumeRoleCredentialsProvider.Builder)StsAssumeRoleCredentialsProvider.builder().stsClient((StsClient)StsClient.builder().build())).refreshRequest((AssumeRoleRequest)assumeRoleRequest.build()).build();
        }).getOrElse((Function0 & Serializable)() -> DefaultCredentialsProvider.create()))).httpClientBuilder((SdkAsyncHttpClient.Builder)NettyNioAsyncHttpClient.builder().maxConcurrency(Predef$.MODULE$.int2Integer(BoxesRunTime.unboxToInt((Object)settings.maxConcurrency()))))).build();
    }

    private Scheduler defaultScheduler(ShardRecordProcessorFactory recordProcessorFactory, KinesisConsumerSettings settings, KinesisAsyncClient kinesisClient) {
        InitialPositionInStreamExtended initialPositionInStreamExtended;
        DynamoDbAsyncClientBuilder dynamoClientBuilder = DynamoDbAsyncClient.builder();
        settings.endpoint().foreach((Function1 & Serializable)x$1 -> (DynamoDbAsyncClientBuilder)dynamoClientBuilder.endpointOverride(x$1));
        DynamoDbAsyncClient dynamoClient = (DynamoDbAsyncClient)dynamoClientBuilder.region(settings.region()).build();
        CloudWatchAsyncClientBuilder cloudWatchClientBuilder = CloudWatchAsyncClient.builder();
        settings.endpoint().foreach((Function1 & Serializable)x$1 -> (CloudWatchAsyncClientBuilder)cloudWatchClientBuilder.endpointOverride(x$1));
        CloudWatchAsyncClient cloudWatchClient = (CloudWatchAsyncClient)cloudWatchClientBuilder.region(settings.region()).build();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(settings.streamName(), settings.appName(), kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), recordProcessorFactory);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        RetrievalMode retrievalMode = settings.retrievalMode();
        Object object = Polling$.MODULE$.equals(retrievalMode) ? retrievalConfig.retrievalSpecificConfig((RetrievalSpecificConfig)new PollingConfig(settings.streamName(), kinesisClient)) : BoxedUnit.UNIT;
        Either<InitialPositionInStream, Date> either = settings.initialPositionInStream();
        if (either instanceof Left) {
            Left left = (Left)either;
            InitialPositionInStream position = (InitialPositionInStream)left.value();
            initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream)position);
        } else if (either instanceof Right) {
            Right right = (Right)either;
            Date date = (Date)right.value();
            initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPositionAtTimestamp((Date)date);
        } else {
            throw new MatchError(either);
        }
        retrievalConfig.initialPositionInStreamExtended(initialPositionInStreamExtended);
        return new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(String appName, String streamName, ConcurrentEffect<F> evidence$1, ContextShift<F> evidence$2) {
        return this.readFromKinesisStream(KinesisConsumerSettings$.MODULE$.apply(streamName, appName, KinesisConsumerSettings$.MODULE$.apply$default$3(), KinesisConsumerSettings$.MODULE$.apply$default$4(), KinesisConsumerSettings$.MODULE$.apply$default$5(), KinesisConsumerSettings$.MODULE$.apply$default$6(), KinesisConsumerSettings$.MODULE$.apply$default$7(), KinesisConsumerSettings$.MODULE$.apply$default$8(), KinesisConsumerSettings$.MODULE$.apply$default$9()), evidence$1, evidence$2);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(KinesisConsumerSettings consumerConfig, ConcurrentEffect<F> evidence$3, ContextShift<F> evidence$4) {
        return this.readFromKinesisStream(consumerConfig, (Function1<ShardRecordProcessorFactory, Scheduler>)(Function1 & Serializable)x$1 -> MODULE$.defaultScheduler((ShardRecordProcessorFactory)x$1, consumerConfig, MODULE$.mkDefaultKinesisClient(consumerConfig)), evidence$3, evidence$4);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(String appName, String streamName, KinesisAsyncClient kinesisClient, ConcurrentEffect<F> evidence$5, ContextShift<F> evidence$6) {
        return this.readFromKinesisStream(KinesisConsumerSettings$.MODULE$.apply(streamName, appName, KinesisConsumerSettings$.MODULE$.apply$default$3(), KinesisConsumerSettings$.MODULE$.apply$default$4(), KinesisConsumerSettings$.MODULE$.apply$default$5(), KinesisConsumerSettings$.MODULE$.apply$default$6(), KinesisConsumerSettings$.MODULE$.apply$default$7(), KinesisConsumerSettings$.MODULE$.apply$default$8(), KinesisConsumerSettings$.MODULE$.apply$default$9()), kinesisClient, evidence$5, evidence$6);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(KinesisConsumerSettings consumerConfig, KinesisAsyncClient kinesisClient, ConcurrentEffect<F> evidence$7, ContextShift<F> evidence$8) {
        return this.readFromKinesisStream(consumerConfig, (Function1<ShardRecordProcessorFactory, Scheduler>)(Function1 & Serializable)x$2 -> MODULE$.defaultScheduler((ShardRecordProcessorFactory)x$2, consumerConfig, kinesisClient), evidence$7, evidence$8);
    }

    public <F> FreeC<F, Chunk<CommittableRecord>, BoxedUnit> readChunkedFromKinesisStream(KinesisConsumerSettings consumerConfig, ConcurrentEffect<F> evidence$9, ContextShift<F> evidence$10) {
        return this.readChunksFromKinesisStream(consumerConfig, (Function1<ShardRecordProcessorFactory, Scheduler>)(Function1 & Serializable)x$3 -> MODULE$.defaultScheduler((ShardRecordProcessorFactory)x$3, consumerConfig, MODULE$.mkDefaultKinesisClient(consumerConfig)), evidence$9, evidence$10);
    }

    public <F> FreeC<F, Chunk<CommittableRecord>, BoxedUnit> readChunkedFromKinesisStream(KinesisConsumerSettings consumerConfig, KinesisAsyncClient kinesisClient, ConcurrentEffect<F> evidence$11, ContextShift<F> evidence$12) {
        return this.readChunksFromKinesisStream(consumerConfig, (Function1<ShardRecordProcessorFactory, Scheduler>)(Function1 & Serializable)x$4 -> MODULE$.defaultScheduler((ShardRecordProcessorFactory)x$4, consumerConfig, kinesisClient), evidence$11, evidence$12);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(KinesisConsumerSettings streamConfig, Function1<ShardRecordProcessorFactory, Scheduler> schedulerFactory, ConcurrentEffect<F> evidence$13, ContextShift<F> evidence$14) {
        return Stream$.MODULE$.flatMap$extension(this.readChunksFromKinesisStream(streamConfig, schedulerFactory, evidence$13, evidence$14), (Function1 & Serializable)os -> new Stream(Stream$.MODULE$.chunk(os)));
    }

    public <F> FreeC<F, Chunk<CommittableRecord>, BoxedUnit> readChunksFromKinesisStream(KinesisConsumerSettings streamConfig, Function1<ShardRecordProcessorFactory, Scheduler> schedulerFactory, ConcurrentEffect<F> evidence$15, ContextShift<F> evidence$16) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Queue$.MODULE$.bounded(BoxesRunTime.unboxToInt((Object)auto$.MODULE$.autoUnwrap((Object)new Refined((Object)streamConfig.bufferSize()), RefType$.MODULE$.refinedRefType())), evidence$15)), (Function1 & Serializable)buffer -> new Stream(Stream$.MODULE$.flatMap$extension(consumer$.instantiateWorker$1(buffer, schedulerFactory, evidence$15), (Function1 & Serializable)scheduler -> new Stream(Stream$.MODULE$.onFinalize$extension(Stream$.MODULE$.concurrently$extension(buffer.dequeue(), Stream$.MODULE$.eval(Blocker$.MODULE$.apply((Sync)evidence$15$1).use((Function1 & Serializable)blocker -> Blocker$.MODULE$.delay$extension(((Blocker)blocker).blockingContext(), (Function0)(JFunction0.mcV.sp & Serializable)() -> scheduler.run(), (Sync)evidence$15$1, evidence$16$1), (Bracket)evidence$15$1)), (Concurrent)evidence$15$1), Sync$.MODULE$.apply((Sync)evidence$15$1).delay((Function0)(JFunction0.mcV.sp & Serializable)() -> scheduler.shutdown()), (Applicative)evidence$15$1)))));
    }

    public <F> Function1<Stream<F, CommittableRecord>, Stream<F, KinesisClientRecord>> checkpointRecords(KinesisCheckpointSettings checkpointSettings, ConcurrentEffect<F> evidence$17, Timer<F> evidence$18) {
        return (Function1 & Serializable)x$7 -> new Stream(Stream$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.map$extension(Stream$.MODULE$.through$extension(((Stream)x$7).fs2$Stream$$free(), package$.MODULE$.groupBy((Function1 & Serializable)r -> Sync$.MODULE$.apply((Sync)evidence$17$1).pure((Object)r.shardId()), (Concurrent)evidence$17)), (Function1 & Serializable)x0$1 -> new Stream(consumer$.$anonfun$checkpointRecords$7(checkpointSettings$2, evidence$17$1, evidence$18$1, x0$1))), (.less.colon.less)$less$colon$less$.MODULE$.refl(), (.less.colon.less)$less$colon$less$.MODULE$.refl(), (Concurrent)evidence$17));
    }

    public <F> KinesisCheckpointSettings checkpointRecords$default$1() {
        return KinesisCheckpointSettings$.MODULE$.defaultInstance();
    }

    public <F> Function1<Stream<F, CommittableRecord>, Stream<F, BoxedUnit>> checkpointRecords_(KinesisCheckpointSettings checkpointSettings, ConcurrentEffect<F> F, Timer<F> timer) {
        return (Function1 & Serializable)x$8 -> new Stream(consumer$.$anonfun$checkpointRecords_$1(checkpointSettings, F, timer, ((Stream)x$8).fs2$Stream$$free()));
    }

    public <F> KinesisCheckpointSettings checkpointRecords_$default$1() {
        return KinesisCheckpointSettings$.MODULE$.defaultInstance();
    }

    public static final /* synthetic */ void $anonfun$readChunksFromKinesisStream$2(ConcurrentEffect evidence$15$1, Queue queue$1, Chunk records) {
        ConcurrentEffect$.MODULE$.apply(evidence$15$1).toIO(queue$1.enqueue1((Object)records)).unsafeRunSync();
    }

    private static final FreeC instantiateWorker$1(Queue queue, Function1 schedulerFactory$1, ConcurrentEffect evidence$15$1) {
        return Stream$.MODULE$.emit(schedulerFactory$1.apply(() -> new ChunkedRecordProcessor((Function1<Chunk<CommittableRecord>, BoxedUnit>)(Function1 & Serializable)records -> {
            consumer$.$anonfun$readChunksFromKinesisStream$2(evidence$15$1, queue, records);
            return BoxedUnit.UNIT;
        })));
    }

    private static final Function1 checkpoint$1(KinesisCheckpointSettings checkpointSettings, Timer evidence$18$1, ConcurrentEffect evidence$17$1) {
        return (Function1 & Serializable)x$5 -> new Stream(Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.collect$extension(Stream$.MODULE$.groupWithin$extension(((Stream)x$5).fs2$Stream$$free(), checkpointSettings.maxBatchSize(), checkpointSettings.maxBatchWait(), evidence$18$1, (Concurrent)evidence$17$1), (PartialFunction)new Serializable(){
            private static final long serialVersionUID = 0L;

            public final <A1 extends Chunk<CommittableRecord>, B1> B1 applyOrElse(A1 x1, Function1<A1, B1> function1) {
                A1 A1 = x1;
                Object object = A1.size() > 0 ? A1.toList().max(CommittableRecord$.MODULE$.orderBySequenceNumber()) : function1.apply(x1);
                return (B1)object;
            }

            public final boolean isDefinedAt(Chunk<CommittableRecord> x1) {
                Chunk<CommittableRecord> chunk = x1;
                boolean bl = chunk.size() > 0;
                return bl;
            }
        }), (Function1 & Serializable)cr -> new Stream(Stream$.MODULE$.eval_(implicits$.MODULE$.toFunctorOps(cr.checkpoint(evidence$17$1), (Functor)evidence$17$1).as((Object)cr.record())))));
    }

    private static final Function1 bypass$1() {
        return (Function1 & Serializable)x$6 -> new Stream(Stream$.MODULE$.map$extension(((Stream)x$6).fs2$Stream$$free(), (Function1 & Serializable)r -> r.record()));
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords$7(KinesisCheckpointSettings checkpointSettings$2, ConcurrentEffect evidence$17$1, Timer evidence$18$1, Tuple2 x0$1) {
        Tuple2 tuple2 = x0$1;
        if (tuple2 == null) {
            throw new MatchError((Object)tuple2);
        }
        FreeC st = ((Stream)tuple2._2()).fs2$Stream$$free();
        FreeC freeC = Stream$.MODULE$.broadcastThrough$extension(st, (Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Function1[]{consumer$.checkpoint$1(checkpointSettings$2, evidence$18$1, evidence$17$1), consumer$.bypass$1()}), (Concurrent)evidence$17$1);
        return freeC;
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords_$1(KinesisCheckpointSettings checkpointSettings$3, ConcurrentEffect F$1, Timer timer$1, FreeC x$8) {
        return ((Stream)implicits$.MODULE$.toFunctorOps((Object)new Stream(Stream$.MODULE$.through$extension(x$8, MODULE$.checkpointRecords(checkpointSettings$3, F$1, timer$1))), (Functor)Stream$.MODULE$.monadErrorInstance((ApplicativeError)F$1)).void()).fs2$Stream$$free();
    }

    private consumer$() {
    }
}

