/*
 * Decompiled with CFR 0.152.
 */
package reactor.kafka.sender.internals;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaOutbound;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import reactor.kafka.sender.TransactionManager;
import reactor.kafka.sender.internals.DefaultKafkaOutbound;
import reactor.kafka.sender.internals.DefaultTransactionManager;
import reactor.kafka.sender.internals.ProducerFactory;
import reactor.kafka.sender.internals.SendSubscriber;

public class DefaultKafkaSender<K, V>
implements KafkaSender<K, V>,
Sinks.EmitFailureHandler {
    static final Logger log = LoggerFactory.getLogger((String)DefaultKafkaSender.class.getName());
    private static final Set<String> DELEGATE_METHODS = new HashSet<String>(Arrays.asList("sendOffsetsToTransaction", "partitionsFor", "metrics", "flush"));
    private final Scheduler scheduler = Schedulers.newSingle((ThreadFactory)new ThreadFactory(){

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("reactor-kafka-sender-" + System.identityHashCode(this));
            return thread;
        }
    });
    private final Mono<Producer<K, V>> producerMono;
    private final AtomicBoolean hasProducer = new AtomicBoolean();
    final SenderOptions<K, V> senderOptions;
    private final TransactionManager transactionManager;
    private Producer<K, V> producerProxy;

    public DefaultKafkaSender(ProducerFactory producerFactory, SenderOptions<K, V> options) {
        this.senderOptions = options.scheduler(options.isTransactional() ? Schedulers.newSingle((String)options.transactionalId()) : options.scheduler());
        this.producerMono = (Mono)Mono.fromCallable(() -> {
            Producer<K, V> producer = producerFactory.createProducer(this.senderOptions);
            if (this.senderOptions.isTransactional()) {
                log.info("Initializing transactions for producer {}", (Object)this.senderOptions.transactionalId());
                producer.initTransactions();
            }
            this.hasProducer.set(true);
            return producer;
        }).publishOn(this.senderOptions.isTransactional() ? this.scheduler : this.senderOptions.scheduler()).cache().as(flux -> this.senderOptions.isTransactional() ? flux.publishOn(this.senderOptions.isTransactional() ? this.scheduler : this.senderOptions.scheduler()) : flux);
        this.transactionManager = this.senderOptions.isTransactional() ? new DefaultTransactionManager<K, V>(this.producerMono, this.senderOptions) : null;
    }

    @Override
    public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>> records) {
        return this.doSend(records);
    }

    <T> Flux<SenderResult<T>> doSend(Publisher<? extends ProducerRecord<K, V>> records) {
        return this.producerMono.flatMapMany(producer -> (Publisher)Flux.from((Publisher)records).publishOn(this.scheduler).as(flux -> new FluxOperator<ProducerRecord<K, V>, SenderResult<T>>((Flux)flux, (Producer)producer){
            final /* synthetic */ Producer val$producer;
            {
                this.val$producer = producer;
                super(x0);
            }

            public void subscribe(CoreSubscriber<? super SenderResult<T>> s) {
                this.source.subscribe(new SendSubscriber(DefaultKafkaSender.this.senderOptions, this.val$producer, s));
            }
        })).doOnError(e -> log.trace("Send failed with exception", e)).publishOn(this.senderOptions.scheduler(), this.senderOptions.maxInFlight());
    }

    @Override
    public KafkaOutbound<K, V> createOutbound() {
        return new DefaultKafkaOutbound(this);
    }

    @Override
    public <T> Flux<Flux<SenderResult<T>>> sendTransactionally(Publisher<? extends Publisher<? extends SenderRecord<K, V, T>>> transactionRecords) {
        Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer();
        return Flux.from(transactionRecords).publishOn(this.senderOptions.scheduler(), false, 1).concatMapDelayError(records -> this.transaction((Publisher)records, (Sinks.Many<Object>)sink), false, 1).window((Publisher)sink.asFlux()).doOnTerminate(() -> sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST)).doOnCancel(() -> sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST));
    }

    @Override
    public TransactionManager transactionManager() {
        if (this.transactionManager == null) {
            throw new IllegalStateException("Transactions are not enabled");
        }
        return this.transactionManager;
    }

    @Override
    public <T> Mono<T> doOnProducer(Function<Producer<K, V>, ? extends T> function) {
        return this.producerMono.map(producer -> function.apply(this.producerProxy((Producer<K, V>)producer)));
    }

    @Override
    public void close() {
        if (!this.hasProducer.getAndSet(false)) {
            return;
        }
        this.producerMono.doOnNext(producer -> producer.close(this.senderOptions.closeTimeout())).block();
        if (this.senderOptions.isTransactional()) {
            this.senderOptions.scheduler().dispose();
        }
        this.scheduler.dispose();
    }

    private <T> Flux<SenderResult<T>> transaction(Publisher<? extends SenderRecord<K, V, T>> transactionRecords, Sinks.Many<Object> transactionBoundary) {
        return this.transactionManager().begin().thenMany(this.send(transactionRecords)).concatWith(this.transactionManager().commit()).concatWith((Publisher)Mono.fromRunnable(() -> transactionBoundary.emitNext((Object)this, (Sinks.EmitFailureHandler)this))).onErrorResume(e -> this.transactionManager().abort().then(Mono.error((Throwable)e))).publishOn(this.senderOptions.scheduler());
    }

    private synchronized Producer<K, V> producerProxy(Producer<K, V> producer) {
        if (this.producerProxy == null) {
            Class[] interfaces = new Class[]{Producer.class};
            InvocationHandler handler = (proxy, method, args) -> {
                if (DELEGATE_METHODS.contains(method.getName())) {
                    try {
                        return method.invoke((Object)producer, args);
                    }
                    catch (InvocationTargetException e) {
                        throw e.getCause();
                    }
                }
                throw new UnsupportedOperationException("Method is not supported: " + method);
            };
            this.producerProxy = (Producer)Proxy.newProxyInstance(Producer.class.getClassLoader(), interfaces, handler);
        }
        return this.producerProxy;
    }

    public boolean onEmitFailure(SignalType signalType, Sinks.EmitResult emitResult) {
        return this.hasProducer.get();
    }
}

