/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.metrics;

import com.google.common.collect.ImmutableMap;
import io.confluent.common.utils.Time;
import io.confluent.ksql.metrics.MetricCollector;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.metrics.MetricUtils;
import io.confluent.ksql.metrics.TopicSensors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.metrics.stats.Rate;

public class ProducerCollector
implements MetricCollector,
ProducerInterceptor<Object, Object> {
    public static final String PRODUCER_MESSAGES_PER_SEC = "messages-per-sec";
    public static final String PRODUCER_TOTAL_MESSAGES = "total-messages";
    private MetricCollectors metricsCollectors;
    private Metrics metrics;
    private final Map<String, TopicSensors<ProducerRecord<Object, Object>>> topicSensors = new HashMap<String, TopicSensors<ProducerRecord<Object, Object>>>();
    private String id;
    private Time time;

    public void configure(Map<String, ?> map) {
        String id = (String)map.get("client.id");
        MetricCollectors collectors = (MetricCollectors)Objects.requireNonNull(map.get("ksql.internal.metric.collectors"));
        this.configure(id, collectors);
    }

    void configure(String id, MetricCollectors metricCollectors) {
        this.metricsCollectors = metricCollectors;
        this.metrics = metricCollectors.getMetrics();
        this.id = metricCollectors.addCollector(id, this);
        this.time = metricCollectors.getTime();
    }

    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
    }

    public ProducerRecord<Object, Object> onSend(ProducerRecord<Object, Object> record) {
        this.collect(record, false);
        return record;
    }

    private void collect(ProducerRecord<Object, Object> record, boolean isError) {
        this.collect(isError, record.topic().toLowerCase());
    }

    private void collect(boolean isError, String topic) {
        this.topicSensors.computeIfAbsent(this.getKey(topic), k -> new TopicSensors(topic, this.buildSensors((String)k))).increment(null, isError);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<TopicSensors.SensorMetric<ProducerRecord<Object, Object>>> buildSensors(String key) {
        ArrayList<TopicSensors.SensorMetric<ProducerRecord<Object, Object>>> sensors = new ArrayList<TopicSensors.SensorMetric<ProducerRecord<Object, Object>>>();
        Metrics metrics = this.metrics;
        synchronized (metrics) {
            this.addSensor(key, PRODUCER_MESSAGES_PER_SEC, (MeasurableStat)new Rate(), sensors);
            this.addSensor(key, PRODUCER_TOTAL_MESSAGES, (MeasurableStat)new CumulativeSum(), sensors);
        }
        return sensors;
    }

    private void addSensor(String key, String metricNameString, MeasurableStat stat, List<TopicSensors.SensorMetric<ProducerRecord<Object, Object>>> results) {
        String name = "prod-" + key + "-" + metricNameString + "-" + this.id;
        MetricName metricName = new MetricName(metricNameString, "producer-metrics", "producer-" + name, (Map)ImmutableMap.of((Object)"key", (Object)key, (Object)"id", (Object)this.id));
        Sensor existingSensor = this.metrics.getSensor(name);
        final Sensor sensor = this.metrics.sensor(name);
        if (existingSensor == null || this.metrics.metrics().get(metricName) == null) {
            sensor.add(metricName, stat);
        }
        KafkaMetric metric = (KafkaMetric)this.metrics.metrics().get(metricName);
        results.add(new TopicSensors.SensorMetric<ProducerRecord<Object, Object>>(sensor, metric, this.time, false){

            @Override
            void record(ProducerRecord<Object, Object> record) {
                sensor.record(1.0);
                super.record(record);
            }
        });
    }

    private String getKey(String topic) {
        return topic;
    }

    public void close() {
        this.metricsCollectors.remove(this.id);
        this.topicSensors.values().forEach(v -> v.close(this.metrics));
    }

    @Override
    public Collection<TopicSensors.Stat> stats(String topic, boolean isError) {
        return MetricUtils.stats(topic, isError, this.topicSensors.values());
    }

    @Override
    public double aggregateStat(String name, boolean isError) {
        return MetricUtils.aggregateStat(name, isError, this.topicSensors.values());
    }

    public String toString() {
        return this.getClass().getSimpleName() + " " + this.id + " " + this.topicSensors.toString();
    }
}

