/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.CounterConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchCollector;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.MembershipManagerImpl;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class AsyncKafkaConsumerTest {
    private AsyncKafkaConsumer<String, String> consumer = null;
    private Time time = new MockTime(0L);
    private final FetchCollector<String, String> fetchCollector = (FetchCollector)Mockito.mock(FetchCollector.class);
    private final ApplicationEventHandler applicationEventHandler = (ApplicationEventHandler)Mockito.mock(ApplicationEventHandler.class);
    private final ConsumerMetadata metadata = (ConsumerMetadata)Mockito.mock(ConsumerMetadata.class);
    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue();
    private final CompletableEventReaper backgroundEventReaper = (CompletableEventReaper)Mockito.mock(CompletableEventReaper.class);

    @AfterEach
    public void resetAll() {
        this.backgroundEventQueue.clear();
        if (this.consumer != null) {
            try {
                this.consumer.close(Duration.ZERO);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.consumer = null;
        Mockito.framework().clearInlineMocks();
        MockConsumerInterceptor.resetCounters();
    }

    private AsyncKafkaConsumer<String, String> newConsumer() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "group-id");
        return this.newConsumer(props);
    }

    private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
        Properties props = TestUtils.requiredConsumerConfig();
        return this.newConsumer(props);
    }

    private AsyncKafkaConsumer<String, String> newConsumerWithEmptyGroupId() {
        Properties props = this.requiredConsumerConfigAndGroupId("");
        return this.newConsumer(props);
    }

    private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
        ConsumerConfig config = new ConsumerConfig(props);
        return this.newConsumer(config);
    }

    private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig config) {
        return new AsyncKafkaConsumer(config, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), this.time, (a, b, c, d, e, f, g) -> this.applicationEventHandler, a -> this.backgroundEventReaper, (a, b, c, d, e, f, g) -> this.fetchCollector, (a, b, c, d) -> this.metadata, this.backgroundEventQueue);
    }

    private AsyncKafkaConsumer<String, String> newConsumer(FetchBuffer fetchBuffer, ConsumerInterceptors<String, String> interceptors, ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, SubscriptionState subscriptions, String groupId, String clientId) {
        long retryBackoffMs = 100L;
        int defaultApiTimeoutMs = 1000;
        boolean autoCommitEnabled = true;
        return new AsyncKafkaConsumer(new LogContext(), clientId, new Deserializers((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer()), fetchBuffer, this.fetchCollector, interceptors, this.time, this.applicationEventHandler, this.backgroundEventQueue, this.backgroundEventReaper, rebalanceListenerInvoker, new Metrics(), subscriptions, this.metadata, retryBackoffMs, defaultApiTimeoutMs, groupId, autoCommitEnabled);
    }

    @Test
    public void testSuccessfulStartupShutdown() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.close());
    }

    @Test
    public void testInvalidGroupId() {
        KafkaException e = (KafkaException)Assertions.assertThrows(KafkaException.class, this::newConsumerWithEmptyGroupId);
        Assertions.assertInstanceOf(InvalidGroupIdException.class, (Object)e.getCause());
    }

    @Test
    public void testFailOnClosedConsumer() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        IllegalStateException res = (IllegalStateException)Assertions.assertThrows(IllegalStateException.class, () -> this.consumer.assignment());
        Assertions.assertEquals((Object)"This consumer has already been closed.", (Object)res.getMessage());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCommitAsyncWithNullCallback() {
        this.consumer = this.newConsumer();
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(t0, new OffsetAndMetadata(10L));
        offsets.put(t1, new OffsetAndMetadata(20L));
        this.consumer.commitAsync(offsets, null);
        ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)commitEventCaptor.capture());
        AsyncCommitEvent commitEvent = (AsyncCommitEvent)commitEventCaptor.getValue();
        Assertions.assertEquals(offsets, (Object)commitEvent.offsets());
        Assertions.assertDoesNotThrow(() -> commitEvent.future().complete(null));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync((Map)offsets, null));
        try {
            Exception e = (Exception)Assertions.assertThrows(KafkaException.class, () -> this.consumer.close(Duration.ZERO));
            Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, (Object)e.getCause());
        }
        finally {
            this.consumer = null;
        }
    }

    @Test
    public void testCommitAsyncUserSuppliedCallbackNoException() {
        this.consumer = this.newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        this.completeCommitAsyncApplicationEventSuccessfully();
        MockCommitCallback callback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(offsets, (OffsetCommitCallback)callback));
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)callback.invoked, (int)1);
        Assertions.assertNull((Object)callback.exception);
    }

    @ParameterizedTest
    @MethodSource(value={"commitExceptionSupplier"})
    public void testCommitAsyncUserSuppliedCallbackWithException(Exception exception) {
        this.consumer = this.newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        this.completeCommitAsyncApplicationEventExceptionally(exception);
        MockCommitCallback callback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(offsets, (OffsetCommitCallback)callback));
        this.forceCommitCallbackInvocation();
        Assertions.assertSame(exception.getClass(), callback.exception.getClass());
    }

    private static Stream<Exception> commitExceptionSupplier() {
        return Stream.of(new Exception[]{new KafkaException("Test exception"), new GroupAuthorizationException("Group authorization exception")});
    }

    @Test
    public void testCommitAsyncWithFencedException() {
        this.consumer = this.newConsumer();
        this.completeCommitSyncApplicationEventSuccessfully();
        Map<TopicPartition, OffsetAndMetadata> offsets = this.mockTopicPartitionOffset();
        MockCommitCallback callback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(offsets, (OffsetCommitCallback)callback));
        ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)commitEventCaptor.capture());
        AsyncCommitEvent commitEvent = (AsyncCommitEvent)commitEventCaptor.getValue();
        commitEvent.future().completeExceptionally((Throwable)Errors.FENCED_INSTANCE_ID.exception());
        Assertions.assertThrows(Errors.FENCED_INSTANCE_ID.exception().getClass(), () -> this.consumer.commitAsync());
    }

    @Test
    public void testCommitted() {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = this.mockTopicPartitionOffset();
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
        Assertions.assertEquals(topicPartitionOffsets, (Object)this.consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000L)));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
        Metric metric = (Metric)this.consumer.metrics().get(this.consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"));
        Assertions.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void testCommittedLeaderEpochUpdate() {
        this.consumer = this.newConsumer();
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        TopicPartition t2 = new TopicPartition("t0", 4);
        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), ""));
        topicPartitionOffsets.put(t1, null);
        topicPartitionOffsets.put(t2, new OffsetAndMetadata(20L, Optional.of(3), ""));
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
        Assertions.assertDoesNotThrow(() -> this.consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000L)));
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).updateLastSeenEpochIfNewer(t0, 2);
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).updateLastSeenEpochIfNewer(t2, 3);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
    }

    @Test
    public void testCommittedExceptionThrown() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndMetadata> offsets = this.mockTopicPartitionOffset();
        Mockito.when((Object)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class))).thenAnswer(invocation -> {
            CompletableApplicationEvent event = (CompletableApplicationEvent)invocation.getArgument(0);
            Assertions.assertInstanceOf(FetchCommittedOffsetsEvent.class, (Object)event);
            throw new KafkaException("Test exception");
        });
        Assertions.assertThrows(KafkaException.class, () -> this.consumer.committed(offsets.keySet(), Duration.ofMillis(1000L)));
    }

    @Test
    public void testWakeupBeforeCallingPoll() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)tp, (Object)new OffsetAndMetadata(1L))});
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
        this.completeCommitSyncApplicationEventSuccessfully();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.poll(Duration.ZERO));
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testWakeupAfterEmptyFetch() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doAnswer(invocation -> {
            this.consumer.wakeup();
            return Fetch.empty();
        }).doAnswer(invocation -> Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)tp, (Object)new OffsetAndMetadata(1L))});
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
        this.completeCommitSyncApplicationEventSuccessfully();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(tp));
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.poll(Duration.ofMinutes(1L)));
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testWakeupAfterNonEmptyFetch() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        List<ConsumerRecord> records = Arrays.asList(new ConsumerRecord("foo", 3, 2L, (Object)"key1", (Object)"value1"), new ConsumerRecord("foo", 3, 3L, (Object)"key2", (Object)"value2"));
        ((FetchCollector)Mockito.doAnswer(invocation -> {
            this.consumer.wakeup();
            return Fetch.forPartition((TopicPartition)tp, (List)records, (boolean)true);
        }).when(this.fetchCollector)).collectFetch((FetchBuffer)Mockito.any(FetchBuffer.class));
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)tp, (Object)new OffsetAndMetadata(1L))});
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
        this.completeCommitSyncApplicationEventSuccessfully();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(tp));
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ofMinutes(1L)));
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testCommitInRebalanceCallback() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        final TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doAnswer(invocation -> Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)Mockito.any(FetchBuffer.class));
        TreeSet<TopicPartition> sortedPartitions = new TreeSet<TopicPartition>((Comparator<TopicPartition>)MembershipManagerImpl.TOPIC_PARTITION_COMPARATOR);
        sortedPartitions.add(tp);
        ConsumerRebalanceListenerCallbackNeededEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, sortedPartitions);
        this.backgroundEventQueue.add((BackgroundEvent)e);
        this.completeCommitSyncApplicationEventSuccessfully();
        final AtomicBoolean callbackExecuted = new AtomicBoolean(false);
        ConsumerRebalanceListener listener = new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                Assertions.assertDoesNotThrow(() -> AsyncKafkaConsumerTest.this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)tp, (Object)new OffsetAndMetadata(0L))})));
                callbackExecuted.set(true);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }
        };
        this.consumer.subscribe(Collections.singletonList("foo"), listener);
        this.consumer.poll(Duration.ZERO);
        Assertions.assertTrue((boolean)callbackExecuted.get());
    }

    @Test
    public void testSubscriptionRegexEvalOnPollOnlyIfMetadataChanges() {
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Cluster cluster = (Cluster)Mockito.mock(Cluster.class);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id");
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(tp, new OffsetAndMetadata(1L));
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        ((ConsumerMetadata)Mockito.doReturn((Object)cluster).when((Object)this.metadata)).fetch();
        ((Cluster)Mockito.doReturn(Collections.singleton("foo")).when((Object)cluster)).topics();
        this.consumer.subscribe(Pattern.compile("f*"));
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).requestUpdateForNewTopics();
        ((SubscriptionState)Mockito.verify((Object)subscriptions)).matchesSubscribedPattern("foo");
        Mockito.clearInvocations((Object[])new SubscriptionState[]{subscriptions});
        Mockito.when((Object)subscriptions.hasPatternSubscription()).thenReturn((Object)true);
        this.consumer.poll(Duration.ZERO);
        ((SubscriptionState)Mockito.verify((Object)subscriptions, (VerificationMode)Mockito.never())).matchesSubscribedPattern("foo");
        Mockito.when((Object)this.metadata.updateVersion()).thenReturn((Object)2);
        Mockito.when((Object)subscriptions.hasPatternSubscription()).thenReturn((Object)true);
        this.consumer.poll(Duration.ZERO);
        ((SubscriptionState)Mockito.verify((Object)subscriptions)).matchesSubscribedPattern("foo");
    }

    @Test
    public void testClearWakeupTriggerAfterPoll() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        List<ConsumerRecord> records = Arrays.asList(new ConsumerRecord("foo", 3, 2L, (Object)"key1", (Object)"value1"), new ConsumerRecord("foo", 3, 3L, (Object)"key2", (Object)"value2"));
        ((FetchCollector)Mockito.doReturn((Object)Fetch.forPartition((TopicPartition)tp, records, (boolean)true)).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)tp, (Object)new OffsetAndMetadata(1L))});
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
        this.completeCommitSyncApplicationEventSuccessfully();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testEnsureCallbackExecutedByApplicationThread() {
        this.consumer = this.newConsumer();
        String currentThread = Thread.currentThread().getName();
        MockCommitCallback callback = new MockCommitCallback();
        this.completeCommitAsyncApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), (OffsetCommitCallback)callback));
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)1, (int)callback.invoked);
        Assertions.assertEquals((Object)currentThread, (Object)callback.completionThread);
    }

    @Test
    public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() {
        this.consumer = this.newConsumer();
        KafkaException callbackException = new KafkaException("Async commit callback failed");
        OffsetCommitCallback callback = (offsets, exception) -> {
            throw callbackException;
        };
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), callback));
        Assertions.assertThrows(((Object)((Object)callbackException)).getClass(), () -> this.consumer.commitSync());
    }

    @Test
    public void testPollLongThrowsException() {
        this.consumer = this.newConsumer();
        Exception e = (Exception)Assertions.assertThrows(UnsupportedOperationException.class, () -> this.consumer.poll(0L));
        Assertions.assertEquals((Object)"Consumer.poll(long) is not supported when \"group.protocol\" is \"consumer\". This method is deprecated and will be removed in the next major release.", (Object)e.getMessage());
    }

    @Test
    public void testOffsetFetchStoresPendingEvent() {
        this.consumer = this.newConsumer();
        long timeoutMs = 0L;
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event = this.getLastEnqueuedEvent();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            Map cfr_ignored_0 = (Map)ConsumerUtils.getResult((Future)event.future(), (Timer)this.time.timer(timeoutMs));
        });
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
        Mockito.clearInvocations((Object[])new ApplicationEventHandler[]{this.applicationEventHandler});
        event.future().complete(Collections.emptyMap());
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        Assertions.assertDoesNotThrow(() -> (Map)ConsumerUtils.getResult((Future)event.future(), (Timer)this.time.timer(timeoutMs)));
        Assertions.assertFalse((boolean)this.consumer.hasPendingOffsetFetchEvent());
    }

    @Test
    public void testOffsetFetchDoesNotReuseMismatchedPendingEvent() {
        this.consumer = this.newConsumer();
        long timeoutMs = 0L;
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event1 = this.getLastEnqueuedEvent();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            Map cfr_ignored_0 = (Map)ConsumerUtils.getResult((Future)event1.future(), (Timer)this.time.timer(timeoutMs));
        });
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
        Mockito.clearInvocations((Object[])new ApplicationEventHandler[]{this.applicationEventHandler});
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 1)));
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event2 = this.getLastEnqueuedEvent();
        Assertions.assertNotEquals(event1, event2);
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            Map cfr_ignored_0 = (Map)ConsumerUtils.getResult((Future)event2.future(), (Timer)this.time.timer(timeoutMs));
        });
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
        Mockito.clearInvocations((Object[])new ApplicationEventHandler[]{this.applicationEventHandler});
        event2.future().complete(Collections.emptyMap());
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        Assertions.assertDoesNotThrow(() -> (Map)ConsumerUtils.getResult((Future)event2.future(), (Timer)this.time.timer(timeoutMs)));
        Assertions.assertFalse((boolean)this.consumer.hasPendingOffsetFetchEvent());
    }

    @Test
    public void testOffsetFetchDoesNotReuseExpiredPendingEvent() {
        this.consumer = this.newConsumer();
        long timeoutMs = 0L;
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event1 = this.getLastEnqueuedEvent();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            Map cfr_ignored_0 = (Map)ConsumerUtils.getResult((Future)event1.future(), (Timer)this.time.timer(timeoutMs));
        });
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
        Mockito.clearInvocations((Object[])new ApplicationEventHandler[]{this.applicationEventHandler});
        this.time.sleep(event1.deadlineMs() - this.time.milliseconds());
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event2 = this.getLastEnqueuedEvent();
        Assertions.assertNotEquals(event1, event2);
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            Map cfr_ignored_0 = (Map)ConsumerUtils.getResult((Future)event2.future(), (Timer)this.time.timer(timeoutMs));
        });
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
    }

    @Test
    public void testOffsetFetchTimeoutExceptionKeepsPendingEvent() {
        this.consumer = this.newConsumer();
        long timeoutMs = 0L;
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event = this.getLastEnqueuedEvent();
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
        event.future().completeExceptionally(new org.apache.kafka.common.errors.TimeoutException("Test error"));
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ofMillis(timeoutMs)));
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
    }

    @Test
    public void testOffsetFetchInterruptExceptionKeepsPendingEvent() {
        this.consumer = this.newConsumer();
        long timeoutMs = 0L;
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event = this.getLastEnqueuedEvent();
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
        event.future().completeExceptionally((Throwable)new InterruptException("Test error"));
        Assertions.assertThrows(InterruptException.class, () -> this.consumer.poll(Duration.ofMillis(timeoutMs)));
        Assertions.assertTrue((boolean)Thread.interrupted());
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
    }

    @Test
    public void testOffsetFetchUnexpectedExceptionClearsPendingEvent() {
        this.consumer = this.newConsumer();
        long timeoutMs = 0L;
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        this.consumer.poll(Duration.ofMillis(timeoutMs));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        CompletableApplicationEvent event = this.getLastEnqueuedEvent();
        Assertions.assertTrue((boolean)this.consumer.hasPendingOffsetFetchEvent());
        event.future().completeExceptionally(new NullPointerException("Test error"));
        Assertions.assertThrows(KafkaException.class, () -> this.consumer.poll(Duration.ofMillis(timeoutMs)));
        Assertions.assertFalse((boolean)this.consumer.hasPendingOffsetFetchEvent());
    }

    @Test
    public void testCommitSyncLeaderEpochUpdate() {
        this.consumer = this.newConsumer();
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), ""));
        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), ""));
        this.completeCommitSyncApplicationEventSuccessfully();
        this.consumer.assign(Arrays.asList(t0, t1));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitSync((Map)topicPartitionOffsets));
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).updateLastSeenEpochIfNewer(t0, 2);
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).updateLastSeenEpochIfNewer(t1, 1);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(SyncCommitEvent.class));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCommitAsyncLeaderEpochUpdate() {
        SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)new ConsumerInterceptors(Collections.emptyList()), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id");
        this.completeCommitSyncApplicationEventSuccessfully();
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L, Optional.of(2), ""));
        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L, Optional.of(1), ""));
        Mockito.when((Object)this.metadata.currentLeader(t0)).thenReturn((Object)new Metadata.LeaderAndEpoch(Optional.of(new Node(1, "host", 9000)), Optional.of(1)));
        Mockito.when((Object)this.metadata.currentLeader(t1)).thenReturn((Object)new Metadata.LeaderAndEpoch(Optional.of(new Node(1, "host", 9000)), Optional.of(1)));
        this.consumer.assign(Arrays.asList(t0, t1));
        this.consumer.seek(t0, 10L);
        this.consumer.seek(t1, 20L);
        MockCommitCallback callback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync((Map)topicPartitionOffsets, (OffsetCommitCallback)callback));
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).updateLastSeenEpochIfNewer(t0, 2);
        ((ConsumerMetadata)Mockito.verify((Object)this.metadata)).updateLastSeenEpochIfNewer(t1, 1);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(AsyncCommitEvent.class));
        try {
            Exception e = (Exception)Assertions.assertThrows(KafkaException.class, () -> this.consumer.close(Duration.ZERO));
            Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, (Object)e.getCause());
        }
        finally {
            this.consumer = null;
        }
    }

    @Test
    public void testCommitAsyncTriggersFencedExceptionFromCommitAsync() {
        String groupId = "consumerGroupA";
        String groupInstanceId = "groupInstanceId1";
        Properties props = this.requiredConsumerConfigAndGroupId("consumerGroupA");
        props.put("group.instance.id", "groupInstanceId1");
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        this.completeCommitAsyncApplicationEventExceptionally((Exception)Errors.FENCED_INSTANCE_ID.exception());
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap((Map.Entry[])new Map.Entry[0]));
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        TopicPartition tp = new TopicPartition("foo", 0);
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.seek(tp, 20L);
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync());
        Exception e = (Exception)Assertions.assertThrows(FencedInstanceIdException.class, () -> this.consumer.commitAsync());
        Assertions.assertEquals((Object)"Get fenced exception for group.instance.id groupInstanceId1", (Object)e.getMessage());
    }

    @Test
    public void testCommitSyncTriggersFencedExceptionFromCommitAsync() {
        String groupId = "consumerGroupA";
        String groupInstanceId = "groupInstanceId1";
        Properties props = this.requiredConsumerConfigAndGroupId("consumerGroupA");
        props.put("group.instance.id", "groupInstanceId1");
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        this.completeCommitAsyncApplicationEventExceptionally((Exception)Errors.FENCED_INSTANCE_ID.exception());
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap((Map.Entry[])new Map.Entry[0]));
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        TopicPartition tp = new TopicPartition("foo", 0);
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.seek(tp, 20L);
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync());
        Exception e = (Exception)Assertions.assertThrows(FencedInstanceIdException.class, () -> this.consumer.commitSync());
        Assertions.assertEquals((Object)"Get fenced exception for group.instance.id groupInstanceId1", (Object)e.getMessage());
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
        TopicPartition tp = new TopicPartition("foo", 0);
        CompletableFuture<Void> asyncCommitFuture = this.setUpConsumerWithIncompleteAsyncCommit(tp);
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100L)));
        asyncCommitFuture.completeExceptionally(new KafkaException("Test exception"));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100L)));
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() {
        TopicPartition tp = new TopicPartition("foo", 0);
        CompletableFuture<Void> asyncCommitFuture = this.setUpConsumerWithIncompleteAsyncCommit(tp);
        this.completeCommitSyncApplicationEventSuccessfully();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
        asyncCommitFuture.complete(null);
        Assertions.assertDoesNotThrow(() -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
        TopicPartition tp = new TopicPartition("foo", 0);
        CompletableFuture<Void> asyncCommitFuture = this.setUpConsumerWithIncompleteAsyncCommit(tp);
        this.completeCommitSyncApplicationEventSuccessfully();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
        asyncCommitFuture.completeExceptionally(new KafkaException("Test exception"));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
    }

    private CompletableFuture<Void> setUpConsumerWithIncompleteAsyncCommit(TopicPartition tp) {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.seek(tp, 20L);
        this.consumer.commitAsync();
        CompletableApplicationEvent event = this.getLastEnqueuedEvent();
        return event.future();
    }

    private <T> CompletableApplicationEvent<T> getLastEnqueuedEvent() {
        ArgumentCaptor eventArgumentCaptor = ArgumentCaptor.forClass(CompletableApplicationEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).add((ApplicationEvent)eventArgumentCaptor.capture());
        List allValues = eventArgumentCaptor.getAllValues();
        return (CompletableApplicationEvent)allValues.get(allValues.size() - 1);
    }

    @Test
    public void testPollTriggersFencedExceptionFromCommitAsync() {
        String groupId = "consumerGroupA";
        String groupInstanceId = "groupInstanceId1";
        Properties props = this.requiredConsumerConfigAndGroupId("consumerGroupA");
        props.put("group.instance.id", "groupInstanceId1");
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        this.completeCommitAsyncApplicationEventExceptionally((Exception)Errors.FENCED_INSTANCE_ID.exception());
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap((Map.Entry[])new Map.Entry[0]));
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        TopicPartition tp = new TopicPartition("foo", 0);
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.seek(tp, 20L);
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync());
        Exception e = (Exception)Assertions.assertThrows(FencedInstanceIdException.class, () -> this.consumer.poll(Duration.ZERO));
        Assertions.assertEquals((Object)"Get fenced exception for group.instance.id groupInstanceId1", (Object)e.getMessage());
    }

    @Test
    public void testEnsurePollExecutedCommitAsyncCallbacks() {
        this.consumer = this.newConsumer();
        MockCommitCallback callback = new MockCommitCallback();
        this.completeCommitAsyncApplicationEventSuccessfully();
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(Utils.mkMap((Map.Entry[])new Map.Entry[0]));
        this.consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), (OffsetCommitCallback)callback));
        this.assertMockCommitCallbackInvoked(() -> this.consumer.poll(Duration.ZERO), callback, null);
    }

    @Test
    public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        MockCommitCallback callback = new MockCommitCallback();
        this.completeCommitAsyncApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), (OffsetCommitCallback)callback));
        this.assertMockCommitCallbackInvoked(() -> this.consumer.close(), callback, null);
    }

    @Test
    public void testVerifyApplicationEventOnShutdown() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        ((ApplicationEventHandler)Mockito.doReturn(null).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        this.consumer.close();
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(UnsubscribeEvent.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(CommitOnCloseEvent.class));
    }

    @Test
    public void testUnsubscribeOnClose() {
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        this.consumer = (AsyncKafkaConsumer)Mockito.spy(this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id"));
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close(Duration.ZERO);
        this.verifyUnsubscribeEvent(subscriptions);
    }

    @Test
    public void testFailedPartitionRevocationOnClose() {
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        this.consumer = (AsyncKafkaConsumer)Mockito.spy(this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)new ConsumerInterceptors(Collections.emptyList()), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id"));
        ((AsyncKafkaConsumer)Mockito.doThrow((Throwable[])new Throwable[]{new KafkaException()}).when(this.consumer)).processBackgroundEvents((Future)ArgumentMatchers.any(), (Timer)ArgumentMatchers.any());
        Assertions.assertThrows(KafkaException.class, () -> this.consumer.close(Duration.ZERO));
        this.verifyUnsubscribeEvent(subscriptions);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).close((Duration)ArgumentMatchers.any(Duration.class));
    }

    @Test
    public void testAutoCommitSyncEnabled() {
        this.completeCommitSyncApplicationEventSuccessfully();
        SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id");
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class));
        subscriptions.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        subscriptions.seek(new TopicPartition("topic", 0), 100L);
        this.consumer.commitSyncAllConsumed(this.time.timer(100L));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(SyncCommitEvent.class));
    }

    @Test
    public void testAutoCommitSyncDisabled() {
        SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id");
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class));
        subscriptions.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        subscriptions.seek(new TopicPartition("topic", 0), 100L);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).add((ApplicationEvent)ArgumentMatchers.any(SyncCommitEvent.class));
    }

    private void assertMockCommitCallbackInvoked(Executable task, MockCommitCallback callback, Errors errors) {
        Assertions.assertDoesNotThrow((Executable)task);
        Assertions.assertEquals((int)1, (int)callback.invoked);
        if (errors == null) {
            Assertions.assertNull((Object)callback.exception);
        } else if (errors.exception() instanceof RetriableException) {
            Assertions.assertInstanceOf(RetriableCommitFailedException.class, (Object)callback.exception);
        }
    }

    @Test
    public void testAssign() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("foo", 3);
        this.consumer.assign(Collections.singleton(tp));
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().contains(tp));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(AssignmentChangeEvent.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(NewTopicsMetadataUpdateRequestEvent.class));
    }

    @Test
    public void testAssignOnNullTopicPartition() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.assign(null));
    }

    @Test
    public void testAssignOnEmptyTopicPartition() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.assign(Collections.emptyList());
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
    }

    @Test
    public void testAssignOnNullTopicInPartition() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.assign(Collections.singleton(new TopicPartition(null, 0))));
    }

    @Test
    public void testAssignOnEmptyTopicInPartition() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.assign(Collections.singleton(new TopicPartition("  ", 0))));
    }

    @Test
    public void testBeginningOffsetsFailsIfNullPartitions() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> this.consumer.beginningOffsets(null, Duration.ofMillis(1L)));
    }

    @Test
    public void testBeginningOffsets() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = this.mockOffsetAndTimestamp();
        Mockito.when((Object)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class))).thenAnswer(invocation -> {
            ListOffsetsEvent event = (ListOffsetsEvent)invocation.getArgument(0);
            Timer timer = this.time.timer(event.deadlineMs() - this.time.milliseconds());
            if (timer.remainingMs() == 0L) {
                Assertions.fail((String)"Timer duration should not be zero.");
            }
            return expectedOffsets;
        });
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.beginningOffsets(expectedOffsets.keySet(), Duration.ofMillis(1L)));
        expectedOffsets.forEach((key, value) -> {
            Assertions.assertTrue((boolean)result.containsKey(key));
            Assertions.assertEquals((long)value.offset(), (Long)((Long)result.get(key)));
        });
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class));
    }

    @Test
    public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailure() {
        this.consumer = this.newConsumer();
        Set<TopicPartition> partitions = this.mockTopicPartitionOffset().keySet();
        KafkaException eventProcessingFailure = new KafkaException("Unexpected failure processing List Offsets event");
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{eventProcessingFailure}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class));
        Throwable consumerError = Assertions.assertThrows(KafkaException.class, () -> this.consumer.beginningOffsets((Collection)partitions, Duration.ofMillis(1L)));
        Assertions.assertEquals((Object)((Object)eventProcessingFailure), (Object)consumerError);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() {
        this.consumer = this.newConsumer();
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException()}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.beginningOffsets(Collections.singletonList(new TopicPartition("t1", 0)), Duration.ofMillis(1L)));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesOnNullPartitions() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> this.consumer.offsetsForTimes(null, Duration.ofMillis(1L)));
    }

    @Test
    public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -2L), Duration.ofMillis(1L)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -1L), Duration.ofMillis(1L)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -3L), Duration.ofMillis(1L)));
    }

    @Test
    public void testOffsetsForTimes() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndTimestampInternal> expectedResult = this.mockOffsetAndTimestamp();
        Map<TopicPartition, Long> timestampToSearch = this.mockTimestampToSearch();
        ((ApplicationEventHandler)Mockito.doReturn(expectedResult).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1L)));
        expectedResult.forEach((key, value) -> {
            OffsetAndTimestamp expected = value.buildOffsetAndTimestamp();
            Assertions.assertEquals((Object)expected, result.get(key));
        });
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesTimeoutException() {
        this.consumer = this.newConsumer();
        long timeout = 100L;
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException("Event did not complete in time and was expired by the reaper")}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Throwable t = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.offsetsForTimes(this.mockTimestampToSearch(), Duration.ofMillis(timeout)));
        Assertions.assertEquals((Object)("Failed to get offsets by times in " + timeout + "ms"), (Object)t.getMessage());
    }

    @Test
    public void testBeginningOffsetsTimeoutException() {
        this.consumer = this.newConsumer();
        long timeout = 100L;
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException("Event did not complete in time and was expired by the reaper")}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Throwable t = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.beginningOffsets(Collections.singleton(new TopicPartition("topic", 5)), Duration.ofMillis(timeout)));
        Assertions.assertEquals((Object)("Failed to get offsets by times in " + timeout + "ms"), (Object)t.getMessage());
    }

    @Test
    public void testEndOffsetsTimeoutException() {
        this.consumer = this.newConsumer();
        long timeout = 100L;
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException("Event did not complete in time and was expired by the reaper")}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Throwable t = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.endOffsets(Collections.singleton(new TopicPartition("topic", 5)), Duration.ofMillis(timeout)));
        Assertions.assertEquals((Object)("Failed to get offsets by times in " + timeout + "ms"), (Object)t.getMessage());
    }

    @Test
    public void testBeginningOffsetsWithZeroTimeout() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("topic1", 0);
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO));
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertNull(result.get(tp));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesWithZeroTimeout() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("topic1", 0);
        Map<TopicPartition, Object> expectedResult = Collections.singletonMap(tp, null);
        Map<TopicPartition, Long> timestampToSearch = Collections.singletonMap(tp, 5L);
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.offsetsForTimes(timestampToSearch, Duration.ZERO));
        Assertions.assertEquals(expectedResult, (Object)result);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testWakeupCommitted() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndMetadata> offsets = this.mockTopicPartitionOffset();
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            CompletableApplicationEvent event = (CompletableApplicationEvent)invocation.getArgument(0);
            Assertions.assertInstanceOf(FetchCommittedOffsetsEvent.class, (Object)event);
            Assertions.assertTrue((boolean)event.future().isCompletedExceptionally());
            return ConsumerUtils.getResult((Future)event.future());
        }).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.committed(offsets.keySet()));
        Assertions.assertNull((Object)this.consumer.wakeupTrigger().getPendingTask());
    }

    @Test
    public void testNoWakeupInCloseCommit() {
        TopicPartition tp = new TopicPartition("topic1", 0);
        this.consumer = this.newConsumer();
        this.consumer.assign(Collections.singleton(tp));
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.seek(tp, 10L);
        this.consumer.wakeup();
        AtomicReference capturedEvent = new AtomicReference();
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            ApplicationEvent event = (ApplicationEvent)invocation.getArgument(0);
            if (event instanceof SyncCommitEvent) {
                capturedEvent.set((SyncCommitEvent)event);
            }
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any());
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close(Duration.ZERO);
        Assertions.assertNotNull(capturedEvent.get());
        Assertions.assertFalse((boolean)((SyncCommitEvent)capturedEvent.get()).future().isCompletedExceptionally());
    }

    @Test
    public void testCloseAwaitPendingAsyncCommitIncomplete() {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        TopicPartition tp = new TopicPartition("foo", 0);
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.seek(tp, 20L);
        this.consumer.commitAsync();
        Exception e = (Exception)Assertions.assertThrows(KafkaException.class, () -> this.consumer.close(Duration.ofMillis(10L)));
        Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, (Object)e.getCause());
    }

    @Test
    public void testCloseAwaitPendingAsyncCommitComplete() {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        MockCommitCallback cb = new MockCommitCallback();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        TopicPartition tp = new TopicPartition("foo", 0);
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.seek(tp, 20L);
        this.completeCommitAsyncApplicationEventSuccessfully();
        this.consumer.commitAsync((OffsetCommitCallback)cb);
        this.completeUnsubscribeApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.close(Duration.ofMillis(10L)));
        Assertions.assertEquals((int)1, (int)cb.invoked);
    }

    @Test
    public void testInterceptorAutoCommitOnClose() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "true");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitSyncApplicationEventSuccessfully();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close(Duration.ZERO);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorCommitSync() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitSyncApplicationEventSuccessfully();
        this.consumer.commitSync(this.mockTopicPartitionOffset());
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testNoInterceptorCommitSyncFailed() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        KafkaException expected = new KafkaException("Test exception");
        this.completeCommitSyncApplicationEventExceptionally((Exception)((Object)expected));
        KafkaException actual = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.consumer.commitSync(this.mockTopicPartitionOffset()));
        Assertions.assertEquals((Object)((Object)expected), (Object)((Object)actual));
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testInterceptorCommitAsync() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitAsyncApplicationEventSuccessfully();
        this.consumer.commitAsync(this.mockTopicPartitionOffset(), (OffsetCommitCallback)new MockCommitCallback());
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testNoInterceptorCommitAsyncFailed() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitAsyncApplicationEventExceptionally((Exception)((Object)new KafkaException("Test exception")));
        this.consumer.commitAsync(this.mockTopicPartitionOffset(), (OffsetCommitCallback)new MockCommitCallback());
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testRefreshCommittedOffsetsSuccess() {
        this.consumer = this.newConsumer();
        this.completeCommitSyncApplicationEventSuccessfully();
        TopicPartition partition = new TopicPartition("t1", 1);
        Set<TopicPartition> partitions = Collections.singleton(partition);
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = Collections.singletonMap(partition, new OffsetAndMetadata(10L));
        this.testRefreshCommittedOffsetsSuccess(partitions, committedOffsets);
    }

    @Test
    public void testRefreshCommittedOffsetsSuccessButNoCommittedOffsetsFound() {
        this.consumer = this.newConsumer();
        TopicPartition partition = new TopicPartition("t1", 1);
        Set<TopicPartition> partitions = Collections.singleton(partition);
        Map<TopicPartition, OffsetAndMetadata> committedOffsets = Collections.emptyMap();
        this.testRefreshCommittedOffsetsSuccess(partitions, committedOffsets);
    }

    @Test
    public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() {
        this.consumer = this.newConsumer();
        this.testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(true);
    }

    @Test
    public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
        this.consumer = this.newConsumerWithoutGroupId();
        this.testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(false);
    }

    @Test
    public void testSubscribeGeneratesEvent() {
        this.consumer = this.newConsumer();
        String topic = "topic1";
        this.consumer.subscribe(Collections.singletonList(topic));
        Assertions.assertEquals(Collections.singleton(topic), (Object)this.consumer.subscription());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(SubscriptionChangeEvent.class));
    }

    @Test
    public void testUnsubscribeGeneratesUnsubscribeEvent() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    @Test
    public void testSubscribeToEmptyListActsAsUnsubscribe() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.subscribe(Collections.emptyList());
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    @Test
    public void testSubscribeToNullTopicCollection() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe((Collection)null));
    }

    @Test
    public void testSubscriptionOnNullTopic() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe(Collections.singletonList(null)));
    }

    @Test
    public void testSubscriptionOnEmptyTopic() {
        this.consumer = this.newConsumer();
        String emptyTopic = "  ";
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe(Collections.singletonList(emptyTopic)));
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
        Properties props = TestUtils.requiredConsumerConfig();
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertFalse((boolean)config.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse((boolean)config.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
        Throwable exception = Assertions.assertThrows(InvalidGroupIdException.class, () -> this.consumer.groupMetadata());
        Assertions.assertEquals((Object)"To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.", (Object)exception.getMessage());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
        String groupId = "consumerGroupA";
        ConsumerConfig config = new ConsumerConfig(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        this.consumer = this.newConsumer(config);
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals((Object)"consumerGroupA", (Object)groupMetadata.groupId());
        Assertions.assertEquals(Optional.empty(), (Object)groupMetadata.groupInstanceId());
        Assertions.assertEquals((int)-1, (int)groupMetadata.generationId());
        Assertions.assertEquals((Object)"", (Object)groupMetadata.memberId());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
        String groupId = "consumerGroupA";
        String groupInstanceId = "groupInstanceId1";
        Properties props = this.requiredConsumerConfigAndGroupId("consumerGroupA");
        props.put("group.instance.id", "groupInstanceId1");
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals((Object)"consumerGroupA", (Object)groupMetadata.groupId());
        Assertions.assertEquals(Optional.of("groupInstanceId1"), (Object)groupMetadata.groupInstanceId());
        Assertions.assertEquals((int)-1, (int)groupMetadata.generationId());
        Assertions.assertEquals((Object)"", (Object)groupMetadata.memberId());
    }

    private MemberStateListener captureGroupMetadataUpdateListener(MockedStatic<RequestManagers> requestManagers) {
        ArgumentCaptor applicationThreadMemberStateListener = ArgumentCaptor.forClass(MemberStateListener.class);
        requestManagers.verify(() -> RequestManagers.supplier((Time)((Time)ArgumentMatchers.any()), (LogContext)((LogContext)ArgumentMatchers.any()), (BackgroundEventHandler)((BackgroundEventHandler)ArgumentMatchers.any()), (ConsumerMetadata)((ConsumerMetadata)ArgumentMatchers.any()), (SubscriptionState)((SubscriptionState)ArgumentMatchers.any()), (FetchBuffer)((FetchBuffer)ArgumentMatchers.any()), (ConsumerConfig)((ConsumerConfig)ArgumentMatchers.any()), (GroupRebalanceConfig)((GroupRebalanceConfig)ArgumentMatchers.any()), (ApiVersions)((ApiVersions)ArgumentMatchers.any()), (FetchMetricsManager)((FetchMetricsManager)ArgumentMatchers.any()), (Supplier)((Supplier)ArgumentMatchers.any()), (Optional)((Optional)ArgumentMatchers.any()), (Metrics)((Metrics)ArgumentMatchers.any()), (OffsetCommitCallbackInvoker)((OffsetCommitCallbackInvoker)ArgumentMatchers.any()), (MemberStateListener)((MemberStateListener)applicationThreadMemberStateListener.capture())));
        return (MemberStateListener)applicationThreadMemberStateListener.getValue();
    }

    @Test
    public void testGroupMetadataUpdate() {
        String groupId = "consumerGroupA";
        ConsumerConfig config = new ConsumerConfig(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        try (MockedStatic requestManagers = Mockito.mockStatic(RequestManagers.class);){
            this.consumer = this.newConsumer(config);
            ConsumerGroupMetadata oldGroupMetadata = this.consumer.groupMetadata();
            MemberStateListener groupMetadataUpdateListener = this.captureGroupMetadataUpdateListener((MockedStatic<RequestManagers>)requestManagers);
            int expectedMemberEpoch = 42;
            String expectedMemberId = "memberId";
            groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(42), Optional.of("memberId"));
            ConsumerGroupMetadata newGroupMetadata = this.consumer.groupMetadata();
            Assertions.assertEquals((Object)oldGroupMetadata.groupId(), (Object)newGroupMetadata.groupId());
            Assertions.assertEquals((Object)"memberId", (Object)newGroupMetadata.memberId());
            Assertions.assertEquals((int)42, (int)newGroupMetadata.generationId());
            Assertions.assertEquals((Object)oldGroupMetadata.groupInstanceId(), (Object)newGroupMetadata.groupInstanceId());
        }
    }

    @Test
    public void testGroupMetadataIsResetAfterUnsubscribe() {
        String groupId = "consumerGroupA";
        ConsumerConfig config = new ConsumerConfig(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        try (MockedStatic requestManagers = Mockito.mockStatic(RequestManagers.class);){
            this.consumer = this.newConsumer(config);
            MemberStateListener groupMetadataUpdateListener = this.captureGroupMetadataUpdateListener((MockedStatic<RequestManagers>)requestManagers);
            this.consumer.subscribe(Collections.singletonList("topic"));
            int memberEpoch = 42;
            String memberId = "memberId";
            groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(42), Optional.of("memberId"));
            ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
            Assertions.assertNotEquals((int)-1, (int)groupMetadata.generationId());
            Assertions.assertNotEquals((Object)"", (Object)groupMetadata.memberId());
        }
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ConsumerGroupMetadata groupMetadataAfterUnsubscription = new ConsumerGroupMetadata("consumerGroupA", -1, "", Optional.empty());
        Assertions.assertEquals((Object)groupMetadataAfterUnsubscription, (Object)this.consumer.groupMetadata());
    }

    @ParameterizedTest
    @MethodSource(value={"listenerCallbacksInvokeSource"})
    public void testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName> methodNames, Optional<RuntimeException> revokedError, Optional<RuntimeException> assignedError, Optional<RuntimeException> lostError, int expectedRevokedCount, int expectedAssignedCount, int expectedLostCount, Optional<RuntimeException> expectedException) {
        this.consumer = this.newConsumer();
        CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener(revokedError, assignedError, lostError);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.subscribe(Collections.singletonList("topic"), (ConsumerRebalanceListener)consumerRebalanceListener);
        SortedSet partitions = Collections.emptySortedSet();
        for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
            ConsumerRebalanceListenerCallbackNeededEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions);
            this.backgroundEventQueue.add((BackgroundEvent)e);
        }
        if (expectedException.isPresent()) {
            Exception exception = (Exception)Assertions.assertThrows(expectedException.get().getClass(), () -> this.consumer.poll(Duration.ZERO));
            Assertions.assertEquals((Object)expectedException.get().getMessage(), (Object)exception.getMessage());
            Assertions.assertEquals((Object)expectedException.get().getCause(), (Object)exception.getCause());
        } else {
            Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
        }
        Assertions.assertEquals((int)expectedRevokedCount, (int)consumerRebalanceListener.revokedCount());
        Assertions.assertEquals((int)expectedAssignedCount, (int)consumerRebalanceListener.assignedCount());
        Assertions.assertEquals((int)expectedLostCount, (int)consumerRebalanceListener.lostCount());
    }

    private static Stream<Arguments> listenerCallbacksInvokeSource() {
        Optional empty = Optional.empty();
        Optional<RuntimeException> error = Optional.of(new RuntimeException("Intentional error"));
        Optional<KafkaException> kafkaException = Optional.of(new KafkaException("Intentional error"));
        Optional<KafkaException> wrappedException = Optional.of(new KafkaException("User rebalance callback throws an error", (Throwable)error.get()));
        return Stream.of(Arguments.of((Object[])new Object[]{Collections.emptyList(), empty, empty, empty, 0, 0, 0, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0, wrappedException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0, wrappedException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1, wrappedException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), kafkaException, empty, empty, 1, 0, 0, kafkaException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, kafkaException, empty, 0, 1, 0, kafkaException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, kafkaException, 0, 0, 1, kafkaException}), Arguments.of((Object[])new Object[]{Arrays.asList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), error, empty, empty, 1, 1, 0, wrappedException}), Arguments.of((Object[])new Object[]{Arrays.asList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), kafkaException, error, empty, 1, 1, 0, kafkaException}));
    }

    @Test
    public void testBackgroundError() {
        String groupId = "consumerGroupA";
        ConsumerConfig config = new ConsumerConfig(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        this.consumer = this.newConsumer(config);
        KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition");
        ErrorEvent errorEvent = new ErrorEvent((Throwable)expectedException);
        this.backgroundEventQueue.add((BackgroundEvent)errorEvent);
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        KafkaException exception = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.consumer.poll(Duration.ZERO));
        Assertions.assertEquals((Object)expectedException.getMessage(), (Object)exception.getMessage());
    }

    @Test
    public void testMultipleBackgroundErrors() {
        String groupId = "consumerGroupA";
        ConsumerConfig config = new ConsumerConfig(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        this.consumer = this.newConsumer(config);
        KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition");
        ErrorEvent errorEvent1 = new ErrorEvent((Throwable)expectedException1);
        this.backgroundEventQueue.add((BackgroundEvent)errorEvent1);
        KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam");
        ErrorEvent errorEvent2 = new ErrorEvent((Throwable)expectedException2);
        this.backgroundEventQueue.add((BackgroundEvent)errorEvent2);
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        KafkaException exception = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.consumer.poll(Duration.ZERO));
        Assertions.assertEquals((Object)expectedException1.getMessage(), (Object)exception.getMessage());
        Assertions.assertTrue((boolean)this.backgroundEventQueue.isEmpty());
    }

    @Test
    public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.remote.assignor", "someAssignor");
        props.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertTrue((boolean)config.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupRemoteAssignorInClassicProtocol() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "consumerGroupA");
        props.put("group.protocol", GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
        props.put("group.remote.assignor", "someAssignor");
        Assertions.assertThrows(ConfigException.class, () -> new ConsumerConfig(props));
    }

    @Test
    public void testGroupRemoteAssignorUsedInConsumerProtocol() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "consumerGroupA");
        props.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        props.put("group.remote.assignor", "someAssignor");
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertFalse((boolean)config.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testPartitionAssignmentStrategyUnusedInAsyncConsumer() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "consumerGroup1");
        props.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        props.put("partition.assignment.strategy", "CooperativeStickyAssignor");
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertTrue((boolean)config.unused().contains("partition.assignment.strategy"));
    }

    @Test
    public void testGroupIdNull() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("auto.commit.interval.ms", (Object)10000);
        props.put("internal.throw.on.fetch.stable.offset.unsupported", (Object)true);
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertFalse((boolean)config.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse((boolean)config.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testGroupIdNotNullAndValid() {
        Properties props = this.requiredConsumerConfigAndGroupId("consumerGroupA");
        props.put("auto.commit.interval.ms", (Object)10000);
        props.put("internal.throw.on.fetch.stable.offset.unsupported", (Object)true);
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertTrue((boolean)config.unused().contains("auto.commit.interval.ms"));
        Assertions.assertTrue((boolean)config.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testGroupIdEmpty() {
        this.testInvalidGroupId("");
    }

    @Test
    public void testGroupIdOnlyWhitespaces() {
        this.testInvalidGroupId("       ");
    }

    @Test
    public void testEnsurePollEventSentOnConsumerPoll() {
        SubscriptionState subscriptions = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)new ConsumerInterceptors(Collections.emptyList()), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id");
        TopicPartition tp = new TopicPartition("topic", 0);
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 0, 2L, (Object)"key1", (Object)"value1"));
        ((FetchCollector)Mockito.doAnswer(invocation -> Fetch.forPartition((TopicPartition)tp, (List)records, (boolean)true)).when(this.fetchCollector)).collectFetch((FetchBuffer)Mockito.any(FetchBuffer.class));
        this.consumer.subscribe(Collections.singletonList("topic1"));
        this.consumer.poll(Duration.ofMillis(100L));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(PollEvent.class));
    }

    private void testInvalidGroupId(String groupId) {
        Properties props = this.requiredConsumerConfigAndGroupId(groupId);
        ConsumerConfig config = new ConsumerConfig(props);
        Exception exception = (Exception)Assertions.assertThrows(KafkaException.class, () -> {
            this.consumer = this.newConsumer(config);
        });
        Assertions.assertEquals((Object)"Failed to construct kafka consumer", (Object)exception.getMessage());
    }

    private Properties requiredConsumerConfigAndGroupId(String groupId) {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", groupId);
        return props;
    }

    private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout(boolean committedOffsetsEnabled) {
        this.completeFetchedCommittedOffsetApplicationEventExceptionally((Exception)((Object)new org.apache.kafka.common.errors.TimeoutException()));
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.assign(Collections.singleton(new TopicPartition("t1", 1)));
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ValidatePositionsEvent.class));
        if (committedOffsetsEnabled) {
            ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).add((ApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
            ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ResetPositionsEvent.class));
        } else {
            ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).add((ApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
            ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ResetPositionsEvent.class));
        }
    }

    private void testRefreshCommittedOffsetsSuccess(Set<TopicPartition> partitions, Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(committedOffsets);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.assign(partitions);
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ValidatePositionsEvent.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).add((ApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ResetPositionsEvent.class));
    }

    @Test
    public void testLongPollWaitIsLimited() {
        this.consumer = this.newConsumer();
        String topicName = "topic1";
        this.consumer.subscribe(Collections.singletonList(topicName));
        Assertions.assertEquals(Collections.singleton(topicName), (Object)this.consumer.subscription());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        int partition = 3;
        TopicPartition tp = new TopicPartition(topicName, 3);
        List<ConsumerRecord> records = Arrays.asList(new ConsumerRecord(topicName, 3, 2L, (Object)"key1", (Object)"value1"), new ConsumerRecord(topicName, 3, 3L, (Object)"key2", (Object)"value2"));
        ((FetchCollector)Mockito.doAnswer(invocation -> {
            this.consumer.subscriptions().assignFromSubscribed(Collections.singleton(tp));
            return Fetch.empty();
        }).doAnswer(invocation -> Fetch.forPartition((TopicPartition)tp, (List)records, (boolean)true)).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        ConsumerRecords returnedRecords = this.consumer.poll(Duration.ofMillis(10000L));
        Assertions.assertEquals((int)2, (int)returnedRecords.count());
        Assertions.assertEquals(Collections.singleton(topicName), (Object)this.consumer.subscription());
        Assertions.assertEquals(Collections.singleton(tp), (Object)this.consumer.assignment());
    }

    @Test
    public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
        this.consumer = this.newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture future = (CompletableFuture)Mockito.mock(CompletableFuture.class);
        CountDownLatch latch = new CountDownLatch(3);
        ((CompletableFuture)Mockito.doAnswer(invocation -> {
            latch.countDown();
            if (latch.getCount() > 0L) {
                long timeout = (Long)invocation.getArgument(0, Long.class);
                timer.sleep(timeout);
                throw new TimeoutException("Intentional timeout");
            }
            future.complete(null);
            return null;
        }).when((Object)future)).get((Long)ArgumentMatchers.any(Long.class), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        this.consumer.processBackgroundEvents((Future)future, timer);
        Assertions.assertEquals((long)800L, (long)timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsWithoutDelay() {
        this.consumer = this.newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture<Object> future = CompletableFuture.completedFuture(null);
        this.consumer.processBackgroundEvents(future, timer);
        Assertions.assertEquals((long)1000L, (long)timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsTimesOut() throws Exception {
        this.consumer = this.newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture future = (CompletableFuture)Mockito.mock(CompletableFuture.class);
        ((CompletableFuture)Mockito.doAnswer(invocation -> {
            long timeout = (Long)invocation.getArgument(0, Long.class);
            timer.sleep(timeout);
            throw new TimeoutException("Intentional timeout");
        }).when((Object)future)).get((Long)ArgumentMatchers.any(Long.class), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.processBackgroundEvents((Future)future, timer));
        Assertions.assertEquals((long)0L, (long)timer.remainingMs());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPollThrowsInterruptExceptionIfInterrupted() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Map offsets = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)tp, (Object)new OffsetAndMetadata(1L))});
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(offsets);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.consumer.assign(Collections.singleton(tp));
        try {
            Thread.currentThread().interrupt();
            Assertions.assertThrows(InterruptException.class, () -> this.consumer.poll(Duration.ZERO));
        }
        finally {
            Thread.interrupted();
        }
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    void testReaperInvokedInClose() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        ((CompletableEventReaper)Mockito.verify((Object)this.backgroundEventReaper)).reap(this.backgroundEventQueue);
    }

    @Test
    void testReaperInvokedInUnsubscribe() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ((CompletableEventReaper)Mockito.verify((Object)this.backgroundEventReaper)).reap(this.time.milliseconds());
    }

    @Test
    void testReaperInvokedInPoll() {
        this.consumer = this.newConsumer();
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.consumer.subscribe(Collections.singletonList("topic"));
        this.consumer.poll(Duration.ZERO);
        ((CompletableEventReaper)Mockito.verify((Object)this.backgroundEventReaper)).reap(this.time.milliseconds());
    }

    @Test
    public void testUnsubscribeWithoutGroupId() {
        this.consumer = this.newConsumerWithoutGroupId();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    private void verifyUnsubscribeEvent(SubscriptionState subscriptions) {
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(UnsubscribeEvent.class));
        ((AsyncKafkaConsumer)Mockito.verify(this.consumer)).processBackgroundEvents((Future)ArgumentMatchers.any(), (Timer)ArgumentMatchers.any());
        ((SubscriptionState)Mockito.verify((Object)subscriptions, (VerificationMode)Mockito.never())).assignFromSubscribed((Collection)ArgumentMatchers.any());
    }

    private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
        return topicPartitionOffsets;
    }

    private Map<TopicPartition, OffsetAndTimestampInternal> mockOffsetAndTimestamp() {
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestamp = new HashMap<TopicPartition, OffsetAndTimestampInternal>();
        offsetAndTimestamp.put(t0, new OffsetAndTimestampInternal(5L, 1L, Optional.empty()));
        offsetAndTimestamp.put(t1, new OffsetAndTimestampInternal(6L, 3L, Optional.empty()));
        return offsetAndTimestamp;
    }

    private Map<TopicPartition, Long> mockTimestampToSearch() {
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(t0, 1L);
        timestampToSearch.put(t1, 2L);
        return timestampToSearch;
    }

    private void completeCommitAsyncApplicationEventExceptionally(Exception ex) {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            AsyncCommitEvent event = (AsyncCommitEvent)invocation.getArgument(0);
            event.future().completeExceptionally(ex);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(AsyncCommitEvent.class));
    }

    private void completeCommitSyncApplicationEventExceptionally(Exception ex) {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            SyncCommitEvent event = (SyncCommitEvent)invocation.getArgument(0);
            event.future().completeExceptionally(ex);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(SyncCommitEvent.class));
    }

    private void completeCommitAsyncApplicationEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            AsyncCommitEvent event = (AsyncCommitEvent)invocation.getArgument(0);
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(AsyncCommitEvent.class));
    }

    private void completeCommitSyncApplicationEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            SyncCommitEvent event = (SyncCommitEvent)invocation.getArgument(0);
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(SyncCommitEvent.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventSuccessfully(Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
        ((ApplicationEventHandler)Mockito.doReturn(committedOffsets).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            FetchCommittedOffsetsEvent event = (FetchCommittedOffsetsEvent)invocation.getArgument(0);
            event.future().complete(committedOffsets);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) {
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{ex}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
    }

    private void completeUnsubscribeApplicationEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            UnsubscribeEvent event = (UnsubscribeEvent)invocation.getArgument(0);
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    private void forceCommitCallbackInvocation() {
        this.consumer.commitAsync();
    }

    private static class MockCommitCallback
    implements OffsetCommitCallback {
        public int invoked = 0;
        public Exception exception = null;
        public String completionThread;

        private MockCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            ++this.invoked;
            this.completionThread = Thread.currentThread().getName();
            this.exception = exception;
        }
    }
}

