/*
 * Decompiled with CFR 0.152.
 */
package kamon.instrumentation.kafka.client;

import com.typesafe.config.Config;
import java.io.Serializable;
import kamon.ClassLoading$;
import kamon.Kamon$;
import kamon.context.Context;
import kamon.context.Context$;
import kamon.context.Storage;
import kamon.instrumentation.kafka.client.ConsumedRecordData;
import kamon.instrumentation.kafka.client.KafkaInstrumentation;
import kamon.instrumentation.kafka.client.KafkaInstrumentation$Syntax$;
import kamon.instrumentation.kafka.client.KafkaPropagator;
import kamon.instrumentation.kafka.client.SpanPropagation$KCtxHeader$;
import kamon.instrumentation.kafka.client.SpanPropagation$W3CTraceContext$;
import kamon.trace.Span;
import kamon.trace.Span$;
import kamon.trace.SpanBuilder;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.Function1;
import scala.Option;
import scala.Option$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.sys.package$;
import scala.util.control.NonFatal$;

public final class KafkaInstrumentation$ {
    public static KafkaInstrumentation$ MODULE$;
    private volatile KafkaInstrumentation.Settings _settings;
    private final Logger log;

    static {
        new KafkaInstrumentation$();
    }

    private KafkaInstrumentation.Settings _settings() {
        return this._settings;
    }

    private void _settings_$eq(KafkaInstrumentation.Settings x$1) {
        this._settings = x$1;
    }

    private Logger log() {
        return this.log;
    }

    public KafkaInstrumentation.Settings settings() {
        return this._settings();
    }

    private KafkaInstrumentation.Settings readSettings(Config config) {
        KafkaPropagator kafkaPropagator;
        Config kafkaConfig = config.getConfig("kamon.instrumentation.kafka.client");
        String identifierScheme = config.getString("kamon.trace.identifier-scheme");
        boolean bl = kafkaConfig.getBoolean("tracing.start-trace-on-producer");
        boolean bl2 = kafkaConfig.getBoolean("tracing.continue-trace-on-consumer");
        boolean bl3 = kafkaConfig.getBoolean("tracing.use-delayed-spans");
        String string = kafkaConfig.getString("tracing.propagator");
        if ("kctx".equals(string)) {
            kafkaPropagator = SpanPropagation$KCtxHeader$.MODULE$.apply();
        } else if ("w3c".equals(string)) {
            String string2 = identifierScheme;
            String string3 = "double";
            if (string2 == null || !string2.equals(string3)) {
                this.log().warn("W3C TraceContext propagation should be used only with identifier-scheme = double");
            }
            kafkaPropagator = SpanPropagation$W3CTraceContext$.MODULE$.apply();
        } else {
            kafkaPropagator = KafkaInstrumentation$.liftedTree1$1(string);
        }
        return new KafkaInstrumentation.Settings(bl, bl2, bl3, kafkaPropagator);
    }

    public <K, V> Context extractContext(ConsumerRecord<K, V> consumerRecord) {
        return KafkaInstrumentation$Syntax$.MODULE$.context$extension(this.Syntax(consumerRecord));
    }

    public ConsumerRecord<?, ?> Syntax(ConsumerRecord<?, ?> cr) {
        return cr;
    }

    public <T> T runWithConsumerSpan(ConsumerRecord<?, ?> record, Function0<T> f) {
        return this.runWithConsumerSpan(record, "consumer.process", true, f);
    }

    public <T> T runWithConsumerSpan(ConsumerRecord<?, ?> record, String operationName, Function0<T> f) {
        return this.runWithConsumerSpan(record, operationName, true, f);
    }

    public <T> T runWithConsumerSpan(ConsumerRecord<?, ?> record, String operationName, boolean finishSpan, Function0<T> f) {
        Object object;
        Context incomingContext = KafkaInstrumentation$Syntax$.MODULE$.context$extension(this.Syntax(record));
        Context operationContext = incomingContext.nonEmpty() ? incomingContext : Kamon$.MODULE$.currentContext();
        Span span = this.consumerSpan(record, operationName);
        Storage.Scope scope = Kamon$.MODULE$.storeContext(operationContext.withEntry(Span$.MODULE$.Key(), (Object)span));
        try {
            try {
                object = f.apply();
            }
            catch (Throwable throwable) {
                Option option = NonFatal$.MODULE$.unapply(throwable);
                if (!option.isEmpty()) {
                    Throwable t = (Throwable)option.get();
                    span.fail(t.getMessage(), t);
                    throw t;
                }
                throw throwable;
            }
        }
        finally {
            if (finishSpan) {
                span.finish();
            }
            scope.close();
        }
        return (T)object;
    }

    public Span consumerSpan(ConsumerRecord<?, ?> record) {
        return this.consumerSpan(record, "consumer.process");
    }

    public Span consumerSpan(ConsumerRecord<?, ?> record, String operationName) {
        ConsumedRecordData consumerRecordData;
        SpanBuilder consumerSpan = Kamon$.MODULE$.consumerSpanBuilder(operationName, "kafka.consumer").tag("kafka.topic", record.topic()).tag("kafka.partition", (long)record.partition()).tag("kafka.offset", record.offset()).tag("kafka.timestamp", record.timestamp()).tag("kafka.timestamp-type", record.timestampType().name);
        Option$.MODULE$.apply(record.key()).foreach((Function1 & Serializable & scala.Serializable)k -> consumerSpan.tag("kafka.key", k.toString()));
        Span incomingSpan = (Span)this.settings().propagator().read(record.headers(), Context$.MODULE$.Empty()).get(Span$.MODULE$.Key());
        if (!incomingSpan.isEmpty()) {
            if (this.settings().continueTraceOnConsumer()) {
                consumerSpan.asChildOf(incomingSpan);
            } else {
                consumerSpan.link(incomingSpan, (Span.Link$.Kind)Span.Link$.Kind$.FollowsFrom$.MODULE$);
            }
        }
        if (record instanceof ConsumedRecordData && (consumerRecordData = (ConsumedRecordData)record).consumerInfo() != null) {
            consumerSpan.tag("kafka.group-id", (String)consumerRecordData.consumerInfo().groupId().getOrElse((Function0 & Serializable & scala.Serializable)() -> "unknown")).tag("kafka.client-id", consumerRecordData.consumerInfo().clientId()).tag("kafka.poll-time", consumerRecordData.nanosSincePollStart());
        }
        if (this.settings().useDelayedSpans()) {
            return consumerSpan.delay(Kamon$.MODULE$.clock().toInstant(record.timestamp() * 1000000L)).start();
        }
        return consumerSpan.start();
    }

    public ConsumerRecord<?, ?> copyHiddenState(ConsumerRecord<?, ?> from, ConsumerRecord<?, ?> to) {
        if (from != null && to != null && from instanceof ConsumedRecordData) {
            ConsumedRecordData fromRecordData = (ConsumedRecordData)from;
            ((ConsumedRecordData)to).set(fromRecordData.nanosSincePollStart(), fromRecordData.consumerInfo());
        }
        return to;
    }

    private static final /* synthetic */ KafkaPropagator liftedTree1$1(String x1$1) {
        try {
            return (KafkaPropagator)ClassLoading$.MODULE$.createInstance(x1$1, ClassTag$.MODULE$.apply(KafkaPropagator.class));
        }
        catch (Throwable t) {
            throw package$.MODULE$.error(new StringBuilder(65).append("Failed to create kafka propagator instance from FQCN [").append(x1$1).append("]. Reason: ").append(t.getMessage()).toString());
        }
    }

    private KafkaInstrumentation$() {
        MODULE$ = this;
        this._settings = this.readSettings(Kamon$.MODULE$.config());
        Kamon$.MODULE$.onReconfigure((Function1 & Serializable & scala.Serializable)newConfig -> {
            KafkaInstrumentation$.MODULE$._settings_$eq(KafkaInstrumentation$.MODULE$.readSettings(newConfig));
            return BoxedUnit.UNIT;
        });
        this.log = LoggerFactory.getLogger(KafkaInstrumentation.Settings.class);
    }
}

