/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.CapacitySchedulerConf;
import org.apache.hadoop.mapred.CapacitySchedulerQueue;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.JobInProgress;
import org.apache.hadoop.mapred.JobQueueJobInProgressListener;
import org.apache.hadoop.mapred.JobQueuesManager;
import org.apache.hadoop.mapred.TaskTrackerManager;
import org.apache.hadoop.util.StringUtils;

public class JobInitializationPoller
extends Thread {
    private static final Log LOG = LogFactory.getLog((String)JobInitializationPoller.class.getName());
    private JobQueuesManager jobQueueManager;
    private long sleepInterval;
    private int poolSize;
    private HashMap<JobID, JobInProgress> initializedJobs = new HashMap();
    private volatile boolean running;
    private TaskTrackerManager ttm;
    private Map<String, JobInitializationThread> threadsToQueueMap;

    public JobInitializationPoller(JobQueuesManager mgr, CapacitySchedulerConf rmConf, Set<String> queue, TaskTrackerManager ttm) {
        this.jobQueueManager = mgr;
        this.threadsToQueueMap = Collections.synchronizedMap(new HashMap());
        super.setName("JobInitializationPollerThread");
        this.running = true;
        this.ttm = ttm;
    }

    void setTaskTrackerManager(TaskTrackerManager ttm) {
        this.ttm = ttm;
    }

    void init(int numQueues, CapacitySchedulerConf capacityConf) {
        this.sleepInterval = capacityConf.getSleepInterval();
        this.poolSize = Math.min(capacityConf.getMaxWorkerThreads(), numQueues);
        this.assignThreadsToQueues();
        Collection<JobInitializationThread> threads = this.threadsToQueueMap.values();
        for (JobInitializationThread t : threads) {
            if (t.isAlive()) continue;
            t.setDaemon(true);
            t.start();
        }
    }

    void reinit(Set<String> queues) {
        Set<String> oldQueues = this.threadsToQueueMap.keySet();
        int i = 0;
        JobInitializationThread[] threads = this.threadsToQueueMap.values().toArray(new JobInitializationThread[0]);
        for (String newQueue : queues) {
            if (oldQueues.contains(newQueue)) continue;
            JobInitializationThread t = threads[i++ % threads.length];
            t.addQueue(newQueue);
            this.threadsToQueueMap.put(newQueue, t);
        }
    }

    @Override
    public void run() {
        while (this.running) {
            try {
                this.cleanUpInitializedJobsList();
                this.selectJobsToInitialize();
                if (this.isInterrupted()) continue;
                Thread.sleep(this.sleepInterval);
            }
            catch (InterruptedException e) {
                LOG.error((Object)("Job Initialization poller interrupted" + StringUtils.stringifyException((Throwable)e)));
            }
        }
    }

    void selectJobsToInitialize() {
        for (String queue : this.jobQueueManager.getAllQueues()) {
            ArrayList<JobInProgress> jobsToInitialize = this.getJobsToInitialize(queue);
            this.printJobs(jobsToInitialize);
            JobInitializationThread t = this.threadsToQueueMap.get(queue);
            for (JobInProgress job : jobsToInitialize) {
                t.addJobsToQueue(queue, job);
            }
        }
    }

    private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
        for (JobInProgress job : jobsToInitialize) {
            LOG.info((Object)("Passing to Initializer Job Id :" + job.getJobID() + " User: " + job.getProfile().getUser() + " Queue : " + job.getProfile().getQueueName()));
        }
    }

    JobInitializationThread createJobInitializationThread() {
        return new JobInitializationThread();
    }

    private void assignThreadsToQueues() {
        Collection<String> queueNames = this.jobQueueManager.getAllQueues();
        int countOfQueues = queueNames.size();
        String[] queues = queueNames.toArray(new String[countOfQueues]);
        int numberOfQueuesPerThread = countOfQueues / this.poolSize;
        int numberOfQueuesAssigned = 0;
        for (int i = 0; i < this.poolSize; ++i) {
            int batch;
            JobInitializationThread initializer = this.createJobInitializationThread();
            for (int j = batch = i * numberOfQueuesPerThread; j < batch + numberOfQueuesPerThread; ++j) {
                initializer.addQueue(queues[j]);
                this.threadsToQueueMap.put(queues[j], initializer);
                ++numberOfQueuesAssigned;
            }
        }
        if (numberOfQueuesAssigned < countOfQueues) {
            int startIndex = 0;
            for (int i = numberOfQueuesAssigned; i < countOfQueues; ++i) {
                JobInitializationThread t = this.threadsToQueueMap.get(queues[startIndex]);
                t.addQueue(queues[i]);
                this.threadsToQueueMap.put(queues[i], t);
                ++startIndex;
            }
        }
    }

    ArrayList<JobInProgress> getJobsToInitialize(String queueName) {
        CapacitySchedulerQueue queue = this.jobQueueManager.getQueue(queueName);
        ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
        HashSet<String> usersOverLimit = new HashSet<String>();
        Collection<JobInProgress> jobs = queue.getWaitingJobs();
        for (JobInProgress job : jobs) {
            String user = job.getProfile().getUser();
            if (this.initializedJobs.containsKey(job.getJobID())) continue;
            if (!queue.initializeJobForQueue(job)) break;
            if (usersOverLimit.contains(user)) continue;
            if (!queue.initializeJobForUser(job)) {
                usersOverLimit.add(user);
                continue;
            }
            if (job.getStatus().getRunState() != 4) continue;
            this.initializedJobs.put(job.getJobID(), job);
            jobsToInitialize.add(job);
            queue.addInitializingJob(job);
        }
        return jobsToInitialize;
    }

    void cleanUpInitializedJobsList() {
        Iterator<Map.Entry<JobID, JobInProgress>> jobsIterator = this.initializedJobs.entrySet().iterator();
        while (jobsIterator.hasNext()) {
            Map.Entry<JobID, JobInProgress> entry = jobsIterator.next();
            JobInProgress job = entry.getValue();
            if (job.getStatus().getRunState() == 1 && this.isScheduled(job)) {
                LOG.info((Object)("Removing scheduled jobs from waiting queue" + job.getJobID()));
                jobsIterator.remove();
                continue;
            }
            if (!job.isComplete()) continue;
            LOG.info((Object)("Removing killed/completed job from initalized jobs list : " + job.getJobID()));
            jobsIterator.remove();
        }
    }

    private boolean isScheduled(JobInProgress job) {
        return job.pendingMaps() < job.desiredMaps() || job.pendingReduces() < job.desiredReduces();
    }

    void terminate() {
        this.running = false;
        for (Map.Entry<String, JobInitializationThread> entry : this.threadsToQueueMap.entrySet()) {
            JobInitializationThread t = entry.getValue();
            if (!t.isAlive()) continue;
            t.terminate();
            t.interrupt();
        }
    }

    JobInProgress getInitializingJob(String queue) {
        JobInitializationThread t = this.threadsToQueueMap.get(queue);
        if (t == null) {
            return null;
        }
        return t.getInitializingJob();
    }

    Set<JobID> getInitializedJobList() {
        return this.initializedJobs.keySet();
    }

    class JobInitializationThread
    extends Thread {
        private JobInProgress initializingJob;
        private volatile boolean startIniting = true;
        private AtomicInteger currentJobCount = new AtomicInteger(0);
        private Map<String, Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>> jobsPerQueue = new ConcurrentHashMap<String, Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress>>();

        @Override
        public void run() {
            while (this.startIniting) {
                this.initializeJobs();
                try {
                    if (!this.startIniting) break;
                    Thread.sleep(JobInitializationPoller.this.sleepInterval);
                }
                catch (Throwable throwable) {}
            }
        }

        void initializeJobs() {
            block0: while (this.currentJobCount.get() > 0) {
                Set<String> queues = this.jobsPerQueue.keySet();
                for (String queue : queues) {
                    JobInProgress job = this.getFirstJobInQueue(queue);
                    if (job == null) continue;
                    LOG.info((Object)("Initializing job : " + job.getJobID() + " in Queue " + job.getProfile().getQueueName() + " For user : " + job.getProfile().getUser()));
                    if (!this.startIniting) continue block0;
                    this.setInitializingJob(job);
                    JobInitializationPoller.this.ttm.initJob(job);
                    this.setInitializingJob(null);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private JobInProgress getFirstJobInQueue(String queue) {
            Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress> jobsList;
            Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress> map = jobsList = this.jobsPerQueue.get(queue);
            synchronized (map) {
                if (jobsList.isEmpty()) {
                    return null;
                }
                Iterator<JobInProgress> jobIterator = jobsList.values().iterator();
                JobInProgress job = jobIterator.next();
                jobIterator.remove();
                this.currentJobCount.getAndDecrement();
                return job;
            }
        }

        synchronized JobInProgress getInitializingJob() {
            return this.initializingJob;
        }

        synchronized void setInitializingJob(JobInProgress job) {
            this.initializingJob = job;
        }

        void terminate() {
            this.startIniting = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void addJobsToQueue(String queue, JobInProgress job) {
            Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress> jobs = this.jobsPerQueue.get(queue);
            if (jobs == null) {
                LOG.error((Object)("Invalid queue passed to the thread : " + queue + " For job :: " + job.getJobID()));
            }
            Map<JobQueueJobInProgressListener.JobSchedulingInfo, JobInProgress> map = jobs;
            synchronized (map) {
                JobQueueJobInProgressListener.JobSchedulingInfo schedInfo = new JobQueueJobInProgressListener.JobSchedulingInfo(job);
                jobs.put(schedInfo, job);
                this.currentJobCount.getAndIncrement();
            }
        }

        void addQueue(String queueName) {
            CapacitySchedulerQueue queue = JobInitializationPoller.this.jobQueueManager.getQueue(queueName);
            TreeMap jobs = new TreeMap(queue.getComparator());
            this.jobsPerQueue.put(queueName, jobs);
        }
    }
}

