/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMembershipManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestState;
import org.apache.kafka.clients.consumer.internals.MemberState;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestState;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ConsumerHeartbeatRequestManagerTest {
    private static final String DEFAULT_GROUP_ID = "groupId";
    private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
    private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
    private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
    private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 80L;
    private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000L;
    private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0;
    private static final String DEFAULT_MEMBER_ID = "member-id";
    private static final int DEFAULT_MEMBER_EPOCH = 1;
    private Time time;
    private Timer pollTimer;
    private CoordinatorRequestManager coordinatorRequestManager;
    private SubscriptionState subscriptions;
    private Metadata metadata;
    private ConsumerHeartbeatRequestManager heartbeatRequestManager;
    private ConsumerMembershipManager membershipManager;
    private HeartbeatRequestState heartbeatRequestState;
    private ConsumerHeartbeatRequestManager.HeartbeatState heartbeatState;
    private BackgroundEventHandler backgroundEventHandler;
    private LogContext logContext;

    @BeforeEach
    public void setUp() {
        this.time = new MockTime();
        this.logContext = new LogContext();
        this.pollTimer = (Timer)Mockito.spy((Object)this.time.timer(10000L));
        this.coordinatorRequestManager = (CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class);
        this.heartbeatState = (ConsumerHeartbeatRequestManager.HeartbeatState)Mockito.mock(ConsumerHeartbeatRequestManager.HeartbeatState.class);
        this.backgroundEventHandler = (BackgroundEventHandler)Mockito.mock(BackgroundEventHandler.class);
        this.subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        this.membershipManager = (ConsumerMembershipManager)Mockito.mock(ConsumerMembershipManager.class);
        this.metadata = (Metadata)Mockito.mock(ConsumerMetadata.class);
        Metrics metrics = new Metrics(this.time);
        ConsumerConfig config = (ConsumerConfig)Mockito.mock(ConsumerConfig.class);
        this.heartbeatRequestState = (HeartbeatRequestState)Mockito.spy((Object)new HeartbeatRequestState(this.logContext, this.time, 1000L, 80L, 1000L, 0.0));
        this.heartbeatRequestManager = new ConsumerHeartbeatRequestManager(this.logContext, this.pollTimer, config, this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler, metrics);
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of((Node)Mockito.mock(Node.class)));
    }

    private void createHeartbeatRequestStateWithZeroHeartbeatInterval() {
        this.heartbeatRequestState = (HeartbeatRequestState)Mockito.spy((Object)new HeartbeatRequestState(this.logContext, this.time, 0L, 80L, 1000L, 0.0));
        this.heartbeatRequestManager = this.createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
    }

    private void createHeartbeatStateAndRequestManager() {
        this.heartbeatState = new ConsumerHeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, 10000);
        this.heartbeatRequestManager = this.createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
    }

    @Test
    public void testHeartBeatRequestStateToStringBase() {
        long retryBackoffMs = 100L;
        long retryBackoffMaxMs = 1000L;
        LogContext logContext = new LogContext();
        HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestState(logContext, this.time, 1000L, retryBackoffMs, retryBackoffMaxMs, 0.2);
        RequestState requestState = new RequestState(logContext, HeartbeatRequestState.class.getName(), retryBackoffMs, retryBackoffMaxMs);
        String target = requestState.toStringBase() + ", remainingMs=1000, heartbeatIntervalMs=1000";
        Assertions.assertDoesNotThrow(() -> ((HeartbeatRequestState)heartbeatRequestState).toString());
        Assertions.assertEquals((Object)target, (Object)heartbeatRequestState.toStringBase());
    }

    @Test
    public void testHeartbeatOnStartup() {
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        Assertions.assertEquals((long)0L, (long)this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        NetworkClientDelegate.PollResult result2 = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result2.unsentRequests.size());
    }

    @Test
    public void testSuccessfulHeartbeatTiming() {
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size(), (String)"No heartbeat should be sent while interval has not expired");
        Assertions.assertEquals((long)this.heartbeatRequestState.timeToNextHeartbeatMs(this.time.milliseconds()), (long)result.timeUntilNextPollMs);
        this.assertNextHeartbeatTiming(1000L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size(), (String)"A heartbeat should be sent when interval expires");
        NetworkClientDelegate.UnsentRequest inflightReq = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
        Assertions.assertEquals((long)1000L, (long)this.heartbeatRequestState.timeToNextHeartbeatMs(this.time.milliseconds()), (String)"Heartbeat timer was not reset to the interval when the heartbeat request was sent.");
        long partOfInterval = 333L;
        this.time.sleep(partOfInterval);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size(), (String)"No heartbeat should be sent while only part of the interval has passed");
        Assertions.assertEquals((long)(1000L - partOfInterval), (long)this.heartbeatRequestState.timeToNextHeartbeatMs(this.time.milliseconds()), (String)"Time to next interval was not properly updated.");
        inflightReq.handler().onComplete(this.createHeartbeatResponse(inflightReq, Errors.NONE));
        this.assertNextHeartbeatTiming(1000L - partOfInterval);
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) {
        this.createHeartbeatStateAndRequestManager();
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        this.time.sleep(1000L);
        String topic = "topic1";
        Set<String> set = Collections.singleton(topic);
        Mockito.when((Object)this.subscriptions.subscription()).thenReturn(set);
        this.subscriptions.subscribe(set, Optional.empty());
        this.mockJoiningMemberData(DEFAULT_GROUP_INSTANCE_ID);
        Assertions.assertEquals((long)0L, (long)this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        NetworkClientDelegate.PollResult pollResult = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)pollResult.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest request = (NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0);
        Assertions.assertInstanceOf(ConsumerGroupHeartbeatRequest.Builder.class, (Object)request.requestBuilder());
        ConsumerGroupHeartbeatRequest heartbeatRequest = (ConsumerGroupHeartbeatRequest)request.requestBuilder().build(version);
        String memberId = heartbeatRequest.data().memberId();
        Assertions.assertNotNull((Object)memberId);
        Assertions.assertFalse((boolean)memberId.isEmpty());
        Assertions.assertEquals((int)0, (int)heartbeatRequest.data().memberEpoch());
        Assertions.assertEquals(Collections.singletonList(topic), (Object)heartbeatRequest.data().subscribedTopicNames());
        Assertions.assertEquals((int)10000, (int)heartbeatRequest.data().rebalanceTimeoutMs());
        Assertions.assertEquals((Object)DEFAULT_GROUP_ID, (Object)heartbeatRequest.data().groupId());
        Assertions.assertEquals((Object)DEFAULT_GROUP_INSTANCE_ID, (Object)heartbeatRequest.data().instanceId());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testSkippingHeartbeat(boolean shouldSkipHeartbeat) {
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)shouldSkipHeartbeat);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        if (!shouldSkipHeartbeat) {
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)0L, (long)result.timeUntilNextPollMs);
        } else {
            Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)Long.MAX_VALUE, (long)result.timeUntilNextPollMs);
        }
    }

    @Test
    public void testTimerNotDue() {
        this.time.sleep(100L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
        Assertions.assertEquals((long)900L, (long)result.timeUntilNextPollMs);
        Assertions.assertEquals((long)900L, (long)this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Mockito.when((Object)this.subscriptions.hasAutoAssignedPartitions()).thenReturn((Object)true);
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)true);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((long)Long.MAX_VALUE, (long)result.timeUntilNextPollMs);
    }

    @Test
    public void testHeartbeatNotSentIfAnotherOneInFlight() {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest inflightReq = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
        this.time.sleep(1000L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size(), (String)"No heartbeat should be sent while a previous one is in-flight");
        this.time.sleep(1000L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size(), (String)"No heartbeat should be sent when the interval expires if there is a previous HB request in-flight");
        inflightReq.handler().onComplete(this.createHeartbeatResponse(inflightReq, Errors.NONE));
        this.time.sleep(80L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size(), (String)"A next heartbeat should be sent on the first poll after receiving a response that took longer than the interval, waiting only for the minimal backoff.");
    }

    @Test
    public void testHeartbeatOutsideInterval() {
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)false);
        Mockito.when((Object)this.membershipManager.shouldHeartbeatNow()).thenReturn((Object)true);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        Assertions.assertEquals((long)1000L, (long)result.timeUntilNextPollMs);
        Assertions.assertEquals((long)1000L, (long)this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestGenerated();
    }

    @Test
    public void testNetworkTimeout() {
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), (RuntimeException)((Object)new TimeoutException("timeout")));
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatFailure(true);
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any());
        this.time.sleep(79L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
        this.time.sleep(1L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
    }

    @Test
    public void testDisconnect() {
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), (RuntimeException)DisconnectException.INSTANCE);
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatFailure(true);
        ((CoordinatorRequestManager)Mockito.verify((Object)this.coordinatorRequestManager)).handleCoordinatorDisconnect((Throwable)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any());
        this.time.sleep(79L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size(), (String)"No request should be generated before the backoff expires");
        this.time.sleep(1L);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size(), (String)"A new request should be generated after the backoff expires");
    }

    @Test
    public void testFailureOnFatalException() {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), (RuntimeException)((Object)new KafkaException("fatal")));
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatFailure(false);
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFatal();
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)ArgumentMatchers.any());
    }

    @Test
    public void testHeartbeatResponseErrorNotifiedToGroupManagerAfterErrorPropagated() {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        ClientResponse response = this.createHeartbeatResponse((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED);
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onComplete(response);
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.backgroundEventHandler, this.membershipManager});
        ((BackgroundEventHandler)inOrder.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)ArgumentMatchers.any(ErrorEvent.class));
        ((ConsumerMembershipManager)inOrder.verify((Object)this.membershipManager)).onHeartbeatFailure(false);
    }

    @Test
    public void testHeartbeatRequestFailureNotifiedToGroupManagerAfterErrorPropagated() {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        this.createHeartbeatResponse((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED);
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), (RuntimeException)((Object)new AuthenticationException("Fatal error in HB")));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.backgroundEventHandler, this.membershipManager});
        ((BackgroundEventHandler)inOrder.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)ArgumentMatchers.any(ErrorEvent.class));
        ((ConsumerMembershipManager)inOrder.verify((Object)this.membershipManager)).onHeartbeatFailure(false);
    }

    @Test
    public void testNoCoordinator() {
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((long)Long.MAX_VALUE, (long)result.timeUntilNextPollMs);
        Assertions.assertEquals((long)1000L, (long)this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    public void testValidateConsumerGroupHeartbeatRequest(short version) {
        this.createHeartbeatStateAndRequestManager();
        this.time.sleep(1000L);
        String subscribedTopic = "topic";
        Mockito.when((Object)this.subscriptions.subscription()).thenReturn(Collections.singleton(subscribedTopic));
        ConsumerGroupHeartbeatResponse result = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setMemberId(DEFAULT_MEMBER_ID).setMemberEpoch(1));
        this.membershipManager.onHeartbeatSuccess(result);
        this.mockStableMemberData(DEFAULT_GROUP_INSTANCE_ID);
        NetworkClientDelegate.PollResult pollResult = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)pollResult.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest request = (NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0);
        Assertions.assertInstanceOf(ConsumerGroupHeartbeatRequest.Builder.class, (Object)request.requestBuilder());
        ConsumerGroupHeartbeatRequest heartbeatRequest = (ConsumerGroupHeartbeatRequest)request.requestBuilder().build(version);
        Assertions.assertEquals((Object)DEFAULT_GROUP_ID, (Object)heartbeatRequest.data().groupId());
        Assertions.assertEquals((Object)DEFAULT_MEMBER_ID, (Object)heartbeatRequest.data().memberId());
        Assertions.assertEquals((int)1, (int)heartbeatRequest.data().memberEpoch());
        Assertions.assertEquals((int)10000, (int)heartbeatRequest.data().rebalanceTimeoutMs());
        Assertions.assertEquals((Object)subscribedTopic, heartbeatRequest.data().subscribedTopicNames().get(0));
        Assertions.assertEquals((Object)DEFAULT_GROUP_INSTANCE_ID, (Object)heartbeatRequest.data().instanceId());
        Assertions.assertEquals((Object)DEFAULT_REMOTE_ASSIGNOR, (Object)heartbeatRequest.data().serverAssignor());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    public void testValidateConsumerGroupHeartbeatRequestAssignmentSentWhenLocalEpochChanges(short version) {
        this.createHeartbeatStateAndRequestManager();
        Mockito.when((Object)this.membershipManager.shouldHeartbeatNow()).thenReturn((Object)true);
        Uuid topicId = Uuid.randomUuid();
        ConsumerGroupHeartbeatRequestData.TopicPartitions expectedTopicPartitions = new ConsumerGroupHeartbeatRequestData.TopicPartitions();
        Map<Uuid, SortedSet> testAssignment = Collections.singletonMap(topicId, Utils.mkSortedSet((Comparable[])new Integer[]{0}));
        expectedTopicPartitions.setTopicId(topicId);
        expectedTopicPartitions.setPartitions(Collections.singletonList(0));
        Mockito.when((Object)this.membershipManager.currentAssignment()).thenReturn((Object)new AbstractMembershipManager.LocalAssignment(0L, testAssignment));
        ConsumerGroupHeartbeatRequest heartbeatRequest1 = this.getHeartbeatRequest(this.heartbeatRequestManager, version);
        Assertions.assertEquals(Collections.singletonList(expectedTopicPartitions), (Object)heartbeatRequest1.data().topicPartitions());
        Mockito.when((Object)this.heartbeatRequestState.canSendRequest(ArgumentMatchers.anyLong())).thenReturn((Object)true);
        ConsumerGroupHeartbeatRequest heartbeatRequest2 = this.getHeartbeatRequest(this.heartbeatRequestManager, version);
        Assertions.assertNull((Object)heartbeatRequest2.data().topicPartitions());
        Mockito.when((Object)this.membershipManager.currentAssignment()).thenReturn((Object)new AbstractMembershipManager.LocalAssignment(1L, testAssignment));
        ConsumerGroupHeartbeatRequest heartbeatRequest3 = this.getHeartbeatRequest(this.heartbeatRequestManager, version);
        Assertions.assertEquals(Collections.singletonList(expectedTopicPartitions), (Object)heartbeatRequest3.data().topicPartitions());
    }

    private ConsumerGroupHeartbeatRequest getHeartbeatRequest(ConsumerHeartbeatRequestManager heartbeatRequestManager, short version) {
        NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)pollResult.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest request = (NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0);
        Assertions.assertInstanceOf(ConsumerGroupHeartbeatRequest.Builder.class, (Object)request.requestBuilder());
        return (ConsumerGroupHeartbeatRequest)request.requestBuilder().build(version);
    }

    @ParameterizedTest
    @MethodSource(value={"errorProvider"})
    public void testHeartbeatResponseOnErrorHandling(Errors error, boolean isFatal) {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        Mockito.when((Object)this.subscriptions.hasAutoAssignedPartitions()).thenReturn((Object)true);
        ClientResponse response = this.createHeartbeatResponse((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0), error);
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onComplete(response);
        ConsumerGroupHeartbeatResponse mockResponse = (ConsumerGroupHeartbeatResponse)response.responseBody();
        switch (error) {
            case NONE: {
                ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatSuccess(mockResponse);
                this.assertNextHeartbeatTiming(1000L);
                break;
            }
            case COORDINATOR_LOAD_IN_PROGRESS: {
                ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any());
                this.assertNextHeartbeatTiming(80L);
                break;
            }
            case COORDINATOR_NOT_AVAILABLE: 
            case NOT_COORDINATOR: {
                ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any());
                ((CoordinatorRequestManager)Mockito.verify((Object)this.coordinatorRequestManager)).markCoordinatorUnknown((String)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
                this.assertNextHeartbeatTiming(0L);
                break;
            }
            case UNKNOWN_MEMBER_ID: 
            case FENCED_MEMBER_EPOCH: {
                ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any());
                this.assertNextHeartbeatTiming(0L);
                break;
            }
            case TOPIC_AUTHORIZATION_FAILED: {
                ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)ArgumentMatchers.any(ErrorEvent.class));
                this.assertNextHeartbeatTiming(80L);
                ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager, (VerificationMode)Mockito.never())).transitionToFatal();
                break;
            }
            default: {
                if (isFatal) {
                    Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
                    this.ensureFatalError(error);
                    break;
                }
                ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any());
                this.assertNextHeartbeatTiming(0L);
            }
        }
        if (error != Errors.NONE) {
            ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatFailure(false);
        }
        if (!isFatal) {
            this.time.sleep(1000L);
            result = this.heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        }
    }

    @ParameterizedTest
    @ValueSource(strings={"The cluster does not support the new CONSUMER group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol until the cluster is upgraded."})
    public void testUnsupportedVersionFromBroker(String errorMsg) {
        this.mockResponseWithException(new UnsupportedVersionException(errorMsg), true);
        ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEventArgumentCaptor.capture());
        ErrorEvent errorEvent = (ErrorEvent)errorEventArgumentCaptor.getValue();
        Assertions.assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), (Object)errorEvent.error());
        Assertions.assertEquals((Object)errorMsg, (Object)errorEvent.error().getMessage());
        Mockito.clearInvocations((Object[])new BackgroundEventHandler[]{this.backgroundEventHandler});
    }

    @ParameterizedTest
    @ValueSource(strings={"The cluster does not support the new CONSUMER group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol until the cluster is upgraded.", "The cluster does not support regular expressions resolution on ConsumerGroupHeartbeat API version 0. It must be upgraded to use ConsumerGroupHeartbeat API version >= 1 to allow to subscribe to a SubscriptionPattern."})
    public void testUnsupportedVersionFromClient(String errorMsg) {
        this.mockResponseWithException(new UnsupportedVersionException(errorMsg), false);
        ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEventArgumentCaptor.capture());
        ErrorEvent errorEvent = (ErrorEvent)errorEventArgumentCaptor.getValue();
        Assertions.assertInstanceOf(Errors.UNSUPPORTED_VERSION.exception().getClass(), (Object)errorEvent.error());
        Assertions.assertEquals((Object)errorMsg, (Object)errorEvent.error().getMessage());
        Mockito.clearInvocations((Object[])new BackgroundEventHandler[]{this.backgroundEventHandler});
    }

    private void mockResponseWithException(UnsupportedVersionException exception, boolean isFromBroker) {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        Mockito.when((Object)this.subscriptions.hasAutoAssignedPartitions()).thenReturn((Object)true);
        ClientResponse response = this.createHeartbeatResponseWithException((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0), exception, isFromBroker);
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onComplete(response);
    }

    private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) {
        long currentTimeMs = this.time.milliseconds();
        Assertions.assertEquals((long)expectedTimeToNextHeartbeatMs, (long)this.heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs));
        if (expectedTimeToNextHeartbeatMs != 0L) {
            Assertions.assertFalse((boolean)this.heartbeatRequestState.canSendRequest(currentTimeMs));
            this.time.sleep(expectedTimeToNextHeartbeatMs);
        }
        Assertions.assertTrue((boolean)this.heartbeatRequestState.canSendRequest(this.time.milliseconds()));
    }

    @Test
    public void testHeartbeatState() {
        this.mockJoiningMemberData(null);
        this.heartbeatState = new ConsumerHeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, 10000);
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        ConsumerGroupHeartbeatRequestData data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)DEFAULT_GROUP_ID, (Object)data.groupId());
        Assertions.assertEquals((Object)DEFAULT_MEMBER_ID, (Object)data.memberId());
        Assertions.assertEquals((int)0, (int)data.memberEpoch());
        Assertions.assertNull((Object)data.instanceId());
        Assertions.assertEquals((int)10000, (int)data.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.emptyList(), (Object)data.subscribedTopicNames());
        Assertions.assertEquals((Object)DEFAULT_REMOTE_ASSIGNOR, (Object)data.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), (Object)data.topicPartitions());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.STABLE);
        Mockito.when((Object)this.subscriptions.hasAutoAssignedPartitions()).thenReturn((Object)true);
        Mockito.when((Object)this.subscriptions.rebalanceListener()).thenReturn(Optional.empty());
        this.mockStableMemberData(null);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)DEFAULT_GROUP_ID, (Object)data.groupId());
        Assertions.assertEquals((Object)DEFAULT_MEMBER_ID, (Object)data.memberId());
        Assertions.assertEquals((int)1, (int)data.memberEpoch());
        Assertions.assertNull((Object)data.instanceId());
        Assertions.assertEquals((int)-1, (int)data.rebalanceTimeoutMs());
        Assertions.assertNull((Object)data.subscribedTopicNames());
        Assertions.assertNull((Object)data.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), (Object)data.topicPartitions());
        String topic = "topic1";
        this.subscriptions.subscribe(Collections.singleton(topic), Optional.empty());
        Mockito.when((Object)this.subscriptions.subscription()).thenReturn(Collections.singleton(topic));
        this.mockRejoiningMemberData();
        data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)DEFAULT_GROUP_ID, (Object)data.groupId());
        Assertions.assertEquals((Object)DEFAULT_MEMBER_ID, (Object)data.memberId());
        Assertions.assertEquals((int)0, (int)data.memberEpoch());
        Assertions.assertNull((Object)data.instanceId());
        Assertions.assertEquals((int)10000, (int)data.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.singletonList(topic), (Object)data.subscribedTopicNames());
        Assertions.assertEquals((Object)DEFAULT_REMOTE_ASSIGNOR, (Object)data.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), (Object)data.topicPartitions());
        data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)DEFAULT_GROUP_ID, (Object)data.groupId());
        Assertions.assertEquals((Object)DEFAULT_MEMBER_ID, (Object)data.memberId());
        Assertions.assertEquals((int)0, (int)data.memberEpoch());
        Assertions.assertNull((Object)data.instanceId());
        Assertions.assertEquals((int)10000, (int)data.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.singletonList(topic), (Object)data.subscribedTopicNames());
        Assertions.assertEquals((Object)DEFAULT_REMOTE_ASSIGNOR, (Object)data.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), (Object)data.topicPartitions());
        ConsumerGroupHeartbeatResponseData.TopicPartitions tpTopic1 = new ConsumerGroupHeartbeatResponseData.TopicPartitions();
        Uuid topicId = Uuid.randomUuid();
        tpTopic1.setTopicId(topicId);
        tpTopic1.setPartitions(Collections.singletonList(0));
        ConsumerGroupHeartbeatResponseData.Assignment assignmentTopic1 = new ConsumerGroupHeartbeatResponseData.Assignment();
        assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1));
        Mockito.when((Object)this.metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, "topic1"));
    }

    @Test
    public void testPollTimerExpiration() {
        this.heartbeatRequestManager = this.createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)false);
        this.time.sleep(10000L);
        this.assertHeartbeat(this.heartbeatRequestManager, 1000);
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToSendingLeaveGroup(true);
        ((ConsumerHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)this.heartbeatState)).reset();
        ((HeartbeatRequestState)Mockito.verify((Object)this.heartbeatRequestState)).reset();
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestGenerated();
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)true);
        this.assertNoHeartbeat(this.heartbeatRequestManager);
        this.heartbeatRequestManager.resetPollTimer(this.time.milliseconds());
        Assertions.assertTrue((boolean)this.pollTimer.notExpired());
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).maybeRejoinStaleMember();
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)false);
        this.assertHeartbeat(this.heartbeatRequestManager, 1000);
    }

    @ParameterizedTest
    @MethodSource(value={"pollOnLeavingMatrix"})
    public void testPollOnLeaving(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
        this.heartbeatRequestManager = this.createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.LEAVING);
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
        Mockito.when((Object)this.membershipManager.leaveGroupOperation()).thenReturn((Object)operation);
        if (groupInstanceId.isEmpty() && CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP == operation) {
            this.assertNoHeartbeat(this.heartbeatRequestManager);
            ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager, (VerificationMode)Mockito.never())).onHeartbeatRequestGenerated();
        } else {
            this.assertHeartbeat(this.heartbeatRequestManager, 1000);
            ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestGenerated();
        }
    }

    @Test
    public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() {
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)false);
        Mockito.when((Object)this.membershipManager.isLeavingGroup()).thenReturn((Object)true);
        this.time.sleep(10000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager, (VerificationMode)Mockito.never())).transitionToSendingLeaveGroup(ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size(), (String)"A heartbeat request should be generated to complete the ongoing leaving operation that was triggered before the poll timer expired.");
    }

    @Test
    public void testisExpiredByUsedForLogging() {
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)false);
        int exceededTimeMs = 5;
        this.time.sleep((long)(10000 + exceededTimeMs));
        Mockito.when((Object)this.membershipManager.isLeavingGroup()).thenReturn((Object)false);
        NetworkClientDelegate.PollResult pollResult = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)pollResult.unsentRequests.size());
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToSendingLeaveGroup(true);
        ((Timer)Mockito.verify((Object)this.pollTimer, (VerificationMode)Mockito.never())).isExpiredBy();
        Mockito.clearInvocations((Object[])new Timer[]{this.pollTimer});
        this.heartbeatRequestManager.resetPollTimer(this.time.milliseconds());
        ((Timer)Mockito.verify((Object)this.pollTimer)).isExpiredBy();
    }

    @Test
    public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
        this.heartbeatRequestManager = this.createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        Mockito.when((Object)this.subscriptions.hasAutoAssignedPartitions()).thenReturn((Object)true);
        ClientResponse response = this.createHeartbeatResponse((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0), Errors.FENCED_MEMBER_EPOCH);
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onComplete(response);
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFenced();
        ((HeartbeatRequestState)Mockito.verify((Object)this.heartbeatRequestState)).onFailedAttempt(ArgumentMatchers.anyLong());
        ((HeartbeatRequestState)Mockito.verify((Object)this.heartbeatRequestState)).reset();
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)true);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size(), (String)"Member should not send heartbeats while FENCED");
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)false);
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size(), (String)"Fenced member should resume heartbeat after transitioning to JOINING");
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(short version) {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size(), (String)"No heartbeat should be sent while a previous one is in-flight");
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.LEAVING);
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(Optional.empty());
        Mockito.when((Object)this.heartbeatState.buildRequestData()).thenReturn((Object)new ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1));
        ConsumerGroupHeartbeatRequest heartbeatToLeave = this.getHeartbeatRequest(this.heartbeatRequestManager, version);
        Assertions.assertEquals((int)-1, (int)heartbeatToLeave.data().memberEpoch());
        Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)true);
        NetworkClientDelegate.PollResult pollAgain = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)pollAgain.unsentRequests.size());
    }

    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    public void testConsumerAcksReconciledAssignmentAfterAckLost(short version) {
        String topic = "topic1";
        Set<String> topics = Collections.singleton(topic);
        Uuid topicId = Uuid.randomUuid();
        int partition = 0;
        Map<Uuid, SortedSet<Integer>> testAssignment = Collections.singletonMap(topicId, Utils.mkSortedSet((Comparable[])new Integer[]{partition}));
        this.createHeartbeatStateAndRequestManager();
        Mockito.when((Object)this.subscriptions.subscription()).thenReturn(topics);
        this.subscriptions.subscribe(topics, Optional.empty());
        this.mockReconcilingMemberData(testAssignment);
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertFalse((boolean)result.unsentRequests.isEmpty());
        ((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), (RuntimeException)((Object)new TimeoutException("timeout")));
        this.time.sleep(10000L);
        this.assertHeartbeat(this.heartbeatRequestManager, 1000);
        ((HeartbeatRequestState)Mockito.verify((Object)this.heartbeatRequestState)).reset();
        result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        NetworkClientDelegate.UnsentRequest request = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
        ConsumerGroupHeartbeatRequest heartbeatRequest = (ConsumerGroupHeartbeatRequest)request.requestBuilder().build(version);
        Assertions.assertEquals(Collections.singletonList(topic), (Object)heartbeatRequest.data().subscribedTopicNames());
        Assertions.assertEquals((int)testAssignment.size(), (int)heartbeatRequest.data().topicPartitions().size());
        ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions = (ConsumerGroupHeartbeatRequestData.TopicPartitions)heartbeatRequest.data().topicPartitions().get(0);
        Assertions.assertEquals((Object)topicId, (Object)topicPartitions.topicId());
        Assertions.assertEquals(Collections.singletonList(partition), (Object)topicPartitions.partitions());
    }

    @ParameterizedTest
    @MethodSource(value={"pollOnLeavingMatrix"})
    public void testPollOnCloseGeneratesRequestIfNeeded(Optional<String> groupInstanceId, CloseOptions.GroupMembershipOperation operation) {
        if (groupInstanceId.isEmpty() && CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP == operation) {
            Mockito.when((Object)this.membershipManager.isLeavingGroup()).thenReturn((Object)false);
        } else {
            Mockito.when((Object)this.membershipManager.isLeavingGroup()).thenReturn((Object)true);
        }
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(groupInstanceId);
        Mockito.when((Object)this.membershipManager.leaveGroupOperation()).thenReturn((Object)operation);
        String membership = groupInstanceId.isEmpty() ? "dynamic" : "static";
        NetworkClientDelegate.PollResult pollResult = this.heartbeatRequestManager.pollOnClose(this.time.milliseconds());
        if (groupInstanceId.isEmpty() && CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP == operation) {
            Assertions.assertTrue((boolean)pollResult.unsentRequests.isEmpty(), (String)("A request to leave the group should not be generated if the " + membership + " is still leaving when closing the manager and GroupMembershipOperation is " + operation.name()));
        } else {
            Assertions.assertEquals((int)1, (int)pollResult.unsentRequests.size(), (String)("A request to leave the group should be generated if the " + membership + " is still leaving when closing the manager and GroupMembershipOperation is " + operation.name()));
        }
    }

    @Test
    public void testRegexInHeartbeatLifecycle() {
        this.heartbeatState = new ConsumerHeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, 10000);
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        this.mockJoiningMemberData(null);
        Mockito.when((Object)this.subscriptions.subscriptionPattern()).thenReturn((Object)new SubscriptionPattern("t1.*"));
        ConsumerGroupHeartbeatRequestData data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)"t1.*", (Object)data.subscribedTopicRegex());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.STABLE);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertNull((Object)data.subscribedTopicRegex());
        Mockito.when((Object)this.subscriptions.subscriptionPattern()).thenReturn((Object)new SubscriptionPattern("t2.*"));
        data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)"t2.*", (Object)data.subscribedTopicRegex());
        Mockito.when((Object)this.subscriptions.subscriptionPattern()).thenReturn(null);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)"", (Object)data.subscribedTopicRegex());
        Mockito.when((Object)this.subscriptions.subscriptionPattern()).thenReturn(null);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertNull((Object)data.subscribedTopicRegex());
    }

    @Test
    public void testRegexInJoiningHeartbeat() {
        this.heartbeatState = new ConsumerHeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, 10000);
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        this.mockJoiningMemberData(null);
        Mockito.when((Object)this.subscriptions.subscriptionPattern()).thenReturn((Object)new SubscriptionPattern("t1.*"));
        ConsumerGroupHeartbeatRequestData data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)"t1.*", (Object)data.subscribedTopicRegex());
        Mockito.when((Object)this.subscriptions.subscriptionPattern()).thenReturn(null);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)"", (Object)data.subscribedTopicRegex());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        Mockito.when((Object)this.subscriptions.subscriptionPattern()).thenReturn(null);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertNull((Object)data.subscribedTopicRegex());
    }

    @Test
    public void testRackIdInHeartbeatLifecycle() {
        this.heartbeatState = new ConsumerHeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, 10000);
        this.createHeartbeatRequestStateWithZeroHeartbeatInterval();
        this.mockJoiningMemberData(null);
        Mockito.when((Object)this.membershipManager.rackId()).thenReturn(Optional.of("rack1"));
        ConsumerGroupHeartbeatRequestData data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)"rack1", (Object)data.rackId());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.STABLE);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertNull((Object)data.rackId());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)"rack1", (Object)data.rackId());
        Mockito.when((Object)this.membershipManager.rackId()).thenReturn(Optional.empty());
        this.heartbeatState = new ConsumerHeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, 10000);
        data = this.heartbeatState.buildRequestData();
        Assertions.assertNull((Object)data.rackId());
    }

    private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int nextPollMs) {
        NetworkClientDelegate.PollResult pollResult = hrm.poll(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)pollResult.unsentRequests.size());
        Assertions.assertEquals((long)nextPollMs, (long)pollResult.timeUntilNextPollMs);
        ((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0)).handler().onComplete(this.createHeartbeatResponse((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0), Errors.NONE));
    }

    private void assertNoHeartbeat(ConsumerHeartbeatRequestManager hrm) {
        NetworkClientDelegate.PollResult pollResult = hrm.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)pollResult.unsentRequests.size());
    }

    private void ensureFatalError(Errors expectedError) {
        ((ConsumerMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFatal();
        ArgumentCaptor errorEventArgumentCaptor = ArgumentCaptor.forClass(ErrorEvent.class);
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEventArgumentCaptor.capture());
        ErrorEvent errorEvent = (ErrorEvent)errorEventArgumentCaptor.getValue();
        Assertions.assertInstanceOf(expectedError.exception().getClass(), (Object)errorEvent.error(), (String)"The fatal error propagated to the app thread does not match the error received in the heartbeat response.");
        this.ensureHeartbeatStopped();
    }

    private void ensureHeartbeatStopped() {
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult result = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
    }

    private static Collection<Arguments> errorProvider() {
        return Arrays.asList(Arguments.of((Object[])new Object[]{Errors.NONE, false}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_NOT_AVAILABLE, false}), Arguments.of((Object[])new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS, false}), Arguments.of((Object[])new Object[]{Errors.NOT_COORDINATOR, false}), Arguments.of((Object[])new Object[]{Errors.GROUP_AUTHORIZATION_FAILED, true}), Arguments.of((Object[])new Object[]{Errors.INVALID_REQUEST, true}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_MEMBER_ID, false}), Arguments.of((Object[])new Object[]{Errors.FENCED_MEMBER_EPOCH, false}), Arguments.of((Object[])new Object[]{Errors.UNSUPPORTED_ASSIGNOR, true}), Arguments.of((Object[])new Object[]{Errors.UNSUPPORTED_VERSION, true}), Arguments.of((Object[])new Object[]{Errors.UNRELEASED_INSTANCE_ID, true}), Arguments.of((Object[])new Object[]{Errors.FENCED_INSTANCE_ID, true}), Arguments.of((Object[])new Object[]{Errors.GROUP_MAX_SIZE_REACHED, true}), Arguments.of((Object[])new Object[]{Errors.TOPIC_AUTHORIZATION_FAILED, false}));
    }

    private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest request, Errors error) {
        return this.createHeartbeatResponse(request, error, "stubbed error message");
    }

    private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest request, Errors error, String msg) {
        ConsumerGroupHeartbeatResponseData data = new ConsumerGroupHeartbeatResponseData().setErrorCode(error.code()).setHeartbeatIntervalMs(1000).setMemberId(DEFAULT_MEMBER_ID).setMemberEpoch(1);
        if (error != Errors.NONE) {
            data.setErrorMessage(msg);
        }
        ConsumerGroupHeartbeatResponse response = new ConsumerGroupHeartbeatResponse(data);
        return new ClientResponse(new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), (RequestCompletionHandler)request.handler(), "0", this.time.milliseconds(), this.time.milliseconds(), false, null, null, (AbstractResponse)response);
    }

    private ClientResponse createHeartbeatResponseWithException(NetworkClientDelegate.UnsentRequest request, UnsupportedVersionException exception, boolean isFromBroker) {
        ConsumerGroupHeartbeatResponse response = null;
        if (isFromBroker) {
            response = new ConsumerGroupHeartbeatResponse(null);
        }
        return new ClientResponse(new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), (RequestCompletionHandler)request.handler(), "0", this.time.milliseconds(), this.time.milliseconds(), false, exception, null, (AbstractResponse)response);
    }

    private ConsumerConfig config() {
        Properties prop = new Properties();
        prop.put("key.deserializer", StringDeserializer.class);
        prop.put("value.deserializer", StringDeserializer.class);
        prop.setProperty("bootstrap.servers", "localhost:9999");
        prop.setProperty("max.poll.interval.ms", String.valueOf(10000));
        prop.setProperty("retry.backoff.ms", String.valueOf(80L));
        prop.setProperty("retry.backoff.max.ms", String.valueOf(1000L));
        return new ConsumerConfig(prop);
    }

    private ConsumerHeartbeatRequestManager createHeartbeatRequestManager(CoordinatorRequestManager coordinatorRequestManager, ConsumerMembershipManager membershipManager, ConsumerHeartbeatRequestManager.HeartbeatState heartbeatState, HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler) {
        LogContext logContext = new LogContext();
        this.pollTimer = this.time.timer(10000L);
        return new ConsumerHeartbeatRequestManager(logContext, this.pollTimer, this.config(), coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, backgroundEventHandler, new Metrics());
    }

    private void mockJoiningMemberData(String instanceId) {
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
        Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)DEFAULT_MEMBER_ID);
        Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)0);
        Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)DEFAULT_GROUP_ID);
        Mockito.when((Object)this.membershipManager.currentAssignment()).thenReturn((Object)AbstractMembershipManager.LocalAssignment.NONE);
        Mockito.when((Object)this.membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
    }

    private void mockRejoiningMemberData() {
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)0);
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(Optional.empty());
    }

    private void mockStableMemberData(String instanceId) {
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId));
        Mockito.when((Object)this.membershipManager.currentAssignment()).thenReturn((Object)new AbstractMembershipManager.LocalAssignment(0L, Collections.emptyMap()));
        Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)DEFAULT_GROUP_ID);
        Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)DEFAULT_MEMBER_ID);
        Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)1);
        Mockito.when((Object)this.membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
    }

    private void mockReconcilingMemberData(Map<Uuid, SortedSet<Integer>> assignment) {
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.RECONCILING);
        Mockito.when((Object)this.membershipManager.currentAssignment()).thenReturn((Object)new AbstractMembershipManager.LocalAssignment(0L, assignment));
        Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)DEFAULT_MEMBER_ID);
        Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)1);
        Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)DEFAULT_GROUP_ID);
        Mockito.when((Object)this.membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
    }

    private static Stream<Arguments> pollOnLeavingMatrix() {
        return Stream.of(Arguments.of((Object[])new Object[]{Optional.empty(), CloseOptions.GroupMembershipOperation.DEFAULT}), Arguments.of((Object[])new Object[]{Optional.empty(), CloseOptions.GroupMembershipOperation.LEAVE_GROUP}), Arguments.of((Object[])new Object[]{Optional.empty(), CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP}), Arguments.of((Object[])new Object[]{Optional.of("groupInstanceId"), CloseOptions.GroupMembershipOperation.DEFAULT}), Arguments.of((Object[])new Object[]{Optional.of("groupInstanceId"), CloseOptions.GroupMembershipOperation.LEAVE_GROUP}), Arguments.of((Object[])new Object[]{Optional.of("groupInstanceId"), CloseOptions.GroupMembershipOperation.REMAIN_IN_GROUP}));
    }
}

