/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.metrics;

import io.confluent.ksql.metrics.MetricCollector;
import io.confluent.ksql.metrics.TopicSensors;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

public final class MetricCollectors {
    private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";
    public static final String RESOURCE_LABEL_PREFIX = "metrics.context.resource.";
    private static final String KSQL_RESOURCE_TYPE = "ksql";
    public static final String RESOURCE_LABEL_TYPE = "metrics.context.resource.type";
    public static final String RESOURCE_LABEL_VERSION = "metrics.context.resource.version";
    public static final String RESOURCE_LABEL_COMMIT_ID = "metrics.context.resource.commit.id";
    public static final String RESOURCE_LABEL_CLUSTER_ID = "metrics.context.resource.cluster.id";
    public static final String RESOURCE_LABEL_KAFKA_CLUSTER_ID = "metrics.context.resource.kafka.cluster.id";
    private static Map<String, MetricCollector> collectorMap;
    private static Metrics metrics;
    private static final io.confluent.common.utils.Time time;

    private MetricCollectors() {
    }

    public static void initialize() {
        MetricConfig metricConfig = new MetricConfig().samples(100).timeWindow(1000L, TimeUnit.MILLISECONDS);
        ArrayList<JmxReporter> reporters = new ArrayList<JmxReporter>();
        reporters.add(new JmxReporter());
        KafkaMetricsContext metricsContext = new KafkaMetricsContext(KSQL_JMX_PREFIX);
        metrics = new Metrics(metricConfig, reporters, (Time)new SystemTime(), (MetricsContext)metricsContext);
        collectorMap = new ConcurrentHashMap<String, MetricCollector>();
    }

    public static void cleanUp() {
        if (metrics != null) {
            metrics.close();
        }
        collectorMap.clear();
    }

    static String addCollector(String id, MetricCollector collector) {
        StringBuilder builtId = new StringBuilder(id);
        while (collectorMap.containsKey(builtId.toString())) {
            builtId.append("-").append(collectorMap.size());
        }
        String finalId = builtId.toString();
        collectorMap.put(finalId, collector);
        return finalId;
    }

    public static void addConfigurableReporter(KsqlConfig ksqlConfig) {
        String ksqlServiceId = ksqlConfig.getString("ksql.service.id");
        List reporters = ksqlConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("ksql.service.id", ksqlServiceId));
        if (reporters.size() > 0) {
            KafkaMetricsContext metricsContext = new KafkaMetricsContext(KSQL_JMX_PREFIX, ksqlConfig.originalsWithPrefix("metrics.context."));
            for (MetricsReporter reporter : reporters) {
                reporter.contextChange((MetricsContext)metricsContext);
                metrics.addReporter(reporter);
            }
        }
    }

    public static Map<String, Object> addConfluentMetricsContextConfigs(String ksqlServiceId, String kafkaClusterId) {
        HashMap<String, Object> updatedProps = new HashMap<String, Object>();
        updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE);
        updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId);
        updatedProps.put(RESOURCE_LABEL_KAFKA_CLUSTER_ID, kafkaClusterId);
        updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion());
        updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId());
        return updatedProps;
    }

    static void remove(String id) {
        collectorMap.remove(id);
    }

    static Map<String, TopicSensors.Stat> getStatsFor(String topic, boolean isError) {
        return MetricCollectors.getAggregateMetrics(collectorMap.values().stream().flatMap(c -> c.stats(topic.toLowerCase(), isError).stream()).collect(Collectors.toList()));
    }

    public static String getAndFormatStatsFor(String topic, boolean isError) {
        return MetricCollectors.format(MetricCollectors.getStatsFor(topic, isError).values(), isError ? "last-failed" : "last-message");
    }

    static Map<String, TopicSensors.Stat> getAggregateMetrics(List<TopicSensors.Stat> allStats) {
        TreeMap<String, TopicSensors.Stat> results = new TreeMap<String, TopicSensors.Stat>();
        allStats.forEach(stat -> {
            results.computeIfAbsent(stat.name(), k -> new TopicSensors.Stat(stat.name(), 0.0, stat.getTimestamp()));
            ((TopicSensors.Stat)results.get(stat.name())).aggregate(stat.getValue());
        });
        return results;
    }

    private static String format(Collection<TopicSensors.Stat> stats, String lastEventTimestampMsg) {
        StringBuilder results = new StringBuilder();
        stats.forEach(stat -> results.append(stat.formatted()).append(" "));
        if (stats.size() > 0) {
            results.append(String.format("%16s: ", lastEventTimestampMsg)).append(String.format("%9s", stats.iterator().next().timestamp()));
        }
        return results.toString();
    }

    public static Collection<Double> currentConsumptionRateByQuery() {
        return collectorMap.values().stream().filter(collector -> collector.getGroupId() != null).collect(Collectors.groupingBy(MetricCollector::getGroupId, Collectors.summingDouble(m -> m.aggregateStat("consumer-messages-per-sec", false)))).values();
    }

    public static double aggregateStat(String name, boolean isError) {
        return collectorMap.values().stream().mapToDouble(m -> m.aggregateStat(name, isError)).sum();
    }

    public static double currentProductionRate() {
        return MetricCollectors.aggregateStat("messages-per-sec", false);
    }

    public static double currentConsumptionRate() {
        return MetricCollectors.aggregateStat("consumer-messages-per-sec", false);
    }

    public static double totalMessageConsumption() {
        return MetricCollectors.aggregateStat("consumer-total-messages", false);
    }

    public static double totalBytesConsumption() {
        return MetricCollectors.aggregateStat("consumer-total-bytes", false);
    }

    public static double currentErrorRate() {
        return collectorMap.values().stream().mapToDouble(MetricCollector::errorRate).sum();
    }

    public static Metrics getMetrics() {
        return metrics;
    }

    public static io.confluent.common.utils.Time getTime() {
        return time;
    }

    static {
        MetricCollectors.initialize();
        time = new io.confluent.common.utils.SystemTime();
    }
}

