/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work.filter;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.SendingAccountor;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RuntimeFilterRouter {
    private Wrapper rootWrapper;
    private SendingAccountor sendingAccountor = new SendingAccountor();
    private RuntimeFilterSink runtimeFilterSink;
    private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);

    public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
        this.rootWrapper = workUnit.getRootWrapper();
        this.runtimeFilterSink = new RuntimeFilterSink(drillbitContext, this.sendingAccountor);
    }

    public void collectRuntimeFilterParallelAndControlInfo() {
        HashMap<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<Integer, List<CoordinationProtos.DrillbitEndpoint>>();
        HashMap<Integer, Integer> joinMjId2ScanMjId = new HashMap<Integer, Integer>();
        HashMap<Integer, Integer> joinMjId2rfNumber = new HashMap<Integer, Integer>();
        RuntimeFilterParallelismCollector runtimeFilterParallelismCollector = new RuntimeFilterParallelismCollector();
        this.rootWrapper.getNode().getRoot().accept(runtimeFilterParallelismCollector, null);
        List<RFHelperHolder> holders = runtimeFilterParallelismCollector.getHolders();
        for (RFHelperHolder holder : holders) {
            List<CoordinationProtos.DrillbitEndpoint> probeSideEndpoints = holder.getProbeSideScanEndpoints();
            int probeSideScanMajorId = holder.getProbeSideScanMajorId();
            int joinNodeMajorId = holder.getJoinMajorId();
            int buildSideRfNumber = holder.getBuildSideRfNumber();
            RuntimeFilterDef runtimeFilterDef = holder.getRuntimeFilterDef();
            boolean sendToForeman = runtimeFilterDef.isSendToForeman();
            if (!sendToForeman) continue;
            joinMjId2probeScanEps.put(joinNodeMajorId, probeSideEndpoints);
            joinMjId2ScanMjId.put(joinNodeMajorId, probeSideScanMajorId);
            joinMjId2rfNumber.put(joinNodeMajorId, buildSideRfNumber);
        }
        this.runtimeFilterSink.setJoinMjId2probeScanEps(joinMjId2probeScanEps);
        this.runtimeFilterSink.setJoinMjId2rfNumber(joinMjId2rfNumber);
        this.runtimeFilterSink.setJoinMjId2ScanMjId(joinMjId2ScanMjId);
    }

    public void waitForComplete() {
        this.sendingAccountor.waitForSendComplete();
        this.runtimeFilterSink.close();
    }

    public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
        this.runtimeFilterSink.add(srcRuntimeFilterWritable);
    }

    private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) {
        targetOpVisitor.setCurrentFragment(wrapper.getNode());
        try {
            wrapper.getNode().getRoot().accept(targetOpVisitor, null);
        }
        catch (Throwable e) {
            throw UserException.systemError(e).build();
        }
        boolean contain = targetOpVisitor.isContain();
        if (contain) {
            return wrapper;
        }
        List<Wrapper> dependencies = wrapper.getFragmentDependencies();
        if (CollectionUtils.isEmpty(dependencies)) {
            return null;
        }
        for (Wrapper dependencyWrapper : dependencies) {
            Wrapper opContainer = this.findTargetWrapper(dependencyWrapper, targetOpVisitor);
            if (opContainer == null) continue;
            return opContainer;
        }
        return null;
    }

    private abstract class TargetPhysicalOperatorVisitor<T, X, E extends Throwable>
    extends AbstractPhysicalVisitor<T, X, E> {
        protected Exchange sendingExchange;

        private TargetPhysicalOperatorVisitor() {
        }

        public void setCurrentFragment(Fragment fragment) {
            this.sendingExchange = fragment.getSendingExchange();
        }

        public abstract boolean isContain();
    }

    protected class RuntimeFilterParallelismCollector
    extends AbstractPhysicalVisitor<Void, RFHelperHolder, RuntimeException> {
        private List<RFHelperHolder> holders = new ArrayList<RFHelperHolder>();

        protected RuntimeFilterParallelismCollector() {
        }

        @Override
        public Void visitOp(PhysicalOperator op, RFHelperHolder holder) throws RuntimeException {
            boolean isHashJoinOp = op instanceof HashJoinPOP;
            if (isHashJoinOp) {
                HashJoinPOP hashJoinPOP = (HashJoinPOP)op;
                int hashJoinOpId = hashJoinPOP.getOperatorId();
                RuntimeFilterDef runtimeFilterDef = hashJoinPOP.getRuntimeFilterDef();
                if (runtimeFilterDef != null && runtimeFilterDef.isSendToForeman()) {
                    if (holder == null || holder.getJoinOpId() != hashJoinOpId) {
                        holder = new RFHelperHolder(hashJoinOpId);
                        this.holders.add(holder);
                    }
                    holder.setRuntimeFilterDef(runtimeFilterDef);
                    long runtimeFilterIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
                    WrapperOperatorsVisitor operatorsVisitor = new WrapperOperatorsVisitor(hashJoinPOP);
                    Wrapper container = RuntimeFilterRouter.this.findTargetWrapper(RuntimeFilterRouter.this.rootWrapper, operatorsVisitor);
                    if (container == null) {
                        throw new IllegalStateException(String.format("No valid Wrapper found for HashJoinPOP with id=%d", hashJoinPOP.getOperatorId()));
                    }
                    int buildSideRFNumber = container.getAssignedEndpoints().size();
                    holder.setBuildSideRfNumber(buildSideRFNumber);
                    int majorFragmentId = container.getMajorFragmentId();
                    holder.setJoinMajorId(majorFragmentId);
                    WrapperRuntimeFilterOperatorsVisitor runtimeFilterOperatorsVisitor = new WrapperRuntimeFilterOperatorsVisitor(runtimeFilterIdentifier);
                    Wrapper probeSideScanContainer = RuntimeFilterRouter.this.findTargetWrapper(container, runtimeFilterOperatorsVisitor);
                    if (probeSideScanContainer == null) {
                        throw new IllegalStateException(String.format("No valid Wrapper found for RuntimeFilterPOP with id=%d", op.getOperatorId()));
                    }
                    int probeSideScanMjId = probeSideScanContainer.getMajorFragmentId();
                    List<CoordinationProtos.DrillbitEndpoint> probeSideScanEps = probeSideScanContainer.getAssignedEndpoints();
                    holder.setProbeSideScanEndpoints(probeSideScanEps);
                    holder.setProbeSideScanMajorId(probeSideScanMjId);
                }
            }
            return (Void)this.visitChildren(op, holder);
        }

        public List<RFHelperHolder> getHolders() {
            return this.holders;
        }
    }

    private static class RFHelperHolder {
        private int joinMajorId;
        private int probeSideScanMajorId;
        private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints;
        private RuntimeFilterDef runtimeFilterDef;
        private int joinOpId;
        private int buildSideRfNumber;

        public RFHelperHolder(int joinOpId) {
            this.joinOpId = joinOpId;
        }

        public int getJoinOpId() {
            return this.joinOpId;
        }

        public void setJoinOpId(int joinOpId) {
            this.joinOpId = joinOpId;
        }

        public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
            return this.probeSideScanEndpoints;
        }

        public void setProbeSideScanEndpoints(List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints) {
            this.probeSideScanEndpoints = probeSideScanEndpoints;
        }

        public int getJoinMajorId() {
            return this.joinMajorId;
        }

        public void setJoinMajorId(int joinMajorId) {
            this.joinMajorId = joinMajorId;
        }

        public int getProbeSideScanMajorId() {
            return this.probeSideScanMajorId;
        }

        public void setProbeSideScanMajorId(int probeSideScanMajorId) {
            this.probeSideScanMajorId = probeSideScanMajorId;
        }

        public RuntimeFilterDef getRuntimeFilterDef() {
            return this.runtimeFilterDef;
        }

        public void setRuntimeFilterDef(RuntimeFilterDef runtimeFilterDef) {
            this.runtimeFilterDef = runtimeFilterDef;
        }

        public int getBuildSideRfNumber() {
            return this.buildSideRfNumber;
        }

        public void setBuildSideRfNumber(int buildSideRfNumber) {
            this.buildSideRfNumber = buildSideRfNumber;
        }
    }

    private class WrapperRuntimeFilterOperatorsVisitor
    extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
        private boolean contain = false;
        private long identifier;

        public WrapperRuntimeFilterOperatorsVisitor(long identifier) {
            this.identifier = identifier;
        }

        @Override
        public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
            if (exchange != this.sendingExchange) {
                return null;
            }
            return exchange.getChild().accept(this, value);
        }

        @Override
        public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
            boolean isRuntimeFilterPop = op instanceof RuntimeFilterPOP;
            boolean isHashJoinPop = op instanceof HashJoinPOP;
            if (isHashJoinPop) {
                HashJoinPOP hashJoinPOP = (HashJoinPOP)op;
                PhysicalOperator leftPop = hashJoinPOP.getLeft();
                leftPop.accept(this, value);
                return null;
            }
            if (isRuntimeFilterPop) {
                boolean same;
                RuntimeFilterPOP runtimeFilterPOP = (RuntimeFilterPOP)op;
                boolean bl = same = this.identifier == runtimeFilterPOP.getIdentifier();
                if (same) {
                    this.contain = true;
                    return null;
                }
            }
            for (PhysicalOperator child : op) {
                child.accept(this, value);
            }
            return null;
        }

        @Override
        public boolean isContain() {
            return this.contain;
        }
    }

    private class WrapperOperatorsVisitor
    extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
        private boolean contain = false;
        private PhysicalOperator targetOp;

        public WrapperOperatorsVisitor(PhysicalOperator targetOp) {
            this.targetOp = targetOp;
        }

        @Override
        public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
            if (exchange != this.sendingExchange) {
                return null;
            }
            return exchange.getChild().accept(this, value);
        }

        @Override
        public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
            if (op == this.targetOp) {
                this.contain = true;
            } else {
                for (PhysicalOperator child : op) {
                    child.accept(this, value);
                }
            }
            return null;
        }

        @Override
        public boolean isContain() {
            return this.contain;
        }
    }
}

