/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.shaded.guava18.com.google.common.hash.HashFunction;
import org.apache.flink.shaded.guava18.com.google.common.hash.Hasher;
import org.apache.flink.shaded.guava18.com.google.common.hash.Hashing;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphHasher;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamGraphHasherV2
implements StreamGraphHasher {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV2.class);

    @Override
    public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
        StreamNode currentNode;
        HashFunction hashFunction = Hashing.murmur3_128((int)0);
        HashMap<Integer, byte[]> hashes = new HashMap<Integer, byte[]>();
        HashSet<Integer> visited = new HashSet<Integer>();
        ArrayDeque<StreamNode> remaining = new ArrayDeque<StreamNode>();
        ArrayList<Integer> sources = new ArrayList<Integer>();
        for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
            sources.add(sourceNodeId);
        }
        Collections.sort(sources);
        for (Integer sourceNodeId : sources) {
            remaining.add(streamGraph.getStreamNode(sourceNodeId));
            visited.add(sourceNodeId);
        }
        while ((currentNode = (StreamNode)remaining.poll()) != null) {
            if (this.generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) {
                for (StreamEdge outEdge : currentNode.getOutEdges()) {
                    StreamNode child = outEdge.getTargetVertex();
                    if (visited.contains(child.getId())) continue;
                    remaining.add(child);
                    visited.add(child.getId());
                }
                continue;
            }
            visited.remove(currentNode.getId());
        }
        return hashes;
    }

    private boolean generateNodeHash(StreamNode node, HashFunction hashFunction, Map<Integer, byte[]> hashes, boolean isChainingEnabled) {
        String userSpecifiedHash = node.getTransformationUID();
        if (userSpecifiedHash == null) {
            for (StreamEdge inEdge : node.getInEdges()) {
                if (hashes.containsKey(inEdge.getSourceId())) continue;
                return false;
            }
            Hasher hasher = hashFunction.newHasher();
            byte[] hash = this.generateDeterministicHash(node, hasher, hashes, isChainingEnabled);
            if (hashes.put(node.getId(), hash) != null) {
                throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
            }
            return true;
        }
        Hasher hasher = hashFunction.newHasher();
        byte[] hash = this.generateUserSpecifiedHash(node, hasher);
        for (byte[] previousHash : hashes.values()) {
            if (!Arrays.equals(previousHash, hash)) continue;
            throw new IllegalArgumentException("Hash collision on user-specified ID \"" + userSpecifiedHash + "\". Most likely cause is a non-unique ID. Please check that all IDs specified via `uid(String)` are unique.");
        }
        if (hashes.put(node.getId(), hash) != null) {
            throw new IllegalStateException("Unexpected state. Tried to add node hash twice. This is probably a bug in the JobGraph generator.");
        }
        return true;
    }

    private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
        hasher.putString((CharSequence)node.getTransformationUID(), Charset.forName("UTF-8"));
        return hasher.hash().asBytes();
    }

    /*
     * WARNING - void declaration
     */
    private byte[] generateDeterministicHash(StreamNode node, Hasher hasher, Map<Integer, byte[]> hashes, boolean isChainingEnabled) {
        this.generateNodeLocalHash(node, hasher, hashes.size());
        for (StreamEdge streamEdge : node.getOutEdges()) {
            if (!this.isChainable(streamEdge, isChainingEnabled)) continue;
            StreamNode chainedNode = streamEdge.getTargetVertex();
            this.generateNodeLocalHash(chainedNode, hasher, hashes.size());
        }
        byte[] hash = hasher.hash().asBytes();
        for (StreamEdge inEdge : node.getInEdges()) {
            byte[] otherHash = hashes.get(inEdge.getSourceId());
            if (otherHash == null) {
                throw new IllegalStateException("Missing hash for input node " + inEdge.getSourceVertex() + ". Cannot generate hash for " + node + ".");
            }
            for (int j = 0; j < hash.length; ++j) {
                hash[j] = (byte)(hash[j] * 37 ^ otherHash[j]);
            }
        }
        if (LOG.isDebugEnabled()) {
            void var6_10;
            String string = "";
            if (node.getOperator() instanceof AbstractUdfStreamOperator) {
                String string2 = ((AbstractUdfStreamOperator)node.getOperator()).getUserFunction().getClass().getName();
            }
            LOG.debug("Generated hash '" + StringUtils.byteToHexString((byte[])hash) + "' for node '" + node.toString() + "' {id: " + node.getId() + ", parallelism: " + node.getParallelism() + ", user function: " + (String)var6_10 + "}");
        }
        return hash;
    }

    private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) {
        hasher.putInt(id);
    }

    private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
        StreamNode upStreamVertex = edge.getSourceVertex();
        StreamNode downStreamVertex = edge.getTargetVertex();
        StreamOperator<?> headOperator = upStreamVertex.getOperator();
        StreamOperator<?> outOperator = downStreamVertex.getOperator();
        return downStreamVertex.getInEdges().size() == 1 && outOperator != null && headOperator != null && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) && edge.getPartitioner() instanceof ForwardPartitioner && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && isChainingEnabled;
    }
}

