/*
 * Decompiled with CFR 0.152.
 */
package io.druid.server;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Provider;
import io.druid.client.selector.Server;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.Smile;
import io.druid.guice.http.DruidHttpClientConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.IAE;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.QueryStats;
import io.druid.server.RequestLogLine;
import io.druid.server.log.RequestLogger;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import org.apache.http.client.utils.URIBuilder;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.proxy.AsyncProxyServlet;
import org.eclipse.jetty.proxy.ProxyServlet;

public class AsyncQueryForwardingServlet
extends AsyncProxyServlet
implements QueryCountStatsProvider {
    private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
    @Deprecated
    private static final String APPLICATION_SMILE = "application/smile";
    private static final String HOST_ATTRIBUTE = "io.druid.proxy.to.host";
    private static final String SCHEME_ATTRIBUTE = "io.druid.proxy.to.host.scheme";
    private static final String QUERY_ATTRIBUTE = "io.druid.proxy.query";
    private static final String AVATICA_QUERY_ATTRIBUTE = "io.druid.proxy.avaticaQuery";
    private static final String OBJECTMAPPER_ATTRIBUTE = "io.druid.proxy.objectMapper";
    private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
    private final AtomicLong successfulQueryCount = new AtomicLong();
    private final AtomicLong failedQueryCount = new AtomicLong();
    private final AtomicLong interruptedQueryCount = new AtomicLong();
    private final QueryToolChestWarehouse warehouse;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final QueryHostFinder hostFinder;
    private final Provider<HttpClient> httpClientProvider;
    private final DruidHttpClientConfig httpClientConfig;
    private final ServiceEmitter emitter;
    private final RequestLogger requestLogger;
    private final GenericQueryMetricsFactory queryMetricsFactory;
    private HttpClient broadcastClient;

    private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception) throws IOException {
        if (!response.isCommitted()) {
            String errorMessage = exception.getMessage() == null ? "null exception" : exception.getMessage();
            response.resetBuffer();
            response.setStatus(500);
            objectMapper.writeValue((OutputStream)response.getOutputStream(), (Object)ImmutableMap.of((Object)"error", (Object)errorMessage));
        }
        response.flushBuffer();
    }

    @Inject
    public AsyncQueryForwardingServlet(QueryToolChestWarehouse warehouse, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, QueryHostFinder hostFinder, @Router Provider<HttpClient> httpClientProvider, @Router DruidHttpClientConfig httpClientConfig, ServiceEmitter emitter, RequestLogger requestLogger, GenericQueryMetricsFactory queryMetricsFactory) {
        this.warehouse = warehouse;
        this.jsonMapper = jsonMapper;
        this.smileMapper = smileMapper;
        this.hostFinder = hostFinder;
        this.httpClientProvider = httpClientProvider;
        this.httpClientConfig = httpClientConfig;
        this.emitter = emitter;
        this.requestLogger = requestLogger;
        this.queryMetricsFactory = queryMetricsFactory;
    }

    public void init() throws ServletException {
        super.init();
        this.broadcastClient = this.newHttpClient();
        try {
            this.broadcastClient.start();
        }
        catch (Exception e) {
            throw new ServletException((Throwable)e);
        }
    }

    public void destroy() {
        super.destroy();
        try {
            this.broadcastClient.stop();
        }
        catch (Exception e) {
            log.warn((Throwable)e, "Error stopping servlet", new Object[0]);
        }
    }

    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        boolean isSmile = "application/x-jackson-smile".equals(request.getContentType()) || APPLICATION_SMILE.equals(request.getContentType());
        ObjectMapper objectMapper = isSmile ? this.smileMapper : this.jsonMapper;
        request.setAttribute(OBJECTMAPPER_ATTRIBUTE, (Object)objectMapper);
        Server defaultServer = this.hostFinder.getDefaultServer();
        request.setAttribute(HOST_ATTRIBUTE, (Object)defaultServer.getHost());
        request.setAttribute(SCHEME_ATTRIBUTE, (Object)defaultServer.getScheme());
        boolean isQueryEndpoint = request.getRequestURI().startsWith("/druid/v2") && !request.getRequestURI().startsWith("/druid/v2/sql");
        boolean isAvatica = request.getRequestURI().startsWith("/druid/v2/sql/avatica");
        if (isAvatica) {
            Map requestMap = (Map)objectMapper.readValue((InputStream)request.getInputStream(), Map.class);
            String connectionId = AsyncQueryForwardingServlet.getAvaticaConnectionId(requestMap);
            Server targetServer = this.hostFinder.findServerAvatica(connectionId);
            byte[] requestBytes = objectMapper.writeValueAsBytes((Object)requestMap);
            request.setAttribute(HOST_ATTRIBUTE, (Object)targetServer.getHost());
            request.setAttribute(SCHEME_ATTRIBUTE, (Object)targetServer.getScheme());
            request.setAttribute(AVATICA_QUERY_ATTRIBUTE, (Object)requestBytes);
        } else if (isQueryEndpoint && HttpMethod.DELETE.is(request.getMethod())) {
            for (Server server : this.hostFinder.getAllServers()) {
                if (!server.getHost().equals(defaultServer.getHost())) {
                    Response.CompleteListener completeListener = result -> {
                        if (result.isFailed()) {
                            log.warn(result.getFailure(), "Failed to forward cancellation request to [%s]", new Object[]{server.getHost()});
                        }
                    };
                    Request broadcastReq = this.broadcastClient.newRequest(this.rewriteURI(request, server.getScheme(), server.getHost())).method(HttpMethod.DELETE).timeout(500L, TimeUnit.MILLISECONDS);
                    this.copyRequestHeaders(request, broadcastReq);
                    broadcastReq.send(completeListener);
                }
                this.interruptedQueryCount.incrementAndGet();
            }
        } else if (isQueryEndpoint && HttpMethod.POST.is(request.getMethod())) {
            try {
                Query inputQuery = (Query)objectMapper.readValue((InputStream)request.getInputStream(), Query.class);
                if (inputQuery != null) {
                    Server server = this.hostFinder.getServer(inputQuery);
                    request.setAttribute(HOST_ATTRIBUTE, (Object)server.getHost());
                    request.setAttribute(SCHEME_ATTRIBUTE, (Object)server.getScheme());
                    if (inputQuery.getId() == null) {
                        inputQuery = inputQuery.withId(UUID.randomUUID().toString());
                    }
                }
                request.setAttribute(QUERY_ATTRIBUTE, (Object)inputQuery);
            }
            catch (IOException e) {
                log.warn((Throwable)e, "Exception parsing query", new Object[0]);
                String errorMessage = e.getMessage() == null ? "no error message" : e.getMessage();
                this.requestLogger.log(new RequestLogLine(DateTimes.nowUtc(), request.getRemoteAddr(), null, new QueryStats((Map<String, Object>)ImmutableMap.of((Object)"success", (Object)false, (Object)"exception", (Object)errorMessage))));
                response.setStatus(400);
                response.setContentType("application/json");
                objectMapper.writeValue((OutputStream)response.getOutputStream(), (Object)ImmutableMap.of((Object)"error", (Object)errorMessage));
                return;
            }
            catch (Exception e) {
                AsyncQueryForwardingServlet.handleException(response, objectMapper, e);
                return;
            }
        }
        super.service(request, response);
    }

    protected void sendProxyRequest(HttpServletRequest clientRequest, HttpServletResponse proxyResponse, Request proxyRequest) {
        Query query;
        proxyRequest.timeout(this.httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
        proxyRequest.idleTimeout(this.httpClientConfig.getReadTimeout().getMillis(), TimeUnit.MILLISECONDS);
        byte[] avaticaQuery = (byte[])clientRequest.getAttribute(AVATICA_QUERY_ATTRIBUTE);
        if (avaticaQuery != null) {
            proxyRequest.content((ContentProvider)new BytesContentProvider((byte[][])new byte[][]{avaticaQuery}));
        }
        if ((query = (Query)clientRequest.getAttribute(QUERY_ATTRIBUTE)) != null) {
            ObjectMapper objectMapper = (ObjectMapper)clientRequest.getAttribute(OBJECTMAPPER_ATTRIBUTE);
            try {
                proxyRequest.content((ContentProvider)new BytesContentProvider((byte[][])new byte[][]{objectMapper.writeValueAsBytes((Object)query)}));
            }
            catch (JsonProcessingException e) {
                Throwables.propagate((Throwable)e);
            }
        }
        clientRequest.setAttribute("Druid-Authorization-Checked", (Object)true);
        super.sendProxyRequest(clientRequest, proxyResponse, proxyRequest);
    }

    protected Response.Listener newProxyResponseListener(HttpServletRequest request, HttpServletResponse response) {
        Query query = (Query)request.getAttribute(QUERY_ATTRIBUTE);
        if (query != null) {
            return this.newMetricsEmittingProxyResponseListener(request, response, query, System.nanoTime());
        }
        return super.newProxyResponseListener(request, response);
    }

    protected String rewriteTarget(HttpServletRequest request) {
        return this.rewriteURI(request, (String)request.getAttribute(SCHEME_ATTRIBUTE), (String)request.getAttribute(HOST_ATTRIBUTE)).toString();
    }

    protected URI rewriteURI(HttpServletRequest request, String scheme, String host) {
        return AsyncQueryForwardingServlet.makeURI(scheme, host, request.getRequestURI(), request.getQueryString());
    }

    protected static URI makeURI(String scheme, String host, String requestURI, String rawQueryString) {
        try {
            return new URIBuilder().setScheme(scheme).setHost(host).setPath(requestURI).setQuery(rawQueryString).build();
        }
        catch (URISyntaxException e) {
            log.error((Throwable)e, "Unable to rewrite URI [%s]", new Object[]{e.getMessage()});
            throw Throwables.propagate((Throwable)e);
        }
    }

    protected HttpClient newHttpClient() {
        return (HttpClient)this.httpClientProvider.get();
    }

    protected HttpClient createHttpClient() throws ServletException {
        HttpClient client = super.createHttpClient();
        this.setTimeout(this.httpClientConfig.getReadTimeout().getMillis());
        return client;
    }

    private Response.Listener newMetricsEmittingProxyResponseListener(HttpServletRequest request, HttpServletResponse response, Query query, long startNs) {
        return new MetricsEmittingProxyResponseListener(request, response, query, startNs);
    }

    @Override
    public long getSuccessfulQueryCount() {
        return this.successfulQueryCount.get();
    }

    @Override
    public long getFailedQueryCount() {
        return this.failedQueryCount.get();
    }

    @Override
    public long getInterruptedQueryCount() {
        return this.interruptedQueryCount.get();
    }

    private static String getAvaticaConnectionId(Map<String, Object> requestMap) throws IOException {
        Object connectionIdObj = requestMap.get("connectionId");
        if (connectionIdObj == null) {
            throw new IAE("Received an Avatica request without a connectionId.", new Object[0]);
        }
        if (!(connectionIdObj instanceof String)) {
            throw new IAE("Received an Avatica request with a non-String connectionId.", new Object[0]);
        }
        return (String)connectionIdObj;
    }

    private class MetricsEmittingProxyResponseListener
    extends ProxyServlet.ProxyResponseListener {
        private final HttpServletRequest req;
        private final HttpServletResponse res;
        private final Query query;
        private final long startNs;

        public MetricsEmittingProxyResponseListener(HttpServletRequest request, HttpServletResponse response, Query query, long startNs) {
            super((ProxyServlet)AsyncQueryForwardingServlet.this, request, response);
            this.req = request;
            this.res = response;
            this.query = query;
            this.startNs = startNs;
        }

        public void onComplete(Result result) {
            long requestTimeNs = System.nanoTime() - this.startNs;
            try {
                boolean success = result.isSucceeded();
                if (success) {
                    AsyncQueryForwardingServlet.this.successfulQueryCount.incrementAndGet();
                } else {
                    AsyncQueryForwardingServlet.this.failedQueryCount.incrementAndGet();
                }
                this.emitQueryTime(requestTimeNs, success);
                AsyncQueryForwardingServlet.this.requestLogger.log(new RequestLogLine(DateTimes.nowUtc(), this.req.getRemoteAddr(), this.query, new QueryStats((Map<String, Object>)ImmutableMap.of((Object)"query/time", (Object)TimeUnit.NANOSECONDS.toMillis(requestTimeNs), (Object)"success", (Object)(success && result.getResponse().getStatus() == Response.Status.OK.getStatusCode() ? 1 : 0)))));
            }
            catch (Exception e) {
                log.error((Throwable)e, "Unable to log query [%s]!", new Object[]{this.query});
            }
            super.onComplete(result);
        }

        public void onFailure(Response response, Throwable failure) {
            try {
                String errorMessage = failure.getMessage();
                AsyncQueryForwardingServlet.this.failedQueryCount.incrementAndGet();
                this.emitQueryTime(System.nanoTime() - this.startNs, false);
                AsyncQueryForwardingServlet.this.requestLogger.log(new RequestLogLine(DateTimes.nowUtc(), this.req.getRemoteAddr(), this.query, new QueryStats((Map<String, Object>)ImmutableMap.of((Object)"success", (Object)false, (Object)"exception", (Object)(errorMessage == null ? "no message" : errorMessage)))));
            }
            catch (IOException logError) {
                log.error((Throwable)logError, "Unable to log query [%s]!", new Object[]{this.query});
            }
            log.makeAlert(failure, "Exception handling request", new Object[0]).addData("exception", (Object)failure.toString()).addData("query", (Object)this.query).addData("peer", (Object)this.req.getRemoteAddr()).emit();
            super.onFailure(response, failure);
        }

        private void emitQueryTime(long requestTimeNs, boolean success) throws JsonProcessingException {
            QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics((GenericQueryMetricsFactory)AsyncQueryForwardingServlet.this.queryMetricsFactory, (QueryToolChest)AsyncQueryForwardingServlet.this.warehouse.getToolChest(this.query), (Query)this.query, (String)this.req.getRemoteAddr());
            queryMetrics.success(success);
            queryMetrics.reportQueryTime(requestTimeNs).emit(AsyncQueryForwardingServlet.this.emitter);
        }
    }
}

