/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest;

import io.confluent.kafka.consumer.ConsumerIterator;
import io.confluent.kafka.consumer.ConsumerTimeoutException;
import io.confluent.kafka.message.MessageAndMetadata;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.ConsumerRecordAndSize;
import io.confluent.kafkarest.ConsumerState;
import io.confluent.kafkarest.ConsumerTopicState;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.List;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ConsumerReadTask<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>
implements Future<List<ConsumerRecord<ClientKeyT, ClientValueT>>> {
    private static final Logger log = LoggerFactory.getLogger(ConsumerReadTask.class);
    private ConsumerState parent;
    private final long maxResponseBytes;
    private final int requestTimeoutMs;
    private final int responseMinBytes;
    private final ConsumerReadCallback<ClientKeyT, ClientValueT> callback;
    private CountDownLatch finished;
    private ConsumerTopicState topicState;
    private ConsumerIterator<KafkaKeyT, KafkaValueT> iter;
    private List<ConsumerRecord<ClientKeyT, ClientValueT>> messages;
    private long bytesConsumed = 0L;
    private final long started;

    public ConsumerReadTask(ConsumerState parent, String topic, long maxBytes, ConsumerReadCallback<ClientKeyT, ClientValueT> callback) {
        KafkaRestConfig conf = parent.getConfig();
        this.parent = parent;
        this.maxResponseBytes = Math.min(maxBytes, conf.getLong("consumer.request.max.bytes"));
        this.callback = callback;
        this.finished = new CountDownLatch(1);
        this.requestTimeoutMs = conf.getInt("consumer.request.timeout.ms");
        int responseMinBytes = conf.getInt("fetch.min.bytes");
        this.responseMinBytes = responseMinBytes < 0 ? Integer.MAX_VALUE : responseMinBytes;
        this.started = conf.getTime().milliseconds();
        try {
            this.topicState = parent.getOrCreateTopicState(topic);
            ConsumerReadTask previousTask = this.topicState.clearFailedTask();
            if (previousTask != null) {
                this.messages = previousTask.messages;
                this.bytesConsumed = previousTask.bytesConsumed;
            }
        }
        catch (RestException e) {
            this.finish(e);
        }
    }

    public void doPartialRead() {
        try {
            boolean requestTimedOut;
            if (this.iter == null) {
                this.parent.startRead(this.topicState);
                this.iter = this.topicState.getIterator();
                this.messages = new Vector<ConsumerRecord<ClientKeyT, ClientValueT>>();
            }
            long roughMsgSize = 0L;
            boolean willExceedMaxResponseBytes = this.bytesConsumed >= this.maxResponseBytes;
            boolean exceededMinResponseBytes = this.bytesConsumed > (long)this.responseMinBytes;
            try {
                while (!willExceedMaxResponseBytes && !exceededMinResponseBytes && this.iter.hasNext()) {
                    MessageAndMetadata msg = (MessageAndMetadata)this.iter.peek();
                    ConsumerRecordAndSize recordAndSize = this.parent.createConsumerRecord(msg);
                    roughMsgSize = recordAndSize.getSize();
                    boolean bl = willExceedMaxResponseBytes = this.bytesConsumed + roughMsgSize >= this.maxResponseBytes;
                    if (!willExceedMaxResponseBytes) {
                        this.iter.next();
                        this.messages.add(recordAndSize.getRecord());
                        this.bytesConsumed += roughMsgSize;
                        exceededMinResponseBytes = this.bytesConsumed > (long)this.responseMinBytes;
                        continue;
                    }
                    break;
                }
            }
            catch (ConsumerTimeoutException cte) {
                log.trace("ConsumerReadTask timed out, using backoff id={}", (Object)this);
            }
            log.trace("ConsumerReadTask exiting read with id={} messages={} bytes={}", new Object[]{this, this.messages.size(), this.bytesConsumed});
            long now = this.parent.getConfig().getTime().milliseconds();
            long elapsed = now - this.started;
            boolean bl = requestTimedOut = elapsed >= (long)this.requestTimeoutMs;
            if (requestTimedOut || willExceedMaxResponseBytes || exceededMinResponseBytes) {
                log.trace("Finishing ConsumerReadTask id={} requestTimedOut={} willExceedMaxResponseBytes={} exceededMinResponseBytes={}", new Object[]{this, requestTimedOut, willExceedMaxResponseBytes, exceededMinResponseBytes});
                this.finish();
            }
        }
        catch (Exception e2) {
            RestServerErrorException e2;
            if (!(e2 instanceof RestException)) {
                e2 = Errors.kafkaErrorException((Throwable)e2);
            }
            this.finish((RestException)e2);
            log.error("Unexpected exception in consumer read task id={} ", (Object)this, (Object)e2);
        }
    }

    public void finish() {
        this.finish(null);
    }

    public void finish(RestException e) {
        log.trace("Finishing ConsumerReadTask id={} exception={}", (Object)this, (Object)e);
        if (e == null) {
            Map<Integer, Long> consumedOffsets = this.topicState.getConsumedOffsets();
            for (ConsumerRecord<ClientKeyT, ClientValueT> msg : this.messages) {
                consumedOffsets.put(msg.getPartition(), msg.getOffset());
            }
        } else if (this.topicState != null && this.messages != null && this.messages.size() > 0) {
            log.trace("Saving failed ConsumerReadTask for subsequent call id={}", (Object)this, (Object)e);
            this.topicState.setFailedTask(this);
        }
        if (this.topicState != null) {
            this.parent.finishRead(this.topicState);
        }
        try {
            this.callback.onCompletion(e == null ? this.messages : null, (Exception)((Object)e));
        }
        catch (Throwable t) {
            log.error("Consumer read callback threw an unhandled exception id={}", (Object)this, (Object)e);
        }
        this.finished.countDown();
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
    }

    @Override
    public boolean isCancelled() {
        return false;
    }

    @Override
    public boolean isDone() {
        return this.finished.getCount() == 0L;
    }

    @Override
    public List<ConsumerRecord<ClientKeyT, ClientValueT>> get() throws InterruptedException, ExecutionException {
        this.finished.await();
        return this.messages;
    }

    @Override
    public List<ConsumerRecord<ClientKeyT, ClientValueT>> get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        this.finished.await(timeout, unit);
        if (this.finished.getCount() > 0L) {
            throw new TimeoutException();
        }
        return this.messages;
    }
}

