/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.server.rest.stream;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.util.DrillExceptionUtil;
import org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import org.apache.drill.exec.physical.resultSet.PushResultSetReader;
import org.apache.drill.exec.physical.resultSet.impl.PushResultSetReaderImpl;
import org.apache.drill.exec.physical.resultSet.util.JsonWriter;
import org.apache.drill.exec.physical.rowSet.RowSetReader;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.server.rest.BaseWebUserConnection;
import org.apache.drill.exec.server.rest.WebSessionResources;
import org.apache.drill.exec.vector.complex.fn.JsonOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingHttpConnection
extends BaseWebUserConnection {
    private static final Logger logger = LoggerFactory.getLogger(StreamingHttpConnection.class);
    private final CountDownLatch startSignal = new CountDownLatch(1);
    private UserBitShared.QueryId queryId;
    private int rowLimit;
    private int batchCount;
    private int rowCount;
    private OutputStream out;
    private JsonWriter writer;
    private PushResultSetReaderImpl.BatchHolder batchHolder;
    private PushResultSetReader reader;

    public StreamingHttpConnection(WebSessionResources webSessionResources) {
        super(webSessionResources);
    }

    public void onStart(UserBitShared.QueryId queryId, int rowLimit) {
        this.queryId = queryId;
        this.rowLimit = rowLimit;
    }

    public void outputAvailable(OutputStream out) throws IOException {
        this.out = out;
        this.writer = new JsonWriter(out, false, false);
        this.startSignal.countDown();
    }

    @Override
    public void sendData(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, QueryDataPackage data) {
        VectorContainer batch = data.batch();
        try {
            if (this.batchCount == 0) {
                this.batchHolder = new PushResultSetReaderImpl.BatchHolder(batch);
                this.reader = new PushResultSetReaderImpl(this.batchHolder);
                this.startSignal.await();
            }
            this.batchHolder.newBatch();
            RowSetReader batchReader = this.reader.start();
            this.emitBatch(batchReader);
            ++this.batchCount;
        }
        catch (IOException e) {
            throw UserException.dataWriteError(e).addContext("Failed to send JSON results to the REST client").build(logger);
        }
        catch (InterruptedException e) {
            throw new DrillRuntimeException("Interrupted", e);
        }
        finally {
            batch.zeroVectors();
            listener.success(Acks.OK, null);
        }
    }

    public void emitBatch(RowSetReader batchReader) throws IOException {
        if (this.batchCount == 0) {
            this.emitHeader(batchReader.tupleSchema());
        }
        if (this.rowLimit == 0 || this.rowCount < this.rowLimit) {
            this.emitRows(batchReader);
        }
    }

    private void emitHeader(TupleMetadata rowSchema) throws IOException {
        this.startHeader();
        JsonOutput gen = this.writer.jsonOutput();
        gen.writeFieldName("columns");
        this.writeColNames(gen, rowSchema);
        this.writeNewline(gen);
        gen.writeFieldName("metadata");
        this.writeColTypes(gen, rowSchema);
        this.writeNewline(gen);
        gen.writeFieldName("attemptedAutoLimit");
        gen.writeInt(this.rowLimit);
        this.writeNewline(gen);
        gen.writeFieldName("rows");
        gen.writeStartArray();
        this.writeNewline(gen);
    }

    private void startHeader() throws IOException {
        JsonOutput gen = this.writer.jsonOutput();
        gen.writeStartObject();
        gen.writeFieldName("queryId");
        gen.writeVarChar(QueryIdHelper.getQueryId(this.queryId));
        this.writeNewline(gen);
    }

    public void writeNewline(JsonOutput gen) throws IOException {
        gen.flush();
        this.out.write(10);
    }

    private void writeColNames(JsonOutput gen, TupleMetadata rowSchema) throws IOException {
        gen.writeStartArray();
        for (ColumnMetadata col : rowSchema) {
            gen.writeVarChar(col.name());
        }
        gen.writeEndArray();
    }

    private void writeColTypes(JsonOutput gen, TupleMetadata rowSchema) throws IOException {
        gen.writeStartArray();
        for (ColumnMetadata col : rowSchema) {
            gen.writeVarChar(this.webDataType(col.majorType()));
        }
        gen.writeEndArray();
    }

    private void emitRows(RowSetReader batchReader) throws IOException {
        while (batchReader.next()) {
            this.writer.writeRow(batchReader);
            this.writeNewline(this.writer.jsonOutput());
            if (this.rowLimit <= 0 || ++this.rowCount < this.rowLimit) continue;
            break;
        }
    }

    public void finish() throws IOException {
        JsonOutput gen = this.writer.jsonOutput();
        if (this.batchCount == 0) {
            this.startHeader();
            if (this.getSession().getOptions().getBoolean("drill.exec.http.rest.errors.verbose")) {
                this.emitErrorInfo();
            }
        } else {
            gen.writeEndArray();
            this.writeNewline(gen);
        }
        gen.writeFieldName("queryState");
        gen.writeVarChar(this.getQueryState());
        this.writeNewline(gen);
        gen.writeEndObject();
        this.writeNewline(gen);
    }

    private void emitErrorInfo() throws IOException {
        JsonOutput gen = this.writer.jsonOutput();
        Throwable exception = DrillExceptionUtil.getThrowable(this.error.getException());
        if (exception != null) {
            gen.writeFieldName("exception");
            gen.writeVarChar(exception.getClass().getName());
            this.writeNewline(gen);
            gen.writeFieldName("errorMessage");
            gen.writeVarChar(exception.getMessage());
            this.writeNewline(gen);
            gen.writeFieldName("stackTrace");
            gen.writeStartArray();
            for (String stackFrame : ExceptionUtils.getStackFrames((Throwable)exception)) {
                gen.writeVarChar(stackFrame);
            }
            gen.writeEndArray();
        } else {
            gen.writeFieldName("errorMessage");
            gen.writeVarChar(this.error.getMessage());
        }
        this.writeNewline(gen);
    }
}

