/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.metrics;

import com.google.common.base.Functions;
import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.metrics.MetricCollector;
import io.confluent.ksql.metrics.TopicSensors;
import io.confluent.ksql.util.AppInfo;
import io.confluent.ksql.util.KsqlConfig;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;

@SuppressFBWarnings(value={"EI_EXPOSE_REP2"}, justification="should be mutable")
public final class MetricCollectors {
    public static final String RESOURCE_LABEL_PREFIX = "metrics.context.resource.";
    public static final String RESOURCE_LABEL_TYPE = "metrics.context.resource.type";
    public static final String RESOURCE_LABEL_VERSION = "metrics.context.resource.version";
    public static final String RESOURCE_LABEL_COMMIT_ID = "metrics.context.resource.commit.id";
    public static final String RESOURCE_LABEL_CLUSTER_ID = "metrics.context.resource.cluster.id";
    public static final String RESOURCE_LABEL_KAFKA_CLUSTER_ID = "metrics.context.resource.kafka.cluster.id";
    private static final String KSQL_JMX_PREFIX = "io.confluent.ksql.metrics";
    private static final String KSQL_RESOURCE_TYPE = "ksql";
    private final io.confluent.common.utils.Time time = new io.confluent.common.utils.SystemTime();
    private Map<String, MetricCollector> collectorMap;
    private Metrics metrics;

    public MetricCollectors() {
        this(new Metrics(new MetricConfig().samples(100).timeWindow(1000L, TimeUnit.MILLISECONDS), new LinkedList<JmxReporter>(Collections.singletonList(new JmxReporter())), (Time)new SystemTime(), (MetricsContext)new KafkaMetricsContext(KSQL_JMX_PREFIX)));
    }

    @SuppressFBWarnings(value={"EI_EXPOSE_REP2"}, justification="should be mutable")
    public MetricCollectors(Metrics metrics) {
        this.metrics = metrics;
        this.collectorMap = new ConcurrentHashMap<String, MetricCollector>();
    }

    String addCollector(String id, MetricCollector collector) {
        StringBuilder builtId = new StringBuilder(id);
        while (this.collectorMap.containsKey(builtId.toString())) {
            builtId.append("-").append(this.collectorMap.size());
        }
        String finalId = builtId.toString();
        this.collectorMap.put(finalId, collector);
        return finalId;
    }

    public void addConfigurableReporter(KsqlConfig ksqlConfig) {
        String ksqlServiceId = ksqlConfig.getString("ksql.service.id");
        List reporters = ksqlConfig.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("ksql.service.id", ksqlServiceId));
        if (reporters.size() > 0) {
            KafkaMetricsContext metricsContext = new KafkaMetricsContext(KSQL_JMX_PREFIX, ksqlConfig.originalsWithPrefix("metrics.context."));
            for (MetricsReporter reporter : reporters) {
                reporter.contextChange((MetricsContext)metricsContext);
                this.metrics.addReporter(reporter);
            }
        }
    }

    public Map<String, Object> addConfluentMetricsContextConfigs(String ksqlServiceId, String kafkaClusterId) {
        HashMap<String, Object> updatedProps = new HashMap<String, Object>();
        updatedProps.put(RESOURCE_LABEL_TYPE, KSQL_RESOURCE_TYPE);
        updatedProps.put(RESOURCE_LABEL_CLUSTER_ID, ksqlServiceId);
        updatedProps.put(RESOURCE_LABEL_KAFKA_CLUSTER_ID, kafkaClusterId);
        updatedProps.put(RESOURCE_LABEL_VERSION, AppInfo.getVersion());
        updatedProps.put(RESOURCE_LABEL_COMMIT_ID, AppInfo.getCommitId());
        return updatedProps;
    }

    void remove(String id) {
        this.collectorMap.remove(id);
    }

    public Collection<TopicSensors.Stat> getStatsFor(String topic, boolean isError) {
        return this.getAggregateMetrics(this.collectorMap.values().stream().flatMap(c -> c.stats(topic.toLowerCase(), isError).stream()).collect(Collectors.toList()));
    }

    public String getAndFormatStatsFor(String topic, boolean isError) {
        return MetricCollectors.format(this.getStatsFor(topic, isError), isError ? "last-failed" : "last-message");
    }

    Collection<TopicSensors.Stat> getAggregateMetrics(List<TopicSensors.Stat> allStats) {
        return ((ImmutableMap)allStats.stream().collect(ImmutableMap.toImmutableMap(TopicSensors.Stat::name, (Function)Functions.identity(), (first, other) -> first.aggregate(other.getValue())))).values();
    }

    public static String format(Collection<TopicSensors.Stat> stats, String lastEventTimestampMsg) {
        StringBuilder results = new StringBuilder();
        stats.forEach(stat -> results.append(stat.formatted()).append(" "));
        if (stats.size() > 0) {
            results.append(String.format("%16s: ", lastEventTimestampMsg)).append(String.format("%9s", stats.iterator().next().timestamp()));
        }
        return results.toString();
    }

    public Collection<Double> currentConsumptionRateByQuery() {
        return this.collectorMap.values().stream().filter(collector -> collector.getGroupId() != null).collect(Collectors.groupingBy(MetricCollector::getGroupId, Collectors.summingDouble(m -> m.aggregateStat("consumer-messages-per-sec", false)))).values();
    }

    public double aggregateStat(String name, boolean isError) {
        return this.collectorMap.values().stream().mapToDouble(m -> m.aggregateStat(name, isError)).sum();
    }

    public double currentProductionRate() {
        return this.aggregateStat("messages-per-sec", false);
    }

    public double currentConsumptionRate() {
        return this.aggregateStat("consumer-messages-per-sec", false);
    }

    public double totalMessageConsumption() {
        return this.aggregateStat("consumer-total-messages", false);
    }

    public double totalBytesConsumption() {
        return this.aggregateStat("consumer-total-bytes", false);
    }

    public double currentErrorRate() {
        return this.collectorMap.values().stream().mapToDouble(MetricCollector::errorRate).sum();
    }

    @SuppressFBWarnings(value={"MS_EXPOSE_REP", "EI_EXPOSE_REP"}, justification="should be mutable")
    public Metrics getMetrics() {
        return this.metrics;
    }

    public io.confluent.common.utils.Time getTime() {
        return this.time;
    }
}

