/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.plugins;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.plugins.RSocketInterceptor;
import io.rsocket.util.RSocketProxy;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

public class LimitRateInterceptor
implements RSocketInterceptor {
    private final int highTide;
    private final int lowTide;
    private final boolean requesterProxy;

    private LimitRateInterceptor(int highTide, int lowTide, boolean requesterProxy) {
        this.highTide = highTide;
        this.lowTide = lowTide;
        this.requesterProxy = requesterProxy;
    }

    @Override
    public RSocket apply(RSocket socket) {
        return this.requesterProxy ? new RequesterProxy(socket) : new ResponderProxy(socket);
    }

    public static LimitRateInterceptor forResponder(int prefetchRate) {
        return LimitRateInterceptor.forResponder(prefetchRate, prefetchRate);
    }

    public static LimitRateInterceptor forResponder(int highTide, int lowTide) {
        return new LimitRateInterceptor(highTide, lowTide, false);
    }

    public static LimitRateInterceptor forRequester(int prefetchRate) {
        return LimitRateInterceptor.forRequester(prefetchRate, prefetchRate);
    }

    public static LimitRateInterceptor forRequester(int highTide, int lowTide) {
        return new LimitRateInterceptor(highTide, lowTide, true);
    }

    private class RequesterProxy
    extends RSocketProxy {
        RequesterProxy(RSocket source) {
            super(source);
        }

        @Override
        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return super.requestChannel((Publisher<Payload>)Flux.from(payloads).limitRate(LimitRateInterceptor.this.highTide, LimitRateInterceptor.this.lowTide));
        }
    }

    private class ResponderProxy
    extends RSocketProxy {
        ResponderProxy(RSocket source) {
            super(source);
        }

        @Override
        public Flux<Payload> requestStream(Payload payload) {
            return super.requestStream(payload).limitRate(LimitRateInterceptor.this.highTide, LimitRateInterceptor.this.lowTide);
        }

        @Override
        public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
            return super.requestChannel(payloads).limitRate(LimitRateInterceptor.this.highTide, LimitRateInterceptor.this.lowTide);
        }
    }
}

