/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.share.persister;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FindCoordinatorResponseData;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.ReadShareGroupStateRequest;
import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.WriteShareGroupStateRequest;
import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.persister.DefaultStatePersister;
import org.apache.kafka.server.share.persister.GroupTopicPartitionData;
import org.apache.kafka.server.share.persister.PartitionData;
import org.apache.kafka.server.share.persister.PartitionFactory;
import org.apache.kafka.server.share.persister.PersisterStateBatch;
import org.apache.kafka.server.share.persister.PersisterStateManager;
import org.apache.kafka.server.share.persister.ReadShareGroupStateParameters;
import org.apache.kafka.server.share.persister.ReadShareGroupStateResult;
import org.apache.kafka.server.share.persister.ShareCoordinatorMetadataCacheHelper;
import org.apache.kafka.server.share.persister.TopicData;
import org.apache.kafka.server.share.persister.WriteShareGroupStateParameters;
import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class DefaultStatePersisterTest {
    private static final KafkaClient CLIENT = (KafkaClient)Mockito.mock(KafkaClient.class);
    private static final Time MOCK_TIME = new MockTime();
    private static final Timer MOCK_TIMER = new MockTimer();
    private static final ShareCoordinatorMetadataCacheHelper CACHE_HELPER = (ShareCoordinatorMetadataCacheHelper)Mockito.mock(ShareCoordinatorMetadataCacheHelper.class);
    private static final String HOST = "localhost";
    private static final int PORT = 9092;

    DefaultStatePersisterTest() {
    }

    private ShareCoordinatorMetadataCacheHelper getDefaultCacheHelper(final Node suppliedNode) {
        return new ShareCoordinatorMetadataCacheHelper(){

            public boolean containsTopic(String topic) {
                return false;
            }

            public Node getShareCoordinator(SharePartitionKey key, String internalTopicName) {
                return Node.noNode();
            }

            public List<Node> getClusterNodes() {
                return Collections.singletonList(suppliedNode);
            }
        };
    }

    @Test
    public void testWriteStateValidate() {
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 0;
        int incorrectPartition = -1;
        DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        CompletableFuture result = defaultStatePersister.writeState(null);
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(null).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.emptyList()).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.singletonList(new TopicData(null, Collections.singletonList(PartitionFactory.newPartitionStateBatchData((int)partition, (int)1, (long)0L, (int)0, null))))).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.singletonList(new TopicData(topicId, Collections.emptyList()))).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.writeState(new WriteShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.singletonList(new TopicData(topicId, Collections.singletonList(PartitionFactory.newPartitionStateBatchData((int)incorrectPartition, (int)1, (long)0L, (int)0, null))))).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
    }

    @Test
    public void testReadStateValidate() {
        String groupId = "group1";
        Uuid topicId = Uuid.randomUuid();
        int partition = 0;
        int incorrectPartition = -1;
        DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        CompletableFuture result = defaultStatePersister.readState(null);
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(null).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(null).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.emptyList()).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.singletonList(new TopicData(null, Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData((int)partition, (int)1))))).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.singletonList(new TopicData(topicId, Collections.emptyList()))).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
        defaultStatePersister = DefaultStatePersisterBuilder.builder().build();
        result = defaultStatePersister.readState(new ReadShareGroupStateParameters.Builder().setGroupTopicPartitionData(new GroupTopicPartitionData.Builder().setGroupId(groupId).setTopicsData(Collections.singletonList(new TopicData(topicId, Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData((int)incorrectPartition, (int)1))))).build()).build());
        Assertions.assertTrue((boolean)result.isDone());
        Assertions.assertTrue((boolean)result.isCompletedExceptionally());
        TestUtils.assertFutureThrows((Future)result, IllegalArgumentException.class);
    }

    @Test
    public void testWriteStateSuccess() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId1 = Uuid.randomUuid();
        int partition1 = 10;
        Uuid topicId2 = Uuid.randomUuid();
        int partition2 = 8;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode1 = new Node(5, HOST, 9092);
        Node coordinatorNode2 = new Node(6, HOST, 9092);
        String coordinatorKey1 = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId1, (int)partition1);
        String coordinatorKey2 = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId2, (int)partition2);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey1), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(5).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey2), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(6).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1;
        }, (AbstractResponse)new WriteShareGroupStateResponse(WriteShareGroupStateResponse.toResponseData((Uuid)topicId1, (int)partition1)), coordinatorNode1);
        client.prepareResponseFrom(body -> {
            WriteShareGroupStateRequest request = (WriteShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((WriteShareGroupStateRequestData.PartitionData)((WriteShareGroupStateRequestData.WriteStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2;
        }, (AbstractResponse)new WriteShareGroupStateResponse(WriteShareGroupStateResponse.toResponseData((Uuid)topicId2, (int)partition2)), coordinatorNode2);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().withKafkaClient((KafkaClient)client).withCacheHelper(cacheHelper).build();
        WriteShareGroupStateParameters request = WriteShareGroupStateParameters.from((WriteShareGroupStateRequestData)new WriteShareGroupStateRequestData().setGroupId(groupId).setTopics(Arrays.asList(new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId1).setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData().setPartition(partition1).setStateEpoch(0).setLeaderEpoch(1).setStartOffset(0L).setStateBatches(Collections.singletonList(new WriteShareGroupStateRequestData.StateBatch().setFirstOffset(0L).setLastOffset(10L).setDeliveryCount((short)1).setDeliveryState((byte)0))))), new WriteShareGroupStateRequestData.WriteStateData().setTopicId(topicId2).setPartitions(Collections.singletonList(new WriteShareGroupStateRequestData.PartitionData().setPartition(partition2).setStateEpoch(0).setLeaderEpoch(1).setStartOffset(0L).setStateBatches(Arrays.asList(new WriteShareGroupStateRequestData.StateBatch().setFirstOffset(0L).setLastOffset(10L).setDeliveryCount((short)1).setDeliveryState((byte)0), new WriteShareGroupStateRequestData.StateBatch().setFirstOffset(11L).setLastOffset(20L).setDeliveryCount((short)1).setDeliveryState((byte)0))))))));
        CompletableFuture resultFuture = defaultStatePersister.writeState(request);
        WriteShareGroupStateResult result = null;
        try {
            result = (WriteShareGroupStateResult)resultFuture.get(10L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            Assertions.fail((String)"Unexpected exception", (Throwable)e);
        }
        HashSet resultMap = new HashSet();
        result.topicsData().forEach(topicData -> topicData.partitions().forEach(partitionData -> resultMap.add((PartitionData)partitionData)));
        HashSet<PartitionData> expectedResultMap = new HashSet<PartitionData>();
        expectedResultMap.add((PartitionData)PartitionFactory.newPartitionErrorData((int)partition1, (short)Errors.NONE.code(), null));
        expectedResultMap.add((PartitionData)PartitionFactory.newPartitionErrorData((int)partition2, (short)Errors.NONE.code(), null));
        Assertions.assertEquals((int)2, (int)result.topicsData().size());
        Assertions.assertEquals(expectedResultMap, resultMap);
    }

    @Test
    public void testReadStateSuccess() {
        MockClient client = new MockClient(MOCK_TIME);
        String groupId = "group1";
        Uuid topicId1 = Uuid.randomUuid();
        int partition1 = 10;
        Uuid topicId2 = Uuid.randomUuid();
        int partition2 = 8;
        Node suppliedNode = new Node(0, HOST, 9092);
        Node coordinatorNode1 = new Node(5, HOST, 9092);
        Node coordinatorNode2 = new Node(6, HOST, 9092);
        String coordinatorKey1 = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId1, (int)partition1);
        String coordinatorKey2 = SharePartitionKey.asCoordinatorKey((String)groupId, (Uuid)topicId2, (int)partition2);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey1), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(5).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> body instanceof FindCoordinatorRequest && ((FindCoordinatorRequest)body).data().keyType() == FindCoordinatorRequest.CoordinatorType.SHARE.id() && ((String)((FindCoordinatorRequest)body).data().coordinatorKeys().get(0)).equals(coordinatorKey2), (AbstractResponse)new FindCoordinatorResponse(new FindCoordinatorResponseData().setCoordinators(Collections.singletonList(new FindCoordinatorResponseData.Coordinator().setNodeId(6).setHost(HOST).setPort(9092).setErrorCode(Errors.NONE.code())))), suppliedNode);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId1 && requestPartition == partition1;
        }, (AbstractResponse)new ReadShareGroupStateResponse(ReadShareGroupStateResponse.toResponseData((Uuid)topicId1, (int)partition1, (long)0L, (int)1, Collections.singletonList(new ReadShareGroupStateResponseData.StateBatch().setFirstOffset(0L).setLastOffset(10L).setDeliveryCount((short)1).setDeliveryState((byte)0)))), coordinatorNode1);
        client.prepareResponseFrom(body -> {
            ReadShareGroupStateRequest request = (ReadShareGroupStateRequest)body;
            String requestGroupId = request.data().groupId();
            Uuid requestTopicId = ((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).topicId();
            int requestPartition = ((ReadShareGroupStateRequestData.PartitionData)((ReadShareGroupStateRequestData.ReadStateData)request.data().topics().get(0)).partitions().get(0)).partition();
            return requestGroupId.equals(groupId) && requestTopicId == topicId2 && requestPartition == partition2;
        }, (AbstractResponse)new ReadShareGroupStateResponse(ReadShareGroupStateResponse.toResponseData((Uuid)topicId2, (int)partition2, (long)0L, (int)1, Arrays.asList(new ReadShareGroupStateResponseData.StateBatch().setFirstOffset(0L).setLastOffset(10L).setDeliveryCount((short)1).setDeliveryState((byte)0), new ReadShareGroupStateResponseData.StateBatch().setFirstOffset(11L).setLastOffset(20L).setDeliveryCount((short)1).setDeliveryState((byte)0)))), coordinatorNode2);
        ShareCoordinatorMetadataCacheHelper cacheHelper = this.getDefaultCacheHelper(suppliedNode);
        DefaultStatePersister defaultStatePersister = DefaultStatePersisterBuilder.builder().withKafkaClient((KafkaClient)client).withCacheHelper(cacheHelper).build();
        ReadShareGroupStateParameters request = ReadShareGroupStateParameters.from((ReadShareGroupStateRequestData)new ReadShareGroupStateRequestData().setGroupId(groupId).setTopics(Arrays.asList(new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId1).setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData().setPartition(partition1).setLeaderEpoch(1))), new ReadShareGroupStateRequestData.ReadStateData().setTopicId(topicId2).setPartitions(Collections.singletonList(new ReadShareGroupStateRequestData.PartitionData().setPartition(partition2).setLeaderEpoch(1))))));
        CompletableFuture resultFuture = defaultStatePersister.readState(request);
        ReadShareGroupStateResult result = null;
        try {
            result = (ReadShareGroupStateResult)resultFuture.get(10L, TimeUnit.SECONDS);
        }
        catch (Exception e) {
            Assertions.fail((String)"Unexpected exception", (Throwable)e);
        }
        HashSet resultMap = new HashSet();
        result.topicsData().forEach(topicData -> topicData.partitions().forEach(partitionData -> resultMap.add((PartitionData)partitionData)));
        HashSet<PartitionData> expectedResultMap = new HashSet<PartitionData>();
        expectedResultMap.add((PartitionData)PartitionFactory.newPartitionAllData((int)partition1, (int)1, (long)0L, (short)Errors.NONE.code(), null, Collections.singletonList(new PersisterStateBatch(0L, 10L, 0, 1))));
        expectedResultMap.add((PartitionData)PartitionFactory.newPartitionAllData((int)partition2, (int)1, (long)0L, (short)Errors.NONE.code(), null, Arrays.asList(new PersisterStateBatch(0L, 10L, 0, 1), new PersisterStateBatch(11L, 20L, 0, 1))));
        Assertions.assertEquals((int)2, (int)result.topicsData().size());
        Assertions.assertEquals(expectedResultMap, resultMap);
    }

    @Test
    public void testWriteStateResponseToResultPartialResults() {
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap()).put(tp1.partition(), CompletableFuture.completedFuture(new WriteShareGroupStateResponse(WriteShareGroupStateResponse.toResponseData((Uuid)tp1.topicId(), (int)tp1.partition()))));
        futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap()).put(tp2.partition(), CompletableFuture.completedFuture(new WriteShareGroupStateResponse(WriteShareGroupStateResponse.toErrorResponseData((Uuid)tp2.topicId(), (int)tp2.partition(), (Errors)Errors.UNKNOWN_TOPIC_OR_PARTITION, (String)"unknown tp"))));
        PersisterStateManager psm = (PersisterStateManager)Mockito.mock(PersisterStateManager.class);
        DefaultStatePersister dsp = new DefaultStatePersister(psm);
        WriteShareGroupStateResult results = dsp.writeResponsesToResult(futureMap);
        Assertions.assertEquals((int)2, (int)results.topicsData().size());
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp1.topicId(), Collections.singletonList(PartitionFactory.newPartitionErrorData((int)tp1.partition(), (short)Errors.NONE.code(), null)))));
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp2.topicId(), Collections.singletonList(PartitionFactory.newPartitionErrorData((int)tp2.partition(), (short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (String)"unknown tp")))));
    }

    @Test
    public void testWriteStateResponseToResultFailedFuture() {
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap()).put(tp1.partition(), CompletableFuture.completedFuture(new WriteShareGroupStateResponse(WriteShareGroupStateResponse.toResponseData((Uuid)tp1.topicId(), (int)tp1.partition()))));
        futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap()).put(tp2.partition(), CompletableFuture.failedFuture(new Exception("scary stuff")));
        PersisterStateManager psm = (PersisterStateManager)Mockito.mock(PersisterStateManager.class);
        DefaultStatePersister dsp = new DefaultStatePersister(psm);
        WriteShareGroupStateResult results = dsp.writeResponsesToResult(futureMap);
        Assertions.assertEquals((int)2, (int)results.topicsData().size());
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp1.topicId(), Collections.singletonList(PartitionFactory.newPartitionErrorData((int)tp1.partition(), (short)Errors.NONE.code(), null)))));
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp2.topicId(), Collections.singletonList(PartitionFactory.newPartitionErrorData((int)tp2.partition(), (short)Errors.UNKNOWN_SERVER_ERROR.code(), (String)"Error writing state to share coordinator: java.lang.Exception: scary stuff")))));
    }

    @Test
    public void testReadStateResponseToResultPartialResults() {
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap()).put(tp1.partition(), CompletableFuture.completedFuture(new ReadShareGroupStateResponse(ReadShareGroupStateResponse.toResponseData((Uuid)tp1.topicId(), (int)tp1.partition(), (long)1L, (int)2, Collections.emptyList()))));
        futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap()).put(tp2.partition(), CompletableFuture.completedFuture(new ReadShareGroupStateResponse(ReadShareGroupStateResponse.toErrorResponseData((Uuid)tp2.topicId(), (int)tp2.partition(), (Errors)Errors.UNKNOWN_TOPIC_OR_PARTITION, (String)"unknown tp"))));
        PersisterStateManager psm = (PersisterStateManager)Mockito.mock(PersisterStateManager.class);
        DefaultStatePersister dsp = new DefaultStatePersister(psm);
        ReadShareGroupStateResult results = dsp.readResponsesToResult(futureMap);
        Assertions.assertEquals((int)2, (int)results.topicsData().size());
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp1.topicId(), Collections.singletonList(PartitionFactory.newPartitionAllData((int)tp1.partition(), (int)2, (long)1L, (short)Errors.NONE.code(), null, Collections.emptyList())))));
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp2.topicId(), Collections.singletonList(PartitionFactory.newPartitionAllData((int)tp2.partition(), (int)0, (long)0L, (short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (String)"unknown tp", Collections.emptyList())))));
    }

    @Test
    public void testReadStateResponseToResultFailedFuture() {
        HashMap<Uuid, Map> futureMap = new HashMap<Uuid, Map>();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), 1, null);
        futureMap.computeIfAbsent(tp1.topicId(), k -> new HashMap()).put(tp1.partition(), CompletableFuture.completedFuture(new ReadShareGroupStateResponse(ReadShareGroupStateResponse.toResponseData((Uuid)tp1.topicId(), (int)tp1.partition(), (long)1L, (int)2, Collections.emptyList()))));
        futureMap.computeIfAbsent(tp2.topicId(), k -> new HashMap()).put(tp2.partition(), CompletableFuture.failedFuture(new Exception("scary stuff")));
        PersisterStateManager psm = (PersisterStateManager)Mockito.mock(PersisterStateManager.class);
        DefaultStatePersister dsp = new DefaultStatePersister(psm);
        ReadShareGroupStateResult results = dsp.readResponsesToResult(futureMap);
        Assertions.assertEquals((int)2, (int)results.topicsData().size());
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp1.topicId(), Collections.singletonList(PartitionFactory.newPartitionAllData((int)tp1.partition(), (int)2, (long)1L, (short)Errors.NONE.code(), null, Collections.emptyList())))));
        Assertions.assertTrue((boolean)results.topicsData().contains(new TopicData(tp2.topicId(), Collections.singletonList(PartitionFactory.newPartitionAllData((int)tp2.partition(), (int)-1, (long)-1L, (short)Errors.UNKNOWN_SERVER_ERROR.code(), (String)"Error reading state from share coordinator: java.lang.Exception: scary stuff", Collections.emptyList())))));
    }

    @Test
    public void testDefaultPersisterClose() {
        PersisterStateManager psm = (PersisterStateManager)Mockito.mock(PersisterStateManager.class);
        DefaultStatePersister dsp = new DefaultStatePersister(psm);
        try {
            ((PersisterStateManager)Mockito.verify((Object)psm, (VerificationMode)Mockito.times((int)0))).stop();
            dsp.stop();
            ((PersisterStateManager)Mockito.verify((Object)psm, (VerificationMode)Mockito.times((int)1))).stop();
        }
        catch (Exception e) {
            Assertions.fail((String)"Unexpected exception", (Throwable)e);
        }
    }

    private static class DefaultStatePersisterBuilder {
        private KafkaClient client = CLIENT;
        private Time time = MOCK_TIME;
        private Timer timer = MOCK_TIMER;
        private ShareCoordinatorMetadataCacheHelper cacheHelper = CACHE_HELPER;

        private DefaultStatePersisterBuilder() {
        }

        private DefaultStatePersisterBuilder withKafkaClient(KafkaClient client) {
            this.client = client;
            return this;
        }

        private DefaultStatePersisterBuilder withCacheHelper(ShareCoordinatorMetadataCacheHelper cacheHelper) {
            this.cacheHelper = cacheHelper;
            return this;
        }

        private DefaultStatePersisterBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private DefaultStatePersisterBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        public static DefaultStatePersisterBuilder builder() {
            return new DefaultStatePersisterBuilder();
        }

        public DefaultStatePersister build() {
            PersisterStateManager persisterStateManager = new PersisterStateManager(this.client, this.cacheHelper, this.time, this.timer);
            return new DefaultStatePersister(persisterStateManager);
        }
    }
}

