/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;

class ActiveTaskCreator {
    private final InternalTopologyBuilder builder;
    private final StreamsConfig config;
    private final StreamsMetricsImpl streamsMetrics;
    private final StateDirectory stateDirectory;
    private final ChangelogReader storeChangelogReader;
    private final ThreadCache cache;
    private final Time time;
    private final KafkaClientSupplier clientSupplier;
    private final String threadId;
    private final Logger log;
    private final Sensor createTaskSensor;
    private final StreamsProducer threadProducer;
    private final Map<TaskId, StreamsProducer> taskProducers;
    private final StreamThread.ProcessingMode processingMode;

    ActiveTaskCreator(InternalTopologyBuilder builder, StreamsConfig config, StreamsMetricsImpl streamsMetrics, StateDirectory stateDirectory, ChangelogReader storeChangelogReader, ThreadCache cache, Time time, KafkaClientSupplier clientSupplier, String threadId, UUID processId, Logger log) {
        this.builder = builder;
        this.config = config;
        this.streamsMetrics = streamsMetrics;
        this.stateDirectory = stateDirectory;
        this.storeChangelogReader = storeChangelogReader;
        this.cache = cache;
        this.time = time;
        this.clientSupplier = clientSupplier;
        this.threadId = threadId;
        this.log = log;
        this.createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics);
        this.processingMode = StreamThread.processingMode(config);
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            this.threadProducer = null;
            this.taskProducers = new HashMap<TaskId, StreamsProducer>();
        } else {
            log.info("Creating thread producer client");
            String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
            LogContext logContext = new LogContext(threadIdPrefix);
            this.threadProducer = new StreamsProducer(config, threadId, clientSupplier, null, processId, logContext);
            this.taskProducers = Collections.emptyMap();
        }
    }

    public void reInitializeThreadProducer() {
        this.threadProducer.resetProducer();
    }

    StreamsProducer streamsProducerForTask(TaskId taskId) {
        if (this.processingMode != StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            throw new IllegalStateException("Expected EXACTLY_ONCE to be enabled, but the processing mode was " + (Object)((Object)this.processingMode));
        }
        StreamsProducer taskProducer = this.taskProducers.get(taskId);
        if (taskProducer == null) {
            throw new IllegalStateException("Unknown TaskId: " + taskId);
        }
        return taskProducer;
    }

    StreamsProducer threadProducer() {
        if (this.processingMode != StreamThread.ProcessingMode.EXACTLY_ONCE_V2) {
            throw new IllegalStateException("Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was " + (Object)((Object)this.processingMode));
        }
        return this.threadProducer;
    }

    Collection<Task> createTasks(Consumer<byte[], byte[]> consumer, Map<TaskId, Set<TopicPartition>> tasksToBeCreated) {
        ArrayList<Task> createdTasks = new ArrayList<Task>();
        for (Map.Entry<TaskId, Set<TopicPartition>> newTaskAndPartitions : tasksToBeCreated.entrySet()) {
            TaskId taskId = newTaskAndPartitions.getKey();
            Set<TopicPartition> partitions = newTaskAndPartitions.getValue();
            LogContext logContext = this.getLogContext(taskId);
            ProcessorTopology topology = this.builder.buildSubtopology(taskId.subtopology());
            ProcessorStateManager stateManager = new ProcessorStateManager(taskId, Task.TaskType.ACTIVE, StreamThread.eosEnabled(this.config), logContext, this.stateDirectory, this.storeChangelogReader, topology.storeToChangelogTopic(), partitions);
            ProcessorContextImpl context = new ProcessorContextImpl(taskId, this.config, stateManager, this.streamsMetrics, this.cache);
            createdTasks.add(this.createActiveTask(taskId, partitions, consumer, logContext, topology, stateManager, context));
        }
        return createdTasks;
    }

    StreamTask createActiveTaskFromStandby(StandbyTask standbyTask, Set<TopicPartition> inputPartitions, Consumer<byte[], byte[]> consumer) {
        InternalProcessorContext context = standbyTask.processorContext();
        ProcessorStateManager stateManager = standbyTask.stateMgr;
        LogContext logContext = this.getLogContext(standbyTask.id);
        standbyTask.closeCleanAndRecycleState();
        stateManager.transitionTaskType(Task.TaskType.ACTIVE, logContext);
        return this.createActiveTask(standbyTask.id, inputPartitions, consumer, logContext, this.builder.buildSubtopology(standbyTask.id.subtopology()), stateManager, context);
    }

    private StreamTask createActiveTask(TaskId taskId, Set<TopicPartition> inputPartitions, Consumer<byte[], byte[]> consumer, LogContext logContext, ProcessorTopology topology, ProcessorStateManager stateManager, InternalProcessorContext context) {
        StreamsProducer streamsProducer;
        if (this.processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) {
            this.log.info("Creating producer client for task {}", (Object)taskId);
            streamsProducer = new StreamsProducer(this.config, this.threadId, this.clientSupplier, taskId, null, logContext);
            this.taskProducers.put(taskId, streamsProducer);
        } else {
            streamsProducer = this.threadProducer;
        }
        RecordCollectorImpl recordCollector = new RecordCollectorImpl(logContext, taskId, streamsProducer, this.config.defaultProductionExceptionHandler(), this.streamsMetrics);
        StreamTask task = new StreamTask(taskId, inputPartitions, topology, consumer, this.config, this.streamsMetrics, this.stateDirectory, this.cache, this.time, stateManager, recordCollector, context, logContext);
        this.log.trace("Created task {} with assigned partitions {}", (Object)taskId, inputPartitions);
        this.createTaskSensor.record();
        return task;
    }

    void closeThreadProducerIfNeeded() {
        if (this.threadProducer != null) {
            try {
                this.threadProducer.close();
            }
            catch (RuntimeException e) {
                throw new StreamsException("Thread producer encounter error trying to close.", e);
            }
        }
    }

    void closeAndRemoveTaskProducerIfNeeded(TaskId id) {
        StreamsProducer taskProducer = this.taskProducers.remove(id);
        if (taskProducer != null) {
            try {
                taskProducer.close();
            }
            catch (RuntimeException e) {
                throw new StreamsException("[" + id + "] task producer encounter error trying to close.", e);
            }
        }
    }

    Map<MetricName, Metric> producerMetrics() {
        Collection<StreamsProducer> producers = this.threadProducer != null ? Collections.singleton(this.threadProducer) : this.taskProducers.values();
        return ClientUtils.producerMetrics(producers);
    }

    Set<String> producerClientIds() {
        if (this.threadProducer != null) {
            return Collections.singleton(ClientUtils.getThreadProducerClientId(this.threadId));
        }
        return this.taskProducers.keySet().stream().map(taskId -> ClientUtils.getTaskProducerClientId(this.threadId, taskId)).collect(Collectors.toSet());
    }

    private LogContext getLogContext(TaskId taskId) {
        String threadIdPrefix = String.format("stream-thread [%s] ", Thread.currentThread().getName());
        String logPrefix = threadIdPrefix + String.format("%s [%s] ", "task", taskId);
        return new LogContext(logPrefix);
    }
}

