/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.org.apache.kafka.clients.consumer;

import io.confluent.org.apache.kafka.clients.consumer.Consumer;
import io.confluent.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import io.confluent.org.apache.kafka.clients.consumer.ConsumerRecord;
import io.confluent.org.apache.kafka.clients.consumer.ConsumerRecords;
import io.confluent.org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import io.confluent.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import io.confluent.org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import io.confluent.org.apache.kafka.clients.consumer.OffsetCommitCallback;
import io.confluent.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import io.confluent.org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import io.confluent.org.apache.kafka.clients.consumer.internals.SubscriptionState;
import io.confluent.org.apache.kafka.common.KafkaException;
import io.confluent.org.apache.kafka.common.Metric;
import io.confluent.org.apache.kafka.common.MetricName;
import io.confluent.org.apache.kafka.common.PartitionInfo;
import io.confluent.org.apache.kafka.common.TopicPartition;
import io.confluent.org.apache.kafka.common.errors.WakeupException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

public class MockConsumer<K, V>
implements Consumer<K, V> {
    private final Map<String, List<PartitionInfo>> partitions;
    private final SubscriptionState subscriptions;
    private final Map<TopicPartition, Long> beginningOffsets;
    private final Map<TopicPartition, List<Long>> endOffsets;
    private final Map<TopicPartition, OffsetAndMetadata> committed;
    private final Queue<Runnable> pollTasks;
    private final Set<TopicPartition> paused;
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
    private KafkaException exception;
    private AtomicBoolean wakeup;
    private boolean closed;

    public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
        this.subscriptions = new SubscriptionState(offsetResetStrategy);
        this.partitions = new HashMap<String, List<PartitionInfo>>();
        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
        this.paused = new HashSet<TopicPartition>();
        this.closed = false;
        this.beginningOffsets = new HashMap<TopicPartition, Long>();
        this.endOffsets = new HashMap<TopicPartition, List<Long>>();
        this.pollTasks = new LinkedList<Runnable>();
        this.exception = null;
        this.wakeup = new AtomicBoolean(false);
        this.committed = new HashMap<TopicPartition, OffsetAndMetadata>();
    }

    @Override
    public synchronized Set<TopicPartition> assignment() {
        return this.subscriptions.assignedPartitions();
    }

    public synchronized void rebalance(Collection<TopicPartition> newAssignment) {
        this.records.clear();
        this.subscriptions.assignFromSubscribed(newAssignment);
    }

    @Override
    public synchronized Set<String> subscription() {
        return this.subscriptions.subscription();
    }

    @Override
    public synchronized void subscribe(Collection<String> topics) {
        this.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    @Override
    public synchronized void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.subscribe(pattern, listener);
        HashSet<String> topicsToSubscribe = new HashSet<String>();
        for (String topic : this.partitions.keySet()) {
            if (!pattern.matcher(topic).matches() || this.subscriptions.subscription().contains(topic)) continue;
            topicsToSubscribe.add(topic);
        }
        this.ensureNotClosed();
        this.subscriptions.subscribeFromPattern(topicsToSubscribe);
        HashSet<TopicPartition> assignedPartitions = new HashSet<TopicPartition>();
        for (String topic : topicsToSubscribe) {
            for (PartitionInfo info : this.partitions.get(topic)) {
                assignedPartitions.add(new TopicPartition(topic, info.partition()));
            }
        }
        this.subscriptions.assignFromSubscribed(assignedPartitions);
    }

    @Override
    public synchronized void subscribe(Pattern pattern) {
        this.subscribe(pattern, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
    }

    @Override
    public synchronized void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.subscribe(new HashSet<String>(topics), listener);
    }

    @Override
    public synchronized void assign(Collection<TopicPartition> partitions) {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.assignFromUser(new HashSet<TopicPartition>(partitions));
    }

    @Override
    public synchronized void unsubscribe() {
        this.ensureNotClosed();
        this.committed.clear();
        this.subscriptions.unsubscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized ConsumerRecords<K, V> poll(long timeout) {
        this.ensureNotClosed();
        Queue<Runnable> queue = this.pollTasks;
        synchronized (queue) {
            Runnable task = this.pollTasks.poll();
            if (task != null) {
                task.run();
            }
        }
        if (this.wakeup.get()) {
            this.wakeup.set(false);
            throw new WakeupException();
        }
        if (this.exception != null) {
            KafkaException exception = this.exception;
            this.exception = null;
            throw exception;
        }
        for (TopicPartition tp : this.subscriptions.assignedPartitions()) {
            if (this.subscriptions.hasValidPosition(tp)) continue;
            this.updateFetchPosition(tp);
        }
        HashMap results = new HashMap();
        for (TopicPartition topicPartition : this.records.keySet()) {
            results.put(topicPartition, new ArrayList());
        }
        for (Map.Entry entry : this.records.entrySet()) {
            if (this.subscriptions.isPaused((TopicPartition)entry.getKey())) continue;
            List recs = (List)entry.getValue();
            for (ConsumerRecord rec : recs) {
                if (!this.assignment().contains(entry.getKey()) || rec.offset() < this.subscriptions.position((TopicPartition)entry.getKey())) continue;
                ((List)results.get(entry.getKey())).add(rec);
                this.subscriptions.position((TopicPartition)entry.getKey(), rec.offset() + 1L);
            }
        }
        this.records.clear();
        return new ConsumerRecords(results);
    }

    public synchronized void addRecord(ConsumerRecord<K, V> record) {
        this.ensureNotClosed();
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        HashSet<TopicPartition> currentAssigned = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
        if (!currentAssigned.contains(tp)) {
            throw new IllegalStateException("Cannot add records for a partition that is not assigned to the consumer");
        }
        List<ConsumerRecord<K, V>> recs = this.records.get(tp);
        if (recs == null) {
            recs = new ArrayList<ConsumerRecord<K, V>>();
            this.records.put(tp, recs);
        }
        recs.add(record);
    }

    public synchronized void setException(KafkaException exception) {
        this.exception = exception;
    }

    @Override
    public synchronized void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.ensureNotClosed();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            this.committed.put(entry.getKey(), entry.getValue());
        }
        if (callback != null) {
            callback.onComplete(offsets, null);
        }
    }

    @Override
    public synchronized void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.commitAsync(offsets, null);
    }

    @Override
    public synchronized void commitAsync() {
        this.commitAsync(null);
    }

    @Override
    public synchronized void commitAsync(OffsetCommitCallback callback) {
        this.ensureNotClosed();
        this.commitAsync(this.subscriptions.allConsumed(), callback);
    }

    @Override
    public synchronized void commitSync() {
        this.commitSync(this.subscriptions.allConsumed());
    }

    @Override
    public synchronized void seek(TopicPartition partition, long offset) {
        this.ensureNotClosed();
        this.subscriptions.seek(partition, offset);
    }

    @Override
    public synchronized OffsetAndMetadata committed(TopicPartition partition) {
        this.ensureNotClosed();
        if (this.subscriptions.isAssigned(partition)) {
            return this.committed.get(partition);
        }
        return new OffsetAndMetadata(0L);
    }

    @Override
    public synchronized long position(TopicPartition partition) {
        this.ensureNotClosed();
        if (!this.subscriptions.isAssigned(partition)) {
            throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
        }
        Long offset = this.subscriptions.position(partition);
        if (offset == null) {
            this.updateFetchPosition(partition);
            offset = this.subscriptions.position(partition);
        }
        return offset;
    }

    @Override
    public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
        this.ensureNotClosed();
        for (TopicPartition tp : partitions) {
            this.subscriptions.requestOffsetReset(tp, OffsetResetStrategy.EARLIEST);
        }
    }

    public synchronized void updateBeginningOffsets(Map<TopicPartition, Long> newOffsets) {
        this.beginningOffsets.putAll(newOffsets);
    }

    @Override
    public synchronized void seekToEnd(Collection<TopicPartition> partitions) {
        this.ensureNotClosed();
        for (TopicPartition tp : partitions) {
            this.subscriptions.requestOffsetReset(tp, OffsetResetStrategy.LATEST);
        }
    }

    public synchronized void addEndOffsets(Map<TopicPartition, Long> newOffsets) {
        this.innerUpdateEndOffsets(newOffsets, false);
    }

    public synchronized void updateEndOffsets(Map<TopicPartition, Long> newOffsets) {
        this.innerUpdateEndOffsets(newOffsets, true);
    }

    private void innerUpdateEndOffsets(Map<TopicPartition, Long> newOffsets, boolean replace) {
        for (Map.Entry<TopicPartition, Long> entry : newOffsets.entrySet()) {
            List<Long> offsets = this.endOffsets.get(entry.getKey());
            if (replace || offsets == null) {
                offsets = new ArrayList<Long>();
            }
            offsets.add(entry.getValue());
            this.endOffsets.put(entry.getKey(), offsets);
        }
    }

    @Override
    public synchronized Map<MetricName, ? extends Metric> metrics() {
        this.ensureNotClosed();
        return Collections.emptyMap();
    }

    @Override
    public synchronized List<PartitionInfo> partitionsFor(String topic) {
        this.ensureNotClosed();
        return this.partitions.get(topic);
    }

    @Override
    public synchronized Map<String, List<PartitionInfo>> listTopics() {
        this.ensureNotClosed();
        return this.partitions;
    }

    public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
        this.ensureNotClosed();
        this.partitions.put(topic, partitions);
    }

    @Override
    public synchronized void pause(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            this.subscriptions.pause(partition);
            this.paused.add(partition);
        }
    }

    @Override
    public synchronized void resume(Collection<TopicPartition> partitions) {
        for (TopicPartition partition : partitions) {
            this.subscriptions.resume(partition);
            this.paused.remove(partition);
        }
    }

    @Override
    public synchronized Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        throw new UnsupportedOperationException("Not implemented yet.");
    }

    @Override
    public synchronized Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : partitions) {
            Long beginningOffset = this.beginningOffsets.get(tp);
            if (beginningOffset == null) {
                throw new IllegalStateException("The partition " + tp + " does not have a beginning offset.");
            }
            result.put(tp, beginningOffset);
        }
        return result;
    }

    @Override
    public synchronized Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        HashMap<TopicPartition, Long> result = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : partitions) {
            Long endOffset = this.getEndOffset(this.endOffsets.get(tp));
            if (endOffset == null) {
                throw new IllegalStateException("The partition " + tp + " does not have an end offset.");
            }
            result.put(tp, endOffset);
        }
        return result;
    }

    @Override
    public synchronized void close() {
        this.close(30000L, TimeUnit.MILLISECONDS);
    }

    @Override
    public synchronized void close(long timeout, TimeUnit unit) {
        this.ensureNotClosed();
        this.closed = true;
    }

    public synchronized boolean closed() {
        return this.closed;
    }

    @Override
    public synchronized void wakeup() {
        this.wakeup.set(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void schedulePollTask(Runnable task) {
        Queue<Runnable> queue = this.pollTasks;
        synchronized (queue) {
            this.pollTasks.add(task);
        }
    }

    public synchronized void scheduleNopPollTask() {
        this.schedulePollTask(new Runnable(){

            @Override
            public void run() {
            }
        });
    }

    @Override
    public synchronized Set<TopicPartition> paused() {
        return Collections.unmodifiableSet(new HashSet<TopicPartition>(this.paused));
    }

    private void ensureNotClosed() {
        if (this.closed) {
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void updateFetchPosition(TopicPartition tp) {
        if (this.subscriptions.isOffsetResetNeeded(tp)) {
            this.resetOffsetPosition(tp);
        } else if (!this.committed.containsKey(tp)) {
            this.subscriptions.requestOffsetReset(tp);
            this.resetOffsetPosition(tp);
        } else {
            this.subscriptions.seek(tp, this.committed.get(tp).offset());
        }
    }

    private void resetOffsetPosition(TopicPartition tp) {
        Long offset;
        OffsetResetStrategy strategy = this.subscriptions.resetStrategy(tp);
        if (strategy == OffsetResetStrategy.EARLIEST) {
            offset = this.beginningOffsets.get(tp);
            if (offset == null) {
                throw new IllegalStateException("MockConsumer didn't have beginning offset specified, but tried to seek to beginning");
            }
        } else if (strategy == OffsetResetStrategy.LATEST) {
            offset = this.getEndOffset(this.endOffsets.get(tp));
            if (offset == null) {
                throw new IllegalStateException("MockConsumer didn't have end offset specified, but tried to seek to end");
            }
        } else {
            throw new NoOffsetForPartitionException(tp);
        }
        this.seek(tp, offset);
    }

    private Long getEndOffset(List<Long> offsets) {
        if (offsets == null || offsets.isEmpty()) {
            return null;
        }
        return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
    }
}

