/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafka.javaapi.consumer.SimpleConsumer;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.SimpleConsumerFactory;
import io.confluent.kafkarest.SimpleFetcher;
import io.confluent.kafkarest.Time;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SimpleConsumerPool {
    private static final Logger log = LoggerFactory.getLogger(SimpleConsumerPool.class);
    private final int maxPoolSize;
    private final int poolInstanceAvailabilityTimeoutMs;
    private final Time time;
    private final SimpleConsumerFactory simpleConsumerFactory;
    private final Map<String, SimpleConsumer> simpleConsumers;
    private final Queue<String> availableConsumers;

    public SimpleConsumerPool(int maxPoolSize, int poolInstanceAvailabilityTimeoutMs, Time time, SimpleConsumerFactory simpleConsumerFactory) {
        this.maxPoolSize = maxPoolSize;
        this.poolInstanceAvailabilityTimeoutMs = poolInstanceAvailabilityTimeoutMs;
        this.time = time;
        this.simpleConsumerFactory = simpleConsumerFactory;
        this.simpleConsumers = new HashMap<String, SimpleConsumer>();
        this.availableConsumers = new LinkedList<String>();
    }

    public synchronized SimpleFetcher get(String host, int port) {
        long expiration = this.time.milliseconds() + (long)this.poolInstanceAvailabilityTimeoutMs;
        do {
            if (this.availableConsumers.size() > 0) {
                String consumerId = this.availableConsumers.remove();
                return new SimpleFetcher(this.simpleConsumers.get(consumerId), this);
            }
            if (this.simpleConsumers.size() < this.maxPoolSize || this.maxPoolSize == 0) {
                SimpleConsumer simpleConsumer = this.simpleConsumerFactory.createConsumer(host, port);
                this.simpleConsumers.put(simpleConsumer.clientId(), simpleConsumer);
                return new SimpleFetcher(simpleConsumer, this);
            }
            try {
                this.wait(this.poolInstanceAvailabilityTimeoutMs);
            }
            catch (InterruptedException e) {
                log.warn("A thread requesting a SimpleConsumer has been interrupted while waiting", (Throwable)e);
            }
        } while (this.time.milliseconds() <= expiration || this.poolInstanceAvailabilityTimeoutMs == 0);
        throw Errors.simpleConsumerPoolTimeoutException();
    }

    public synchronized void release(SimpleFetcher simpleFetcher) {
        log.debug("Releasing into the pool SimpleConsumer with id " + simpleFetcher.clientId());
        this.availableConsumers.add(simpleFetcher.clientId());
        this.notify();
    }

    public void shutdown() {
        for (SimpleConsumer simpleConsumer : this.simpleConsumers.values()) {
            simpleConsumer.close();
        }
    }

    public int size() {
        return this.simpleConsumers.size();
    }
}

