/*
 * Decompiled with CFR 0.152.
 */
package org.jboss.resteasy.reactive.server.handlers;

import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.sse.OutboundSseEvent;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Flow;
import mutiny.zero.flow.adapters.AdaptersToFlow;
import org.jboss.logging.Logger;
import org.jboss.resteasy.reactive.RestMulti;
import org.jboss.resteasy.reactive.common.util.RestMediaType;
import org.jboss.resteasy.reactive.common.util.ServerMediaType;
import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext;
import org.jboss.resteasy.reactive.server.core.SseUtil;
import org.jboss.resteasy.reactive.server.core.StreamingUtil;
import org.jboss.resteasy.reactive.server.jaxrs.OutboundSseEventImpl;
import org.jboss.resteasy.reactive.server.jaxrs.SseEventSinkImpl;
import org.jboss.resteasy.reactive.server.spi.ServerRestHandler;
import org.jboss.resteasy.reactive.server.spi.StreamingResponse;
import org.reactivestreams.Publisher;

public class PublisherResponseHandler
implements ServerRestHandler {
    private static final String JSON = "json";
    private static final ServerMediaType REST_MULTI_DEFAULT_SERVER_MEDIA_TYPE = new ServerMediaType(List.of(MediaType.APPLICATION_OCTET_STREAM_TYPE), StandardCharsets.UTF_8.name(), false);
    private List<StreamingResponseCustomizer> streamingResponseCustomizers = Collections.emptyList();
    private static final Logger log = Logger.getLogger(PublisherResponseHandler.class);
    private static final ServerRestHandler[] AWOL = new ServerRestHandler[]{requestContext -> {
        throw new IllegalStateException("FAILURE: should never be restarted");
    }};

    public void setStreamingResponseCustomizers(List<StreamingResponseCustomizer> streamingResponseCustomizers) {
        this.streamingResponseCustomizers = streamingResponseCustomizers;
    }

    @Override
    public void handle(ResteasyReactiveRequestContext requestContext) throws Exception {
        Object requestContextResult = requestContext.getResult();
        if (requestContextResult instanceof Publisher) {
            requestContextResult = AdaptersToFlow.publisher((Publisher)((Publisher)requestContextResult));
        }
        if (requestContextResult instanceof Flow.Publisher) {
            MediaType[] mediaTypes;
            Flow.Publisher result = (Flow.Publisher)requestContextResult;
            ServerMediaType produces = requestContext.getTarget().getProduces();
            if (produces == null) {
                if (result instanceof RestMulti) {
                    produces = REST_MULTI_DEFAULT_SERVER_MEDIA_TYPE;
                } else {
                    throw new IllegalStateException("Negotiation or dynamic media type resolution for Multi is only supported when using 'org.jboss.resteasy.reactive.RestMulti'");
                }
            }
            if ((mediaTypes = produces.getSortedOriginalMediaTypes()).length != 1) {
                throw new IllegalStateException("Negotiation or dynamic media type resolution for Multi is only supported when using 'org.jboss.resteasy.reactive.RestMulti'");
            }
            MediaType mediaType = mediaTypes[0];
            requestContext.setResponseContentType(mediaType);
            requestContext.setGenericReturnType(requestContext.getTarget().getReturnType());
            if (mediaType.isCompatible(MediaType.SERVER_SENT_EVENTS_TYPE)) {
                this.handleSse(requestContext, result);
            } else {
                requestContext.suspend();
                boolean json = mediaType.toString().contains(JSON);
                if (this.requiresChunkedStream(mediaType)) {
                    this.handleChunkedStreaming(requestContext, result, json);
                } else {
                    this.handleStreaming(requestContext, result, json);
                }
            }
        }
    }

    private boolean requiresChunkedStream(MediaType mediaType) {
        return mediaType.isCompatible((MediaType)RestMediaType.APPLICATION_NDJSON_TYPE) || mediaType.isCompatible((MediaType)RestMediaType.APPLICATION_STREAM_JSON_TYPE);
    }

    private void handleChunkedStreaming(ResteasyReactiveRequestContext requestContext, Flow.Publisher<?> result, boolean json) {
        long demand = 1L;
        if (result instanceof RestMulti.SyncRestMulti) {
            RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti)result;
            demand = rest.getDemand();
        }
        result.subscribe(new StreamingMultiSubscriber(requestContext, this.streamingResponseCustomizers, result, json, demand, false));
    }

    private void handleStreaming(ResteasyReactiveRequestContext requestContext, Flow.Publisher<?> result, boolean json) {
        long demand = 1L;
        boolean encodeAsJsonArray = true;
        if (result instanceof RestMulti.SyncRestMulti) {
            RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti)result;
            demand = rest.getDemand();
            encodeAsJsonArray = rest.encodeAsJsonArray();
        }
        result.subscribe(new StreamingMultiSubscriber(requestContext, this.streamingResponseCustomizers, result, json, demand, encodeAsJsonArray));
    }

    private void handleSse(ResteasyReactiveRequestContext requestContext, Flow.Publisher<?> result) {
        long demand;
        if (result instanceof RestMulti.SyncRestMulti) {
            RestMulti.SyncRestMulti rest = (RestMulti.SyncRestMulti)result;
            demand = rest.getDemand();
        } else {
            demand = 1L;
        }
        SseUtil.setHeaders(requestContext, requestContext.serverResponse(), this.streamingResponseCustomizers);
        requestContext.suspend();
        requestContext.serverResponse().write(SseEventSinkImpl.EMPTY_BUFFER, throwable -> {
            if (throwable == null) {
                result.subscribe(new SseMultiSubscriber(requestContext, this.streamingResponseCustomizers, demand));
            } else {
                requestContext.resume((Throwable)throwable);
            }
        });
    }

    private static class StreamingMultiSubscriber
    extends AbstractMultiSubscriber {
        private static final String LINE_SEPARATOR = "\n";
        private final Flow.Publisher publisher;
        private final boolean json;
        private final boolean encodeAsJsonArray;
        private volatile String nextJsonPrefix;
        private volatile boolean hadItem;

        StreamingMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> staticCustomizers, Flow.Publisher publisher, boolean json, long demand, boolean encodeAsJsonArray) {
            super(requestContext, staticCustomizers, demand);
            this.publisher = publisher;
            this.json = json;
            this.encodeAsJsonArray = encodeAsJsonArray;
            this.nextJsonPrefix = encodeAsJsonArray ? "[" : null;
            this.hadItem = false;
        }

        @Override
        public void onNext(Object item) {
            List<StreamingResponseCustomizer> customizers = this.determineCustomizers(!this.hadItem);
            this.hadItem = true;
            StreamingUtil.send(this.requestContext, customizers, item, this.messagePrefix(), this.messageSuffix()).handle((v, t) -> {
                if (t != null) {
                    try {
                        this.subscription.cancel();
                    }
                    catch (Throwable t2) {
                        t2.printStackTrace();
                    }
                    this.handleException(this.requestContext, (Throwable)t);
                } else {
                    this.nextJsonPrefix = this.encodeAsJsonArray ? "," : null;
                    this.subscription.request(this.demand);
                }
                return null;
            });
        }

        private List<StreamingResponseCustomizer> determineCustomizers(boolean isFirst) {
            Flow.Publisher publisher;
            if (isFirst && (publisher = this.publisher) instanceof RestMulti) {
                RestMulti restMulti = (RestMulti)publisher;
                Map headers = restMulti.getHeaders();
                Integer status = restMulti.getStatus();
                if (headers.isEmpty() && status == null) {
                    return this.staticCustomizers;
                }
                ArrayList<StreamingResponseCustomizer> result = new ArrayList<StreamingResponseCustomizer>(this.staticCustomizers.size() + 2);
                result.addAll(this.staticCustomizers);
                if (!headers.isEmpty()) {
                    result.add(new StreamingResponseCustomizer.AddHeadersCustomizer(headers));
                }
                if (status != null) {
                    result.add(new StreamingResponseCustomizer.StatusCustomizer(status));
                }
                return result;
            }
            return this.staticCustomizers;
        }

        @Override
        public void onComplete() {
            if (!this.hadItem) {
                StreamingUtil.setHeaders(this.requestContext, this.requestContext.serverResponse(), this.determineCustomizers(true));
            }
            if (this.json) {
                String postfix = this.onCompleteText();
                if (postfix != null) {
                    byte[] postfixBytes = postfix.getBytes(StandardCharsets.US_ASCII);
                    this.requestContext.serverResponse().write(postfixBytes).handle((v, t) -> {
                        super.onComplete();
                        return null;
                    });
                } else {
                    super.onComplete();
                }
            } else {
                super.onComplete();
            }
        }

        protected String onCompleteText() {
            if (!this.encodeAsJsonArray) {
                return null;
            }
            String postfix = !this.hadItem ? "[]" : "]";
            return postfix;
        }

        protected String messagePrefix() {
            return this.json ? this.nextJsonPrefix : null;
        }

        protected String messageSuffix() {
            return !this.encodeAsJsonArray ? LINE_SEPARATOR : null;
        }
    }

    private static class SseMultiSubscriber
    extends AbstractMultiSubscriber {
        SseMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> staticCustomizers, long demand) {
            super(requestContext, staticCustomizers, demand);
        }

        @Override
        public void onNext(Object item) {
            Object event = item instanceof OutboundSseEvent ? (OutboundSseEvent)item : new OutboundSseEventImpl.BuilderImpl().data(item).build();
            SseUtil.send(this.requestContext, event, this.staticCustomizers).whenComplete((v, t) -> {
                if (t != null) {
                    this.subscription.cancel();
                    this.handleException(this.requestContext, (Throwable)t);
                } else {
                    this.subscription.request(this.demand);
                }
            });
        }
    }

    public static interface StreamingResponseCustomizer {
        public void customize(StreamingResponse<?> var1);

        public static class AddHeadersCustomizer
        implements StreamingResponseCustomizer {
            private Map<String, List<String>> headers;

            public AddHeadersCustomizer(Map<String, List<String>> headers) {
                this.headers = headers;
            }

            public AddHeadersCustomizer() {
            }

            public Map<String, List<String>> getHeaders() {
                return this.headers;
            }

            public void setHeaders(Map<String, List<String>> headers) {
                this.headers = headers;
            }

            @Override
            public void customize(StreamingResponse<?> streamingResponse) {
                for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) {
                    streamingResponse.setResponseHeader((CharSequence)entry.getKey(), (Iterable<CharSequence>)entry.getValue());
                }
            }
        }

        public static class StatusCustomizer
        implements StreamingResponseCustomizer {
            private int status;

            public StatusCustomizer(int status) {
                this.status = status;
            }

            public StatusCustomizer() {
            }

            public int getStatus() {
                return this.status;
            }

            public void setStatus(int status) {
                this.status = status;
            }

            @Override
            public void customize(StreamingResponse<?> streamingResponse) {
                streamingResponse.setStatusCode(this.status);
            }
        }
    }

    static abstract class AbstractMultiSubscriber
    implements Flow.Subscriber<Object> {
        protected final ResteasyReactiveRequestContext requestContext;
        protected final List<StreamingResponseCustomizer> staticCustomizers;
        protected final long demand;
        protected volatile Flow.Subscription subscription;
        private volatile boolean weClosed = false;

        AbstractMultiSubscriber(ResteasyReactiveRequestContext requestContext, List<StreamingResponseCustomizer> staticCustomizers, long demand) {
            this.requestContext = requestContext;
            this.staticCustomizers = staticCustomizers;
            this.demand = demand;
            requestContext.restart(AWOL, true);
            requestContext.serverResponse().addCloseHandler(() -> {
                if (!this.weClosed && this.subscription != null) {
                    this.subscription.cancel();
                }
            });
        }

        @Override
        public void onSubscribe(Flow.Subscription s) {
            this.subscription = s;
            s.request(this.demand);
        }

        @Override
        public void onComplete() {
            this.weClosed = true;
            this.requestContext.serverResponse().end();
            this.requestContext.close();
        }

        @Override
        public void onError(Throwable t) {
            this.handleException(this.requestContext, t);
        }

        protected void handleException(ResteasyReactiveRequestContext requestContext, Throwable t) {
            if (requestContext.serverResponse().headWritten()) {
                log.error((Object)"Exception in SSE server handling, impossible to send it to client", t);
            } else {
                requestContext.resume(t, true);
            }
        }
    }
}

