/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.share.persister;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.PersisterStateManager;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class PersisterStateManagerTest {
    private static final KafkaClient CLIENT = (KafkaClient)Mockito.mock(KafkaClient.class);
    private static final Time MOCK_TIME = new MockTime();
    private static final Timer MOCK_TIMER = new MockTimer((MockTime)MOCK_TIME);
    private static final ShareCoordinatorMetadataCacheHelper CACHE_HELPER = (ShareCoordinatorMetadataCacheHelper)Mockito.mock(ShareCoordinatorMetadataCacheHelper.class);
    private static final int MAX_RPC_RETRY_ATTEMPTS = 5;
    public static final long REQUEST_BACKOFF_MS = 100L;
    public static final long REQUEST_BACKOFF_MAX_MS = 3000L;
    private static final String HOST = "localhost";
    private static final int PORT = 9092;
    private static Timer mockTimer;

    PersisterStateManagerTest() {
    }

    private ShareCoordinatorMetadataCacheHelper getDefaultCacheHelper(final Node suppliedNode) {
        return new ShareCoordinatorMetadataCacheHelper(){

            public boolean containsTopic(String topic) {
                return false;
            }

            public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
                return Node.noNode();
            }

            public List<Node> getClusterNodes() {
                return Collections.singletonList(suppliedNode);
            }
        };
    }

    private ShareCoordinatorMetadataCacheHelper getCoordinatorCacheHelper(final Node coordinatorNode) {
        return new ShareCoordinatorMetadataCacheHelper(){

            public boolean containsTopic(String topic) {
                return true;
            }

            public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
                return coordinatorNode;
            }

            public List<Node> getClusterNodes() {
                return Collections.emptyList();
            }
        };
    }

    @BeforeEach
    public void setUp() {
        mockTimer = new SystemTimerReaper("persisterStateManagerTestTimer", (Timer)new SystemTimer("persisterStateManagerTestTimer"));
    }

    @AfterEach
    public void tearDown() throws Exception {
        Utils.closeQuietly((AutoCloseable)mockTimer, (String)"persisterStateManagerTestTimer");
    }

    @Test
    public void testFindCoordinatorFatalError() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setKey(coordinatorKey).setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), suppliedNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        TestStateHandler handler = (TestStateHandler)((Object)Mockito.spy((Object)((Object)new TestStateHandler(stateManager, groupId, topicId, partition, future, 100L, 3000L, 5){

            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        })));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        TestStateHandler.TestHandlerResponse result = null;
        try {
            result = handler.result().get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        Assertions.assertEquals((short)Errors.UNKNOWN_SERVER_ERROR.code(), (short)((WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler)((Object)Mockito.verify((Object)((Object)handler), (VerificationMode)Mockito.times((int)1)))).findShareCoordinatorBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testFindCoordinatorAttemptsExhausted() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setKey(coordinatorKey).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setKey(coordinatorKey).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setHost(Node.noNode().host()).setNodeId(Node.noNode().id()).setPort(Node.noNode().port())))), suppliedNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        int maxAttempts = 2;
        TestStateHandler handler = (TestStateHandler)((Object)Mockito.spy((Object)((Object)new TestStateHandler(stateManager, groupId, topicId, partition, future, 100L, 3000L, maxAttempts){

            protected AbstractRequest.Builder<? extends AbstractRequest> requestBuilder() {
                return null;
            }
        })));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        TestStateHandler.TestHandlerResponse result = null;
        try {
            result = handler.result.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        Assertions.assertEquals((short)Errors.COORDINATOR_NOT_AVAILABLE.code(), (short)((WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0)).errorCode());
        ((TestStateHandler)((Object)Mockito.verify((Object)((Object)handler), (VerificationMode)Mockito.times((int)2)))).findShareCoordinatorBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testFindCoordinatorSuccess() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = Arrays.asList(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws InterruptedException, ExecutionException {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = Arrays.asList(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        TestUtils.waitForCondition(resultFuture::isDone, (long)15000L, (long)10L, () -> "Failed to get result from future");
        WriteShareGroupStateResponse result = (WriteShareGroupStateResponse)resultFuture.get();
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestCoordinatorFoundOnRetry() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = Arrays.asList(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestWithCoordinatorNodeLookup() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = Arrays.asList(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestWithRetryAndCoordinatorNodeLookup() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = Arrays.asList(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestFailedMaxRetriesExhausted() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = Arrays.asList(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("")))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 2));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        WriteShareGroupStateResponse result = null;
        try {
            result = (WriteShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        WriteShareGroupStateResponseData.PartitionResult partitionResult = (WriteShareGroupStateResponseData.PartitionResult)((WriteShareGroupStateResponseData.WriteStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.WriteStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testWriteStateRequestBatchingWithCoordinatorNodeLookup() throws ExecutionException, Exception {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        List<PersisterStateBatch> stateBatches = Arrays.asList(new PersisterStateBatch(0L, 9L, 0, 1), new PersisterStateBatch(10L, 19L, 1, 1));
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new WriteShareGroupStateResponse(new WriteShareGroupStateResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("")))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        AtomicBoolean isBatchingSuccess = new AtomicBoolean(false);
        stateManager.setGenerateCallback(() -> {
            Map handlersPerType = (Map)stateManager.nodeRPCMap().get(coordinatorNode);
            if (handlersPerType != null && handlersPerType.containsKey(PersisterStateManager.RPCType.WRITE) && ((Map)handlersPerType.get(PersisterStateManager.RPCType.WRITE)).containsKey(groupId) && ((List)((Map)handlersPerType.get(PersisterStateManager.RPCType.WRITE)).get(groupId)).size() > 2) {
                isBatchingSuccess.set(true);
            }
        });
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        ArrayList<PersisterStateManager.WriteStateHandler> handlers = new ArrayList<PersisterStateManager.WriteStateHandler>();
        for (int i = 0; i < 5; ++i) {
            PersisterStateManager persisterStateManager = stateManager;
            Objects.requireNonNull(persisterStateManager);
            PersisterStateManager.WriteStateHandler handler = (PersisterStateManager.WriteStateHandler)Mockito.spy((Object)new PersisterStateManager.WriteStateHandler(persisterStateManager, groupId, topicId, partition, 0, 0, 0L, stateBatches, future, 100L, 3000L, 5));
            handlers.add(handler);
            stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        }
        CompletableFuture.allOf((CompletableFuture[])handlers.stream().map(PersisterStateManager.WriteStateHandler::result).toArray(CompletableFuture[]::new)).get();
        TestUtils.waitForCondition(isBatchingSuccess::get, (long)15000L, (long)10L, () -> "unable to verify batching");
    }

    @Test
    public void testReadStateRequestCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestIllegalStateCoordinatorFoundSuccessfully() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(Uuid.randomUuid()).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(500).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((short)Errors.UNKNOWN_SERVER_ERROR.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestRetryWithNotCoordinatorSuccessfulOnRetry() throws ExecutionException, InterruptedException {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        TestUtils.waitForCondition(resultFuture::isDone, (long)15000L, (long)10L, () -> "Failed to get result from future");
        ReadShareGroupStateResponse result = (ReadShareGroupStateResponse)resultFuture.get();
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestCoordinatorFoundOnRetry() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode = new Node(1, HOST, 9092);
        String coordinatorKey = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId, (int)partition);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setErrorCode(Errors.NOT_COORDINATOR.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(1).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestWithCoordinatorNodeLookup() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)1))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestRetryWithCoordinatorNodeLookup() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.NONE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 5, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)partitionResult.errorCode());
        Assertions.assertEquals((int)1, (int)partitionResult.stateEpoch());
        Assertions.assertEquals((long)0L, (long)partitionResult.startOffset());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testReadStateRequestFailureMaxRetriesExhausted() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 10;
        Node coordinatorNode = new Node(1, HOST, 9092);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId && requestPartition == partition;
        }, (AbstractResponse)new ReadShareGroupStateResponse(new ReadShareGroupStateResponseData().setResults(Collections.singletonList(new ReadShareGroupStateResponseData.ReadStateResult().setTopicId(topicId).setPartitions(Collections.singletonList(new ReadShareGroupStateResponseData.PartitionResult().setPartition(partition).setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()).setErrorMessage("").setStateEpoch(1).setStartOffset(0L).setStateBatches(Collections.emptyList())))))), coordinatorNode);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getCoordinatorCacheHelper(coordinatorNode);
        PersisterStateManager stateManager = PersisterStateManagerBuilder.builder().withKafkaClient((KafkaClient)client).withTimer(mockTimer).withCacheHelper(cacheHelper).build();
        stateManager.start();
        CompletableFuture future = new CompletableFuture();
        PersisterStateManager persisterStateManager = stateManager;
        Objects.requireNonNull(persisterStateManager);
        PersisterStateManager.ReadStateHandler handler = (PersisterStateManager.ReadStateHandler)Mockito.spy((Object)new PersisterStateManager.ReadStateHandler(persisterStateManager, groupId, topicId, partition, 0, future, 100L, 3000L, 2, null));
        stateManager.enqueue((PersisterStateManager.PersisterStateManagerHandler)handler);
        CompletableFuture resultFuture = handler.result();
        ReadShareGroupStateResponse result = null;
        try {
            result = (ReadShareGroupStateResponse)resultFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to get result from future", (Throwable)e);
        }
        ReadShareGroupStateResponseData.PartitionResult partitionResult = (ReadShareGroupStateResponseData.PartitionResult)((ReadShareGroupStateResponseData.ReadStateResult)result.data().results().get(0)).partitions().get(0);
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).findShareCoordinatorBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)0))).requestBuilder();
        ((PersisterStateManager.ReadStateHandler)Mockito.verify((Object)handler, (VerificationMode)Mockito.times((int)2))).onComplete((ClientResponse)ArgumentMatchers.any());
        Assertions.assertEquals((Object)coordinatorNode, (Object)handler.getCoordinatorNode());
        Assertions.assertEquals((int)partition, (int)partitionResult.partition());
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)partitionResult.errorCode());
        try {
            stateManager.stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Failed to stop state manager", (Throwable)e);
        }
    }

    @Test
    public void testPersisterStateManagerClose() {
        KafkaClient client = (KafkaClient)Mockito.mock(KafkaClient.class);
        Timer timer = (Timer)Mockito.mock(Timer.class);
        PersisterStateManager psm = PersisterStateManagerBuilder.builder().withTimer(timer).withKafkaClient(client).build();
        try {
            ((KafkaClient)Mockito.verify((Object)client, (VerificationMode)Mockito.times((int)0))).close();
            ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)0))).close();
            psm.start();
            psm.stop();
            ((KafkaClient)Mockito.verify((Object)client, (VerificationMode)Mockito.times((int)1))).close();
            ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)1))).close();
        }
        catch (Exception e) {
            Assertions.fail((String)"unexpected exception", (Throwable)e);
        }
    }

    private static class PersisterStateManagerBuilder {
        private KafkaClient client = CLIENT;
        private Time time = MOCK_TIME;
        private Timer timer = MOCK_TIMER;
        private ShareCoordinatorMetadataCacheHelper cacheHelper = CACHE_HELPER;

        private PersisterStateManagerBuilder() {
        }

        private PersisterStateManagerBuilder withKafkaClient(KafkaClient client) {
            this.client = client;
            return this;
        }

        private PersisterStateManagerBuilder withCacheHelper(ShareCoordinatorMetadataCacheHelper cacheHelper) {
            this.cacheHelper = cacheHelper;
            return this;
        }

        private PersisterStateManagerBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private PersisterStateManagerBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public static PersisterStateManagerBuilder builder() {
            return new PersisterStateManagerBuilder();
        }

        public PersisterStateManager build() {
            return new PersisterStateManager(this.client, this.cacheHelper, this.time, this.timer);
        }
    }

    private abstract class TestStateHandler
    extends PersisterStateManager.PersisterStateManagerHandler {
        private final CompletableFuture<TestHandlerResponse> result;

        TestStateHandler(PersisterStateManager stateManager, String groupId, Uuid topicId, int partition, CompletableFuture<TestHandlerResponse> result, long backoffMs, long backoffMaxMs, int maxFindCoordAttempts) {
            PersisterStateManager persisterStateManager = stateManager;
            Objects.requireNonNull(persisterStateManager);
            super(persisterStateManager, groupId, topicId, partition, backoffMs, backoffMaxMs, maxFindCoordAttempts);
            this.result = result;
        }

        protected void handleRequestResponse(ClientResponse response) {
            this.result.complete(new TestHandlerResponse(new TestHandlerResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(this.partitionKey().partition()).setErrorMessage(Errors.NONE.message()).setErrorCode(Errors.NONE.code())))))));
        }

        protected boolean isResponseForRequest(ClientResponse response) {
            return true;
        }

        protected void findCoordinatorErrorResponse(Errors error, Exception exception) {
            this.result.complete(new TestHandlerResponse(new TestHandlerResponseData().setResults(Collections.singletonList(new WriteShareGroupStateResponseData.WriteStateResult().setTopicId(this.partitionKey().topicId()).setPartitions(Collections.singletonList(new WriteShareGroupStateResponseData.PartitionResult().setPartition(this.partitionKey().partition()).setErrorMessage(exception == null ? error.message() : exception.getMessage()).setErrorCode(error.code())))))));
        }

        protected String name() {
            return "TestStateHandler";
        }

        protected boolean isBatchable() {
            return false;
        }

        protected PersisterStateManager.RPCType rpcType() {
            return PersisterStateManager.RPCType.UNKNOWN;
        }

        protected CompletableFuture<TestHandlerResponse> result() {
            return this.result;
        }

        private class TestHandlerResponse
        extends WriteShareGroupStateResponse {
            public TestHandlerResponse(WriteShareGroupStateResponseData data) {
                super(data);
            }
        }

        private class TestHandlerResponseData
        extends WriteShareGroupStateResponseData {
            private TestHandlerResponseData() {
            }
        }
    }
}

