/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.work;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkManager
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(WorkManager.class);
    private static final int EXIT_TIMEOUT_MS = 5000;
    private final ConcurrentMap<ExecProtos.FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap();
    private final ConcurrentMap<UserBitShared.QueryId, Foreman> queries = Maps.newConcurrentMap();
    private final BootStrapContext bContext;
    private DrillbitContext dContext;
    private final ControlMessageHandler controlMessageWorker;
    private final UserWorker userWorker;
    private final WorkerBee bee;
    private final WorkEventBus workBus;
    private final Executor executor;
    private final StatusThread statusThread;
    private final Lock isEmptyLock = new ReentrantLock();
    private Condition isEmptyCondition;
    private static final int STATUS_PERIOD_SECONDS = 5;

    public WorkManager(BootStrapContext context) {
        this.bContext = context;
        this.bee = new WorkerBee();
        this.workBus = new WorkEventBus();
        this.executor = context.getExecutor();
        this.controlMessageWorker = new ControlMessageHandler(this.bee);
        this.userWorker = new UserWorker(this.bee);
        this.statusThread = new StatusThread();
    }

    public void start(CoordinationProtos.DrillbitEndpoint endpoint, Controller controller, DataConnectionCreator data, ClusterCoordinator coord, PersistentStoreProvider provider, PersistentStoreProvider profilesProvider) {
        this.dContext = new DrillbitContext(endpoint, this.bContext, coord, controller, data, this.workBus, provider, profilesProvider);
        this.statusThread.start();
        DrillMetrics.register("drill.fragments.running", this.runningFragments::size);
    }

    public Executor getExecutor() {
        return this.executor;
    }

    public WorkEventBus getWorkBus() {
        return this.workBus;
    }

    public ControlMessageHandler getControlMessageHandler() {
        return this.controlMessageWorker;
    }

    public UserWorker getUserWorker() {
        return this.userWorker;
    }

    public WorkerBee getBee() {
        return this.bee;
    }

    @Override
    public void close() throws Exception {
        this.statusThread.interrupt();
        long numRunningFragments = this.runningFragments.size();
        if (numRunningFragments != 0L) {
            logger.warn("Closing WorkManager but there are {} running fragments.", (Object)numRunningFragments);
            if (logger.isDebugEnabled()) {
                for (ExecProtos.FragmentHandle handle : this.runningFragments.keySet()) {
                    logger.debug("Fragment still running: {} status: {}", (Object)QueryIdHelper.getQueryIdentifier(handle), (Object)((FragmentExecutor)this.runningFragments.get(handle)).getStatus());
                }
            }
        }
        if (this.getContext() != null) {
            this.getContext().close();
        }
    }

    public DrillbitContext getContext() {
        return this.dContext;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitToExit(boolean forcefulShutdown) {
        this.isEmptyLock.lock();
        this.isEmptyCondition = this.isEmptyLock.newCondition();
        try {
            if (forcefulShutdown) {
                long currentTime;
                long startTime = System.currentTimeMillis();
                long endTime = startTime + 5000L;
                while (!this.areQueriesAndFragmentsEmpty() && (currentTime = System.currentTimeMillis()) < endTime) {
                    try {
                        if (this.isEmptyCondition.await(endTime - currentTime, TimeUnit.MILLISECONDS)) continue;
                        break;
                    }
                    catch (InterruptedException e) {
                        logger.error("Interrupted while waiting to exit");
                    }
                }
                if (!this.areQueriesAndFragmentsEmpty()) {
                    logger.warn("Timed out after {} millis. Shutting down before all fragments and foremen have completed.", (Object)5000);
                    for (UserBitShared.QueryId queryId : this.queries.keySet()) {
                        logger.warn("Query {} is still running.", (Object)QueryIdHelper.getQueryId(queryId));
                    }
                    for (ExecProtos.FragmentHandle fragmentHandle : this.runningFragments.keySet()) {
                        logger.warn("Fragment {} is still running.", (Object)QueryIdHelper.getQueryIdentifier(fragmentHandle));
                    }
                }
            } else {
                while (!this.areQueriesAndFragmentsEmpty()) {
                    this.isEmptyCondition.awaitUninterruptibly();
                }
            }
        }
        finally {
            this.isEmptyLock.unlock();
        }
    }

    private boolean areQueriesAndFragmentsEmpty() {
        return this.queries.isEmpty() && this.runningFragments.isEmpty();
    }

    private void indicateIfSafeToExit() {
        this.isEmptyLock.lock();
        try {
            if (this.isEmptyCondition != null) {
                logger.info("Waiting for {} running queries before shutting down.", (Object)this.queries.size());
                logger.info("Waiting for {} running fragments before shutting down.", (Object)this.runningFragments.size());
                if (this.areQueriesAndFragmentsEmpty()) {
                    this.isEmptyCondition.signal();
                }
            }
        }
        finally {
            this.isEmptyLock.unlock();
        }
    }

    public synchronized Map<String, Integer> getRemainingQueries() {
        HashMap<String, Integer> queriesInfo = new HashMap<String, Integer>();
        queriesInfo.put("queriesCount", this.queries.size());
        queriesInfo.put("fragmentsCount", this.runningFragments.size());
        return queriesInfo;
    }

    public class WorkerBee {
        public void addNewForeman(Foreman foreman) {
            WorkManager.this.queries.put(foreman.getQueryId(), foreman);
            WorkManager.this.executor.execute(foreman);
        }

        public void addNewWork(Runnable runnable) {
            WorkManager.this.executor.execute(runnable);
        }

        public boolean cancelForeman(UserBitShared.QueryId queryId, DrillUserPrincipal principal) {
            Preconditions.checkNotNull(queryId);
            Foreman foreman = (Foreman)WorkManager.this.queries.get(queryId);
            if (foreman == null) {
                return false;
            }
            String queryIdString = QueryIdHelper.getQueryId(queryId);
            if (principal != null && !principal.canManageQueryOf(foreman.getQueryContext().getQueryUserName())) {
                throw UserException.permissionError().message("Not authorized to cancel the query '%s'", queryIdString).build(logger);
            }
            WorkManager.this.executor.execute(() -> {
                Thread currentThread = Thread.currentThread();
                String originalName = currentThread.getName();
                try {
                    currentThread.setName(queryIdString + ":foreman:cancel");
                    logger.debug("Canceling foreman. Thread: {}", (Object)originalName);
                    foreman.cancel();
                }
                catch (Throwable t) {
                    logger.warn("Exception while canceling foreman", t);
                }
                finally {
                    currentThread.setName(originalName);
                }
            });
            return true;
        }

        public void retireForeman(Foreman foreman) {
            Preconditions.checkNotNull(foreman);
            UserBitShared.QueryId queryId = foreman.getQueryId();
            boolean wasRemoved = WorkManager.this.queries.remove(queryId, foreman);
            if (!wasRemoved) {
                logger.warn("Couldn't find retiring Foreman for query " + queryId);
            }
            WorkManager.this.indicateIfSafeToExit();
        }

        public Foreman getForemanForQueryId(UserBitShared.QueryId queryId) {
            return (Foreman)WorkManager.this.queries.get(queryId);
        }

        public DrillbitContext getContext() {
            return WorkManager.this.dContext;
        }

        public void addFragmentRunner(FragmentExecutor fragmentExecutor) {
            final ExecProtos.FragmentHandle fragmentHandle = fragmentExecutor.getContext().getHandle();
            WorkManager.this.runningFragments.put(fragmentHandle, fragmentExecutor);
            WorkManager.this.executor.execute(new SelfCleaningRunnable(fragmentExecutor){

                @Override
                protected void cleanup() {
                    WorkManager.this.runningFragments.remove(fragmentHandle);
                    WorkManager.this.indicateIfSafeToExit();
                }
            });
        }

        public void startFragmentPendingRemote(final FragmentManager fragmentManager) {
            final ExecProtos.FragmentHandle fragmentHandle = fragmentManager.getHandle();
            FragmentExecutor fragmentExecutor = fragmentManager.getRunnable();
            if (fragmentExecutor == null) {
                return;
            }
            WorkManager.this.runningFragments.put(fragmentHandle, fragmentExecutor);
            WorkManager.this.executor.execute(new SelfCleaningRunnable(fragmentExecutor){

                @Override
                protected void cleanup() {
                    WorkManager.this.runningFragments.remove(fragmentHandle);
                    if (!fragmentManager.isCancelled()) {
                        WorkManager.this.workBus.removeFragmentManager(fragmentHandle, false);
                    }
                    WorkManager.this.indicateIfSafeToExit();
                }
            });
        }

        public FragmentExecutor getFragmentRunner(ExecProtos.FragmentHandle handle) {
            return (FragmentExecutor)WorkManager.this.runningFragments.get(handle);
        }

        public void receiveRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
            BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef();
            boolean toForeman = runtimeFilterDef.getToForeman();
            UserBitShared.QueryId queryId = runtimeFilterDef.getQueryId();
            String queryIdStr = QueryIdHelper.getQueryId(queryId);
            runtimeFilter.retainBuffers(1);
            if (toForeman) {
                Foreman foreman = (Foreman)WorkManager.this.queries.get(queryId);
                if (foreman != null) {
                    WorkManager.this.executor.execute(() -> {
                        Thread currentThread = Thread.currentThread();
                        String originalName = currentThread.getName();
                        currentThread.setName(queryIdStr + ":foreman:routeRuntimeFilter");
                        try {
                            foreman.getRuntimeFilterRouter().register(runtimeFilter);
                        }
                        catch (Exception e) {
                            logger.warn("Exception while registering the RuntimeFilter", (Throwable)e);
                        }
                        finally {
                            currentThread.setName(originalName);
                            runtimeFilter.close();
                        }
                    });
                }
            } else {
                int majorId = runtimeFilterDef.getMajorFragmentId();
                int minorId = runtimeFilterDef.getMinorFragmentId();
                ExecProtos.FragmentHandle fragmentHandle = ExecProtos.FragmentHandle.newBuilder().setMajorFragmentId(majorId).setMinorFragmentId(minorId).setQueryId(queryId).build();
                FragmentExecutor fragmentExecutor = (FragmentExecutor)WorkManager.this.runningFragments.get(fragmentHandle);
                if (fragmentExecutor != null) {
                    fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter);
                }
            }
        }
    }

    private class StatusThread
    extends Thread {
        StatusThread() {
            this.setName("WorkManager.StatusThread");
        }

        @Override
        public void run() {
            Controller controller = WorkManager.this.dContext.getController();
            CoordinationProtos.DrillbitEndpoint localBitEndPoint = WorkManager.this.dContext.getEndpoint();
            while (true) {
                ArrayList<DrillRpcFuture<GeneralRPCProtos.Ack>> futures = Lists.newArrayList();
                for (FragmentExecutor fragmentExecutor : WorkManager.this.runningFragments.values()) {
                    BitControl.FragmentStatus status = fragmentExecutor.getStatus();
                    if (status == null) continue;
                    CoordinationProtos.DrillbitEndpoint foremanEndpoint = fragmentExecutor.getContext().getForemanEndpoint();
                    if (localBitEndPoint.equals(foremanEndpoint)) {
                        WorkManager.this.workBus.statusUpdate(status);
                        continue;
                    }
                    futures.add(controller.getTunnel(foremanEndpoint).sendFragmentStatus(status));
                }
                for (DrillRpcFuture drillRpcFuture : futures) {
                    try {
                        drillRpcFuture.checkedGet();
                    }
                    catch (RpcException ex) {
                        logger.info("Failure while sending intermediate fragment status to Foreman", (Throwable)ex);
                    }
                }
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }
}

