/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ipc;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.DecayRpcScheduler;
import org.apache.hadoop.ipc.ProcessingDetails;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.RpcServerException;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CallQueueManager<E extends Schedulable>
extends AbstractQueue<E>
implements BlockingQueue<E> {
    public static final Logger LOG = LoggerFactory.getLogger(CallQueueManager.class);
    private static final int CHECKPOINT_NUM = 20;
    private static final long CHECKPOINT_INTERVAL_MS = 10L;
    private volatile boolean clientBackOffEnabled;
    private final AtomicReference<BlockingQueue<E>> putRef;
    private final AtomicReference<BlockingQueue<E>> takeRef;
    private RpcScheduler scheduler;

    static <E> Class<? extends BlockingQueue<E>> convertQueueClass(Class<?> queueClass, Class<E> elementClass) {
        return queueClass;
    }

    static Class<? extends RpcScheduler> convertSchedulerClass(Class<?> schedulerClass) {
        return schedulerClass;
    }

    public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass, Class<? extends RpcScheduler> schedulerClass, boolean clientBackOffEnabled, int maxQueueSize, String namespace, Configuration conf) {
        int priorityLevels = CallQueueManager.parseNumLevels(namespace, conf);
        this.scheduler = CallQueueManager.createScheduler(schedulerClass, priorityLevels, namespace, conf);
        BlockingQueue<E> bq = this.createCallQueueInstance(backingClass, priorityLevels, maxQueueSize, namespace, conf);
        this.clientBackOffEnabled = clientBackOffEnabled;
        this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
        this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
        LOG.info("Using callQueue: {}, queueCapacity: {}, scheduler: {}, ipcBackoff: {}.", new Object[]{backingClass, maxQueueSize, schedulerClass, clientBackOffEnabled});
    }

    @VisibleForTesting
    CallQueueManager(BlockingQueue<E> queue, RpcScheduler scheduler, boolean clientBackOffEnabled) {
        this.putRef = new AtomicReference<BlockingQueue<E>>(queue);
        this.takeRef = new AtomicReference<BlockingQueue<E>>(queue);
        this.scheduler = scheduler;
        this.clientBackOffEnabled = clientBackOffEnabled;
    }

    private static <T extends RpcScheduler> T createScheduler(Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
        try {
            Constructor<T> ctor = theClass.getDeclaredConstructor(Integer.TYPE, String.class, Configuration.class);
            return (T)((RpcScheduler)ctor.newInstance(priorityLevels, ns, conf));
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (InvocationTargetException e) {
            throw new RuntimeException(theClass.getName() + " could not be constructed.", e.getCause());
        }
        catch (Exception e) {
            try {
                Constructor<T> ctor = theClass.getDeclaredConstructor(Integer.TYPE);
                return (T)((RpcScheduler)ctor.newInstance(priorityLevels));
            }
            catch (RuntimeException e2) {
                throw e2;
            }
            catch (InvocationTargetException e3) {
                throw new RuntimeException(theClass.getName() + " could not be constructed.", e3.getCause());
            }
            catch (Exception e3) {
                try {
                    Constructor<T> ctor = theClass.getDeclaredConstructor(new Class[0]);
                    return (T)((RpcScheduler)ctor.newInstance(new Object[0]));
                }
                catch (RuntimeException e4) {
                    throw e4;
                }
                catch (InvocationTargetException e5) {
                    throw new RuntimeException(theClass.getName() + " could not be constructed.", e5.getCause());
                }
                catch (Exception exception) {
                    throw new RuntimeException(theClass.getName() + " could not be constructed.");
                }
            }
        }
    }

    private <T extends BlockingQueue<E>> T createCallQueueInstance(Class<T> theClass, int priorityLevels, int maxLen, String ns, Configuration conf) {
        try {
            Constructor<T> ctor = theClass.getDeclaredConstructor(Integer.TYPE, Integer.TYPE, String.class, Configuration.class);
            return (T)((BlockingQueue)ctor.newInstance(priorityLevels, maxLen, ns, conf));
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (InvocationTargetException e) {
            throw new RuntimeException(theClass.getName() + " could not be constructed.", e.getCause());
        }
        catch (Exception e) {
            try {
                Constructor<T> ctor = theClass.getDeclaredConstructor(Integer.TYPE);
                return (T)((BlockingQueue)ctor.newInstance(maxLen));
            }
            catch (RuntimeException e2) {
                throw e2;
            }
            catch (InvocationTargetException e3) {
                throw new RuntimeException(theClass.getName() + " could not be constructed.", e3.getCause());
            }
            catch (Exception e3) {
                try {
                    Constructor<T> ctor = theClass.getDeclaredConstructor(new Class[0]);
                    return (T)((BlockingQueue)ctor.newInstance(new Object[0]));
                }
                catch (RuntimeException e4) {
                    throw e4;
                }
                catch (InvocationTargetException e5) {
                    throw new RuntimeException(theClass.getName() + " could not be constructed.", e5.getCause());
                }
                catch (Exception exception) {
                    throw new RuntimeException(theClass.getName() + " could not be constructed.");
                }
            }
        }
    }

    boolean isClientBackoffEnabled() {
        return this.clientBackOffEnabled;
    }

    boolean shouldBackOff(Schedulable e) {
        return this.scheduler.shouldBackOff(e);
    }

    void addResponseTime(String name, Schedulable e, ProcessingDetails details) {
        this.scheduler.addResponseTime(name, e, details);
    }

    int getPriorityLevel(Schedulable e) {
        return this.scheduler.getPriorityLevel(e);
    }

    int getPriorityLevel(UserGroupInformation user) {
        if (this.scheduler instanceof DecayRpcScheduler) {
            return ((DecayRpcScheduler)this.scheduler).getPriorityLevel(user);
        }
        return 0;
    }

    void setPriorityLevel(UserGroupInformation user, int priority) {
        if (this.scheduler instanceof DecayRpcScheduler) {
            ((DecayRpcScheduler)this.scheduler).setPriorityLevel(user, priority);
        }
    }

    void setClientBackoffEnabled(boolean value) {
        this.clientBackOffEnabled = value;
    }

    @Override
    public void put(E e) throws InterruptedException {
        if (!this.isClientBackoffEnabled()) {
            this.putRef.get().put(e);
        } else if (this.shouldBackOff((Schedulable)e)) {
            this.throwBackoff();
        } else {
            this.addInternal(e, false);
        }
    }

    @Override
    public boolean add(E e) {
        return this.addInternal(e, true);
    }

    @VisibleForTesting
    boolean addInternal(E e, boolean checkBackoff) {
        if (checkBackoff && this.isClientBackoffEnabled() && this.shouldBackOff((Schedulable)e)) {
            this.throwBackoff();
        }
        try {
            return this.putRef.get().add(e);
        }
        catch (CallQueueOverflowException ex) {
            throw ex;
        }
        catch (IllegalStateException ise) {
            this.throwBackoff();
            return true;
        }
    }

    private void throwBackoff() throws IllegalStateException {
        throw CallQueueOverflowException.DISCONNECT;
    }

    @Override
    public boolean offer(E e) {
        return this.putRef.get().offer(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return this.putRef.get().offer(e, timeout, unit);
    }

    @Override
    public E peek() {
        return (E)((Schedulable)this.takeRef.get().peek());
    }

    @Override
    public E poll() {
        return (E)((Schedulable)this.takeRef.get().poll());
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return (E)((Schedulable)this.takeRef.get().poll(timeout, unit));
    }

    @Override
    public E take() throws InterruptedException {
        Schedulable e = null;
        while (e == null) {
            e = (Schedulable)this.takeRef.get().poll(1000L, TimeUnit.MILLISECONDS);
        }
        return (E)e;
    }

    @Override
    public int size() {
        return this.takeRef.get().size();
    }

    @Override
    public int remainingCapacity() {
        return this.takeRef.get().remainingCapacity();
    }

    private static int parseNumLevels(String ns, Configuration conf) {
        int retval = conf.getInt(ns + "." + "faircallqueue.priority-levels", 0);
        if (retval == 0) {
            retval = conf.getInt(ns + "." + "scheduler.priority.levels", 4);
        } else {
            LOG.warn(ns + "." + "faircallqueue.priority-levels" + " is deprecated. Please use " + ns + "." + "scheduler.priority.levels" + ".");
        }
        if (retval < 1) {
            throw new IllegalArgumentException("numLevels must be at least 1");
        }
        return retval;
    }

    public synchronized void swapQueue(Class<? extends RpcScheduler> schedulerClass, Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize, String ns, Configuration conf) {
        int priorityLevels = CallQueueManager.parseNumLevels(ns, conf);
        this.scheduler.stop();
        RpcScheduler newScheduler = CallQueueManager.createScheduler(schedulerClass, priorityLevels, ns, conf);
        BlockingQueue<E> newQ = this.createCallQueueInstance(queueClassToUse, priorityLevels, maxSize, ns, conf);
        BlockingQueue<E> oldQ = this.putRef.get();
        this.putRef.set(newQ);
        while (!this.queueIsReallyEmpty(oldQ)) {
        }
        this.takeRef.set(newQ);
        this.scheduler = newScheduler;
        LOG.info("Old Queue: " + this.stringRepr(oldQ) + ", Replacement: " + this.stringRepr(newQ));
    }

    private boolean queueIsReallyEmpty(BlockingQueue<?> q) {
        for (int i = 0; i < 20; ++i) {
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException ie) {
                return false;
            }
            if (q.isEmpty()) continue;
            return false;
        }
        return true;
    }

    private String stringRepr(Object o) {
        return o.getClass().getName() + '@' + Integer.toHexString(o.hashCode());
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        return this.takeRef.get().drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        return this.takeRef.get().drainTo(c, maxElements);
    }

    @Override
    public Iterator<E> iterator() {
        return this.takeRef.get().iterator();
    }

    static class CallQueueOverflowException
    extends IllegalStateException {
        private static String TOO_BUSY = "Server too busy";
        static final CallQueueOverflowException KEEPALIVE = new CallQueueOverflowException(new RetriableException(TOO_BUSY), RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.ERROR);
        static final CallQueueOverflowException DISCONNECT = new CallQueueOverflowException(new RetriableException(TOO_BUSY + " - disconnecting"), RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto.FATAL);

        CallQueueOverflowException(IOException ioe, final RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto status) {
            super("Queue full", new RpcServerException(ioe.getMessage(), ioe){

                @Override
                public RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto getRpcStatusProto() {
                    return status;
                }
            });
        }

        @Override
        public IOException getCause() {
            return (IOException)super.getCause();
        }
    }
}

