/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskEventAddTezEvent;
import org.apache.tez.dag.app.dag.impl.BroadcastEdgeManager;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;

public class Edge {
    private EdgeProperty edgeProperty;
    private EdgeManagerContext edgeManagerContext;
    private EdgeManager edgeManager;
    private EventHandler eventHandler;
    private AtomicBoolean bufferEvents = new AtomicBoolean(false);
    private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
    private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
    private Vertex sourceVertex;
    private Vertex destinationVertex;
    private EventMetaData destinationMetaInfo;

    public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
        this.edgeProperty = edgeProperty;
        this.eventHandler = eventHandler;
        this.createEdgeManager();
    }

    private void createEdgeManager() {
        switch (this.edgeProperty.getDataMovementType()) {
            case ONE_TO_ONE: {
                this.edgeManager = new OneToOneEdgeManager();
                break;
            }
            case BROADCAST: {
                this.edgeManager = new BroadcastEdgeManager();
                break;
            }
            case SCATTER_GATHER: {
                this.edgeManager = new ScatterGatherEdgeManager();
                break;
            }
            case CUSTOM: {
                String edgeManagerClassName = this.edgeProperty.getEdgeManagerDescriptor().getClassName();
                this.edgeManager = (EdgeManager)RuntimeUtils.createClazzInstance((String)edgeManagerClassName);
                break;
            }
            default: {
                String message = "Unknown edge data movement type: " + this.edgeProperty.getDataMovementType();
                throw new TezUncheckedException(message);
            }
        }
    }

    public void initialize() {
        byte[] bb = null;
        if (this.edgeProperty.getDataMovementType() == EdgeProperty.DataMovementType.CUSTOM) {
            bb = this.edgeProperty.getEdgeManagerDescriptor().getUserPayload();
        }
        this.edgeManagerContext = new EdgeManagerContextImpl(this.sourceVertex.getName(), this.destinationVertex.getName(), bb);
        this.edgeManager.initialize(this.edgeManagerContext);
        this.destinationMetaInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, this.destinationVertex.getName(), this.sourceVertex.getName(), null);
    }

    public synchronized void setCustomEdgeManager(EdgeManagerDescriptor descriptor) {
        EdgeProperty modifiedEdgeProperty;
        this.edgeProperty = modifiedEdgeProperty = new EdgeProperty(descriptor, this.edgeProperty.getDataSourceType(), this.edgeProperty.getSchedulingType(), this.edgeProperty.getEdgeSource(), this.edgeProperty.getEdgeDestination());
        this.createEdgeManager();
        this.initialize();
    }

    public EdgeProperty getEdgeProperty() {
        return this.edgeProperty;
    }

    public EdgeManager getEdgeManager() {
        return this.edgeManager;
    }

    public void setSourceVertex(Vertex sourceVertex) {
        if (this.sourceVertex != null && this.sourceVertex != sourceVertex) {
            throw new TezUncheckedException("Source vertex exists: " + sourceVertex.getName());
        }
        this.sourceVertex = sourceVertex;
    }

    public void setDestinationVertex(Vertex destinationVertex) {
        if (this.destinationVertex != null && this.destinationVertex != destinationVertex) {
            throw new TezUncheckedException("Destination vertex exists: " + destinationVertex.getName());
        }
        this.destinationVertex = destinationVertex;
    }

    public InputSpec getDestinationSpec(int destinationTaskIndex) {
        return new InputSpec(this.sourceVertex.getName(), this.edgeProperty.getEdgeDestination(), this.edgeManager.getNumDestinationTaskPhysicalInputs(this.sourceVertex.getTotalTasks(), destinationTaskIndex));
    }

    public OutputSpec getSourceSpec(int sourceTaskIndex) {
        return new OutputSpec(this.destinationVertex.getName(), this.edgeProperty.getEdgeSource(), this.edgeManager.getNumSourceTaskPhysicalOutputs(this.destinationVertex.getTotalTasks(), sourceTaskIndex));
    }

    public void startEventBuffering() {
        this.bufferEvents.set(true);
    }

    public void stopEventBuffering() {
        this.bufferEvents.set(false);
        for (TezEvent event : this.destinationEventBuffer) {
            this.sendTezEventToDestinationTasks(event);
        }
        this.destinationEventBuffer.clear();
        for (TezEvent event : this.sourceEventBuffer) {
            this.sendTezEventToSourceTasks(event);
        }
        this.sourceEventBuffer.clear();
    }

    public void sendTezEventToSourceTasks(TezEvent tezEvent) {
        if (!this.bufferEvents.get()) {
            switch (tezEvent.getEventType()) {
                case INPUT_READ_ERROR_EVENT: {
                    InputReadErrorEvent event = (InputReadErrorEvent)tezEvent.getEvent();
                    TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
                    int destTaskIndex = destAttemptId.getTaskID().getId();
                    int srcTaskIndex = this.edgeManager.routeInputErrorEventToSource(event, destTaskIndex);
                    int numConsumers = this.edgeManager.getNumDestinationConsumerTasks(srcTaskIndex, this.destinationVertex.getTotalTasks());
                    Task srcTask = this.sourceVertex.getTask(srcTaskIndex);
                    if (srcTask == null) {
                        throw new TezUncheckedException("Unexpected null task. sourceVertex=" + this.sourceVertex.getVertexId() + " srcIndex = " + srcTaskIndex + " destAttemptId=" + destAttemptId + " destIndex=" + destTaskIndex + " edgeManager=" + this.edgeManager.getClass().getName());
                    }
                    TezTaskID srcTaskId = srcTask.getTaskId();
                    int taskAttemptIndex = event.getVersion();
                    TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)srcTaskId, (int)taskAttemptIndex);
                    this.eventHandler.handle((Event)new TaskAttemptEventOutputFailed(srcTaskAttemptId, tezEvent, numConsumers));
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                }
            }
        } else {
            this.sourceEventBuffer.add(tezEvent);
        }
    }

    private void handleCompositeDataMovementEvent(TezEvent tezEvent) {
        CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent)tezEvent.getEvent();
        EventMetaData srcInfo = tezEvent.getSourceInfo();
        for (DataMovementEvent dmEvent : compEvent.getEvents()) {
            TezEvent newEvent = new TezEvent((org.apache.tez.runtime.api.Event)dmEvent, srcInfo);
            this.sendTezEventToDestinationTasks(newEvent);
        }
    }

    void sendDmEventOrIfEventToTasks(TezEvent tezEvent, int srcTaskIndex, boolean isDataMovementEvent, Map<Integer, List<Integer>> ifInputIndicesToTaskIndices) {
        int num = 0;
        org.apache.tez.runtime.api.Event event = tezEvent.getEvent();
        for (Map.Entry<Integer, List<Integer>> entry : ifInputIndicesToTaskIndices.entrySet()) {
            TezEvent tezEventToSend = null;
            if (++num == ifInputIndicesToTaskIndices.size()) {
                if (isDataMovementEvent) {
                    ((DataMovementEvent)event).setTargetIndex(entry.getKey().intValue());
                } else {
                    ((InputFailedEvent)event).setTargetIndex(entry.getKey().intValue());
                }
                tezEventToSend = tezEvent;
            } else {
                InputFailedEvent e;
                if (isDataMovementEvent) {
                    DataMovementEvent dmEvent = (DataMovementEvent)event;
                    e = new DataMovementEvent(dmEvent.getSourceIndex(), entry.getKey().intValue(), dmEvent.getVersion(), dmEvent.getUserPayload());
                } else {
                    InputFailedEvent ifEvent = (InputFailedEvent)event;
                    e = new InputFailedEvent(entry.getKey().intValue(), ifEvent.getVersion());
                }
                tezEventToSend = new TezEvent((org.apache.tez.runtime.api.Event)e, tezEvent.getSourceInfo());
            }
            tezEventToSend.setDestinationInfo(this.destinationMetaInfo);
            for (Integer destTaskIndex : entry.getValue()) {
                Task destTask = this.destinationVertex.getTask(destTaskIndex);
                if (destTask == null) {
                    throw new TezUncheckedException("Unexpected null task. sourceVertex=" + this.sourceVertex.getVertexId() + " srcIndex = " + srcTaskIndex + " destAttemptId=" + this.destinationVertex.getVertexId() + " destIndex=" + destTaskIndex + " edgeManager=" + this.edgeManager.getClass().getName());
                }
                TezTaskID destTaskId = destTask.getTaskId();
                this.sendEventToTask(destTaskId, tezEventToSend);
            }
        }
    }

    public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
        if (!this.bufferEvents.get()) {
            boolean isDataMovementEvent = true;
            switch (tezEvent.getEventType()) {
                case COMPOSITE_DATA_MOVEMENT_EVENT: {
                    this.handleCompositeDataMovementEvent(tezEvent);
                    break;
                }
                case INPUT_FAILED_EVENT: {
                    isDataMovementEvent = false;
                }
                case DATA_MOVEMENT_EVENT: {
                    HashMap inputIndicesToTaskIndices = Maps.newHashMap();
                    TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
                    int srcTaskIndex = srcAttemptId.getTaskID().getId();
                    if (isDataMovementEvent) {
                        DataMovementEvent dmEvent = (DataMovementEvent)tezEvent.getEvent();
                        this.edgeManager.routeDataMovementEventToDestination(dmEvent, srcTaskIndex, this.destinationVertex.getTotalTasks(), (Map)inputIndicesToTaskIndices);
                    } else {
                        this.edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex, this.destinationVertex.getTotalTasks(), (Map)inputIndicesToTaskIndices);
                    }
                    if (!inputIndicesToTaskIndices.isEmpty()) {
                        this.sendDmEventOrIfEventToTasks(tezEvent, srcTaskIndex, isDataMovementEvent, inputIndicesToTaskIndices);
                        break;
                    }
                    throw new TezUncheckedException("Event must be routed. sourceVertex=" + this.sourceVertex.getVertexId() + " srcIndex = " + srcTaskIndex + " destAttemptId=" + this.destinationVertex.getVertexId() + " edgeManager=" + this.edgeManager.getClass().getName() + " Event type=" + tezEvent.getEventType());
                }
                default: {
                    throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                }
            }
        } else {
            this.destinationEventBuffer.add(tezEvent);
        }
    }

    private void sendEventToTask(TezTaskID taskId, TezEvent tezEvent) {
        this.eventHandler.handle((Event)new TaskEventAddTezEvent(taskId, tezEvent));
    }

    public String getSourceVertexName() {
        return this.sourceVertex.getName();
    }

    public String getDestinationVertexName() {
        return this.destinationVertex.getName();
    }

    static class EdgeManagerContextImpl
    implements EdgeManagerContext {
        private final String srcVertexName;
        private final String destVertexName;
        private final byte[] userPayload;

        EdgeManagerContextImpl(String srcVertexName, String destVertexName, byte[] userPayload) {
            this.srcVertexName = srcVertexName;
            this.destVertexName = destVertexName;
            this.userPayload = userPayload;
        }

        public byte[] getUserPayload() {
            return this.userPayload;
        }

        public String getSrcVertexName() {
            return this.srcVertexName;
        }

        public String getDestVertexName() {
            return this.destVertexName;
        }
    }
}

