/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AssignedStreamsTasks;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

abstract class AssignedTasks<T extends Task> {
    final Logger log;
    private final String taskTypeName;
    private final Map<TaskId, T> created = new HashMap<TaskId, T>();
    private final Map<TaskId, T> suspended = new HashMap<TaskId, T>();
    private final Set<TaskId> previousActiveTasks = new HashSet<TaskId>();
    final Map<TaskId, T> running = new ConcurrentHashMap<TaskId, T>();
    private final Map<TopicPartition, T> runningByPartition = new HashMap<TopicPartition, T>();

    AssignedTasks(LogContext logContext, String taskTypeName) {
        this.taskTypeName = taskTypeName;
        this.log = logContext.logger(this.getClass());
    }

    void addNewTask(T task) {
        this.created.put(task.id(), task);
    }

    void initializeNewTasks() {
        if (!this.created.isEmpty()) {
            this.log.debug("Initializing {}s {}", (Object)this.taskTypeName, this.created.keySet());
        }
        Iterator<Map.Entry<TaskId, T>> it = this.created.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, T> entry = it.next();
            try {
                if (!((Task)entry.getValue()).initializeStateStores()) {
                    this.log.debug("Transitioning {} {} to restoring", (Object)this.taskTypeName, (Object)entry.getKey());
                    ((AssignedStreamsTasks)this).addToRestoring((StreamTask)entry.getValue());
                } else {
                    this.transitionToRunning((Task)entry.getValue());
                }
                it.remove();
            }
            catch (LockException e) {
                this.log.trace("Could not create {} {} due to {}; will retry", new Object[]{this.taskTypeName, entry.getKey(), e.toString()});
            }
        }
    }

    boolean allTasksRunning() {
        return this.created.isEmpty() && this.suspended.isEmpty();
    }

    Collection<T> running() {
        return this.running.values();
    }

    RuntimeException suspend() {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        this.log.trace("Suspending running {} {}", (Object)this.taskTypeName, this.runningTaskIds());
        firstException.compareAndSet(null, this.suspendTasks(this.running.values()));
        this.log.trace("Close created {} {}", (Object)this.taskTypeName, this.created.keySet());
        firstException.compareAndSet(null, this.closeNonRunningTasks(this.created.values()));
        this.previousActiveTasks.clear();
        this.previousActiveTasks.addAll(this.running.keySet());
        this.running.clear();
        this.created.clear();
        this.runningByPartition.clear();
        return firstException.get();
    }

    private RuntimeException closeNonRunningTasks(Collection<T> tasks) {
        RuntimeException exception = null;
        for (Task task : tasks) {
            try {
                task.close(false, false);
            }
            catch (RuntimeException e) {
                this.log.error("Failed to close {}, {}", new Object[]{this.taskTypeName, task.id(), e});
                if (exception != null) continue;
                exception = e;
            }
        }
        return exception;
    }

    private RuntimeException suspendTasks(Collection<T> tasks) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        Iterator<T> it = tasks.iterator();
        while (it.hasNext()) {
            Task task = (Task)it.next();
            try {
                task.suspend();
                this.suspended.put(task.id(), task);
            }
            catch (TaskMigratedException closeAsZombieAndSwallow) {
                this.log.info("Failed to suspend {} {} since it got migrated to another thread already. Closing it as zombie and move on.", (Object)this.taskTypeName, (Object)task.id());
                firstException.compareAndSet(null, this.closeZombieTask(task));
                it.remove();
            }
            catch (RuntimeException e) {
                this.log.error("Suspending {} {} failed due to the following error:", new Object[]{this.taskTypeName, task.id(), e});
                firstException.compareAndSet(null, e);
                try {
                    task.close(false, false);
                }
                catch (RuntimeException f) {
                    this.log.error("After suspending failed, closing the same {} {} failed again due to the following error:", new Object[]{this.taskTypeName, task.id(), f});
                }
            }
        }
        return firstException.get();
    }

    RuntimeException closeZombieTask(T task) {
        try {
            task.close(false, true);
        }
        catch (RuntimeException e) {
            this.log.warn("Failed to close zombie {} {} due to {}; ignore and proceed.", new Object[]{this.taskTypeName, task.id(), e.toString()});
            return e;
        }
        return null;
    }

    boolean hasRunningTasks() {
        return !this.running.isEmpty();
    }

    boolean maybeResumeSuspendedTask(TaskId taskId, Set<TopicPartition> partitions) {
        if (this.suspended.containsKey(taskId)) {
            Task task = (Task)this.suspended.get(taskId);
            this.log.trace("Found suspended {} {}", (Object)this.taskTypeName, (Object)taskId);
            if (task.partitions().equals(partitions)) {
                this.suspended.remove(taskId);
                task.resume();
                try {
                    this.transitionToRunning(task);
                }
                catch (TaskMigratedException e) {
                    this.log.info("Failed to resume {} {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)this.taskTypeName, (Object)task.id());
                    RuntimeException fatalException = this.closeZombieTask(task);
                    this.running.remove(task.id());
                    if (fatalException != null) {
                        throw fatalException;
                    }
                    throw e;
                }
                this.log.trace("Resuming suspended {} {}", (Object)this.taskTypeName, (Object)task.id());
                return true;
            }
            this.log.warn("Couldn't resume task {} assigned partitions {}, task partitions {}", new Object[]{taskId, partitions, task.partitions()});
        }
        return false;
    }

    void transitionToRunning(T task) {
        this.log.debug("Transitioning {} {} to running", (Object)this.taskTypeName, (Object)task.id());
        this.running.put(task.id(), task);
        task.initializeTopology();
        for (TopicPartition topicPartition : task.partitions()) {
            this.runningByPartition.put(topicPartition, task);
        }
        for (TopicPartition topicPartition : task.changelogPartitions()) {
            this.runningByPartition.put(topicPartition, task);
        }
    }

    T runningTaskFor(TopicPartition partition) {
        return (T)((Task)this.runningByPartition.get(partition));
    }

    Set<TaskId> runningTaskIds() {
        return this.running.keySet();
    }

    Map<TaskId, T> runningTaskMap() {
        return Collections.unmodifiableMap(this.running);
    }

    public String toString() {
        return this.toString("");
    }

    public String toString(String indent) {
        StringBuilder builder = new StringBuilder();
        this.describe(builder, this.running.values(), indent, "Running:");
        this.describe(builder, this.suspended.values(), indent, "Suspended:");
        this.describe(builder, this.created.values(), indent, "New:");
        return builder.toString();
    }

    void describe(StringBuilder builder, Collection<T> tasks, String indent, String name) {
        builder.append(indent).append(name);
        for (Task t : tasks) {
            builder.append(indent).append(t.toString(indent + "\t\t"));
        }
        builder.append("\n");
    }

    List<T> allTasks() {
        ArrayList<T> tasks = new ArrayList<T>();
        tasks.addAll(this.running.values());
        tasks.addAll(this.suspended.values());
        tasks.addAll(this.created.values());
        return tasks;
    }

    Set<TaskId> allAssignedTaskIds() {
        HashSet<TaskId> taskIds = new HashSet<TaskId>();
        taskIds.addAll(this.running.keySet());
        taskIds.addAll(this.suspended.keySet());
        taskIds.addAll(this.created.keySet());
        return taskIds;
    }

    void clear() {
        this.runningByPartition.clear();
        this.running.clear();
        this.created.clear();
        this.suspended.clear();
    }

    Set<TaskId> previousTaskIds() {
        return this.previousActiveTasks;
    }

    int commit() {
        int committed = 0;
        RuntimeException firstException = null;
        Iterator<T> it = this.running().iterator();
        while (it.hasNext()) {
            Task task = (Task)it.next();
            try {
                if (!task.commitNeeded()) continue;
                task.commit();
                ++committed;
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to commit {} {} since it got migrated to another thread already. Closing it as zombie before triggering a new rebalance.", (Object)this.taskTypeName, (Object)task.id());
                RuntimeException fatalException = this.closeZombieTask(task);
                if (fatalException != null) {
                    throw fatalException;
                }
                it.remove();
                throw e;
            }
            catch (RuntimeException t) {
                this.log.error("Failed to commit {} {} due to the following error:", new Object[]{this.taskTypeName, task.id(), t});
                if (firstException != null) continue;
                firstException = t;
            }
        }
        if (firstException != null) {
            throw firstException;
        }
        return committed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void closeNonAssignedSuspendedTasks(Map<TaskId, Set<TopicPartition>> newAssignment) {
        Iterator<T> standByTaskIterator = this.suspended.values().iterator();
        while (standByTaskIterator.hasNext()) {
            Task suspendedTask = (Task)standByTaskIterator.next();
            if (newAssignment.containsKey(suspendedTask.id()) && suspendedTask.partitions().equals(newAssignment.get(suspendedTask.id()))) continue;
            this.log.debug("Closing suspended and not re-assigned {} {}", (Object)this.taskTypeName, (Object)suspendedTask.id());
            try {
                suspendedTask.closeSuspended(true, false, null);
            }
            catch (Exception e) {
                this.log.error("Failed to remove suspended {} {} due to the following error:", new Object[]{this.taskTypeName, suspendedTask.id(), e});
            }
            finally {
                standByTaskIterator.remove();
            }
        }
    }

    void close(boolean clean) {
        AtomicReference<Object> firstException = new AtomicReference<Object>(null);
        for (Task task : this.allTasks()) {
            try {
                if (this.suspended.containsKey(task.id())) {
                    task.closeSuspended(clean, false, null);
                    continue;
                }
                task.close(clean, false);
            }
            catch (TaskMigratedException e) {
                this.log.info("Failed to close {} {} since it got migrated to another thread already. Closing it as zombie and move on.", (Object)this.taskTypeName, (Object)task.id());
                firstException.compareAndSet(null, this.closeZombieTask(task));
            }
            catch (RuntimeException t) {
                this.log.error("Failed while closing {} {} due to the following error:", new Object[]{task.getClass().getSimpleName(), task.id(), t});
                if (clean) {
                    if (this.closeUnclean(task)) continue;
                    firstException.compareAndSet(null, t);
                    continue;
                }
                firstException.compareAndSet(null, t);
            }
        }
        this.clear();
        RuntimeException fatalException = firstException.get();
        if (fatalException != null) {
            throw fatalException;
        }
    }

    private boolean closeUnclean(T task) {
        this.log.info("Try to close {} {} unclean.", (Object)task.getClass().getSimpleName(), (Object)task.id());
        try {
            task.close(false, false);
        }
        catch (RuntimeException fatalException) {
            this.log.error("Failed while closing {} {} due to the following error:", new Object[]{task.getClass().getSimpleName(), task.id(), fatalException});
            return false;
        }
        return true;
    }
}

