/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.KStreamImpl;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.TableSourceNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InternalStreamsBuilder
implements InternalNameProvider {
    private static final String TABLE_SOURCE_SUFFIX = "-source";
    final InternalTopologyBuilder internalTopologyBuilder;
    private final AtomicInteger index = new AtomicInteger(0);
    private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
    private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap();
    private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet();
    private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet();
    private static final String TOPOLOGY_ROOT = "root";
    private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
    protected final StreamsGraphNode root = new StreamsGraphNode("root"){

        @Override
        public void writeToTopology(InternalTopologyBuilder topologyBuilder) {
        }
    };

    public InternalStreamsBuilder(InternalTopologyBuilder internalTopologyBuilder) {
        this.internalTopologyBuilder = internalTopologyBuilder;
    }

    public <K, V> KStream<K, V> stream(Collection<String> topics, ConsumedInternal<K, V> consumed) {
        String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, "KSTREAM-SOURCE-");
        StreamSourceNode<K, V> streamSourceNode = new StreamSourceNode<K, V>(name, topics, consumed);
        this.addGraphNode(this.root, streamSourceNode);
        return new KStreamImpl<K, V>(name, consumed.keySerde(), consumed.valueSerde(), Collections.singleton(name), false, streamSourceNode, this);
    }

    public <K, V> KStream<K, V> stream(Pattern topicPattern, ConsumedInternal<K, V> consumed) {
        String name = this.newProcessorName("KSTREAM-SOURCE-");
        StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<K, V>(name, topicPattern, consumed);
        this.addGraphNode(this.root, streamPatternSourceNode);
        return new KStreamImpl<K, V>(name, consumed.keySerde(), consumed.valueSerde(), Collections.singleton(name), false, streamPatternSourceNode, this);
    }

    public <K, V> KTable<K, V> table(String topic, ConsumedInternal<K, V> consumed, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        NamedInternal named = new NamedInternal(consumed.name());
        String sourceName = named.suffixWithOrElseGet(TABLE_SOURCE_SUFFIX, this, "KSTREAM-SOURCE-");
        String tableSourceName = named.orElseGenerateWithPrefix(this, "KTABLE-SOURCE-");
        KTableSource tableSource = new KTableSource(materialized.storeName(), materialized.queryableStoreName());
        ProcessorParameters processorParameters = new ProcessorParameters(tableSource, tableSourceName);
        TableSourceNode tableSourceNode = TableSourceNode.tableSourceNodeBuilder().withTopic(topic).withSourceName(sourceName).withNodeName(tableSourceName).withConsumedInternal(consumed).withMaterializedInternal(materialized).withProcessorParameters(processorParameters).build();
        this.addGraphNode(this.root, tableSourceNode);
        return new KTableImpl(tableSourceName, consumed.keySerde(), consumed.valueSerde(), Collections.singleton(sourceName), materialized.queryableStoreName(), tableSource, tableSourceNode, this);
    }

    public <K, V> GlobalKTable<K, V> globalTable(String topic, ConsumedInternal<K, V> consumed, MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(consumed, "consumed can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        materialized.withLoggingDisabled();
        NamedInternal named = new NamedInternal(consumed.name());
        String sourceName = named.suffixWithOrElseGet(TABLE_SOURCE_SUFFIX, this, "KSTREAM-SOURCE-");
        String processorName = named.orElseGenerateWithPrefix(this, "KTABLE-SOURCE-");
        String storeName = materialized.storeName();
        KTableSource tableSource = new KTableSource(storeName, storeName);
        ProcessorParameters processorParameters = new ProcessorParameters(tableSource, processorName);
        TableSourceNode tableSourceNode = TableSourceNode.tableSourceNodeBuilder().withTopic(topic).isGlobalKTable(true).withSourceName(sourceName).withConsumedInternal(consumed).withMaterializedInternal(materialized).withProcessorParameters(processorParameters).build();
        this.addGraphNode(this.root, tableSourceNode);
        return new GlobalKTableImpl(new KTableSourceValueGetterSupplier(storeName), materialized.queryableStoreName());
    }

    @Override
    public String newProcessorName(String prefix) {
        return prefix + String.format("%010d", this.index.getAndIncrement());
    }

    @Override
    public String newStoreName(String prefix) {
        return prefix + String.format("STATE-STORE-%010d", this.index.getAndIncrement());
    }

    public synchronized void addStateStore(StoreBuilder builder) {
        this.addGraphNode(this.root, (StreamsGraphNode)new StateStoreNode(builder));
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String sourceName, String topic, ConsumedInternal consumed, String processorName, ProcessorSupplier stateUpdateSupplier) {
        GlobalStoreNode globalStoreNode = new GlobalStoreNode(storeBuilder, sourceName, topic, consumed, processorName, stateUpdateSupplier);
        this.addGraphNode(this.root, (StreamsGraphNode)globalStoreNode);
    }

    public synchronized void addGlobalStore(StoreBuilder<KeyValueStore> storeBuilder, String topic, ConsumedInternal consumed, ProcessorSupplier stateUpdateSupplier) {
        storeBuilder.withLoggingDisabled();
        String sourceName = this.newProcessorName("KSTREAM-SOURCE-");
        String processorName = this.newProcessorName("KTABLE-SOURCE-");
        this.addGlobalStore(storeBuilder, sourceName, topic, consumed, processorName, stateUpdateSupplier);
    }

    void addGraphNode(StreamsGraphNode parent, StreamsGraphNode child) {
        Objects.requireNonNull(parent, "parent node can't be null");
        Objects.requireNonNull(child, "child node can't be null");
        parent.addChild(child);
        this.maybeAddNodeForOptimizationMetadata(child);
    }

    void addGraphNode(Collection<StreamsGraphNode> parents, StreamsGraphNode child) {
        Objects.requireNonNull(parents, "parent node can't be null");
        Objects.requireNonNull(child, "child node can't be null");
        if (parents.isEmpty()) {
            throw new StreamsException("Parent node collection can't be empty");
        }
        for (StreamsGraphNode parent : parents) {
            this.addGraphNode(parent, child);
        }
    }

    private void maybeAddNodeForOptimizationMetadata(StreamsGraphNode node) {
        node.setBuildPriority(this.buildPriorityIndex.getAndIncrement());
        if (node.parentNodes().isEmpty() && !node.nodeName().equals(TOPOLOGY_ROOT)) {
            throw new IllegalStateException("Nodes should not have a null parent node.  Name: " + node.nodeName() + " Type: " + node.getClass().getSimpleName());
        }
        if (node.isKeyChangingOperation()) {
            this.keyChangingOperationsToOptimizableRepartitionNodes.put(node, new LinkedHashSet());
        } else if (node instanceof OptimizableRepartitionNode) {
            StreamsGraphNode parentNode = this.getKeyChangingParentNode(node);
            if (parentNode != null) {
                this.keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode)node);
            }
        } else if (node.isMergeNode()) {
            this.mergeNodes.add(node);
        } else if (node instanceof TableSourceNode) {
            this.tableSourceNodes.add(node);
        }
    }

    public void buildAndOptimizeTopology() {
        this.buildAndOptimizeTopology(null);
    }

    public void buildAndOptimizeTopology(Properties props) {
        this.maybePerformOptimizations(props);
        PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new PriorityQueue<StreamsGraphNode>(5, Comparator.comparing(StreamsGraphNode::buildPriority));
        graphNodePriorityQueue.offer(this.root);
        while (!graphNodePriorityQueue.isEmpty()) {
            StreamsGraphNode streamGraphNode = (StreamsGraphNode)graphNodePriorityQueue.remove();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Adding nodes to topology {} child nodes {}", (Object)streamGraphNode, streamGraphNode.children());
            }
            if (streamGraphNode.allParentsWrittenToTopology() && !streamGraphNode.hasWrittenToTopology()) {
                streamGraphNode.writeToTopology(this.internalTopologyBuilder);
                streamGraphNode.setHasWrittenToTopology(true);
            }
            for (StreamsGraphNode graphNode : streamGraphNode.children()) {
                graphNodePriorityQueue.offer(graphNode);
            }
        }
    }

    private void maybePerformOptimizations(Properties props) {
        if (props != null && "all".equals(props.getProperty("topology.optimization"))) {
            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
            this.optimizeKTableSourceTopics();
            this.maybeOptimizeRepartitionOperations();
        }
    }

    private void optimizeKTableSourceTopics() {
        LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
        this.tableSourceNodes.forEach(node -> ((TableSourceNode)node).reuseSourceTopicForChangeLog(true));
    }

    private void maybeOptimizeRepartitionOperations() {
        this.maybeUpdateKeyChangingRepartitionNodeMap();
        Iterator<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entryIterator = this.keyChangingOperationsToOptimizableRepartitionNodes.entrySet().iterator();
        while (entryIterator.hasNext()) {
            Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> entry = entryIterator.next();
            StreamsGraphNode keyChangingNode = entry.getKey();
            if (entry.getValue().isEmpty()) continue;
            GroupedInternal groupedInternal = new GroupedInternal(this.getRepartitionSerdes((Collection)entry.getValue()));
            String repartitionTopicName = this.getFirstRepartitionTopicName((Collection)entry.getValue());
            OptimizableRepartitionNode optimizedSingleRepartition = this.createRepartitionNode(repartitionTopicName, groupedInternal.keySerde(), groupedInternal.valueSerde());
            optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
            for (OptimizableRepartitionNode optimizableRepartitionNode : entry.getValue()) {
                StreamsGraphNode keyChangingNodeChild = this.findParentNodeMatching(optimizableRepartitionNode, gn -> gn.parentNodes().contains(keyChangingNode));
                if (keyChangingNodeChild == null) {
                    throw new StreamsException(String.format("Found a null keyChangingChild node for %s", optimizableRepartitionNode));
                }
                LOG.debug("Found the child node of the key changer {} from the repartition {}.", (Object)keyChangingNodeChild, (Object)optimizableRepartitionNode);
                optimizedSingleRepartition.addChild(keyChangingNodeChild);
                LOG.debug("Removing {} from {}  children {}", new Object[]{keyChangingNodeChild, keyChangingNode, keyChangingNode.children()});
                keyChangingNode.removeChild(keyChangingNodeChild);
                Collection<StreamsGraphNode> repartitionNodeToBeReplacedChildren = optimizableRepartitionNode.children();
                Collection<StreamsGraphNode> parentsOfRepartitionNodeToBeReplaced = optimizableRepartitionNode.parentNodes();
                for (StreamsGraphNode repartitionNodeToBeReplacedChild : repartitionNodeToBeReplacedChildren) {
                    for (StreamsGraphNode parentNode : parentsOfRepartitionNodeToBeReplaced) {
                        parentNode.addChild(repartitionNodeToBeReplacedChild);
                    }
                }
                for (StreamsGraphNode parentNode : parentsOfRepartitionNodeToBeReplaced) {
                    parentNode.removeChild(optimizableRepartitionNode);
                }
                optimizableRepartitionNode.clearChildren();
                LOG.debug("Updated node {} children {}", optimizedSingleRepartition, optimizedSingleRepartition.children());
            }
            keyChangingNode.addChild(optimizedSingleRepartition);
            entryIterator.remove();
        }
    }

    private void maybeUpdateKeyChangingRepartitionNodeMap() {
        HashMap mergeNodesToKeyChangers = new HashMap();
        HashSet<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new HashSet<StreamsGraphNode>();
        for (StreamsGraphNode streamsGraphNode : this.mergeNodes) {
            mergeNodesToKeyChangers.put(streamsGraphNode, new LinkedHashSet());
            Set<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>>> entrySet = this.keyChangingOperationsToOptimizableRepartitionNodes.entrySet();
            for (Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>>> entry : entrySet) {
                StreamsGraphNode maybeParentKey;
                if (!this.mergeNodeHasRepartitionChildren(streamsGraphNode, entry.getValue()) || (maybeParentKey = this.findParentNodeMatching(streamsGraphNode, node -> node.parentNodes().contains(entry.getKey()))) == null) continue;
                ((Set)mergeNodesToKeyChangers.get(streamsGraphNode)).add(entry.getKey());
            }
        }
        for (Map.Entry entry : mergeNodesToKeyChangers.entrySet()) {
            StreamsGraphNode mergeKey = (StreamsGraphNode)entry.getKey();
            Collection keyChangingParents = (Collection)entry.getValue();
            LinkedHashSet repartitionNodes = new LinkedHashSet();
            for (StreamsGraphNode keyChangingParent : keyChangingParents) {
                repartitionNodes.addAll(this.keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
                mergeNodeKeyChangingParentsToRemove.add(keyChangingParent);
            }
            this.keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, repartitionNodes);
        }
        for (StreamsGraphNode streamsGraphNode : mergeNodeKeyChangingParentsToRemove) {
            this.keyChangingOperationsToOptimizableRepartitionNodes.remove(streamsGraphNode);
        }
    }

    private boolean mergeNodeHasRepartitionChildren(StreamsGraphNode mergeNode, LinkedHashSet<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
        return repartitionNodes.stream().allMatch(n -> this.findParentNodeMatching((StreamsGraphNode)n, gn -> gn.parentNodes().contains(mergeNode)) != null);
    }

    private <K, V> OptimizableRepartitionNode<K, V> createRepartitionNode(String repartitionTopicName, Serde<K> keySerde, Serde<V> valueSerde) {
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        KStreamImpl.createRepartitionedSource(this, keySerde, valueSerde, repartitionTopicName, repartitionNodeBuilder);
        repartitionNodeBuilder.withRepartitionTopic(repartitionTopicName);
        return repartitionNodeBuilder.build();
    }

    private StreamsGraphNode getKeyChangingParentNode(StreamsGraphNode repartitionNode) {
        StreamsGraphNode shouldBeKeyChangingNode = this.findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation());
        StreamsGraphNode keyChangingNode = this.findParentNodeMatching(repartitionNode, StreamsGraphNode::isKeyChangingOperation);
        if (shouldBeKeyChangingNode != null && shouldBeKeyChangingNode.equals(keyChangingNode)) {
            return keyChangingNode;
        }
        return null;
    }

    private String getFirstRepartitionTopicName(Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
        return repartitionNodes.iterator().next().repartitionTopic();
    }

    private GroupedInternal getRepartitionSerdes(Collection<OptimizableRepartitionNode<?, ?>> repartitionNodes) {
        Serde<?> keySerde = null;
        Serde<?> valueSerde = null;
        for (OptimizableRepartitionNode<?, ?> repartitionNode : repartitionNodes) {
            if (keySerde == null && repartitionNode.keySerde() != null) {
                keySerde = repartitionNode.keySerde();
            }
            if (valueSerde == null && repartitionNode.valueSerde() != null) {
                valueSerde = repartitionNode.valueSerde();
            }
            if (keySerde == null || valueSerde == null) continue;
            break;
        }
        return new GroupedInternal(Grouped.with(keySerde, valueSerde));
    }

    private StreamsGraphNode findParentNodeMatching(StreamsGraphNode startSeekingNode, Predicate<StreamsGraphNode> parentNodePredicate) {
        if (parentNodePredicate.test(startSeekingNode)) {
            return startSeekingNode;
        }
        StreamsGraphNode foundParentNode = null;
        for (StreamsGraphNode parentNode : startSeekingNode.parentNodes()) {
            if (parentNodePredicate.test(parentNode)) {
                return parentNode;
            }
            foundParentNode = this.findParentNodeMatching(parentNode, parentNodePredicate);
        }
        return foundParentNode;
    }

    public StreamsGraphNode root() {
        return this.root;
    }
}

