/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

public class ProcessorNode<K, V> {
    private final List<ProcessorNode<?, ?>> children;
    private final Map<String, ProcessorNode<?, ?>> childByName;
    private final Processor<K, V> processor;
    private final String name;
    private final Time time;
    public final Set<String> stateStores;
    private InternalProcessorContext internalProcessorContext;
    private String threadId;
    private Sensor processSensor;
    private Sensor punctuateSensor;
    private Sensor destroySensor;
    private Sensor createSensor;

    public ProcessorNode(String name) {
        this(name, null, null);
    }

    public ProcessorNode(String name, Processor<K, V> processor, Set<String> stateStores) {
        this.name = name;
        this.processor = processor;
        this.children = new ArrayList();
        this.childByName = new HashMap();
        this.stateStores = stateStores;
        this.time = new SystemTime();
    }

    public final String name() {
        return this.name;
    }

    public final Processor<K, V> processor() {
        return this.processor;
    }

    public List<ProcessorNode<?, ?>> children() {
        return this.children;
    }

    ProcessorNode getChild(String childName) {
        return this.childByName.get(childName);
    }

    public void addChild(ProcessorNode<?, ?> child) {
        this.children.add(child);
        this.childByName.put(child.name, child);
    }

    public void init(InternalProcessorContext context) {
        try {
            this.internalProcessorContext = context;
            this.initSensors();
            StreamsMetricsImpl.maybeMeasureLatency(() -> {
                if (this.processor != null) {
                    this.processor.init(context);
                }
            }, this.time, this.createSensor);
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to initialize processor %s", this.name), e);
        }
    }

    private void initSensors() {
        this.threadId = Thread.currentThread().getName();
        String taskId = this.internalProcessorContext.taskId().toString();
        StreamsMetricsImpl streamsMetrics = this.internalProcessorContext.metrics();
        this.processSensor = ProcessorNodeMetrics.processSensor(this.threadId, taskId, this.name, streamsMetrics);
        this.punctuateSensor = ProcessorNodeMetrics.punctuateSensor(this.threadId, taskId, this.name, streamsMetrics);
        this.createSensor = ProcessorNodeMetrics.createSensor(this.threadId, taskId, this.name, streamsMetrics);
        this.destroySensor = ProcessorNodeMetrics.destroySensor(this.threadId, taskId, this.name, streamsMetrics);
    }

    public void close() {
        try {
            StreamsMetricsImpl.maybeMeasureLatency(() -> {
                if (this.processor != null) {
                    this.processor.close();
                }
            }, this.time, this.destroySensor);
            this.internalProcessorContext.metrics().removeAllNodeLevelSensors(this.threadId, this.internalProcessorContext.taskId().toString(), this.name);
        }
        catch (Exception e) {
            throw new StreamsException(String.format("failed to close processor %s", this.name), e);
        }
    }

    public void process(K key, V value) {
        try {
            StreamsMetricsImpl.maybeMeasureLatency(() -> this.processor.process(key, value), this.time, this.processSensor);
        }
        catch (ClassCastException e) {
            String keyClass = key == null ? "unknown because key is null" : key.getClass().getName();
            String valueClass = value == null ? "unknown because value is null" : value.getClass().getName();
            throw new StreamsException(String.format("ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: %s, and value: %s.%nNote that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.", keyClass, valueClass), e);
        }
    }

    public void punctuate(long timestamp, Punctuator punctuator) {
        StreamsMetricsImpl.maybeMeasureLatency(() -> punctuator.punctuate(timestamp), this.time, this.punctuateSensor);
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder sb = new StringBuilder(indent + this.name + ":\n");
        if (this.stateStores != null && !this.stateStores.isEmpty()) {
            sb.append(indent).append("\tstates:\t\t[");
            for (String store : this.stateStores) {
                sb.append(store);
                sb.append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }
}

