/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestState;
import org.apache.kafka.clients.consumer.internals.MemberState;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.StreamsGroupHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatRequest;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
class StreamsGroupHeartbeatRequestManagerTest {
    private static final LogContext LOG_CONTEXT = new LogContext("test");
    private static final long RECEIVED_HEARTBEAT_INTERVAL_MS = 1200L;
    private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
    private static final String GROUP_ID = "group-id";
    private static final String MEMBER_ID = "member-id";
    private static final int MEMBER_EPOCH = 1;
    private static final String INSTANCE_ID = "instance-id";
    private static final UUID PROCESS_ID = UUID.randomUUID();
    private static final StreamsRebalanceData.HostInfo ENDPOINT = new StreamsRebalanceData.HostInfo("localhost", 8080);
    private static final String SOURCE_TOPIC_1 = "sourceTopic1";
    private static final String SOURCE_TOPIC_2 = "sourceTopic2";
    private static final Set<String> SOURCE_TOPICS = Set.of("sourceTopic1", "sourceTopic2");
    private static final String REPARTITION_SINK_TOPIC_1 = "repartitionSinkTopic1";
    private static final String REPARTITION_SINK_TOPIC_2 = "repartitionSinkTopic2";
    private static final String REPARTITION_SINK_TOPIC_3 = "repartitionSinkTopic3";
    private static final Set<String> REPARTITION_SINK_TOPICS = Set.of("repartitionSinkTopic1", "repartitionSinkTopic2", "repartitionSinkTopic3");
    private static final String REPARTITION_SOURCE_TOPIC_1 = "repartitionSourceTopic1";
    private static final String REPARTITION_SOURCE_TOPIC_2 = "repartitionSourceTopic2";
    private static final Map<String, StreamsRebalanceData.TopicInfo> REPARTITION_SOURCE_TOPICS = Map.of("repartitionSourceTopic1", new StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short)1), Map.of("config3", "value3", "config1", "value1")), "repartitionSourceTopic2", new StreamsRebalanceData.TopicInfo(Optional.of(3), Optional.of((short)3), Collections.emptyMap()));
    private static final String CHANGELOG_TOPIC_1 = "changelogTopic1";
    private static final String CHANGELOG_TOPIC_2 = "changelogTopic2";
    private static final String CHANGELOG_TOPIC_3 = "changelogTopic3";
    private static final Map<String, StreamsRebalanceData.TopicInfo> CHANGELOG_TOPICS = Map.of("changelogTopic1", new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short)1), Map.of()), "changelogTopic2", new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short)2), Map.of()), "changelogTopic3", new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short)3), Map.of("config4", "value4", "config2", "value2")));
    private static final Collection<Set<String>> COPARTITION_GROUP = Set.of(Set.of("sourceTopic1", "repartitionSourceTopic2"), Set.of("sourceTopic2", "repartitionSourceTopic1"));
    private static final String SUBTOPOLOGY_NAME_1 = "subtopology1";
    private static final StreamsRebalanceData.Subtopology SUBTOPOLOGY_1 = new StreamsRebalanceData.Subtopology(SOURCE_TOPICS, REPARTITION_SINK_TOPICS, REPARTITION_SOURCE_TOPICS, CHANGELOG_TOPICS, COPARTITION_GROUP);
    private static final String SUBTOPOLOGY_NAME_2 = "subtopology2";
    private static final String SOURCE_TOPIC_3 = "sourceTopic3";
    private static final String CHANGELOG_TOPIC_4 = "changelogTopic4";
    private static final StreamsRebalanceData.Subtopology SUBTOPOLOGY_2 = new StreamsRebalanceData.Subtopology(Set.of("sourceTopic3"), Set.of(), Map.of(), Map.of("changelogTopic4", new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short)1), Map.of())), Collections.emptyList());
    private static final Map<String, StreamsRebalanceData.Subtopology> SUBTOPOLOGIES = Map.of("subtopology1", SUBTOPOLOGY_1, "subtopology2", SUBTOPOLOGY_2);
    private static final String CLIENT_TAG_1 = "client-tag1";
    private static final String VALUE_1 = "value1";
    private static final Map<String, String> CLIENT_TAGS = Map.of("client-tag1", "value1");
    private static final List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> ENDPOINT_TO_PARTITIONS = List.of(new StreamsGroupHeartbeatResponseData.EndpointToPartitions().setUserEndpoint(new StreamsGroupHeartbeatResponseData.Endpoint().setHost("localhost").setPort(8080)).setActivePartitions(List.of(new StreamsGroupHeartbeatResponseData.TopicPartition().setTopic("topic").setPartitions(List.of(Integer.valueOf(0))))));
    private final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(PROCESS_ID, Optional.of(ENDPOINT), SUBTOPOLOGIES, CLIENT_TAGS);
    private final Time time = new MockTime();
    private final ConsumerConfig config = StreamsGroupHeartbeatRequestManagerTest.config();
    @Mock
    private CoordinatorRequestManager coordinatorRequestManager;
    @Mock
    private StreamsMembershipManager membershipManager;
    @Mock
    private BackgroundEventHandler backgroundEventHandler;
    private final Metrics metrics = new Metrics(this.time);
    private final Node coordinatorNode = new Node(1, "localhost", 9092);

    StreamsGroupHeartbeatRequestManagerTest() {
    }

    @Test
    public void testConstructWithNullCoordinatorRequestManager() {
        Exception exception = (Exception)Assertions.assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(new LogContext("test"), this.time, this.config, null, this.membershipManager, this.backgroundEventHandler, this.metrics, this.streamsRebalanceData));
        Assertions.assertEquals((Object)"Coordinator request manager cannot be null", (Object)exception.getMessage());
    }

    @Test
    public void testConstructWithNullMembershipManager() {
        Exception exception = (Exception)Assertions.assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(new LogContext("test"), this.time, this.config, this.coordinatorRequestManager, null, this.backgroundEventHandler, this.metrics, this.streamsRebalanceData));
        Assertions.assertEquals((Object)"Streams membership manager cannot be null", (Object)exception.getMessage());
    }

    @Test
    public void testConstructWithNullBackgroundEventHandler() {
        Exception exception = (Exception)Assertions.assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(new LogContext("test"), this.time, this.config, this.coordinatorRequestManager, this.membershipManager, null, this.metrics, this.streamsRebalanceData));
        Assertions.assertEquals((Object)"Background event handler cannot be null", (Object)exception.getMessage());
    }

    @Test
    public void testConstructWithNullMetrics() {
        Exception exception = (Exception)Assertions.assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(new LogContext("test"), this.time, this.config, this.coordinatorRequestManager, this.membershipManager, this.backgroundEventHandler, null, this.streamsRebalanceData));
        Assertions.assertEquals((Object)"Metrics cannot be null", (Object)exception.getMessage());
    }

    @Test
    public void testConstructWithNullStreamsRebalanceData() {
        Exception exception = (Exception)Assertions.assertThrows(NullPointerException.class, () -> new StreamsGroupHeartbeatRequestManager(new LogContext("test"), this.time, this.config, this.coordinatorRequestManager, this.membershipManager, this.backgroundEventHandler, this.metrics, null));
        Assertions.assertEquals((Object)"Streams rebalance data cannot be null", (Object)exception.getMessage());
    }

    @Test
    public void testNoHeartbeatIfCoordinatorUnknown() {
        try (MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestSkipped();
            ((Timer)Mockito.verify((Object)pollTimer, (VerificationMode)Mockito.never())).update();
        }
    }

    @Test
    public void testNoHeartbeatIfHeartbeatSkipped() {
        try (MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.shouldSkipHeartbeat()).thenReturn((Object)true);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestSkipped();
            ((Timer)Mockito.verify((Object)pollTimer, (VerificationMode)Mockito.never())).update();
        }
    }

    @Test
    public void testPropagateCoordinatorFatalErrorToApplicationThread() {
        StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
        Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
        RuntimeException fatalError = new RuntimeException("KABOOM");
        Mockito.when((Object)this.coordinatorRequestManager.getAndClearFatalError()).thenReturn(Optional.of(fatalError));
        NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
        ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestSkipped();
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)ArgumentMatchers.argThat(errorEvent -> errorEvent instanceof ErrorEvent && ((ErrorEvent)errorEvent).error() == fatalError));
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testSendingHeartbeatIfMemberIsLeaving(boolean requestInFlight) {
        long heartbeatIntervalMs = 1234L;
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> {
            Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)false);
            Mockito.when((Object)mock.heartbeatIntervalMs()).thenReturn((Object)1234L);
            Mockito.when((Object)mock.requestInFlight()).thenReturn((Object)requestInFlight);
        });
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.LEAVING);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)1234L, (long)result.timeUntilNextPollMs);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @ParameterizedTest
    @EnumSource(value=MemberState.class, names={"JOINING", "ACKNOWLEDGING"})
    public void testSendingHeartbeatIfMemberIsJoiningOrAcknowledging(MemberState memberState) {
        long heartbeatIntervalMs = 1234L;
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> {
            Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)false);
            Mockito.when((Object)mock.heartbeatIntervalMs()).thenReturn((Object)1234L);
        });
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)1234L, (long)result.timeUntilNextPollMs);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @ParameterizedTest
    @EnumSource(value=MemberState.class, names={"JOINING", "ACKNOWLEDGING"})
    public void testNotSendingHeartbeatIfMemberIsJoiningOrAcknowledgingWhenHeartbeatInFlight(MemberState memberState) {
        long timeToNextHeartbeatMs = 1234L;
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> {
            Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)false);
            Mockito.when((Object)mock.timeToNextHeartbeatMs(this.time.milliseconds())).thenReturn((Object)1234L);
            Mockito.when((Object)mock.requestInFlight()).thenReturn((Object)true);
        });
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)1234L, (long)result.timeUntilNextPollMs);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @Test
    public void testSendingHeartbeatIfHeartbeatCanBeSent() {
        long heartbeatIntervalMs = 1234L;
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> {
            Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true);
            Mockito.when((Object)mock.heartbeatIntervalMs()).thenReturn((Object)1234L);
        });
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.STABLE);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)1234L, (long)result.timeUntilNextPollMs);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @Test
    public void testNotSendingHeartbeatIfHeartbeatCannotBeSent() {
        long timeToNextHeartbeatMs = 1234L;
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> {
            Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)false);
            Mockito.when((Object)mock.timeToNextHeartbeatMs(this.time.milliseconds())).thenReturn((Object)1234L);
        });
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)1234L, (long)result.timeUntilNextPollMs);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @Test
    public void testSendingLeaveHeartbeatIfPollTimerExpired() {
        long heartbeatIntervalMs = 1234L;
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.heartbeatIntervalMs()).thenReturn((Object)1234L));
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class, (mock, context) -> Mockito.when((Object)mock.isExpired()).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)1234L, (long)result.timeUntilNextPollMs);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onPollTimerExpired();
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).reset();
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
        }
    }

    @Test
    public void testNotSendingLeaveHeartbeatIfPollTimerExpiredAndMemberIsLeaving() {
        long timeToNextHeartbeatMs = 1234L;
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.timeToNextHeartbeatMs(this.time.milliseconds())).thenReturn((Object)1234L));
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class, (mock, context) -> Mockito.when((Object)mock.isExpired()).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.isLeavingGroup()).thenReturn((Object)true);
            Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.PREPARE_LEAVING);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)0, (int)result.unsentRequests.size());
            Assertions.assertEquals((long)1234L, (long)result.timeUntilNextPollMs);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager, (VerificationMode)Mockito.never())).onPollTimerExpired();
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState, (VerificationMode)Mockito.never())).reset();
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState, (VerificationMode)Mockito.never())).reset();
        }
    }

    @Test
    public void testSendingLeaveHeartbeatRequestWhenPollTimerExpired() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class, (mock, context) -> Mockito.when((Object)mock.isExpired()).thenReturn((Object)true));){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)GROUP_ID);
            Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)MEMBER_ID);
            Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)-1);
            Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((long)0L, (long)result.timeUntilNextPollMs);
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            Assertions.assertEquals(Optional.of(this.coordinatorNode), (Object)((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).node());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest)networkRequest.requestBuilder().build();
            Assertions.assertEquals((Object)GROUP_ID, (Object)streamsRequest.data().groupId());
            Assertions.assertEquals((Object)MEMBER_ID, (Object)streamsRequest.data().memberId());
            Assertions.assertEquals((int)-1, (int)streamsRequest.data().memberEpoch());
            Assertions.assertEquals((Object)INSTANCE_ID, (Object)streamsRequest.data().instanceId());
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).onSendAttempt(this.time.milliseconds());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestGenerated();
            ClientResponse response = this.buildClientResponse();
            networkRequest.handler().onComplete(response);
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState, (VerificationMode)Mockito.never())).updateHeartbeatIntervalMs(ArgumentMatchers.anyLong());
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState, (VerificationMode)Mockito.never())).onSuccessfulAttempt(ArgumentMatchers.anyLong());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager, (VerificationMode)Mockito.never())).onHeartbeatSuccess((StreamsGroupHeartbeatResponse)ArgumentMatchers.any());
        }
    }

    @Test
    public void testSendingHeartbeatRequest() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)GROUP_ID);
            Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)MEMBER_ID);
            Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)1);
            Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((long)0L, (long)result.timeUntilNextPollMs);
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            Assertions.assertEquals(Optional.of(this.coordinatorNode), (Object)((NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0)).node());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest)networkRequest.requestBuilder().build();
            Assertions.assertEquals((Object)GROUP_ID, (Object)streamsRequest.data().groupId());
            Assertions.assertEquals((Object)MEMBER_ID, (Object)streamsRequest.data().memberId());
            Assertions.assertEquals((int)1, (int)streamsRequest.data().memberEpoch());
            Assertions.assertEquals((Object)INSTANCE_ID, (Object)streamsRequest.data().instanceId());
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).onSendAttempt(this.time.milliseconds());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatRequestGenerated();
            this.time.sleep(2000L);
            Assertions.assertEquals((Object)2.0, (Object)this.metrics.metric(this.metrics.metricName("last-heartbeat-seconds-ago", "consumer-coordinator-metrics")).metricValue());
            ClientResponse response = this.buildClientResponse();
            networkRequest.handler().onComplete(response);
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onHeartbeatSuccess((StreamsGroupHeartbeatResponse)response.responseBody());
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).updateHeartbeatIntervalMs(1200L);
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).onSuccessfulAttempt(networkRequest.handler().completionTimeMs());
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).resetTimer();
            List topicPartitions = ((StreamsRebalanceData.EndpointPartitions)this.streamsRebalanceData.partitionsByHost().get(new StreamsRebalanceData.HostInfo(ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().host(), ENDPOINT_TO_PARTITIONS.get(0).userEndpoint().port()))).activePartitions();
            Assertions.assertEquals((Object)((StreamsGroupHeartbeatResponseData.TopicPartition)ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0)).topic(), (Object)((TopicPartition)topicPartitions.get(0)).topic());
            Assertions.assertEquals((Integer)((Integer)((StreamsGroupHeartbeatResponseData.TopicPartition)ENDPOINT_TO_PARTITIONS.get(0).activePartitions().get(0)).partitions().get(0)), (int)((TopicPartition)topicPartitions.get(0)).partition());
            Assertions.assertEquals((Object)1.0, (Object)this.metrics.metric(this.metrics.metricName("heartbeat-total", "consumer-coordinator-metrics")).metricValue());
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={false, true})
    public void testBuildingHeartbeatRequestFieldsThatAreAlwaysSent(boolean instanceIdPresent) {
        Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)GROUP_ID);
        Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)MEMBER_ID);
        Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)1);
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(instanceIdPresent ? Optional.of(INSTANCE_ID) : Optional.empty());
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1000);
        StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)GROUP_ID, (Object)requestData1.groupId());
        Assertions.assertEquals((Object)MEMBER_ID, (Object)requestData1.memberId());
        Assertions.assertEquals((int)1, (int)requestData1.memberEpoch());
        if (instanceIdPresent) {
            Assertions.assertEquals((Object)INSTANCE_ID, (Object)requestData1.instanceId());
        } else {
            Assertions.assertNull((Object)requestData1.instanceId());
        }
        StreamsGroupHeartbeatRequestData requestData2 = heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)GROUP_ID, (Object)requestData2.groupId());
        Assertions.assertEquals((Object)MEMBER_ID, (Object)requestData2.memberId());
        Assertions.assertEquals((int)1, (int)requestData2.memberEpoch());
        if (instanceIdPresent) {
            Assertions.assertEquals((Object)INSTANCE_ID, (Object)requestData2.instanceId());
        } else {
            Assertions.assertNull((Object)requestData2.instanceId());
        }
    }

    @ParameterizedTest
    @MethodSource(value={"provideNonJoiningStates"})
    public void testBuildingHeartbeatRequestTopologySentWhenJoining(MemberState memberState) {
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1000);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
        Assertions.assertEquals((int)this.streamsRebalanceData.topologyEpoch(), (int)requestData1.topology().epoch());
        List subtopologies = requestData1.topology().subtopologies();
        Assertions.assertEquals((int)2, (int)subtopologies.size());
        StreamsGroupHeartbeatRequestData.Subtopology subtopology1 = (StreamsGroupHeartbeatRequestData.Subtopology)subtopologies.get(0);
        Assertions.assertEquals((Object)SUBTOPOLOGY_NAME_1, (Object)subtopology1.subtopologyId());
        Assertions.assertEquals(List.of(SOURCE_TOPIC_1, SOURCE_TOPIC_2), (Object)subtopology1.sourceTopics());
        Assertions.assertEquals(List.of(REPARTITION_SINK_TOPIC_1, REPARTITION_SINK_TOPIC_2, REPARTITION_SINK_TOPIC_3), (Object)subtopology1.repartitionSinkTopics());
        Assertions.assertEquals((int)REPARTITION_SOURCE_TOPICS.size(), (int)subtopology1.repartitionSourceTopics().size());
        subtopology1.repartitionSourceTopics().forEach(topicInfo -> {
            StreamsRebalanceData.TopicInfo repartitionTopic = REPARTITION_SOURCE_TOPICS.get(topicInfo.name());
            Assertions.assertEquals((Integer)((Integer)repartitionTopic.numPartitions().get()), (int)topicInfo.partitions());
            Assertions.assertEquals((Short)((Short)repartitionTopic.replicationFactor().get()), (short)topicInfo.replicationFactor());
            Assertions.assertEquals((int)repartitionTopic.topicConfigs().size(), (int)topicInfo.topicConfigs().size());
            Assertions.assertTrue((boolean)this.isSorted(topicInfo.topicConfigs(), Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
        });
        Assertions.assertEquals((int)CHANGELOG_TOPICS.size(), (int)subtopology1.stateChangelogTopics().size());
        subtopology1.stateChangelogTopics().forEach(topicInfo -> {
            Assertions.assertTrue((boolean)CHANGELOG_TOPICS.containsKey(topicInfo.name()));
            Assertions.assertEquals((int)0, (int)topicInfo.partitions());
            StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name());
            Assertions.assertEquals((Short)((Short)changelogTopic.replicationFactor().get()), (short)topicInfo.replicationFactor());
            Assertions.assertEquals((int)changelogTopic.topicConfigs().size(), (int)topicInfo.topicConfigs().size());
            Assertions.assertTrue((boolean)this.isSorted(topicInfo.topicConfigs(), Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
        });
        Assertions.assertEquals((int)2, (int)subtopology1.copartitionGroups().size());
        StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 = new StreamsGroupHeartbeatRequestData.CopartitionGroup().setRepartitionSourceTopics(Collections.singletonList((short)0)).setSourceTopics(Collections.singletonList((short)1));
        StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData2 = new StreamsGroupHeartbeatRequestData.CopartitionGroup().setRepartitionSourceTopics(Collections.singletonList((short)1)).setSourceTopics(Collections.singletonList((short)0));
        Assertions.assertTrue((boolean)subtopology1.copartitionGroups().contains(expectedCopartitionGroupData1));
        Assertions.assertTrue((boolean)subtopology1.copartitionGroups().contains(expectedCopartitionGroupData2));
        StreamsGroupHeartbeatRequestData.Subtopology subtopology2 = (StreamsGroupHeartbeatRequestData.Subtopology)subtopologies.get(1);
        Assertions.assertEquals((Object)SUBTOPOLOGY_NAME_2, (Object)subtopology2.subtopologyId());
        Assertions.assertEquals(List.of(SOURCE_TOPIC_3), (Object)subtopology2.sourceTopics());
        Assertions.assertEquals(Collections.emptyList(), (Object)subtopology2.repartitionSinkTopics());
        Assertions.assertEquals(Collections.emptyList(), (Object)subtopology2.repartitionSourceTopics());
        Assertions.assertEquals((int)1, (int)subtopology2.stateChangelogTopics().size());
        Assertions.assertEquals((Object)CHANGELOG_TOPIC_4, (Object)((StreamsGroupHeartbeatRequestData.TopicInfo)subtopology2.stateChangelogTopics().get(0)).name());
        Assertions.assertEquals((int)0, (int)((StreamsGroupHeartbeatRequestData.TopicInfo)subtopology2.stateChangelogTopics().get(0)).partitions());
        Assertions.assertEquals((int)1, (int)((StreamsGroupHeartbeatRequestData.TopicInfo)subtopology2.stateChangelogTopics().get(0)).replicationFactor());
        Assertions.assertEquals((int)0, (int)((StreamsGroupHeartbeatRequestData.TopicInfo)subtopology2.stateChangelogTopics().get(0)).topicConfigs().size());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertNull((Object)nonJoiningRequestData.topology());
    }

    private <V> boolean isSorted(List<V> collection, Comparator<V> comparator) {
        for (int i = 1; i < collection.size(); ++i) {
            if (comparator.compare(collection.get(i - 1), collection.get(i)) <= 0) continue;
            return false;
        }
        return true;
    }

    @ParameterizedTest
    @MethodSource(value={"provideNonJoiningStates"})
    public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(MemberState memberState) {
        int rebalanceTimeoutMs = 1234;
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1234);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
        Assertions.assertEquals((int)1234, (int)requestData1.rebalanceTimeoutMs());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertEquals((int)-1, (int)nonJoiningRequestData.rebalanceTimeoutMs());
    }

    @ParameterizedTest
    @MethodSource(value={"provideNonJoiningStates"})
    public void testBuildingHeartbeatProcessIdSentWhenJoining(MemberState memberState) {
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1234);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        StreamsGroupHeartbeatRequestData requestData1 = heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)PROCESS_ID.toString(), (Object)requestData1.processId());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertNull((Object)nonJoiningRequestData.processId());
    }

    @ParameterizedTest
    @MethodSource(value={"provideNonJoiningStates"})
    public void testBuildingHeartbeatEndpointSentWhenJoining(MemberState memberState) {
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1234);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)ENDPOINT.host(), (Object)joiningRequestData.userEndpoint().host());
        Assertions.assertEquals((int)ENDPOINT.port(), (int)joiningRequestData.userEndpoint().port());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertNull((Object)nonJoiningRequestData.userEndpoint());
    }

    @ParameterizedTest
    @MethodSource(value={"provideNonJoiningStates"})
    public void testBuildingHeartbeatClientTagsSentWhenJoining(MemberState memberState) {
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1234);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)CLIENT_TAG_1, (Object)((StreamsGroupHeartbeatRequestData.KeyValue)joiningRequestData.clientTags().get(0)).key());
        Assertions.assertEquals((Object)VALUE_1, (Object)((StreamsGroupHeartbeatRequestData.KeyValue)joiningRequestData.clientTags().get(0)).value());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        StreamsGroupHeartbeatRequestData nonJoiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertNull((Object)nonJoiningRequestData.clientTags());
    }

    @ParameterizedTest
    @MethodSource(value={"provideNonJoiningStates"})
    public void testBuildingHeartbeatAssignmentSentWhenChanged(MemberState memberState) {
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1234);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.JOINING);
        StreamsGroupHeartbeatRequestData joiningRequestData = heartbeatState.buildRequestData();
        Assertions.assertEquals(List.of(), (Object)joiningRequestData.activeTasks());
        Assertions.assertEquals(List.of(), (Object)joiningRequestData.standbyTasks());
        Assertions.assertEquals(List.of(), (Object)joiningRequestData.warmupTasks());
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        this.streamsRebalanceData.setReconciledAssignment(new StreamsRebalanceData.Assignment(Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)), Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)), Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5))));
        StreamsGroupHeartbeatRequestData firstNonJoiningRequestData = heartbeatState.buildRequestData();
        StreamsGroupHeartbeatRequestManagerTest.assertTaskIdsEquals(List.of(new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(SUBTOPOLOGY_NAME_1).setPartitions(List.of(Integer.valueOf(0), Integer.valueOf(1))), new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(SUBTOPOLOGY_NAME_2).setPartitions(List.of(Integer.valueOf(2)))), firstNonJoiningRequestData.activeTasks());
        StreamsGroupHeartbeatRequestManagerTest.assertTaskIdsEquals(List.of(new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(SUBTOPOLOGY_NAME_1).setPartitions(List.of(Integer.valueOf(2)))), firstNonJoiningRequestData.standbyTasks());
        StreamsGroupHeartbeatRequestManagerTest.assertTaskIdsEquals(List.of(new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(SUBTOPOLOGY_NAME_1).setPartitions(List.of(Integer.valueOf(3), Integer.valueOf(4), Integer.valueOf(5)))), firstNonJoiningRequestData.warmupTasks());
        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithoutChanges = heartbeatState.buildRequestData();
        Assertions.assertNull((Object)nonJoiningRequestDataWithoutChanges.activeTasks());
        Assertions.assertNull((Object)nonJoiningRequestDataWithoutChanges.standbyTasks());
        Assertions.assertNull((Object)nonJoiningRequestDataWithoutChanges.warmupTasks());
        this.streamsRebalanceData.setReconciledAssignment(new StreamsRebalanceData.Assignment(Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0)), Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)), Set.of()));
        StreamsGroupHeartbeatRequestData nonJoiningRequestDataWithChanges = heartbeatState.buildRequestData();
        StreamsGroupHeartbeatRequestManagerTest.assertTaskIdsEquals(List.of(new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(SUBTOPOLOGY_NAME_1).setPartitions(List.of(Integer.valueOf(0)))), nonJoiningRequestDataWithChanges.activeTasks());
        StreamsGroupHeartbeatRequestManagerTest.assertTaskIdsEquals(List.of(new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(SUBTOPOLOGY_NAME_1).setPartitions(List.of(Integer.valueOf(2)))), nonJoiningRequestDataWithChanges.standbyTasks());
        Assertions.assertEquals(List.of(), (Object)nonJoiningRequestDataWithChanges.warmupTasks());
    }

    @ParameterizedTest
    @MethodSource(value={"provideNonJoiningStates"})
    public void testResettingHeartbeatState(MemberState memberState) {
        Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)GROUP_ID);
        Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)MEMBER_ID);
        Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)1);
        Mockito.when((Object)this.membershipManager.groupInstanceId()).thenReturn(Optional.of(INSTANCE_ID));
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1234);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        this.streamsRebalanceData.setReconciledAssignment(new StreamsRebalanceData.Assignment(Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 1), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_2, 2)), Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 2)), Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 3), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 4), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_NAME_1, 5))));
        StreamsGroupHeartbeatRequestData requestDataBeforeReset = heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)GROUP_ID, (Object)requestDataBeforeReset.groupId());
        Assertions.assertEquals((Object)MEMBER_ID, (Object)requestDataBeforeReset.memberId());
        Assertions.assertEquals((int)1, (int)requestDataBeforeReset.memberEpoch());
        Assertions.assertEquals((Object)INSTANCE_ID, (Object)requestDataBeforeReset.instanceId());
        Assertions.assertFalse((boolean)requestDataBeforeReset.activeTasks().isEmpty());
        Assertions.assertFalse((boolean)requestDataBeforeReset.standbyTasks().isEmpty());
        Assertions.assertFalse((boolean)requestDataBeforeReset.warmupTasks().isEmpty());
        heartbeatState.reset();
        StreamsGroupHeartbeatRequestData requestDataAfterReset = heartbeatState.buildRequestData();
        Assertions.assertEquals((Object)GROUP_ID, (Object)requestDataAfterReset.groupId());
        Assertions.assertEquals((Object)MEMBER_ID, (Object)requestDataAfterReset.memberId());
        Assertions.assertEquals((int)1, (int)requestDataAfterReset.memberEpoch());
        Assertions.assertEquals((Object)INSTANCE_ID, (Object)requestDataAfterReset.instanceId());
        Assertions.assertEquals((Object)requestDataBeforeReset.activeTasks(), (Object)requestDataAfterReset.activeTasks());
        Assertions.assertEquals((Object)requestDataBeforeReset.standbyTasks(), (Object)requestDataAfterReset.standbyTasks());
        Assertions.assertEquals((Object)requestDataBeforeReset.warmupTasks(), (Object)requestDataAfterReset.warmupTasks());
    }

    private static Stream<Arguments> provideNonJoiningStates() {
        return Stream.of(Arguments.of((Object[])new Object[]{MemberState.ACKNOWLEDGING}), Arguments.of((Object[])new Object[]{MemberState.RECONCILING}), Arguments.of((Object[])new Object[]{MemberState.STABLE}), Arguments.of((Object[])new Object[]{MemberState.PREPARE_LEAVING}), Arguments.of((Object[])new Object[]{MemberState.LEAVING}));
    }

    @ParameterizedTest
    @EnumSource(value=MemberState.class, names={"JOINING", "ACKNOWLEDGING", "RECONCILING", "STABLE", "PREPARE_LEAVING", "LEAVING"})
    public void testBuildingHeartbeatShutdownRequested(MemberState memberState) {
        StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = new StreamsGroupHeartbeatRequestManager.HeartbeatState(this.streamsRebalanceData, this.membershipManager, 1234);
        Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)memberState);
        StreamsGroupHeartbeatRequestData requestDataWithoutShutdownRequest = heartbeatState.buildRequestData();
        Assertions.assertFalse((boolean)requestDataWithoutShutdownRequest.shutdownApplication());
        this.streamsRebalanceData.requestShutdown();
        StreamsGroupHeartbeatRequestData requestDataWithShutdownRequest = heartbeatState.buildRequestData();
        Assertions.assertTrue((boolean)requestDataWithShutdownRequest.shutdownApplication());
    }

    @Test
    public void testCoordinatorDisconnectFailureWhileSending() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            this.time.sleep(1234L);
            long completionTimeMs = this.time.milliseconds();
            DisconnectException disconnectException = DisconnectException.INSTANCE;
            networkRequest.handler().onFailure(completionTimeMs, (RuntimeException)disconnectException);
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).onFailedAttempt(completionTimeMs);
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ((CoordinatorRequestManager)Mockito.verify((Object)this.coordinatorRequestManager)).handleCoordinatorDisconnect((Throwable)disconnectException, completionTimeMs);
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onRetriableHeartbeatFailure();
        }
    }

    @Test
    public void testUnsupportedVersionFailureWhileSending() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            this.time.sleep(1234L);
            long completionTimeMs = this.time.milliseconds();
            UnsupportedVersionException unsupportedVersionException = new UnsupportedVersionException("message");
            networkRequest.handler().onFailure(completionTimeMs, (RuntimeException)((Object)unsupportedVersionException));
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).onFailedAttempt(completionTimeMs);
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
            ArgumentCaptor errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
            ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEvent.capture());
            Assertions.assertEquals((Object)"The cluster does not support the STREAMS group protocol or does not support the versions of the STREAMS group protocol used by this client (used versions: 0 to 0).", (Object)((ErrorEvent)errorEvent.getValue()).error().getMessage());
            Assertions.assertInstanceOf(UnsupportedVersionException.class, (Object)((ErrorEvent)errorEvent.getValue()).error());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFatal();
        }
    }

    @Test
    public void testFatalFailureWhileSending() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            this.time.sleep(1234L);
            long completionTimeMs = this.time.milliseconds();
            RuntimeException fatalException = new RuntimeException();
            networkRequest.handler().onFailure(completionTimeMs, fatalException);
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).onFailedAttempt(completionTimeMs);
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
            ArgumentCaptor errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
            ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEvent.capture());
            Assertions.assertEquals((Object)fatalException, (Object)((ErrorEvent)errorEvent.getValue()).error());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFatal();
        }
    }

    @ParameterizedTest
    @EnumSource(value=Errors.class, names={"NOT_COORDINATOR", "COORDINATOR_NOT_AVAILABLE"})
    public void testNotCoordinatorAndCoordinatorNotAvailableErrorResponse(Errors error) {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            this.time.sleep(1234L);
            long completionTimeMs = this.time.milliseconds();
            ClientResponse response = this.buildClientErrorResponse(error, "error message");
            networkRequest.handler().onComplete(response);
            ((CoordinatorRequestManager)Mockito.verify((Object)this.coordinatorRequestManager)).markCoordinatorUnknown(((StreamsGroupHeartbeatResponse)response.responseBody()).data().errorMessage(), completionTimeMs);
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).reset();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
        }
    }

    @Test
    public void testCoordinatorLoadInProgressErrorResponse() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            ClientResponse response = this.buildClientErrorResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS, "message");
            networkRequest.handler().onComplete(response);
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState, (VerificationMode)Mockito.never())).reset();
        }
    }

    @Test
    public void testGroupAuthorizationFailedErrorResponse() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
             LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)GROUP_ID);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            ClientResponse response = this.buildClientErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED, "message");
            networkRequest.handler().onComplete(response);
            Assertions.assertTrue((boolean)logAppender.getMessages("ERROR").stream().anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to group authorization failure: Not authorized to access group: group-id")));
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ArgumentCaptor errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
            ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEvent.capture());
            Assertions.assertEquals((Object)GroupAuthorizationException.forGroupId((String)GROUP_ID).getMessage(), (Object)((ErrorEvent)errorEvent.getValue()).error().getMessage());
            Assertions.assertInstanceOf(GroupAuthorizationException.class, (Object)((ErrorEvent)errorEvent.getValue()).error());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFatal();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
        }
    }

    @Test
    public void testTopicAuthorizationFailedErrorResponse() {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
             LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            Mockito.when((Object)this.membershipManager.state()).thenReturn((Object)MemberState.STABLE);
            Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)MEMBER_ID);
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            String errorMessage = "message";
            ClientResponse response = this.buildClientErrorResponse(Errors.TOPIC_AUTHORIZATION_FAILED, "message");
            networkRequest.handler().onComplete(response);
            Assertions.assertTrue((boolean)logAppender.getMessages("ERROR").stream().anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed for member member-id with state " + String.valueOf(MemberState.STABLE) + " due to " + String.valueOf(Errors.TOPIC_AUTHORIZATION_FAILED) + ": message")));
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ArgumentCaptor errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
            ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEvent.capture());
            Assertions.assertEquals((Object)Errors.TOPIC_AUTHORIZATION_FAILED.message(), (Object)((ErrorEvent)errorEvent.getValue()).error().getMessage());
            Assertions.assertInstanceOf(TopicAuthorizationException.class, (Object)((ErrorEvent)errorEvent.getValue()).error());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
        }
    }

    @ParameterizedTest
    @EnumSource(value=Errors.class, names={"INVALID_REQUEST", "GROUP_MAX_SIZE_REACHED", "UNSUPPORTED_VERSION", "STREAMS_INVALID_TOPOLOGY", "STREAMS_INVALID_TOPOLOGY_EPOCH", "STREAMS_TOPOLOGY_FENCED"})
    public void testKnownFatalErrorResponse(Errors error) {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
             LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            String errorMessageInResponse = "message";
            ClientResponse response = this.buildClientErrorResponse(error, "message");
            networkRequest.handler().onComplete(response);
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ArgumentCaptor errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
            ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEvent.capture());
            if (error == Errors.UNSUPPORTED_VERSION) {
                String errorMessage = "The cluster does not support the STREAMS group protocol or does not support the versions of the STREAMS group protocol used by this client (used versions: 0 to 0).";
                Assertions.assertTrue((boolean)logAppender.getMessages("ERROR").stream().anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to " + String.valueOf(error) + ": The cluster does not support the STREAMS group protocol or does not support the versions of the STREAMS group protocol used by this client (used versions: 0 to 0).")));
                Assertions.assertEquals((Object)"The cluster does not support the STREAMS group protocol or does not support the versions of the STREAMS group protocol used by this client (used versions: 0 to 0).", (Object)((ErrorEvent)errorEvent.getValue()).error().getMessage());
            } else {
                Assertions.assertTrue((boolean)logAppender.getMessages("ERROR").stream().anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to " + String.valueOf(error) + ": message")));
                Assertions.assertEquals((Object)"message", (Object)((ErrorEvent)errorEvent.getValue()).error().getMessage());
            }
            Assertions.assertInstanceOf(error.exception().getClass(), (Object)((ErrorEvent)errorEvent.getValue()).error());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFatal();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
        }
    }

    @ParameterizedTest
    @EnumSource(value=Errors.class, names={"FENCED_MEMBER_EPOCH", "UNKNOWN_MEMBER_ID"})
    public void testFencedMemberOrUnknownMemberIdErrorResponse(Errors error) {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            HeartbeatRequestState heartbeatRequestState = (HeartbeatRequestState)heartbeatRequestStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            String errorMessage = "message";
            ClientResponse response = this.buildClientErrorResponse(error, "message");
            networkRequest.handler().onComplete(response);
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ((HeartbeatRequestState)Mockito.verify((Object)heartbeatRequestState)).reset();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFenced();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
        }
    }

    @ParameterizedTest
    @MethodSource(value={"provideOtherErrors"})
    public void testOtherErrorResponse(Errors error) {
        try (MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.canSendRequest(this.time.milliseconds())).thenReturn((Object)true));
             MockedConstruction heartbeatStateMockedConstruction = Mockito.mockConstruction(StreamsGroupHeartbeatRequestManager.HeartbeatState.class);
             LogCaptureAppender logAppender = LogCaptureAppender.createAndRegister(StreamsGroupHeartbeatRequestManager.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            StreamsGroupHeartbeatRequestManager.HeartbeatState heartbeatState = (StreamsGroupHeartbeatRequestManager.HeartbeatState)heartbeatStateMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(this.coordinatorNode));
            NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(this.time.milliseconds());
            Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
            NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
            String errorMessage = "message";
            ClientResponse response = this.buildClientErrorResponse(error, "message");
            networkRequest.handler().onComplete(response);
            Assertions.assertTrue((boolean)logAppender.getMessages("ERROR").stream().anyMatch(m -> m.contains("StreamsGroupHeartbeatRequest failed due to unexpected error")));
            ((StreamsGroupHeartbeatRequestManager.HeartbeatState)Mockito.verify((Object)heartbeatState)).reset();
            ArgumentCaptor errorEvent = ArgumentCaptor.forClass(ErrorEvent.class);
            ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)errorEvent.capture());
            Assertions.assertEquals((Object)"message", (Object)((ErrorEvent)errorEvent.getValue()).error().getMessage());
            Assertions.assertInstanceOf(error.exception().getClass(), (Object)((ErrorEvent)errorEvent.getValue()).error());
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).transitionToFatal();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).onFatalHeartbeatFailure();
        }
    }

    private static Stream<Arguments> provideOtherErrors() {
        Set<Errors> consideredErrors = Set.of(Errors.NONE, Errors.NOT_COORDINATOR, Errors.COORDINATOR_NOT_AVAILABLE, Errors.COORDINATOR_LOAD_IN_PROGRESS, Errors.GROUP_AUTHORIZATION_FAILED, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.INVALID_REQUEST, Errors.GROUP_MAX_SIZE_REACHED, Errors.FENCED_MEMBER_EPOCH, Errors.UNKNOWN_MEMBER_ID, Errors.UNSUPPORTED_VERSION, Errors.STREAMS_INVALID_TOPOLOGY, Errors.STREAMS_INVALID_TOPOLOGY_EPOCH, Errors.STREAMS_TOPOLOGY_FENCED);
        return Arrays.stream(Errors.values()).filter(error -> !consideredErrors.contains(error)).map(xva$0 -> Arguments.of((Object[])new Object[]{xva$0}));
    }

    @Test
    public void testPollOnCloseWhenIsNotLeaving() {
        StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
        NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(this.time.milliseconds());
        Assertions.assertEquals((Object)NetworkClientDelegate.PollResult.EMPTY, (Object)result);
    }

    @Test
    public void testPollOnCloseWhenIsLeaving() {
        StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
        Mockito.when((Object)this.membershipManager.isLeavingGroup()).thenReturn((Object)true);
        Mockito.when((Object)this.membershipManager.groupId()).thenReturn((Object)GROUP_ID);
        Mockito.when((Object)this.membershipManager.memberId()).thenReturn((Object)MEMBER_ID);
        Mockito.when((Object)this.membershipManager.memberEpoch()).thenReturn((Object)-1);
        NetworkClientDelegate.PollResult result = heartbeatRequestManager.pollOnClose(this.time.milliseconds());
        Assertions.assertEquals((int)1, (int)result.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest networkRequest = (NetworkClientDelegate.UnsentRequest)result.unsentRequests.get(0);
        StreamsGroupHeartbeatRequest streamsRequest = (StreamsGroupHeartbeatRequest)networkRequest.requestBuilder().build();
        Assertions.assertEquals((Object)GROUP_ID, (Object)streamsRequest.data().groupId());
        Assertions.assertEquals((Object)MEMBER_ID, (Object)streamsRequest.data().memberId());
        Assertions.assertEquals((int)-1, (int)streamsRequest.data().memberEpoch());
    }

    @Test
    public void testMaximumTimeToWaitPollTimerExpired() {
        try (MockedConstruction timerMockedConstruction = Mockito.mockConstruction(Timer.class, (mock, context) -> Mockito.when((Object)mock.isExpired()).thenReturn((Object)true));
             MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.requestInFlight()).thenReturn((Object)false));){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)timerMockedConstruction.constructed().get(0);
            this.time.sleep(1234L);
            long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds());
            Assertions.assertEquals((long)0L, (long)maximumTimeToWait);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @Test
    public void testMaximumTimeToWaitWhenHeartbeatShouldBeSentImmediately() {
        try (MockedConstruction timerMockedConstruction = Mockito.mockConstruction(Timer.class);
             MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.requestInFlight()).thenReturn((Object)false));){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)timerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn((Object)true);
            this.time.sleep(1234L);
            long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds());
            Assertions.assertEquals((long)0L, (long)maximumTimeToWait);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true, false", "false, false", "true, true"})
    public void testMaximumTimeToWaitWhenHeartbeatShouldBeNotSentImmediately(boolean isRequestInFlight, boolean shouldNotWaitForHeartbeatInterval) {
        long remainingMs = 12L;
        long timeToNextHeartbeatMs = 6L;
        try (MockedConstruction timerMockedConstruction = Mockito.mockConstruction(Timer.class, (mock, context) -> Mockito.when((Object)mock.remainingMs()).thenReturn((Object)12L));
             MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> {
            Mockito.when((Object)mock.requestInFlight()).thenReturn((Object)isRequestInFlight);
            Mockito.when((Object)mock.timeToNextHeartbeatMs(ArgumentMatchers.anyLong())).thenReturn((Object)6L);
        });){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)timerMockedConstruction.constructed().get(0);
            Mockito.when((Object)this.membershipManager.shouldNotWaitForHeartbeatInterval()).thenReturn((Object)shouldNotWaitForHeartbeatInterval);
            this.time.sleep(1234L);
            long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds());
            Assertions.assertEquals((long)6L, (long)maximumTimeToWait);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @ParameterizedTest
    @CsvSource(value={"12, 5", "10, 6"})
    public void testMaximumTimeToWaitSelectingMinimumWaitTime(long remainingMs, long timeToNextHeartbeatMs) {
        try (MockedConstruction timerMockedConstruction = Mockito.mockConstruction(Timer.class, (mock, context) -> Mockito.when((Object)mock.remainingMs()).thenReturn((Object)remainingMs));
             MockedConstruction heartbeatRequestStateMockedConstruction = Mockito.mockConstruction(HeartbeatRequestState.class, (mock, context) -> Mockito.when((Object)mock.timeToNextHeartbeatMs(ArgumentMatchers.anyLong())).thenReturn((Object)timeToNextHeartbeatMs));){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)timerMockedConstruction.constructed().get(0);
            this.time.sleep(1234L);
            long maximumTimeToWait = heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds());
            Assertions.assertEquals((long)5L, (long)maximumTimeToWait);
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
        }
    }

    @Test
    public void testResetPollTimer() {
        try (MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(1);
            heartbeatRequestManager.resetPollTimer(this.time.milliseconds());
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
            ((Timer)Mockito.verify((Object)pollTimer)).isExpired();
            ((Timer)Mockito.verify((Object)pollTimer)).reset(10000L);
        }
    }

    @Test
    public void testResetPollTimerWhenExpired() {
        try (MockedConstruction pollTimerMockedConstruction = Mockito.mockConstruction(Timer.class);){
            StreamsGroupHeartbeatRequestManager heartbeatRequestManager = this.createStreamsGroupHeartbeatRequestManager();
            Timer pollTimer = (Timer)pollTimerMockedConstruction.constructed().get(1);
            Mockito.when((Object)pollTimer.isExpired()).thenReturn((Object)true);
            heartbeatRequestManager.resetPollTimer(this.time.milliseconds());
            ((Timer)Mockito.verify((Object)pollTimer)).update(this.time.milliseconds());
            ((Timer)Mockito.verify((Object)pollTimer)).isExpired();
            ((Timer)Mockito.verify((Object)pollTimer)).isExpiredBy();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).memberId();
            ((StreamsMembershipManager)Mockito.verify((Object)this.membershipManager)).maybeRejoinStaleMember();
            ((Timer)Mockito.verify((Object)pollTimer)).reset(10000L);
        }
    }

    private static ConsumerConfig config() {
        Properties prop = new Properties();
        prop.put("key.deserializer", StringDeserializer.class);
        prop.put("value.deserializer", StringDeserializer.class);
        prop.setProperty("max.poll.interval.ms", String.valueOf(10000));
        return new ConsumerConfig(prop);
    }

    private StreamsGroupHeartbeatRequestManager createStreamsGroupHeartbeatRequestManager() {
        return new StreamsGroupHeartbeatRequestManager(LOG_CONTEXT, this.time, this.config, this.coordinatorRequestManager, this.membershipManager, this.backgroundEventHandler, this.metrics, this.streamsRebalanceData);
    }

    private ClientResponse buildClientResponse() {
        return new ClientResponse(new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, 1, "", 1), null, "-1", this.time.milliseconds(), this.time.milliseconds(), false, null, null, (AbstractResponse)new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData().setPartitionsByUserEndpoint(ENDPOINT_TO_PARTITIONS).setHeartbeatIntervalMs(1200)));
    }

    private ClientResponse buildClientErrorResponse(Errors error, String errorMessage) {
        return new ClientResponse(new RequestHeader(ApiKeys.STREAMS_GROUP_HEARTBEAT, 1, "", 1), null, "-1", this.time.milliseconds(), this.time.milliseconds(), false, null, null, (AbstractResponse)new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData().setErrorCode(error.code()).setErrorMessage(errorMessage)));
    }

    private static void assertTaskIdsEquals(List<StreamsGroupHeartbeatRequestData.TaskIds> expected, List<StreamsGroupHeartbeatRequestData.TaskIds> actual) {
        List sortedExpected = expected.stream().map(taskIds -> new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(taskIds.subtopologyId()).setPartitions(taskIds.partitions().stream().sorted().collect(Collectors.toList()))).sorted(Comparator.comparing(StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId)).collect(Collectors.toList());
        List sortedActual = actual.stream().map(taskIds -> new StreamsGroupHeartbeatRequestData.TaskIds().setSubtopologyId(taskIds.subtopologyId()).setPartitions(taskIds.partitions().stream().sorted().collect(Collectors.toList()))).sorted(Comparator.comparing(StreamsGroupHeartbeatRequestData.TaskIds::subtopologyId)).collect(Collectors.toList());
        Assertions.assertEquals(sortedExpected, sortedActual);
    }
}

