/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.thrift;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.thrift.CallQueue;
import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

@InterfaceAudience.Private
public class TBoundedThreadPoolServer
extends TServer {
    private static final String QUEUE_FULL_MSG = "Queue is full, closing connection";
    public static final String MIN_WORKER_THREADS_CONF_KEY = "hbase.thrift.minWorkerThreads";
    public static final int DEFAULT_MIN_WORKER_THREADS = 16;
    public static final String MAX_WORKER_THREADS_CONF_KEY = "hbase.thrift.maxWorkerThreads";
    public static final int DEFAULT_MAX_WORKER_THREADS = 1000;
    public static final String MAX_QUEUED_REQUESTS_CONF_KEY = "hbase.thrift.maxQueuedRequests";
    public static final int DEFAULT_MAX_QUEUED_REQUESTS = 1000;
    public static final String THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY = "hbase.thrift.threadKeepAliveTimeSec";
    private static final int DEFAULT_THREAD_KEEP_ALIVE_TIME_SEC = 60;
    public static final int TIME_TO_WAIT_AFTER_SHUTDOWN_MS = 5000;
    private static final Log LOG = LogFactory.getLog((String)TBoundedThreadPoolServer.class.getName());
    private final CallQueue callQueue;
    private ExecutorService executorService;
    private volatile boolean stopped;
    private Args serverOptions;

    public TBoundedThreadPoolServer(Args options, ThriftMetrics metrics) {
        super((TServer.AbstractServerArgs)options);
        this.callQueue = options.maxQueuedRequests > 0 ? new CallQueue(new LinkedBlockingQueue<CallQueue.Call>(options.maxQueuedRequests), metrics) : new CallQueue(new SynchronousQueue<CallQueue.Call>(), metrics);
        ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
        tfb.setDaemon(true);
        tfb.setNameFormat("thrift-worker-%d");
        this.executorService = new ThreadPoolExecutor(options.minWorkerThreads, options.maxWorkerThreads, (long)options.threadKeepAliveTimeSec, TimeUnit.SECONDS, (BlockingQueue<Runnable>)this.callQueue, tfb.build());
        this.serverOptions = options;
    }

    public void serve() {
        try {
            this.serverTransport_.listen();
        }
        catch (TTransportException ttx) {
            LOG.error((Object)"Error occurred during listening.", (Throwable)ttx);
            return;
        }
        Runtime.getRuntime().addShutdownHook(new Thread(((Object)((Object)this)).getClass().getSimpleName() + "-shutdown-hook"){

            @Override
            public void run() {
                TBoundedThreadPoolServer.this.stop();
            }
        });
        this.stopped = false;
        while (!this.stopped && !Thread.interrupted()) {
            TTransport client = null;
            try {
                client = this.serverTransport_.accept();
            }
            catch (TTransportException ttx) {
                if (this.stopped) break;
                LOG.warn((Object)"Transport error when accepting message", (Throwable)ttx);
                continue;
            }
            ClientConnnection command = new ClientConnnection(client);
            try {
                this.executorService.execute(command);
            }
            catch (RejectedExecutionException rex) {
                if (client.getClass() == TSocket.class) {
                    LOG.warn((Object)("Queue is full, closing connection from " + ((TSocket)client).getSocket().getRemoteSocketAddress()));
                } else {
                    LOG.warn((Object)QUEUE_FULL_MSG, (Throwable)rex);
                }
                client.close();
            }
        }
        this.shutdownServer();
    }

    private void shutdownServer() {
        this.executorService.shutdown();
        long msLeftToWait = this.serverOptions.stopTimeoutUnit.toMillis(this.serverOptions.stopTimeoutVal);
        long timeMillis = System.currentTimeMillis();
        LOG.info((Object)("Waiting for up to " + msLeftToWait + " ms to finish processing" + " pending requests"));
        boolean interrupted = false;
        while (msLeftToWait >= 0L) {
            try {
                this.executorService.awaitTermination(msLeftToWait, TimeUnit.MILLISECONDS);
                break;
            }
            catch (InterruptedException ix) {
                long timePassed = System.currentTimeMillis() - timeMillis;
                msLeftToWait -= timePassed;
                timeMillis += timePassed;
                interrupted = true;
            }
        }
        LOG.info((Object)"Interrupting all worker threads and waiting for 5000 ms longer");
        this.executorService.shutdownNow();
        Threads.sleepWithoutInterrupt((long)5000L);
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        LOG.info((Object)"Thrift server shutdown complete");
    }

    public void stop() {
        this.stopped = true;
        this.serverTransport_.interrupt();
    }

    private class ClientConnnection
    implements Runnable {
        private TTransport client;

        private ClientConnnection(TTransport client) {
            this.client = client;
        }

        @Override
        public void run() {
            TProcessor processor = null;
            TTransport inputTransport = null;
            TTransport outputTransport = null;
            TProtocol inputProtocol = null;
            TProtocol outputProtocol = null;
            try {
                processor = TBoundedThreadPoolServer.this.processorFactory_.getProcessor(this.client);
                inputTransport = TBoundedThreadPoolServer.this.inputTransportFactory_.getTransport(this.client);
                outputTransport = TBoundedThreadPoolServer.this.outputTransportFactory_.getTransport(this.client);
                inputProtocol = TBoundedThreadPoolServer.this.inputProtocolFactory_.getProtocol(inputTransport);
                outputProtocol = TBoundedThreadPoolServer.this.outputProtocolFactory_.getProtocol(outputTransport);
                while (!TBoundedThreadPoolServer.this.stopped && processor.process(inputProtocol, outputProtocol)) {
                }
            }
            catch (TTransportException ttx) {
            }
            catch (TException tx) {
                LOG.error((Object)"Thrift error occurred during processing of message.", (Throwable)tx);
            }
            catch (Exception x) {
                LOG.error((Object)"Error occurred during processing of message.", (Throwable)x);
            }
            if (inputTransport != null) {
                inputTransport.close();
            }
            if (outputTransport != null) {
                outputTransport.close();
            }
        }
    }

    public static class Args
    extends TThreadPoolServer.Args {
        int maxQueuedRequests;
        int threadKeepAliveTimeSec;

        public Args(TServerTransport transport, Configuration conf) {
            super(transport);
            this.minWorkerThreads = conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY, 16);
            this.maxWorkerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY, 1000);
            this.maxQueuedRequests = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY, 1000);
            this.threadKeepAliveTimeSec = conf.getInt(TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY, 60);
        }

        public String toString() {
            return "min worker threads=" + this.minWorkerThreads + ", max worker threads=" + this.maxWorkerThreads + ", max queued requests=" + this.maxQueuedRequests;
        }
    }
}

