/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ConsumerInterceptorsTest {
    private final int filterPartition1 = 5;
    private final int filterPartition2 = 6;
    private final String topic = "test";
    private final int partition = 1;
    private final TopicPartition tp = new TopicPartition("test", 1);
    private final TopicPartition filterTopicPart1 = new TopicPartition("test5", 5);
    private final TopicPartition filterTopicPart2 = new TopicPartition("test6", 6);
    private final ConsumerRecord<Integer, Integer> consumerRecord = new ConsumerRecord("test", 1, 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)1, (Object)1);
    private int onCommitCount = 0;
    private int onConsumeCount = 0;

    @Test
    public void testOnConsumeChain() {
        ArrayList interceptorList = new ArrayList();
        FilterConsumerInterceptor interceptor1 = new FilterConsumerInterceptor(5);
        FilterConsumerInterceptor interceptor2 = new FilterConsumerInterceptor(6);
        interceptorList.add(interceptor1);
        interceptorList.add(interceptor2);
        ConsumerInterceptors interceptors = new ConsumerInterceptors(interceptorList);
        HashMap records = new HashMap();
        ArrayList<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<ConsumerRecord<Integer, Integer>>();
        list1.add(this.consumerRecord);
        ArrayList<ConsumerRecord> list2 = new ArrayList<ConsumerRecord>();
        list2.add(new ConsumerRecord(this.filterTopicPart1.topic(), this.filterTopicPart1.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)1, (Object)1));
        ArrayList<ConsumerRecord> list3 = new ArrayList<ConsumerRecord>();
        list3.add(new ConsumerRecord(this.filterTopicPart2.topic(), this.filterTopicPart2.partition(), 0L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, (Object)1, (Object)1));
        records.put(this.tp, list1);
        records.put(this.filterTopicPart1, list2);
        records.put(this.filterTopicPart2, list3);
        ConsumerRecords consumerRecords = new ConsumerRecords(records);
        ConsumerRecords interceptedRecords = interceptors.onConsume(consumerRecords);
        Assertions.assertEquals((int)1, (int)interceptedRecords.count());
        Assertions.assertTrue((boolean)interceptedRecords.partitions().contains(this.tp));
        Assertions.assertFalse((boolean)interceptedRecords.partitions().contains(this.filterTopicPart1));
        Assertions.assertFalse((boolean)interceptedRecords.partitions().contains(this.filterTopicPart2));
        Assertions.assertEquals((int)2, (int)this.onConsumeCount);
        interceptor1.injectOnConsumeError(true);
        ConsumerRecords partInterceptedRecs = interceptors.onConsume(consumerRecords);
        Assertions.assertEquals((int)2, (int)partInterceptedRecs.count());
        Assertions.assertTrue((boolean)partInterceptedRecs.partitions().contains(this.filterTopicPart1));
        Assertions.assertFalse((boolean)partInterceptedRecs.partitions().contains(this.filterTopicPart2));
        Assertions.assertEquals((int)4, (int)this.onConsumeCount);
        interceptor2.injectOnConsumeError(true);
        ConsumerRecords noneInterceptedRecs = interceptors.onConsume(consumerRecords);
        Assertions.assertEquals((Object)noneInterceptedRecs, (Object)consumerRecords);
        Assertions.assertEquals((int)3, (int)noneInterceptedRecs.count());
        Assertions.assertEquals((int)6, (int)this.onConsumeCount);
        interceptors.close();
    }

    @Test
    public void testOnCommitChain() {
        ArrayList interceptorList = new ArrayList();
        FilterConsumerInterceptor interceptor1 = new FilterConsumerInterceptor(5);
        FilterConsumerInterceptor interceptor2 = new FilterConsumerInterceptor(6);
        interceptorList.add(interceptor1);
        interceptorList.add(interceptor2);
        ConsumerInterceptors interceptors = new ConsumerInterceptors(interceptorList);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(this.tp, new OffsetAndMetadata(0L));
        interceptors.onCommit(offsets);
        Assertions.assertEquals((int)2, (int)this.onCommitCount);
        interceptor1.injectOnCommitError(true);
        interceptors.onCommit(offsets);
        Assertions.assertEquals((int)4, (int)this.onCommitCount);
        interceptors.close();
    }

    private class FilterConsumerInterceptor<K, V>
    implements ConsumerInterceptor<K, V> {
        private int filterPartition;
        private boolean throwExceptionOnConsume = false;
        private boolean throwExceptionOnCommit = false;

        FilterConsumerInterceptor(int filterPartition) {
            this.filterPartition = filterPartition;
        }

        public void configure(Map<String, ?> configs) {
        }

        public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
            ConsumerInterceptorsTest.this.onConsumeCount++;
            if (this.throwExceptionOnConsume) {
                throw new KafkaException("Injected exception in FilterConsumerInterceptor.onConsume.");
            }
            HashMap<TopicPartition, List> recordMap = new HashMap<TopicPartition, List>();
            for (TopicPartition tp : records.partitions()) {
                if (tp.partition() == this.filterPartition) continue;
                recordMap.put(tp, records.records(tp));
            }
            return new ConsumerRecords(recordMap);
        }

        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
            ConsumerInterceptorsTest.this.onCommitCount++;
            if (this.throwExceptionOnCommit) {
                throw new KafkaException("Injected exception in FilterConsumerInterceptor.onCommit.");
            }
        }

        public void close() {
        }

        public void injectOnConsumeError(boolean on) {
            this.throwExceptionOnConsume = on;
        }

        public void injectOnCommitError(boolean on) {
            this.throwExceptionOnCommit = on;
        }
    }
}

