/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.procedure2;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException;
import org.apache.hadoop.hbase.procedure2.NoServerDispatchException;
import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException;
import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hbase.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteProcedureDispatcher.class);
    public static final String THREAD_POOL_SIZE_CONF_KEY = "hbase.procedure.remote.dispatcher.threadpool.size";
    private static final int DEFAULT_THREAD_POOL_SIZE = 128;
    public static final String DISPATCH_DELAY_CONF_KEY = "hbase.procedure.remote.dispatcher.delay.msec";
    private static final int DEFAULT_DISPATCH_DELAY = 150;
    public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = "hbase.procedure.remote.dispatcher.max.queue.size";
    private static final int DEFAULT_MAX_QUEUE_SIZE = 32;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = new ConcurrentHashMap();
    private final int operationDelay;
    private final int queueMaxSize;
    private final int corePoolSize;
    private TimeoutExecutorThread timeoutExecutor;
    private ThreadPoolExecutor threadPool;

    protected RemoteProcedureDispatcher(Configuration conf) {
        this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, 128);
        this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, 150);
        this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, 32);
    }

    public boolean start() {
        if (this.running.getAndSet(true)) {
            LOG.warn("Already running");
            return false;
        }
        LOG.info("Instantiated, coreThreads={} (allowCoreThreadTimeOut=true), queueMaxSize={}, operationDelay={}", new Object[]{this.corePoolSize, this.queueMaxSize, this.operationDelay});
        this.timeoutExecutor = new TimeoutExecutorThread();
        this.timeoutExecutor.start();
        this.threadPool = Threads.getBoundedCachedThreadPool((int)this.corePoolSize, (long)60L, (TimeUnit)TimeUnit.SECONDS, (ThreadFactory)Threads.newDaemonThreadFactory((String)this.getClass().getSimpleName(), (Thread.UncaughtExceptionHandler)this.getUncaughtExceptionHandler()));
        return true;
    }

    public boolean stop() {
        if (!this.running.getAndSet(false)) {
            return false;
        }
        LOG.info("Stopping procedure remote dispatcher");
        this.timeoutExecutor.sendStopSignal();
        this.threadPool.shutdownNow();
        return true;
    }

    public void join() {
        assert (!this.running.get()) : "expected not running";
        this.timeoutExecutor.awaitTermination();
        this.timeoutExecutor = null;
        this.threadPool.shutdownNow();
        try {
            while (!this.threadPool.awaitTermination(60L, TimeUnit.SECONDS)) {
                LOG.warn("Waiting for thread-pool to terminate");
            }
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for thread-pool termination", (Throwable)e);
        }
    }

    protected abstract Thread.UncaughtExceptionHandler getUncaughtExceptionHandler();

    public void addNode(TRemote key) {
        assert (key != null) : "Tried to add a node with a null key";
        BufferNode newNode = new BufferNode(this, key);
        this.nodeMap.putIfAbsent(key, newNode);
    }

    public void addOperationToNode(TRemote key, RemoteProcedure rp) throws NullTargetServerDispatchException, NoServerDispatchException, NoNodeDispatchException {
        if (key == null) {
            throw new NullTargetServerDispatchException(rp.toString());
        }
        BufferNode node = this.nodeMap.get(key);
        if (node == null) {
            throw new NoServerDispatchException(key.toString() + "; " + rp.toString());
        }
        node.add(rp);
        if (!this.nodeMap.containsValue(node)) {
            throw new NoNodeDispatchException(key.toString() + "; " + rp.toString());
        }
    }

    public void removeCompletedOperation(TRemote key, RemoteProcedure rp) {
        BufferNode node = this.nodeMap.get(key);
        if (node == null) {
            LOG.warn("since no node for this key {}, we can't removed the finished remote procedure", key);
            return;
        }
        node.operationCompleted(rp);
    }

    public boolean removeNode(TRemote key) {
        BufferNode node = this.nodeMap.remove(key);
        if (node == null) {
            return false;
        }
        node.abortOperationsInQueue();
        return true;
    }

    protected final void submitTask(Runnable task) {
        this.threadPool.execute(task);
    }

    protected final void submitTask(Runnable task, long delay, TimeUnit unit) {
        this.timeoutExecutor.add(new DelayedTask(task, delay, unit));
    }

    protected abstract void remoteDispatch(TRemote var1, Set<RemoteProcedure> var2);

    protected abstract void abortPendingOperations(TRemote var1, Set<RemoteProcedure> var2);

    protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(TEnv env, TRemote remote, Set<RemoteProcedure> remoteProcedures) {
        ArrayListMultimap requestByType = ArrayListMultimap.create();
        for (RemoteProcedure proc : remoteProcedures) {
            Optional<RemoteOperation> operation = proc.remoteCallBuild(env, remote);
            operation.ifPresent(op -> requestByType.put(op.getClass(), op));
        }
        return requestByType;
    }

    protected <T extends RemoteOperation> List<T> fetchType(ArrayListMultimap<Class<?>, RemoteOperation> requestByType, Class<T> type) {
        return requestByType.removeAll(type);
    }

    private static final class DelayedTask
    extends DelayedUtil.DelayedContainerWithTimestamp<Runnable> {
        public DelayedTask(Runnable task, long delay, TimeUnit unit) {
            super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay));
        }
    }

    protected static final class BufferNode
    extends DelayedUtil.DelayedContainerWithTimestamp<TRemote>
    implements RemoteNode<TEnv, TRemote> {
        private Set<RemoteProcedure> operations;
        private final Set<RemoteProcedure> dispatchedOperations;
        final /* synthetic */ RemoteProcedureDispatcher this$0;

        protected BufferNode(TRemote key) {
            this.this$0 = this$0;
            super(key, 0L);
            this.dispatchedOperations = new HashSet<RemoteProcedure>();
        }

        @Override
        public TRemote getKey() {
            return (Comparable)this.getObject();
        }

        @Override
        public synchronized void add(RemoteProcedure operation) {
            if (this.operations == null) {
                this.operations = new HashSet<RemoteProcedure>();
                this.setTimeout(EnvironmentEdgeManager.currentTime() + (long)this.this$0.operationDelay);
                this.this$0.timeoutExecutor.add(this);
            }
            this.operations.add(operation);
            if (this.operations.size() > this.this$0.queueMaxSize) {
                this.this$0.timeoutExecutor.remove(this);
                this.dispatch();
            }
        }

        @Override
        public synchronized void dispatch() {
            if (this.operations != null) {
                this.this$0.remoteDispatch(this.getKey(), this.operations);
                this.operations.stream().filter(operation -> operation.storeInDispatchedQueue()).forEach(operation -> this.dispatchedOperations.add((RemoteProcedure)operation));
                this.operations = null;
            }
        }

        public synchronized void abortOperationsInQueue() {
            if (this.operations != null) {
                this.this$0.abortPendingOperations(this.getKey(), this.operations);
                this.operations = null;
            }
            this.this$0.abortPendingOperations(this.getKey(), this.dispatchedOperations);
            this.dispatchedOperations.clear();
        }

        public synchronized void operationCompleted(RemoteProcedure remoteProcedure) {
            this.dispatchedOperations.remove(remoteProcedure);
        }

        @Override
        public String toString() {
            return super.toString() + ", operations=" + this.operations;
        }
    }

    private final class TimeoutExecutorThread
    extends Thread {
        private final DelayQueue<DelayedUtil.DelayedWithTimeout> queue;

        public TimeoutExecutorThread() {
            super("ProcedureDispatcherTimeoutThread");
            this.queue = new DelayQueue();
        }

        @Override
        public void run() {
            while (RemoteProcedureDispatcher.this.running.get()) {
                DelayedUtil.DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(this.queue);
                if (task == null || task == DelayedUtil.DELAYED_POISON) continue;
                if (task instanceof DelayedTask) {
                    RemoteProcedureDispatcher.this.threadPool.execute((Runnable)((DelayedTask)task).getObject());
                    continue;
                }
                ((BufferNode)task).dispatch();
            }
        }

        public void add(DelayedUtil.DelayedWithTimeout delayed) {
            this.queue.add(delayed);
        }

        public void remove(DelayedUtil.DelayedWithTimeout delayed) {
            this.queue.remove(delayed);
        }

        public void sendStopSignal() {
            this.queue.add(DelayedUtil.DELAYED_POISON);
        }

        public void awaitTermination() {
            try {
                long startTime = EnvironmentEdgeManager.currentTime();
                int i = 0;
                while (this.isAlive()) {
                    this.sendStopSignal();
                    this.join(250L);
                    if (i > 0 && i % 8 == 0) {
                        LOG.warn("Waiting termination of thread " + this.getName() + ", " + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime));
                    }
                    ++i;
                }
            }
            catch (InterruptedException e) {
                LOG.warn(this.getName() + " join wait got interrupted", (Throwable)e);
            }
        }
    }

    public static interface RemoteNode<TEnv, TRemote> {
        public TRemote getKey();

        public void add(RemoteProcedure<TEnv, TRemote> var1);

        public void dispatch();
    }

    public static interface RemoteProcedure<TEnv, TRemote> {
        public Optional<RemoteOperation> remoteCallBuild(TEnv var1, TRemote var2);

        public void remoteCallFailed(TEnv var1, TRemote var2, IOException var3);

        public void remoteOperationCompleted(TEnv var1);

        public void remoteOperationFailed(TEnv var1, RemoteProcedureException var2);

        default public boolean storeInDispatchedQueue() {
            return true;
        }
    }

    public static abstract class RemoteOperation {
        private final RemoteProcedure remoteProcedure;

        protected RemoteOperation(RemoteProcedure remoteProcedure) {
            this.remoteProcedure = remoteProcedure;
        }

        public RemoteProcedure getRemoteProcedure() {
            return this.remoteProcedure;
        }
    }
}

