/*
 * Decompiled with CFR 0.152.
 */
package io.rsocket.broker.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.broker.frames.Address;
import io.rsocket.broker.rsocket.RSocketLocator;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RoutingRSocket
implements RSocket {
    private final RSocketLocator rSocketLocator;
    private final Function<Payload, Address> addressExtractor;

    public RoutingRSocket(RSocketLocator rSocketLocator, Function<Payload, Address> addressExtractor) {
        this.rSocketLocator = rSocketLocator;
        this.addressExtractor = addressExtractor;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        try {
            RSocket located = this.locate(payload);
            return located.fireAndForget(payload);
        }
        catch (Throwable t) {
            payload.release();
            return Mono.error((Throwable)this.handleError(t));
        }
    }

    public Mono<Payload> requestResponse(Payload payload) {
        try {
            RSocket located = this.locate(payload);
            return located.requestResponse(payload);
        }
        catch (Throwable t) {
            payload.release();
            return Mono.error((Throwable)this.handleError(t));
        }
    }

    public Mono<Void> metadataPush(Payload payload) {
        try {
            RSocket located = this.locate(payload);
            return located.metadataPush(payload);
        }
        catch (Throwable t) {
            payload.release();
            return Mono.error((Throwable)this.handleError(t));
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            RSocket located = this.locate(payload);
            return located.requestStream(payload);
        }
        catch (Throwable t) {
            payload.release();
            return Flux.error((Throwable)this.handleError(t));
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return Flux.from(payloads).switchOnFirst((first, flux) -> {
            if (first.hasValue()) {
                Payload payload = (Payload)first.get();
                try {
                    RSocket located = this.locate(payload);
                    return located.requestChannel((Publisher)flux);
                }
                catch (Throwable t) {
                    payload.release();
                    return Flux.error((Throwable)this.handleError(t));
                }
            }
            return flux;
        });
    }

    private RSocket locate(Payload payload) {
        Address address = this.addressExtractor.apply(payload);
        if (!this.rSocketLocator.supports(address.getRoutingType())) {
            throw new IllegalStateException("No RSocketLocator for RoutingType " + address.getRoutingType());
        }
        return this.rSocketLocator.locate(address);
    }

    private Throwable handleError(Throwable t) {
        return t;
    }
}

