/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.receiver.internals;

import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.common.TopicPartition;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.receiver.internals.AckMode;
import reactor.kafka.receiver.internals.CommittableBatch;
import reactor.kafka.receiver.internals.ConsumerFactory;
import reactor.kafka.receiver.internals.ConsumerHandler;
import reactor.kafka.sender.TransactionManager;

public class DefaultKafkaReceiver<K, V>
implements KafkaReceiver<K, V> {
    private static final Logger log = LoggerFactory.getLogger(DefaultKafkaReceiver.class);
    private final ConsumerFactory consumerFactory;
    private final ReceiverOptions<K, V> receiverOptions;
    Predicate<Throwable> isRetriableException = RetriableCommitFailedException.class::isInstance;
    ConsumerHandler<K, V> consumerHandler;

    public DefaultKafkaReceiver(ConsumerFactory consumerFactory, ReceiverOptions<K, V> receiverOptions) {
        this.consumerFactory = consumerFactory;
        this.receiverOptions = receiverOptions;
    }

    @Override
    public Flux<ReceiverRecord<K, V>> receive(Integer prefetch) {
        return this.withHandler(AckMode.MANUAL_ACK, (scheduler, handler) -> handler.receive().publishOn(scheduler, this.preparePublishOnQueueSize(prefetch)).flatMapIterable(it -> it).map(record -> new ReceiverRecord(record, handler.toCommittableOffset(record))));
    }

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
        return this.withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler.receive().filter(it -> !it.isEmpty()).publishOn(scheduler, this.preparePublishOnQueueSize(prefetch)).map(consumerRecords -> Flux.fromIterable((Iterable)consumerRecords).doAfterTerminate(() -> {
            for (ConsumerRecord r : consumerRecords) {
                handler.acknowledge(r);
            }
        })));
    }

    @Override
    public Flux<ConsumerRecord<K, V>> receiveAtmostOnce(Integer prefetch) {
        return this.withHandler(AckMode.ATMOST_ONCE, (scheduler, handler) -> handler.receive().concatMap(records -> Flux.fromIterable((Iterable)records).concatMap(r -> handler.commit(r).thenReturn(r)).publishOn(scheduler, 1), this.preparePublishOnQueueSize(prefetch)));
    }

    @Override
    public Flux<Flux<ConsumerRecord<K, V>>> receiveExactlyOnce(TransactionManager transactionManager, Integer prefetch) {
        return this.withHandler(AckMode.EXACTLY_ONCE, (scheduler, handler) -> {
            Flux resultFlux = handler.receive().filter(it -> !it.isEmpty()).map(consumerRecords -> {
                CommittableBatch offsetBatch = new CommittableBatch();
                for (ConsumerRecord r : consumerRecords) {
                    offsetBatch.updateOffset(new TopicPartition(r.topic(), r.partition()), r.offset());
                }
                return transactionManager.begin().thenMany((Publisher)Flux.defer(() -> {
                    handler.awaitingTransaction.getAndSet(true);
                    return Flux.fromIterable((Iterable)consumerRecords);
                })).concatWith(transactionManager.sendOffsets(offsetBatch.getAndClearOffsets().offsets(), handler.consumer.groupMetadata())).doAfterTerminate(() -> handler.awaitingTransaction.set(false));
            });
            return resultFlux.publishOn(transactionManager.scheduler(), this.preparePublishOnQueueSize(prefetch));
        });
    }

    @Override
    public <T> Mono<T> doOnConsumer(Function<Consumer<K, V>, ? extends T> function) {
        if (this.consumerHandler == null) {
            return Mono.error((Throwable)new IllegalStateException("You must call one of receive*() methods before using doOnConsumer"));
        }
        return this.consumerHandler.doOnConsumer(function);
    }

    private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerHandler<K, V>, Flux<T>> function) {
        return Flux.usingWhen((Publisher)Mono.fromCallable(() -> {
            this.consumerHandler = new ConsumerHandler<K, V>(this.receiverOptions, this.consumerFactory.createConsumer(this.receiverOptions), e -> this.isRetriableException.test((Throwable)e), ackMode);
            return this.consumerHandler;
        }), handler -> Flux.using(() -> Schedulers.single((Scheduler)this.receiverOptions.schedulerSupplier().get()), scheduler -> (Flux)function.apply((Scheduler)scheduler, (ConsumerHandler)handler), Scheduler::dispose), handler -> handler.close().doFinally(__ -> {
            this.consumerHandler = null;
        }));
    }

    private int preparePublishOnQueueSize(Integer prefetch) {
        return prefetch != null ? prefetch : 1;
    }
}

