/*
 * Decompiled with CFR 0.152.
 */
package ratpack.stream.internal;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.reactivestreams.Subscriber;
import ratpack.exec.Promise;
import ratpack.func.Function;
import ratpack.stream.TransformablePublisher;
import ratpack.stream.YieldRequest;
import ratpack.stream.internal.DefaultYieldRequest;
import ratpack.stream.internal.SubscriptionSupport;

public class FlatYieldingPublisher<T>
implements TransformablePublisher<T> {
    private final Function<? super YieldRequest, ? extends Promise<? extends T>> producer;
    private final AtomicLong subscriptionCounter = new AtomicLong();

    public FlatYieldingPublisher(Function<? super YieldRequest, ? extends Promise<? extends T>> producer) {
        this.producer = producer;
    }

    public void subscribe(Subscriber<? super T> subscriber) {
        new Subscription(subscriber);
    }

    private class Subscription
    extends SubscriptionSupport<T> {
        private final long subscriptionNum;
        private final AtomicInteger requestCounter;
        private final AtomicLong waiting;
        private final AtomicBoolean draining;

        public Subscription(Subscriber<? super T> subscriber) {
            super(subscriber);
            this.subscriptionNum = FlatYieldingPublisher.this.subscriptionCounter.getAndIncrement();
            this.requestCounter = new AtomicInteger();
            this.waiting = new AtomicLong();
            this.draining = new AtomicBoolean();
            this.start();
        }

        @Override
        protected void doRequest(long n) {
            this.waiting.addAndGet(n);
            this.drain();
        }

        private void drain() {
            if (!this.isStopped() && this.draining.compareAndSet(false, true)) {
                long l = this.waiting.getAndDecrement();
                if (l > 0L) {
                    try {
                        Promise promise = (Promise)FlatYieldingPublisher.this.producer.apply(new DefaultYieldRequest(this.subscriptionNum, this.requestCounter.getAndIncrement()));
                        promise.wiretap(r -> this.draining.set(false)).onError(this::onError).then(t -> {
                            if (t == null) {
                                this.onComplete();
                            } else {
                                this.onNext(t);
                                this.drain();
                            }
                        });
                    }
                    catch (Throwable e) {
                        this.draining.set(false);
                        this.onError(e);
                        return;
                    }
                } else {
                    this.waiting.addAndGet(1L);
                    this.draining.set(false);
                }
                if (this.waiting.get() > 0L) {
                    this.drain();
                }
            }
        }
    }
}

