/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.recovery;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.records.TezDAGID;

public class RecoveryService
extends AbstractService {
    private static final Log LOG = LogFactory.getLog(RecoveryService.class);
    private final AppContext appContext;
    private LinkedBlockingQueue<DAGHistoryEvent> eventQueue = new LinkedBlockingQueue();
    private Set<TezDAGID> completedDAGs = new HashSet<TezDAGID>();
    private Thread eventHandlingThread;
    private AtomicBoolean stopped = new AtomicBoolean(false);
    private AtomicBoolean started = new AtomicBoolean(false);
    private int eventCounter = 0;
    private int eventsProcessed = 0;
    private final Object lock = new Object();
    private FileSystem recoveryDirFS;
    Path recoveryPath;
    Map<TezDAGID, FSDataOutputStream> outputStreamMap = new HashMap<TezDAGID, FSDataOutputStream>();
    private int bufferSize;
    private FSDataOutputStream summaryStream;

    public RecoveryService(AppContext appContext) {
        super(RecoveryService.class.getName());
        this.appContext = appContext;
    }

    public void serviceInit(Configuration conf) throws Exception {
        LOG.info((Object)"Initializing RecoveryService");
        this.recoveryPath = this.appContext.getCurrentRecoveryDir();
        this.recoveryDirFS = FileSystem.get((URI)this.recoveryPath.toUri(), (Configuration)conf);
        this.bufferSize = conf.getInt("tez.dag.recovery.io.buffer.size", 8192);
    }

    public void serviceStart() {
        LOG.info((Object)"Starting RecoveryService");
        this.eventHandlingThread = new Thread(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                while (!RecoveryService.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    DAGHistoryEvent event;
                    if (RecoveryService.this.eventCounter != 0 && RecoveryService.this.eventCounter % 1000 == 0) {
                        LOG.info((Object)("Event queue stats, eventsProcessedSinceLastUpdate=" + RecoveryService.this.eventsProcessed + ", eventQueueSize=" + RecoveryService.this.eventQueue.size()));
                        RecoveryService.this.eventCounter = 0;
                        RecoveryService.this.eventsProcessed = 0;
                    } else {
                        ++RecoveryService.this.eventCounter;
                    }
                    try {
                        event = (DAGHistoryEvent)((Object)RecoveryService.this.eventQueue.take());
                    }
                    catch (InterruptedException e) {
                        LOG.info((Object)"EventQueue take interrupted. Returning");
                        return;
                    }
                    Object object = RecoveryService.this.lock;
                    synchronized (object) {
                        try {
                            ++RecoveryService.this.eventsProcessed;
                            RecoveryService.this.handleEvent(event);
                        }
                        catch (Exception e) {
                            LOG.warn((Object)"Error handling recovery event", (Throwable)e);
                        }
                    }
                }
            }
        }, "RecoveryEventHandlingThread");
        this.eventHandlingThread.start();
        this.started.set(true);
    }

    public void serviceStop() {
        LOG.info((Object)"Stopping RecoveryService");
        this.stopped.set(true);
        if (this.eventHandlingThread != null) {
            this.eventHandlingThread.interrupt();
        }
        if (this.summaryStream != null) {
            try {
                this.summaryStream.flush();
                this.summaryStream.close();
            }
            catch (IOException ioe) {
                LOG.warn((Object)"Error when closing summary stream", (Throwable)ioe);
            }
        }
        for (FSDataOutputStream outputStream : this.outputStreamMap.values()) {
            try {
                outputStream.flush();
                outputStream.close();
            }
            catch (IOException ioe) {
                LOG.warn((Object)"Error when closing output stream", (Throwable)ioe);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void handle(DAGHistoryEvent event) {
        if (this.stopped.get()) {
            LOG.warn((Object)("Igoring event as service stopped, eventType" + (Object)((Object)event.getHistoryEvent().getEventType())));
            return;
        }
        if (!this.started.get()) {
            this.eventQueue.add(event);
            return;
        }
        HistoryEventType eventType = event.getHistoryEvent().getEventType();
        if (eventType.equals((Object)HistoryEventType.DAG_SUBMITTED) || eventType.equals((Object)HistoryEventType.DAG_FINISHED)) {
            Object object = this.lock;
            synchronized (object) {
                try {
                    this.handleEvent(event);
                    this.summaryStream.flush();
                    if (eventType.equals((Object)HistoryEventType.DAG_SUBMITTED)) {
                        this.outputStreamMap.get(event.getDagID()).flush();
                    } else if (eventType.equals((Object)HistoryEventType.DAG_FINISHED)) {
                        this.completedDAGs.add(event.getDagID());
                        if (this.outputStreamMap.containsKey(event.getDagID())) {
                            try {
                                this.outputStreamMap.get(event.getDagID()).flush();
                                this.outputStreamMap.get(event.getDagID()).close();
                                this.outputStreamMap.remove(event.getDagID());
                            }
                            catch (IOException ioe) {
                                LOG.warn((Object)("Error when trying to flush/close recovery file for dag, dagId=" + event.getDagID()));
                            }
                        }
                    }
                }
                catch (Exception e) {
                    LOG.warn((Object)"Error handling recovery event", (Throwable)e);
                }
            }
            LOG.info((Object)("DAG completed, dagId=" + event.getDagID() + ", queueSize=" + this.eventQueue.size()));
        } else {
            this.eventQueue.add(event);
        }
    }

    private void handleEvent(DAGHistoryEvent event) {
        HistoryEventType eventType = event.getHistoryEvent().getEventType();
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Handling recovery event of type " + (Object)((Object)event.getHistoryEvent().getEventType())));
        }
        if (event.getDagID() == null) {
            return;
        }
        TezDAGID dagID = event.getDagID();
        if (this.completedDAGs.contains(dagID)) {
            return;
        }
        try {
            FSDataOutputStream outputStream;
            if (eventType.equals((Object)HistoryEventType.DAG_SUBMITTED) || eventType.equals((Object)HistoryEventType.DAG_FINISHED)) {
                if (this.summaryStream == null) {
                    Path summaryPath = new Path(this.recoveryPath, this.appContext.getApplicationID() + ".summary");
                    this.summaryStream = this.recoveryDirFS.create(summaryPath, false, this.bufferSize);
                }
                if (eventType.equals((Object)HistoryEventType.DAG_SUBMITTED)) {
                    DAGSubmittedEvent dagSubmittedEvent = (DAGSubmittedEvent)event.getHistoryEvent();
                    String dagName = dagSubmittedEvent.getDAGName();
                    if (dagName != null && dagName.startsWith("TezPreWarmDAG")) {
                        return;
                    }
                    Path dagFilePath = new Path(this.recoveryPath, dagID.toString() + ".recovery");
                    FSDataOutputStream outputStream2 = this.recoveryDirFS.create(dagFilePath, false, this.bufferSize);
                    this.outputStreamMap.put(dagID, outputStream2);
                }
                if (this.outputStreamMap.containsKey(dagID)) {
                    SummaryEvent summaryEvent = (SummaryEvent)((Object)event.getHistoryEvent());
                    summaryEvent.toSummaryProtoStream((OutputStream)this.summaryStream);
                }
            }
            if ((outputStream = this.outputStreamMap.get(dagID)) == null) {
                return;
            }
            outputStream.write(event.getHistoryEvent().getEventType().ordinal());
            event.getHistoryEvent().toProtoStream((OutputStream)outputStream);
        }
        catch (IOException ioe) {
            LOG.warn((Object)"Failed to write to stream", (Throwable)ioe);
        }
    }
}

