/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafka.common.InvalidConfigException;
import io.confluent.kafka.consumer.Consumer;
import io.confluent.kafka.consumer.ConsumerConfig;
import io.confluent.kafka.javaapi.consumer.ConsumerConnector;
import io.confluent.kafkarest.AvroConsumerState;
import io.confluent.kafkarest.BinaryConsumerState;
import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.ConsumerReadTask;
import io.confluent.kafkarest.ConsumerState;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.JsonConsumerState;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.MetadataObserver;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final KafkaRestConfig config;
    private final Time time;
    private final String zookeeperConnect;
    private final MetadataObserver mdObserver;
    private final int iteratorTimeoutMs;
    private final Map<ConsumerInstanceId, ConsumerState> consumers = new HashMap<ConsumerInstanceId, ConsumerState>();
    private final ExecutorService executor;
    private ConsumerFactory consumerFactory;
    final DelayQueue<RunnableReadTask> delayedReadTasks = new DelayQueue();
    private final ReadTaskSchedulerThread readTaskSchedulerThread;
    private final ExpirationThread expirationThread;

    public ConsumerManager(final KafkaRestConfig config, MetadataObserver mdObserver) {
        this.config = config;
        this.time = config.getTime();
        this.zookeeperConnect = config.getString("zookeeper.connect");
        this.mdObserver = mdObserver;
        this.iteratorTimeoutMs = config.getInt("consumer.iterator.timeout.ms");
        int maxThreadCount = config.getInt("consumer.threads") < 0 ? Integer.MAX_VALUE : config.getInt("consumer.threads");
        this.executor = new KafkaConsumerThreadPoolExecutor(0, maxThreadCount, 60L, TimeUnit.SECONDS, new SynchronousQueue(), new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                if (r instanceof ReadFutureTask) {
                    RunnableReadTask readTask = ((ReadFutureTask)r).readTask;
                    int delayMs = ThreadLocalRandom.current().nextInt(25, 76);
                    readTask.waitExpirationMs = config.getTime().milliseconds() + (long)delayMs;
                    ConsumerManager.this.delayedReadTasks.add(readTask);
                } else if (!executor.isShutdown()) {
                    r.run();
                }
            }
        });
        this.consumerFactory = null;
        this.expirationThread = new ExpirationThread();
        this.expirationThread.start();
        this.readTaskSchedulerThread = new ReadTaskSchedulerThread();
        this.readTaskSchedulerThread.start();
    }

    public ConsumerManager(KafkaRestConfig config, MetadataObserver mdObserver, ConsumerFactory consumerFactory) {
        this(config, mdObserver);
        this.consumerFactory = consumerFactory;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String createConsumer(String group, ConsumerInstanceConfig instanceConfig) {
        String name = instanceConfig.getName();
        if (instanceConfig.getId() != null) {
            name = instanceConfig.getId();
        }
        if (name == null) {
            name = "rest-consumer-";
            String serverId = this.config.getString("id");
            if (!serverId.isEmpty()) {
                name = name + serverId + "-";
            }
            name = name + UUID.randomUUID().toString();
        }
        ConsumerInstanceId cid = new ConsumerInstanceId(group, name);
        ConsumerManager consumerManager = this;
        synchronized (consumerManager) {
            if (this.consumers.containsKey(cid)) {
                throw Errors.consumerAlreadyExistsException();
            }
            this.consumers.put(cid, null);
        }
        boolean succeeded = false;
        try {
            ConsumerConnector consumer;
            log.debug("Creating consumer " + name + " in group " + group);
            Properties props = (Properties)this.config.getOriginalProperties().clone();
            props.setProperty("zookeeper.connect", this.zookeeperConnect);
            props.setProperty("group.id", group);
            if (instanceConfig.getId() != null) {
                props.setProperty("consumer.id", instanceConfig.getId());
            }
            props.setProperty("consumer.timeout.ms", Integer.toString(this.iteratorTimeoutMs));
            if (instanceConfig.getAutoCommitEnable() != null) {
                props.setProperty("auto.commit.enable", instanceConfig.getAutoCommitEnable());
            } else {
                props.setProperty("auto.commit.enable", "false");
            }
            if (instanceConfig.getAutoOffsetReset() != null) {
                props.setProperty("auto.offset.reset", instanceConfig.getAutoOffsetReset());
            }
            try {
                consumer = this.consumerFactory == null ? Consumer.createJavaConsumerConnector(new ConsumerConfig(props)) : this.consumerFactory.createConsumer(new ConsumerConfig(props));
            }
            catch (InvalidConfigException e) {
                throw Errors.invalidConsumerConfigException((String)e.getMessage());
            }
            ConsumerState state = this.createConsumerState(instanceConfig, cid, consumer);
            Object object = this;
            synchronized (object) {
                this.consumers.put(cid, state);
            }
            succeeded = true;
            object = name;
            return object;
        }
        finally {
            if (!succeeded) {
                ConsumerManager consumerManager2 = this;
                synchronized (consumerManager2) {
                    this.consumers.remove(cid);
                }
            }
        }
    }

    private ConsumerState createConsumerState(ConsumerInstanceConfig instanceConfig, ConsumerInstanceId cid, ConsumerConnector consumer) throws RestServerErrorException {
        KafkaRestConfig newConfig = KafkaRestConfig.newConsumerConfig((KafkaRestConfig)this.config, (ConsumerInstanceConfig)instanceConfig);
        switch (instanceConfig.getFormat()) {
            case BINARY: {
                return new BinaryConsumerState(newConfig, cid, consumer);
            }
            case AVRO: {
                return new AvroConsumerState(newConfig, cid, consumer);
            }
            case JSON: {
                return new JsonConsumerState(newConfig, cid, consumer);
            }
        }
        throw new RestServerErrorException(String.format("Invalid embedded format %s for new consumer.", instanceConfig.getFormat()), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
    }

    public <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> Future<List<ConsumerRecord<ClientKeyT, ClientValueT>>> readTopic(String group, String instance, String topic, Class<? extends ConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> consumerStateType, long maxBytes, ConsumerReadCallback<ClientKeyT, ClientValueT> callback) {
        ConsumerState state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return null;
        }
        if (!consumerStateType.isInstance(state)) {
            callback.onCompletion(null, (Exception)((Object)Errors.consumerFormatMismatch()));
            return null;
        }
        if (!this.mdObserver.topicExists(topic)) {
            callback.onCompletion(null, (Exception)((Object)Errors.topicNotFoundException()));
            return null;
        }
        ConsumerReadTask task = new ConsumerReadTask(state, topic, maxBytes, callback);
        this.executor.submit(new RunnableReadTask(new ReadTaskState(task, state, callback)));
        return task;
    }

    public Future commitOffsets(String group, String instance, final CommitCallback callback) {
        ConsumerState state;
        try {
            state = this.getConsumerInstance(group, instance);
        }
        catch (RestNotFoundException e) {
            callback.onCompletion(null, (Exception)((Object)e));
            return null;
        }
        return this.executor.submit(new Runnable(){

            @Override
            public void run() {
                try {
                    List<TopicPartitionOffset> offsets = state.commitOffsets();
                    callback.onCompletion(offsets, null);
                }
                catch (Exception e) {
                    log.error("Failed to commit offsets for consumer " + state.getId().toString(), (Throwable)e);
                    Exception responseException = e;
                    if (!(e instanceof RestException)) {
                        responseException = Errors.kafkaErrorException((Throwable)e);
                    }
                    callback.onCompletion(null, responseException);
                }
                finally {
                    state.updateExpiration();
                }
            }
        });
    }

    public void deleteConsumer(String group, String instance) {
        log.debug("Destroying consumer " + instance + " in group " + group);
        ConsumerState state = this.getConsumerInstance(group, instance, true);
        state.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        log.trace("Shutting down consumer expiration thread");
        this.expirationThread.shutdown();
        log.trace("Shutting down read task scheduler thread");
        this.readTaskSchedulerThread.shutdown();
        ConsumerManager consumerManager = this;
        synchronized (consumerManager) {
            for (Map.Entry<ConsumerInstanceId, ConsumerState> entry : this.consumers.entrySet()) {
                entry.getValue().close();
            }
            this.consumers.clear();
            this.executor.shutdown();
        }
    }

    private synchronized ConsumerState getConsumerInstance(String group, String instance, boolean toRemove) {
        ConsumerState state;
        ConsumerInstanceId id = new ConsumerInstanceId(group, instance);
        ConsumerState consumerState = state = toRemove ? this.consumers.remove(id) : this.consumers.get(id);
        if (state == null) {
            throw Errors.consumerInstanceNotFoundException();
        }
        state.updateExpiration();
        return state;
    }

    ConsumerState getConsumerInstance(String group, String instance) {
        return this.getConsumerInstance(group, instance, false);
    }

    private class ExpirationThread
    extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        public ExpirationThread() {
            super("Consumer Expiration Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            this.setDaemon(true);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                while (this.isRunning.get()) {
                    ConsumerManager consumerManager = ConsumerManager.this;
                    synchronized (consumerManager) {
                        long now = ConsumerManager.this.time.milliseconds();
                        Iterator itr = ConsumerManager.this.consumers.values().iterator();
                        while (itr.hasNext()) {
                            final ConsumerState state = (ConsumerState)itr.next();
                            if (state == null || !state.expired(now)) continue;
                            log.debug("Removing the expired consumer {}", (Object)state.getId());
                            itr.remove();
                            ConsumerManager.this.executor.submit(new Runnable(){

                                @Override
                                public void run() {
                                    state.close();
                                }
                            });
                        }
                    }
                    Thread.sleep(1000L);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                this.interrupt();
                this.shutdownLatch.await();
            }
            catch (InterruptedException e) {
                throw new Error("Interrupted when shutting down consumer worker thread.");
            }
        }
    }

    private class ReadTaskSchedulerThread
    extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        ReadTaskSchedulerThread() {
            super("Read Task Scheduler Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            this.setDaemon(true);
        }

        @Override
        public void run() {
            try {
                while (this.isRunning.get()) {
                    RunnableReadTask readTask = (RunnableReadTask)ConsumerManager.this.delayedReadTasks.poll(500L, TimeUnit.MILLISECONDS);
                    if (readTask == null) continue;
                    ConsumerManager.this.executor.submit(readTask);
                }
            }
            catch (InterruptedException interruptedException) {
            }
            finally {
                this.shutdownLatch.countDown();
            }
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                this.interrupt();
                this.shutdownLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted when shutting down read task scheduler thread.");
            }
        }
    }

    private static class ReadTaskState {
        final ConsumerReadTask task;
        final ConsumerState consumerState;
        final ConsumerReadCallback callback;

        public ReadTaskState(ConsumerReadTask task, ConsumerState state, ConsumerReadCallback callback) {
            this.task = task;
            this.consumerState = state;
            this.callback = callback;
        }
    }

    class RunnableReadTask
    implements Runnable,
    Delayed {
        private final ReadTaskState taskState;
        private final KafkaRestConfig consumerConfig;
        private final long started;
        private final long requestExpiration;
        private long waitExpirationMs;

        public RunnableReadTask(ReadTaskState taskState) {
            this.taskState = taskState;
            this.started = ConsumerManager.this.config.getTime().milliseconds();
            this.consumerConfig = taskState.consumerState.getConfig();
            this.requestExpiration = this.started + (long)this.consumerConfig.getInt("consumer.request.timeout.ms").intValue();
            this.waitExpirationMs = 0L;
        }

        @Override
        public void run() {
            try {
                log.trace("Executing consumer read task ({})", (Object)this.taskState.task);
                if (this.taskState.task.isDone()) {
                    return;
                }
                this.taskState.task.doPartialRead();
                this.taskState.consumerState.updateExpiration();
                if (!this.taskState.task.isDone()) {
                    long backoffTime = ConsumerManager.this.config.getTime().milliseconds() + (long)this.consumerConfig.getInt("consumer.iterator.backoff.ms").intValue();
                    this.waitExpirationMs = Math.min(backoffTime, this.requestExpiration);
                    ConsumerManager.this.delayedReadTasks.add(this);
                } else {
                    log.trace("Finished executing consumer read task ({})", (Object)this.taskState.task);
                }
            }
            catch (Exception e) {
                log.error("Failed to read records consumer " + this.taskState.consumerState.getId().toString(), (Throwable)e);
                Exception responseException = e;
                if (!(e instanceof RestException)) {
                    responseException = Errors.kafkaErrorException((Throwable)e);
                }
                this.taskState.callback.onCompletion(null, (Exception)((Object)((RestException)((Object)responseException))));
            }
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.waitExpirationMs - ConsumerManager.this.config.getTime().milliseconds(), TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(Delayed o) {
            if (o == null) {
                throw new NullPointerException("Delayed comparator cannot compare with null");
            }
            long otherObjDelay = o.getDelay(TimeUnit.MILLISECONDS);
            long delay = this.getDelay(TimeUnit.MILLISECONDS);
            return Long.compare(delay, otherObjDelay);
        }
    }

    class KafkaConsumerThreadPoolExecutor
    extends ThreadPoolExecutor {
        private KafkaConsumerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            if (runnable instanceof RunnableReadTask) {
                return new ReadFutureTask((RunnableReadTask)runnable, value);
            }
            return super.newTaskFor(runnable, value);
        }
    }

    private class ReadFutureTask<V>
    extends FutureTask<V> {
        private final RunnableReadTask readTask;

        private ReadFutureTask(RunnableReadTask runnable, V result) {
            super(runnable, result);
            this.readTask = runnable;
        }
    }

    public static interface ConsumerFactory {
        public ConsumerConnector createConsumer(ConsumerConfig var1);
    }

    public static interface CommitCallback {
        public void onCompletion(List<TopicPartitionOffset> var1, Exception var2);
    }
}

