/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.impl.TezRootInputInitializerContextImpl;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;

public class RootInputInitializerManager {
    private static final Log LOG = LogFactory.getLog(RootInputInitializerManager.class);
    private final ExecutorService rawExecutor;
    private final ListeningExecutorService executor;
    private final EventHandler eventHandler;
    private volatile boolean isStopped = false;
    private final UserGroupInformation dagUgi;
    private final Vertex vertex;
    private final AppContext appContext;
    private final Map<String, InitializerWrapper> initializerMap = new HashMap<String, InitializerWrapper>();

    public RootInputInitializerManager(Vertex vertex, AppContext appContext, UserGroupInformation dagUgi) {
        this.appContext = appContext;
        this.vertex = vertex;
        this.eventHandler = appContext.getEventHandler();
        this.rawExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("InputInitializer [" + this.vertex.getName() + "] #%d").build());
        this.executor = MoreExecutors.listeningDecorator((ExecutorService)this.rawExecutor);
        this.dagUgi = dagUgi;
    }

    public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) {
        for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input : inputs) {
            TezRootInputInitializerContextImpl context = new TezRootInputInitializerContextImpl(input, this.vertex, this.appContext);
            InputInitializer initializer = this.createInitializer(input, context);
            InitializerWrapper initializerWrapper = new InitializerWrapper(input, initializer, context, this.vertex);
            this.initializerMap.put(input.getName(), initializerWrapper);
            ListenableFuture future = this.executor.submit((Callable)new InputInitializerCallable(initializerWrapper, this.dagUgi));
            Futures.addCallback((ListenableFuture)future, (FutureCallback)this.createInputInitializerCallback(initializerWrapper));
        }
    }

    @VisibleForTesting
    protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input, InputInitializerContext context) {
        InputInitializer initializer = (InputInitializer)ReflectionUtils.createClazzInstance((String)((InputInitializerDescriptor)input.getControllerDescriptor()).getClassName(), (Class[])new Class[]{InputInitializerContext.class}, (Object[])new Object[]{context});
        return initializer;
    }

    public void handleInitializerEvent(InputInitializerEvent event) {
        Preconditions.checkState((boolean)this.vertex.getName().equals(event.getTargetVertexName()), (Object)"Received event for incorrect vertex");
        Preconditions.checkNotNull((Object)event.getTargetInputName(), (Object)"target input name must be set");
        InitializerWrapper initializer = this.initializerMap.get(event.getTargetInputName());
        Preconditions.checkState((initializer != null ? 1 : 0) != 0, (Object)("Received event for unknown input : " + event.getTargetInputName()));
        if (this.isStopped) {
            LOG.warn((Object)("InitializerManager already stopped for " + this.vertex.getLogIdentifier() + " Dropping event. [" + event + "]"));
            return;
        }
        if (initializer.isComplete()) {
            LOG.warn((Object)("Event targeted at vertex " + this.vertex.getLogIdentifier() + ", initializerWrapper for Input: " + initializer.getInput().getName() + " will be dropped, since Input has already been initialized. [" + event + "]"));
        }
        try {
            initializer.getInitializer().handleInputInitializerEvent((List)Lists.newArrayList((Object[])new InputInitializerEvent[]{event}));
        }
        catch (Exception e) {
            throw new TezUncheckedException("Initializer for input: " + event.getTargetInputName() + " failed to process event", (Throwable)e);
        }
    }

    @VisibleForTesting
    protected InputInitializerCallback createInputInitializerCallback(InitializerWrapper initializer) {
        return new InputInitializerCallback(initializer, this.eventHandler, this.vertex.getVertexId());
    }

    public void shutdown() {
        if (this.executor != null && !this.isStopped) {
            this.executor.shutdownNow();
            this.isStopped = true;
        }
    }

    private static class InitializerWrapper {
        private final RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input;
        private final InputInitializer initializer;
        private final InputInitializerContext context;
        private final AtomicBoolean isComplete = new AtomicBoolean(false);
        private final String vertexLogIdentifier;

        InitializerWrapper(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input, InputInitializer initializer, InputInitializerContext context, Vertex vertex) {
            this.input = input;
            this.initializer = initializer;
            this.context = context;
            this.vertexLogIdentifier = vertex.getLogIdentifier();
        }

        public RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> getInput() {
            return this.input;
        }

        public InputInitializer getInitializer() {
            return this.initializer;
        }

        public String getVertexLogIdentifier() {
            return this.vertexLogIdentifier;
        }

        public boolean isComplete() {
            return this.isComplete.get();
        }

        public void setComplete() {
            this.isComplete.set(true);
        }
    }

    @VisibleForTesting
    private static class InputInitializerCallback
    implements FutureCallback<List<org.apache.tez.runtime.api.Event>> {
        private final InitializerWrapper initializer;
        private final EventHandler eventHandler;
        private final TezVertexID vertexID;

        public InputInitializerCallback(InitializerWrapper initializer, EventHandler eventHandler, TezVertexID vertexID) {
            this.initializer = initializer;
            this.eventHandler = eventHandler;
            this.vertexID = vertexID;
        }

        public void onSuccess(List<org.apache.tez.runtime.api.Event> result) {
            this.initializer.setComplete();
            LOG.info((Object)("Succeeded InputInitializer for Input: " + this.initializer.getInput().getName() + " on vertex " + this.initializer.getVertexLogIdentifier()));
            this.eventHandler.handle((Event)new VertexEventRootInputInitialized(this.vertexID, this.initializer.getInput().getName(), result));
        }

        public void onFailure(Throwable t) {
            this.initializer.setComplete();
            LOG.info((Object)("Failed InputInitializer for Input: " + this.initializer.getInput().getName() + " on vertex " + this.initializer.getVertexLogIdentifier()));
            this.eventHandler.handle((Event)new VertexEventRootInputFailed(this.vertexID, this.initializer.getInput().getName(), t));
        }
    }

    private static class InputInitializerCallable
    implements Callable<List<org.apache.tez.runtime.api.Event>> {
        private final InitializerWrapper initializerWrapper;
        private final UserGroupInformation ugi;

        public InputInitializerCallable(InitializerWrapper initializer, UserGroupInformation ugi) {
            this.initializerWrapper = initializer;
            this.ugi = ugi;
        }

        @Override
        public List<org.apache.tez.runtime.api.Event> call() throws Exception {
            List events = (List)this.ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<List<org.apache.tez.runtime.api.Event>>(){

                @Override
                public List<org.apache.tez.runtime.api.Event> run() throws Exception {
                    LOG.info((Object)("Starting InputInitializer for Input: " + InputInitializerCallable.this.initializerWrapper.getInput().getName() + " on vertex " + InputInitializerCallable.this.initializerWrapper.getVertexLogIdentifier()));
                    return InputInitializerCallable.this.initializerWrapper.getInitializer().initialize();
                }
            });
            return events;
        }
    }
}

