/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.influxdb;

import com.google.gson.Gson;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.influxdb.AbstractInfluxDBProcessor;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;

@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@SupportsBatching
@Tags(value={"influxdb", "measurement", "get", "read", "query", "timeseries"})
@CapabilityDescription(value="Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query.  Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).")
@WritesAttributes(value={@WritesAttribute(attribute="influxdb.error.message", description="InfluxDB error message"), @WritesAttribute(attribute="influxdb.executed.query", description="InfluxDB executed query")})
public class ExecuteInfluxDBQuery
extends AbstractInfluxDBProcessor {
    public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query";
    private static final int DEFAULT_INFLUX_RESPONSE_CHUNK_SIZE = 0;
    public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder().name("influxdb-query-result-time-unit").displayName("Query Result Time Units").description("The time unit of query results from the InfluxDB").defaultValue(TimeUnit.NANOSECONDS.name()).required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).allowableValues(Arrays.stream(TimeUnit.values()).map(v -> v.name()).collect(Collectors.toSet())).sensitive(false).build();
    public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder().name("influxdb-query").displayName("InfluxDB Query").description("The InfluxDB query to execute. Note: If there are incoming connections, then the query is created from incoming FlowFile's content otherwise it is created from this property.").required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final Integer MAX_CHUNK_SIZE = 10000;
    public static final PropertyDescriptor INFLUX_DB_QUERY_CHUNK_SIZE = new PropertyDescriptor.Builder().name("influxdb-query-chunk-size").displayName("Results chunk size").description("Chunking can be used to return results in a stream of smaller batches (each has a partial results up to a chunk size) rather than as a single response. Chunking queries can return an unlimited number of rows. Note: Chunking is enable when result chunk size is greater than 0").defaultValue(String.valueOf(0)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.createLongValidator((long)0L, (long)MAX_CHUNK_SIZE.intValue(), (boolean)true)).required(true).build();
    static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Successful InfluxDB queries are routed to this relationship").build();
    static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Falied InfluxDB queries are routed to this relationship").build();
    static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("Failed queries that are retryable exception are routed to this relationship").build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;
    protected Gson gson = new Gson();

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        if (!context.getProperty(INFLUX_DB_QUERY).isSet() && !context.hasIncomingConnection()) {
            String error = "The InfluxDB Query processor requires input connection or scheduled InfluxDB query";
            this.getLogger().error(error);
            throw new ProcessException(error);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        String query = null;
        String database = null;
        TimeUnit queryResultTimeunit = null;
        Charset charset = null;
        FlowFile outgoingFlowFile = null;
        if (context.hasIncomingConnection()) {
            FlowFile incomingFlowFile = session.get();
            if (incomingFlowFile == null && context.hasNonLoopConnection()) {
                return;
            }
            charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(incomingFlowFile).getValue());
            if (incomingFlowFile.getSize() == 0L) {
                if (!context.getProperty(INFLUX_DB_QUERY).isSet()) {
                    String message = "FlowFile query is empty and no scheduled query is set";
                    this.getLogger().error(message);
                    incomingFlowFile = session.putAttribute(incomingFlowFile, "influxdb.error.message", message);
                    session.transfer(incomingFlowFile, REL_FAILURE);
                    return;
                }
                query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(incomingFlowFile).getValue();
            } else {
                try {
                    query = this.getQuery(session, charset, incomingFlowFile);
                }
                catch (IOException ioe) {
                    this.getLogger().error("Exception while reading from FlowFile " + ioe.getLocalizedMessage(), (Throwable)ioe);
                    throw new ProcessException((Throwable)ioe);
                }
            }
            outgoingFlowFile = incomingFlowFile;
        } else {
            outgoingFlowFile = session.create();
            charset = Charset.forName(context.getProperty(CHARSET).evaluateAttributeExpressions(outgoingFlowFile).getValue());
            query = context.getProperty(INFLUX_DB_QUERY).evaluateAttributeExpressions(outgoingFlowFile).getValue();
        }
        database = context.getProperty(DB_NAME).evaluateAttributeExpressions(outgoingFlowFile).getValue();
        queryResultTimeunit = TimeUnit.valueOf(context.getProperty(INFLUX_DB_QUERY_RESULT_TIMEUNIT).evaluateAttributeExpressions(outgoingFlowFile).getValue());
        try {
            String json;
            long startTimeMillis = System.currentTimeMillis();
            int chunkSize = context.getProperty(INFLUX_DB_QUERY_CHUNK_SIZE).evaluateAttributeExpressions(outgoingFlowFile).asInteger();
            List<QueryResult> result = this.executeQuery(context, database, query, queryResultTimeunit, chunkSize);
            String string = json = result.size() == 1 ? this.gson.toJson((Object)result.get(0)) : this.gson.toJson(result);
            if (this.getLogger().isDebugEnabled()) {
                this.getLogger().debug("Query result {} ", new Object[]{result});
            }
            ByteArrayInputStream bais = new ByteArrayInputStream(json.getBytes(charset));
            session.importFrom((InputStream)bais, outgoingFlowFile);
            bais.close();
            long endTimeMillis = System.currentTimeMillis();
            if (!this.hasErrors(result).booleanValue()) {
                outgoingFlowFile = session.putAttribute(outgoingFlowFile, INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
                session.getProvenanceReporter().send(outgoingFlowFile, this.makeProvenanceUrl(context, database), endTimeMillis - startTimeMillis);
                session.transfer(outgoingFlowFile, REL_SUCCESS);
                return;
            }
            outgoingFlowFile = this.populateErrorAttributes(session, outgoingFlowFile, query, this.queryErrors(result));
            session.transfer(outgoingFlowFile, REL_FAILURE);
            return;
        }
        catch (Exception exception) {
            outgoingFlowFile = this.populateErrorAttributes(session, outgoingFlowFile, query, exception.getMessage());
            if (exception.getCause() instanceof SocketTimeoutException) {
                this.getLogger().error("Failed to read from InfluxDB due SocketTimeoutException to {} and retrying", new Object[]{exception.getCause().getLocalizedMessage()}, exception.getCause());
                session.transfer(outgoingFlowFile, REL_RETRY);
            } else {
                this.getLogger().error("Failed to read from InfluxDB due to {}", new Object[]{exception.getLocalizedMessage()}, (Throwable)exception);
                session.transfer(outgoingFlowFile, REL_FAILURE);
            }
            context.yield();
        }
    }

    protected String getQuery(ProcessSession session, Charset charset, FlowFile incomingFlowFile) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        session.exportTo(incomingFlowFile, (OutputStream)baos);
        baos.close();
        return new String(baos.toByteArray(), charset);
    }

    protected String makeProvenanceUrl(ProcessContext context, String database) {
        return "influxdb://" + context.getProperty(INFLUX_DB_URL).evaluateAttributeExpressions().getValue() + "/" + database;
    }

    protected List<QueryResult> executeQuery(ProcessContext context, String database, String query, TimeUnit timeunit, int chunkSize) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        InfluxDB influx = this.getInfluxDB(context);
        Query influxQuery = new Query(query, database);
        if (chunkSize > 0) {
            LinkedList<QueryResult> results = new LinkedList<QueryResult>();
            influx.query(influxQuery, chunkSize, result -> {
                if (this.isQueryDone(result.getError())) {
                    latch.countDown();
                } else {
                    results.add((QueryResult)result);
                }
            });
            latch.await();
            return results;
        }
        return Collections.singletonList(influx.query(influxQuery, timeunit));
    }

    private boolean isQueryDone(String error) {
        return error != null && error.equals("DONE");
    }

    protected FlowFile populateErrorAttributes(ProcessSession session, FlowFile flowFile, String query, String message) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        attributes.put("influxdb.error.message", String.valueOf(message));
        attributes.put(INFLUX_DB_EXECUTED_QUERY, String.valueOf(query));
        flowFile = session.putAllAttributes(flowFile, attributes);
        return flowFile;
    }

    private Boolean hasErrors(List<QueryResult> results) {
        for (QueryResult result : results) {
            if (!result.hasError()) continue;
            return true;
        }
        return false;
    }

    private String queryErrors(List<QueryResult> results) {
        return results.stream().filter(QueryResult::hasError).map(QueryResult::getError).collect(Collectors.joining("\n"));
    }

    @Override
    @OnStopped
    public void close() {
        super.close();
    }

    static {
        HashSet<Relationship> tempRelationships = new HashSet<Relationship>();
        tempRelationships.add(REL_SUCCESS);
        tempRelationships.add(REL_FAILURE);
        tempRelationships.add(REL_RETRY);
        relationships = Collections.unmodifiableSet(tempRelationships);
        ArrayList<PropertyDescriptor> tempDescriptors = new ArrayList<PropertyDescriptor>();
        tempDescriptors.add(DB_NAME);
        tempDescriptors.add(INFLUX_DB_URL);
        tempDescriptors.add(INFLUX_DB_CONNECTION_TIMEOUT);
        tempDescriptors.add(INFLUX_DB_QUERY_RESULT_TIMEUNIT);
        tempDescriptors.add(INFLUX_DB_QUERY);
        tempDescriptors.add(INFLUX_DB_QUERY_CHUNK_SIZE);
        tempDescriptors.add(USERNAME);
        tempDescriptors.add(PASSWORD);
        tempDescriptors.add(CHARSET);
        propertyDescriptors = Collections.unmodifiableList(tempDescriptors);
    }
}

