/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.share;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Function;
import kafka.cluster.Partition;
import kafka.cluster.PartitionListener;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.share.DelayedShareFetch;
import kafka.server.share.DelayedShareFetchTest;
import kafka.server.share.SharePartition;
import kafka.server.share.SharePartitionCache;
import kafka.server.share.SharePartitionManager;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.FencedStateEpochException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidShareSessionEpochException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.ShareSessionLimitReachedException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.requests.ShareRequestMetadata;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.common.ShareVersion;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.purgatory.DelayedOperationKey;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.CachedSharePartition;
import org.apache.kafka.server.share.ErroneousAndValidPartitionData;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.context.FinalContext;
import org.apache.kafka.server.share.context.ShareFetchContext;
import org.apache.kafka.server.share.context.ShareSessionContext;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.fetch.ShareFetchTestUtils;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.share.persister.NoOpStatePersister;
import org.apache.kafka.server.share.persister.Persister;
import org.apache.kafka.server.share.session.ShareSession;
import org.apache.kafka.server.share.session.ShareSessionCache;
import org.apache.kafka.server.share.session.ShareSessionKey;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;

@Timeout(value=120L)
public class SharePartitionManagerTest {
    private static final int DEFAULT_RECORD_LOCK_DURATION_MS = 30000;
    private static final int MAX_DELIVERY_COUNT = 5;
    private static final short MAX_IN_FLIGHT_MESSAGES = 200;
    private static final short MAX_FETCH_RECORDS = 500;
    private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000;
    private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000;
    private static final int BATCH_SIZE = 500;
    private static final FetchParams FETCH_PARAMS = new FetchParams(-1, -1L, 2000L, 1, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
    private static final String TIMER_NAME_PREFIX = "share-partition-manager";
    private static final String CONNECTION_ID = "id-1";
    static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000;
    static final long REMOTE_FETCH_MAX_WAIT_MS = 6000L;
    private MockTime time;
    private ReplicaManager mockReplicaManager;
    private BrokerTopicStats brokerTopicStats;
    private SharePartitionManager sharePartitionManager;
    private static final List<TopicIdPartition> EMPTY_PART_LIST = List.of();
    private static final List<ShareFetchResponseData.AcquiredRecords> EMPTY_ACQUIRED_RECORDS = List.of();

    @BeforeEach
    public void setUp() {
        this.time = new MockTime();
        kafka.utils.TestUtils.clearYammerMetrics();
        this.brokerTopicStats = new BrokerTopicStats();
        this.mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Partition partition = this.mockPartition();
        Mockito.when((Object)this.mockReplicaManager.getPartitionOrException((TopicPartition)ArgumentMatchers.any())).thenReturn((Object)partition);
    }

    @AfterEach
    public void tearDown() throws Exception {
        if (this.sharePartitionManager != null) {
            this.sharePartitionManager.close();
        }
        this.brokerTopicStats.close();
        this.assertNoReaperThreadsPendingClose();
    }

    @Test
    public void testNewContextReturnsFinalContextWithoutRequestData() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, -1);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertEquals(FinalContext.class, context2.getClass());
    }

    @Test
    public void testNewContextReturnsFinalContextWithRequestData() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, -1);
        List<TopicIdPartition> reqData2 = List.of(tp0, tp1);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertEquals(FinalContext.class, context2.getClass());
    }

    @Test
    public void testNewContextReturnsFinalContextWhenTopicPartitionsArePresentInRequestData() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId, 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId, -1);
        List<TopicIdPartition> reqData2 = List.of(new TopicIdPartition(tpId1, new TopicPartition("foo", 0)));
        Assertions.assertInstanceOf(FinalContext.class, (Object)this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(true), CONNECTION_ID));
    }

    @Test
    public void testNewContextThrowsErrorWhenShareSessionNotFoundOnFinalEpoch() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> this.sharePartitionManager.newContext("grp", EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(Uuid.randomUuid(), -1), Boolean.valueOf(false), CONNECTION_ID));
    }

    @Test
    public void testNewContextThrowsErrorWhenAcknowledgeDataPresentOnInitialEpoch() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        Assertions.assertThrows(InvalidRequestException.class, () -> this.sharePartitionManager.newContext("grp", List.of(tp0, tp1), EMPTY_PART_LIST, new ShareRequestMetadata(Uuid.randomUuid(), 0), Boolean.valueOf(true), CONNECTION_ID));
    }

    @Test
    public void testNewContextThrowsErrorWhenShareSessionCacheIsFullOnInitialEpoch() {
        ShareSessionCache cache = new ShareSessionCache(1);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        Uuid memberId2 = Uuid.randomUuid();
        List<TopicIdPartition> reqData = List.of(tp0, tp1);
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId1, 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId2, 0);
        Assertions.assertThrows(ShareSessionLimitReachedException.class, () -> this.sharePartitionManager.newContext("grp", reqData, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false), "id-2"));
    }

    @Test
    public void testNewContextExistingSessionNewRequestWithInitialEpoch() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        List<TopicIdPartition> reqData = List.of(tp0, tp1);
        ShareRequestMetadata reqMetadata = new ShareRequestMetadata(memberId, 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        Assertions.assertEquals((int)1, (int)cache.size());
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, reqData, EMPTY_PART_LIST, reqMetadata, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        Assertions.assertEquals((int)1, (int)cache.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNewContext() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        topicNames.put(tpId0, "foo");
        topicNames.put(tpId1, "bar");
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
        String groupId = "grp";
        List<TopicIdPartition> reqData2 = List.of(tp0, tp1);
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        ((ShareSessionContext)context2).shareFetchData().forEach(topicIdPartition -> Assertions.assertTrue((boolean)reqData2.contains(topicIdPartition)));
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals(respData2, (Object)resp2.responseData(topicNames));
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true), "id-2"));
        Uuid memberId4 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(memberId4, 1), Boolean.valueOf(true), "id-3"));
        ShareFetchContext context5 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context5);
        Assertions.assertTrue((boolean)((ShareSessionContext)context5).isSubsequent());
        ShareSessionContext shareSessionContext5 = (ShareSessionContext)context5;
        ShareSession shareSession = shareSessionContext5.session();
        synchronized (shareSession) {
            shareSessionContext5.session().partitionMap().forEach(cachedSharePartition -> {
                TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                Assertions.assertTrue((boolean)reqData2.contains(topicIdPartition));
            });
        }
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertEquals((int)0, (int)resp5.responseData(topicNames).size());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true), CONNECTION_ID));
        ShareFetchContext context7 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true), CONNECTION_ID);
        ShareFetchResponse resp7 = context7.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp7.error());
        Assertions.assertEquals((int)100, (int)resp7.throttleTimeMs());
        ShareFetchContext context8 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), -1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertEquals(FinalContext.class, context8.getClass());
        Assertions.assertEquals((int)1, (int)cache.size());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData8 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData8.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData8.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp8 = context8.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData8);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp8.error());
        CompletableFuture releaseResponse = this.sharePartitionManager.releaseSession(groupId, reqMetadata2.memberId().toString());
        Assertions.assertTrue((boolean)releaseResponse.isDone());
        Assertions.assertFalse((boolean)releaseResponse.isCompletedExceptionally());
        Assertions.assertEquals((int)0, (int)cache.size());
    }

    @Test
    public void testAcknowledgeSessionUpdateThrowsOnInitialEpoch() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.acknowledgeSessionUpdate("grp", new ShareRequestMetadata(Uuid.randomUuid(), 0)));
    }

    @Test
    public void testAcknowledgeSessionUpdateThrowsWhenShareSessionNotFound() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> this.sharePartitionManager.acknowledgeSessionUpdate("grp", new ShareRequestMetadata(Uuid.randomUuid(), ShareRequestMetadata.nextEpoch((int)0))));
    }

    @Test
    public void testAcknowledgeSessionUpdateThrowsInvalidShareSessionEpochException() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, new ShareRequestMetadata(memberId, 0), Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.acknowledgeSessionUpdate("grp", new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch((int)ShareRequestMetadata.nextEpoch((int)0)))));
    }

    @Test
    public void testAcknowledgeSessionUpdateSuccessOnSubsequentEpoch() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, new ShareRequestMetadata(memberId, 0), Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        Assertions.assertDoesNotThrow(() -> this.sharePartitionManager.acknowledgeSessionUpdate("grp", new ShareRequestMetadata(memberId, ShareRequestMetadata.nextEpoch((int)0))));
    }

    @Test
    public void testAcknowledgeSessionUpdateSuccessOnFinalEpoch() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, List.of(tp0, tp1), EMPTY_PART_LIST, new ShareRequestMetadata(memberId, 0), Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        Assertions.assertDoesNotThrow(() -> this.sharePartitionManager.acknowledgeSessionUpdate("grp", new ShareRequestMetadata(memberId, -1)));
    }

    @Test
    public void testSubsequentShareSession() {
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        topicNames.put(fooId, "foo");
        topicNames.put(barId, "bar");
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0));
        List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
        String groupId = "grp";
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp0.partition()));
        respData1.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp1.partition()));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals((int)2, (int)resp1.responseData(topicNames).size());
        List<TopicIdPartition> reqData2 = List.of(tp2);
        ArrayList<TopicIdPartition> removed2 = new ArrayList<TopicIdPartition>();
        removed2.add(tp0);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, reqData2, removed2, new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        HashSet<TopicIdPartition> expectedTopicIdPartitions2 = new HashSet<TopicIdPartition>();
        expectedTopicIdPartitions2.add(tp1);
        expectedTopicIdPartitions2.add(tp2);
        HashSet actualTopicIdPartitions2 = new HashSet();
        ShareSessionContext shareSessionContext = (ShareSessionContext)context2;
        shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
            TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
            actualTopicIdPartitions2.add(topicIdPartition);
        });
        Assertions.assertEquals(expectedTopicIdPartitions2, actualTopicIdPartitions2);
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp1.partition()));
        respData2.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp2.partition()));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals((int)1, (int)resp2.data().responses().size());
        Assertions.assertEquals((Object)barId, (Object)((ShareFetchResponseData.ShareFetchableTopicResponse)resp2.data().responses().stream().findFirst().get()).topicId());
        Assertions.assertEquals((int)1, (int)((ShareFetchResponseData.ShareFetchableTopicResponse)resp2.data().responses().stream().findFirst().get()).partitions().size());
        Assertions.assertEquals((int)0, (int)((ShareFetchResponseData.PartitionData)((ShareFetchResponseData.ShareFetchableTopicResponse)resp2.data().responses().stream().findFirst().get()).partitions().get(0)).partitionIndex());
        Assertions.assertEquals((int)1, (int)resp2.responseData(topicNames).size());
    }

    @Test
    public void testZeroSizeShareSession() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid fooId = Uuid.randomUuid();
        topicNames.put(fooId, "foo");
        TopicIdPartition foo0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition foo1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        List<TopicIdPartition> reqData1 = List.of(foo0, foo1);
        String groupId = "grp";
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(foo0, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo0.partition()));
        respData1.put(foo1, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo1.partition()));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals((int)2, (int)resp1.responseData(topicNames).size());
        ArrayList<TopicIdPartition> removed2 = new ArrayList<TopicIdPartition>();
        removed2.add(foo0);
        removed2.add(foo1);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, removed2, new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        LinkedHashMap respData2 = new LinkedHashMap();
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
        Assertions.assertTrue((boolean)resp2.responseData(topicNames).isEmpty());
        Assertions.assertEquals((int)1, (int)cache.size());
    }

    @Test
    public void testToForgetPartitions() {
        String groupId = "grp";
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        TopicIdPartition foo = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition bar = new TopicIdPartition(barId, new TopicPartition("bar", 0));
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        List<TopicIdPartition> reqData1 = List.of(foo, bar);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        this.assertPartitionsPresent((ShareSessionContext)context1, List.of(foo, bar));
        this.mockUpdateAndGenerateResponseData(context1, groupId, reqMetadata1.memberId());
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, List.of(foo), new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        this.assertPartitionsPresent((ShareSessionContext)context2, List.of(bar));
        this.mockUpdateAndGenerateResponseData(context2, groupId, reqMetadata1.memberId());
        ShareFetchContext context3 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, List.of(bar), new ShareRequestMetadata(reqMetadata1.memberId(), 2), Boolean.valueOf(true), CONNECTION_ID);
        this.assertPartitionsPresent((ShareSessionContext)context3, EMPTY_PART_LIST);
    }

    @Test
    public void testShareSessionUpdateTopicIdsBrokerSide() {
        String groupId = "grp";
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        TopicIdPartition foo = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition bar = new TopicIdPartition(barId, new TopicPartition("bar", 1));
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        topicNames.put(fooId, "foo");
        topicNames.put(barId, "bar");
        List<TopicIdPartition> reqData1 = List.of(foo, bar);
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(bar, new ShareFetchResponseData.PartitionData().setPartitionIndex(bar.partition()));
        respData1.put(foo, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo.partition()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals((int)2, (int)resp1.responseData(topicNames).size());
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata1.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        Assertions.assertTrue((boolean)((ShareSessionContext)context2).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(foo, new ShareFetchResponseData.PartitionData().setPartitionIndex(foo.partition()).setErrorCode(Errors.INCONSISTENT_TOPIC_ID.code()));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals((short)Errors.INCONSISTENT_TOPIC_ID.code(), (short)((ShareFetchResponseData.PartitionData)resp2.responseData(topicNames).get(foo)).errorCode());
    }

    @Test
    public void testGetErroneousAndValidTopicIdPartitions() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        Uuid tpId0 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tpNull1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(null, 0));
        TopicIdPartition tpNull2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(null, 1));
        String groupId = "grp";
        List<TopicIdPartition> reqData2 = List.of(tp0, tp1, tpNull1);
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        this.assertErroneousAndValidTopicIdPartitions(context2.getErroneousAndValidTopicIdPartitions(), List.of(tpNull1), List.of(tp0, tp1));
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        respData2.put(tpNull1, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        ShareFetchResponse resp2Throttle = context2.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2Throttle.error());
        Assertions.assertEquals((int)100, (int)resp2Throttle.throttleTimeMs());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true), CONNECTION_ID));
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(Uuid.randomUuid(), 1), Boolean.valueOf(true), CONNECTION_ID));
        ShareFetchContext context5 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context5);
        Assertions.assertTrue((boolean)((ShareSessionContext)context5).isSubsequent());
        this.assertErroneousAndValidTopicIdPartitions(context5.getErroneousAndValidTopicIdPartitions(), List.of(tpNull1), List.of(tp0, tp1));
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true), CONNECTION_ID));
        List<TopicIdPartition> reqData7 = List.of(tpNull2);
        ShareFetchContext context7 = this.sharePartitionManager.newContext(groupId, reqData7, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true), CONNECTION_ID);
        ShareFetchResponse resp7 = context7.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp7.error());
        Assertions.assertEquals((int)100, (int)resp7.throttleTimeMs());
        this.assertErroneousAndValidTopicIdPartitions(context7.getErroneousAndValidTopicIdPartitions(), List.of(tpNull1, tpNull2), List.of(tp0, tp1));
        ShareFetchContext context8 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), -1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertEquals(FinalContext.class, context8.getClass());
        Assertions.assertEquals((int)1, (int)cache.size());
        this.assertErroneousAndValidTopicIdPartitions(context8.getErroneousAndValidTopicIdPartitions(), EMPTY_PART_LIST, EMPTY_PART_LIST);
        ShareFetchResponse resp8 = context8.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp8.error());
        Assertions.assertEquals((int)100, (int)resp8.throttleTimeMs());
        CompletableFuture releaseResponse = this.sharePartitionManager.releaseSession(groupId, reqMetadata2.memberId().toString());
        Assertions.assertTrue((boolean)releaseResponse.isDone());
        Assertions.assertFalse((boolean)releaseResponse.isCompletedExceptionally());
        Assertions.assertEquals((int)0, (int)cache.size());
    }

    @Test
    public void testShareFetchContextResponseSize() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        topicNames.put(tpId0, "foo");
        topicNames.put(tpId1, "bar");
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
        String groupId = "grp";
        List<TopicIdPartition> reqData2 = List.of(tp0, tp1);
        ObjectSerializationCache objectSerializationCache = new ObjectSerializationCache();
        short version = ApiKeys.SHARE_FETCH.latestVersion();
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(Uuid.randomUuid(), 0);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData2.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int respSize2 = context2.responseSize(respData2, version);
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals(respData2, (Object)resp2.responseData(topicNames));
        Assertions.assertEquals((int)(4 + resp2.data().size(objectSerializationCache, version)), (int)respSize2);
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true), CONNECTION_ID));
        Uuid memberId4 = Uuid.randomUuid();
        Assertions.assertThrows(ShareSessionNotFoundException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(memberId4, 1), Boolean.valueOf(true), CONNECTION_ID));
        List<TopicIdPartition> reqData5 = List.of(tp2);
        ShareFetchContext context5 = this.sharePartitionManager.newContext(groupId, reqData5, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context5);
        Assertions.assertTrue((boolean)((ShareSessionContext)context5).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData5 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData5.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        int respSize5 = context5.responseSize(respData5, version);
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData5);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertEquals((int)(4 + resp5.data().size(objectSerializationCache, version)), (int)respSize5);
        Assertions.assertThrows(InvalidShareSessionEpochException.class, () -> this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 5), Boolean.valueOf(true), CONNECTION_ID));
        ShareFetchContext context7 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true), CONNECTION_ID);
        int respSize7 = context7.responseSize(respData2, version);
        ShareFetchResponse resp7 = context7.throttleResponse(100);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp7.error());
        Assertions.assertEquals((int)100, (int)resp7.throttleTimeMs());
        Assertions.assertEquals((int)(4 + new ShareFetchResponseData().size(objectSerializationCache, version)), (int)respSize7);
        ShareFetchContext context8 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata2.memberId(), -1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertEquals(FinalContext.class, context8.getClass());
        Assertions.assertEquals((int)1, (int)cache.size());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData8 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData8.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        int respSize8 = context8.responseSize(respData8, version);
        ShareFetchResponse resp8 = context8.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData8);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp8.error());
        Assertions.assertEquals((int)(4 + resp8.data().size(objectSerializationCache, version)), (int)respSize8);
    }

    @Test
    public void testCachedTopicPartitionsWithNoTopicPartitions() {
        ShareSessionCache cache = new ShareSessionCache(10);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        List result = this.sharePartitionManager.cachedTopicIdPartitionsInShareSession("grp", Uuid.randomUuid());
        Assertions.assertTrue((boolean)result.isEmpty());
    }

    @Test
    public void testCachedTopicPartitionsForValidShareSessions() {
        ShareSessionCache cache = new ShareSessionCache(10);
        Uuid tpId0 = Uuid.randomUuid();
        Uuid tpId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(tpId0, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(tpId0, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(tpId1, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(tpId1, new TopicPartition("bar", 1));
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        Uuid memberId2 = Uuid.randomUuid();
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp1.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp2.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)String.valueOf(memberId1)))).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withPartitionCache(partitionCache).build();
        List<TopicIdPartition> reqData1 = List.of(tp0, tp1);
        ShareRequestMetadata reqMetadata1 = new ShareRequestMetadata(memberId1, 0);
        ShareFetchContext context1 = this.sharePartitionManager.newContext(groupId, reqData1, EMPTY_PART_LIST, reqMetadata1, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context1);
        Assertions.assertFalse((boolean)((ShareSessionContext)context1).isSubsequent());
        ShareSessionKey shareSessionKey1 = new ShareSessionKey(groupId, reqMetadata1.memberId());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData1 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData1.put(tp0, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        respData1.put(tp1, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp1 = context1.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData1);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp1.error());
        Assertions.assertEquals(new HashSet<TopicIdPartition>(List.of(tp0, tp1)), new HashSet(this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1)));
        List<TopicIdPartition> reqData2 = List.of(tp2);
        ShareRequestMetadata reqMetadata2 = new ShareRequestMetadata(memberId2, 0);
        ShareFetchContext context2 = this.sharePartitionManager.newContext(groupId, reqData2, EMPTY_PART_LIST, reqMetadata2, Boolean.valueOf(false), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context2);
        Assertions.assertFalse((boolean)((ShareSessionContext)context2).isSubsequent());
        ShareSessionKey shareSessionKey2 = new ShareSessionKey(groupId, reqMetadata2.memberId());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData2 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData2.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        ShareFetchResponse resp2 = context2.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData2);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp2.error());
        Assertions.assertEquals(List.of(tp2), (Object)this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
        List<TopicIdPartition> reqData3 = List.of(tp2);
        ShareFetchContext context3 = this.sharePartitionManager.newContext(groupId, reqData3, EMPTY_PART_LIST, new ShareRequestMetadata(shareSessionKey1.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context3);
        Assertions.assertTrue((boolean)((ShareSessionContext)context3).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData3 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData3.put(tp2, new ShareFetchResponseData.PartitionData().setPartitionIndex(0));
        ShareFetchResponse resp3 = context3.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData3);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp3.error());
        Assertions.assertEquals(new HashSet<TopicIdPartition>(List.of(tp0, tp1, tp2)), new HashSet(this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1)));
        List<TopicIdPartition> reqData4 = List.of(tp3);
        ShareFetchContext context4 = this.sharePartitionManager.newContext(groupId, reqData4, List.of(tp2), new ShareRequestMetadata(shareSessionKey2.memberId(), 1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context4);
        Assertions.assertTrue((boolean)((ShareSessionContext)context4).isSubsequent());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> respData4 = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        respData4.put(tp3, new ShareFetchResponseData.PartitionData().setPartitionIndex(1));
        ShareFetchResponse resp4 = context4.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData4);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp4.error());
        Assertions.assertEquals(List.of(tp3), (Object)this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
        ShareFetchContext context5 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, EMPTY_PART_LIST, new ShareRequestMetadata(reqMetadata1.memberId(), -1), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertEquals(FinalContext.class, context5.getClass());
        LinkedHashMap respData5 = new LinkedHashMap();
        ShareFetchResponse resp5 = context5.updateAndGenerateResponseData(groupId, reqMetadata1.memberId(), respData5);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp5.error());
        Assertions.assertFalse((boolean)this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1).isEmpty());
        this.sharePartitionManager.releaseSession(groupId, reqMetadata1.memberId().toString());
        Assertions.assertTrue((boolean)this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId1).isEmpty());
        ShareFetchContext context6 = this.sharePartitionManager.newContext(groupId, EMPTY_PART_LIST, List.of(tp3), new ShareRequestMetadata(shareSessionKey2.memberId(), 2), Boolean.valueOf(true), CONNECTION_ID);
        Assertions.assertInstanceOf(ShareSessionContext.class, (Object)context6);
        Assertions.assertTrue((boolean)((ShareSessionContext)context6).isSubsequent());
        LinkedHashMap respData6 = new LinkedHashMap();
        ShareFetchResponse resp6 = context6.updateAndGenerateResponseData(groupId, reqMetadata2.memberId(), respData6);
        Assertions.assertEquals((Object)Errors.NONE, (Object)resp6.error());
        Assertions.assertEquals(EMPTY_PART_LIST, (Object)this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, memberId2));
    }

    @Test
    public void testSharePartitionKey() {
        SharePartitionKey sharePartitionKey1 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
        SharePartitionKey sharePartitionKey2 = new SharePartitionKey("mock-group-2", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
        SharePartitionKey sharePartitionKey3 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(1L, 1L), new TopicPartition("test-1", 0)));
        SharePartitionKey sharePartitionKey4 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 1)));
        SharePartitionKey sharePartitionKey5 = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 0L), new TopicPartition("test-2", 0)));
        SharePartitionKey sharePartitionKey1Copy = new SharePartitionKey("mock-group-1", new TopicIdPartition(new Uuid(0L, 1L), new TopicPartition("test", 0)));
        Assertions.assertEquals((Object)sharePartitionKey1, (Object)sharePartitionKey1Copy);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey2);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey3);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey4);
        Assertions.assertNotEquals((Object)sharePartitionKey1, (Object)sharePartitionKey5);
        Assertions.assertNotNull((Object)sharePartitionKey1);
    }

    @Test
    public void testMultipleSequentialShareFetches() {
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        Uuid barId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(fooId, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(barId, new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(barId, new TopicPartition("bar", 1));
        TopicIdPartition tp4 = new TopicIdPartition(fooId, new TopicPartition("foo", 2));
        TopicIdPartition tp5 = new TopicIdPartition(barId, new TopicPartition("bar", 2));
        TopicIdPartition tp6 = new TopicIdPartition(fooId, new TopicPartition("foo", 3));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1, tp2, tp3, tp4, tp5, tp6);
        this.mockFetchOffsetForTimestamp(this.mockReplicaManager);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp0, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp1, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp2, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp3, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp4, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp5, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp6, 1);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(topicIdPartitions)).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 1, 500, 500, topicIdPartitions);
        Assertions.assertTrue((boolean)future.isDone());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        future = this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 3, 500, 500, topicIdPartitions);
        Assertions.assertTrue((boolean)future.isDone());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)2))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        future = this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 10, 500, 500, topicIdPartitions);
        Assertions.assertTrue((boolean)future.isDone());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)3))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(6L, 0L, 0L, 0L), Map.of("foo", new TopicMetrics(3L, 0L, 0L, 0L), "bar", new TopicMetrics(3L, 0L, 0L, 0L)));
    }

    @Test
    public void testReplicaManagerFetchShouldNotProceed() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)0))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Map result = (Map)future.join();
        Assertions.assertEquals((int)0, (int)result.size());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(1L, 0L, 0L, 0L), Map.of("foo", new TopicMetrics(1L, 0L, 0L, 0L)));
    }

    @Test
    public void testReplicaManagerFetchShouldProceed() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        this.mockFetchOffsetForTimestamp(this.mockReplicaManager);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp0, 1);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(topicIdPartitions)).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().count());
        Assertions.assertEquals((int)1, (int)this.brokerTopicStats.numTopics());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(tp0.topic()).totalShareFetchRequestRate().count());
    }

    @Test
    public void testCloseSharePartitionManager() throws Exception {
        Timer timer = (Timer)Mockito.mock(SystemTimerReaper.class);
        ShareGroupMetrics shareGroupMetrics = (ShareGroupMetrics)Mockito.mock(ShareGroupMetrics.class);
        SharePartitionManager sharePartitionManager = SharePartitionManagerBuilder.builder().withTimer(timer).withShareGroupMetrics(shareGroupMetrics).build();
        ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)0))).close();
        ((ShareGroupMetrics)Mockito.verify((Object)shareGroupMetrics, (VerificationMode)Mockito.times((int)0))).close();
        sharePartitionManager.close();
        ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.times((int)1))).close();
        ((ShareGroupMetrics)Mockito.verify((Object)shareGroupMetrics, (VerificationMode)Mockito.times((int)1))).close();
    }

    @Test
    public void testReleaseSessionSuccess() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 2));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("baz", 4));
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp1.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId.toString()))).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp2.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId.toString()))).thenReturn((Object)FutureUtils.failedFuture((Throwable)new InvalidRecordStateException("Unable to release acquired records for the batch")));
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        ImplicitLinkedHashCollection partitionMap = new ImplicitLinkedHashCollection(3);
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp1));
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp2));
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp3));
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)partitionMap);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withPartitionCache(partitionCache).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture resultFuture = this.sharePartitionManager.releaseSession(groupId, memberId.toString());
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)3, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp1));
        Assertions.assertTrue((boolean)result.containsKey(tp2));
        Assertions.assertTrue((boolean)result.containsKey(tp3));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).errorCode());
        Assertions.assertEquals((int)2, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).partitionIndex());
        Assertions.assertEquals((short)Errors.INVALID_RECORD_STATE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).errorCode());
        Assertions.assertEquals((Object)"Unable to release acquired records for the batch", (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).errorMessage());
        Assertions.assertEquals((int)4, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).partitionIndex());
        Assertions.assertEquals((short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).errorCode());
        Assertions.assertEquals((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).errorMessage());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(0L, 0L, 0L, 0L), Map.of());
    }

    @Test
    public void testReleaseSessionWithIncorrectGroupId() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        ImplicitLinkedHashCollection partitionMap = new ImplicitLinkedHashCollection(3);
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp1));
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)partitionMap);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = this.sharePartitionManager.releaseSession("grp-2", memberId.toString());
        Assertions.assertTrue((boolean)resultFuture.isDone());
        Assertions.assertTrue((boolean)resultFuture.isCompletedExceptionally());
        Throwable exception = Assertions.assertThrows(ExecutionException.class, resultFuture::get);
        Assertions.assertInstanceOf(ShareSessionNotFoundException.class, (Object)exception.getCause());
    }

    @Test
    public void testReleaseSessionWithIncorrectMemberId() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp1 = new TopicIdPartition(memberId, new TopicPartition("foo", 0));
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, Uuid.randomUuid()))).thenReturn((Object)shareSession);
        ImplicitLinkedHashCollection partitionMap = new ImplicitLinkedHashCollection(3);
        partitionMap.add((ImplicitLinkedHashCollection.Element)new CachedSharePartition(tp1));
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)partitionMap);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = this.sharePartitionManager.releaseSession(groupId, memberId.toString());
        Assertions.assertTrue((boolean)resultFuture.isDone());
        Assertions.assertTrue((boolean)resultFuture.isCompletedExceptionally());
        Throwable exception = Assertions.assertThrows(ExecutionException.class, resultFuture::get);
        Assertions.assertInstanceOf(ShareSessionNotFoundException.class, (Object)exception.getCause());
    }

    @Test
    public void testReleaseSessionWithEmptyTopicPartitions() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, memberId))).thenReturn((Object)shareSession);
        Mockito.when((Object)shareSession.partitionMap()).thenReturn((Object)new ImplicitLinkedHashCollection());
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = this.sharePartitionManager.releaseSession(groupId, memberId.toString());
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)0, (int)result.size());
    }

    @Test
    public void testReleaseSessionWithNullShareSession() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        Mockito.when((Object)cache.get(new ShareSessionKey(groupId, memberId))).thenReturn(null);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, memberId))).thenReturn((Object)((ShareSession)Mockito.mock(ShareSession.class)));
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).build();
        CompletableFuture resultFuture = this.sharePartitionManager.releaseSession(groupId, memberId.toString());
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)0, (int)result.size());
    }

    @Test
    public void testAcknowledgeSinglePartition() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp2.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp), sp2);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withBrokerTopicStats(this.brokerTopicStats).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, List.of(new ShareAcknowledgementBatch(12L, 20L, List.of(Byte.valueOf((byte)1))), new ShareAcknowledgementBatch(24L, 56L, List.of(Byte.valueOf((byte)1)))));
        CompletableFuture resultFuture = this.sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(0L, 0L, 1L, 0L), Map.of("foo", new TopicMetrics(0L, 0L, 1L, 0L)));
    }

    @Test
    public void testAcknowledgeMultiplePartition() throws Exception {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp1.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp2.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp3.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics((Time)this.time);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withShareGroupMetrics(shareGroupMetrics).withBrokerTopicStats(this.brokerTopicStats).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp1, List.of(new ShareAcknowledgementBatch(12L, 20L, List.of(Byte.valueOf((byte)1))), new ShareAcknowledgementBatch(24L, 56L, List.of(Byte.valueOf((byte)1)))));
        acknowledgeTopics.put(tp2, List.of(new ShareAcknowledgementBatch(15L, 26L, List.of(Byte.valueOf((byte)2))), new ShareAcknowledgementBatch(34L, 56L, List.of(Byte.valueOf((byte)2)))));
        acknowledgeTopics.put(tp3, List.of(new ShareAcknowledgementBatch(4L, 15L, List.of(Byte.valueOf((byte)3))), new ShareAcknowledgementBatch(16L, 21L, List.of(Byte.valueOf((byte)3)))));
        CompletableFuture resultFuture = this.sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)3, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp1));
        Assertions.assertTrue((boolean)result.containsKey(tp2));
        Assertions.assertTrue((boolean)result.containsKey(tp3));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).errorCode());
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).errorCode());
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp3)).errorCode());
        Assertions.assertEquals((long)42L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
        Assertions.assertEquals((long)35L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
        Assertions.assertEquals((long)18L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
        Assertions.assertTrue((shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).meanRate() > 0.0 ? 1 : 0) != 0);
        Assertions.assertTrue((shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).meanRate() > 0.0 ? 1 : 0) != 0);
        Assertions.assertTrue((shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).meanRate() > 0.0 ? 1 : 0) != 0);
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(0L, 0L, 3L, 0L), Map.of(tp1.topic(), new TopicMetrics(0L, 0L, 1L, 0L), tp2.topic(), new TopicMetrics(0L, 0L, 1L, 0L), tp3.topic(), new TopicMetrics(0L, 0L, 1L, 0L)));
        shareGroupMetrics.close();
    }

    @Test
    public void testAcknowledgeIndividualOffsets() throws Exception {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        List<ShareAcknowledgementBatch> ack1 = List.of(new ShareAcknowledgementBatch(12L, 12L, List.of(Byte.valueOf((byte)1))));
        List<ShareAcknowledgementBatch> ack2 = List.of(new ShareAcknowledgementBatch(15L, 20L, List.of(Byte.valueOf((byte)2), Byte.valueOf((byte)3), Byte.valueOf((byte)2), Byte.valueOf((byte)2), Byte.valueOf((byte)3), Byte.valueOf((byte)2))));
        Mockito.when((Object)sp1.acknowledge(memberId, ack1)).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp2.acknowledge(memberId, ack2)).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics((Time)this.time);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withShareGroupMetrics(shareGroupMetrics).withBrokerTopicStats(this.brokerTopicStats).build();
        Map<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = Map.of(tp1, ack1, tp2, ack2);
        CompletableFuture resultFuture = this.sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)2, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp1));
        Assertions.assertTrue((boolean)result.containsKey(tp2));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp1)).errorCode());
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).partitionIndex());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp2)).errorCode());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
        Assertions.assertEquals((long)4L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
        Assertions.assertEquals((long)2L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
        Assertions.assertTrue((shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).meanRate() > 0.0 ? 1 : 0) != 0);
        Assertions.assertTrue((shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).meanRate() > 0.0 ? 1 : 0) != 0);
        Assertions.assertTrue((shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).meanRate() > 0.0 ? 1 : 0) != 0);
        shareGroupMetrics.close();
    }

    @Test
    public void testAcknowledgeIncorrectGroupId() {
        String groupId = "grp";
        String groupId2 = "grp2";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp), sp2);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics((Time)this.time);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withBrokerTopicStats(this.brokerTopicStats).withShareGroupMetrics(shareGroupMetrics).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, List.of(new ShareAcknowledgementBatch(12L, 20L, List.of(Byte.valueOf((byte)1))), new ShareAcknowledgementBatch(24L, 56L, List.of(Byte.valueOf((byte)1)))));
        CompletableFuture resultFuture = this.sharePartitionManager.acknowledge(memberId, groupId2, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
        Assertions.assertEquals((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorMessage());
        Assertions.assertEquals((long)0L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.ACCEPT.id).count());
        Assertions.assertEquals((long)0L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.RELEASE.id).count());
        Assertions.assertEquals((long)0L, (long)shareGroupMetrics.recordAcknowledgementMeter(AcknowledgeType.REJECT.id).count());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(0L, 0L, 1L, 1L), Map.of(tp.topic(), new TopicMetrics(0L, 0L, 1L, 1L)));
    }

    @Test
    public void testAcknowledgeIncorrectMemberId() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp2.acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new InvalidRequestException("Member is not the owner of batch record")));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp), sp2);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withBrokerTopicStats(this.brokerTopicStats).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, List.of(new ShareAcknowledgementBatch(12L, 20L, List.of(Byte.valueOf((byte)1))), new ShareAcknowledgementBatch(24L, 56L, List.of(Byte.valueOf((byte)1)))));
        CompletableFuture resultFuture = this.sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)0, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.INVALID_REQUEST.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
        Assertions.assertEquals((Object)"Member is not the owner of batch record", (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorMessage());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(0L, 0L, 1L, 1L), Map.of(tp.topic(), new TopicMetrics(0L, 0L, 1L, 1L)));
    }

    @Test
    public void testAcknowledgeEmptyPartitionCacheMap() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 3));
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withBrokerTopicStats(this.brokerTopicStats).build();
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp, List.of(new ShareAcknowledgementBatch(78L, 90L, List.of(Byte.valueOf((byte)2))), new ShareAcknowledgementBatch(94L, 99L, List.of(Byte.valueOf((byte)2)))));
        CompletableFuture resultFuture = this.sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Map result = (Map)resultFuture.join();
        Assertions.assertEquals((int)1, (int)result.size());
        Assertions.assertTrue((boolean)result.containsKey(tp));
        Assertions.assertEquals((int)3, (int)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).partitionIndex());
        Assertions.assertEquals((short)Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), (short)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorCode());
        Assertions.assertEquals((Object)Errors.UNKNOWN_TOPIC_OR_PARTITION.message(), (Object)((ShareAcknowledgeResponseData.PartitionData)result.get(tp)).errorMessage());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(0L, 0L, 1L, 1L), Map.of(tp.topic(), new TopicMetrics(0L, 0L, 1L, 1L)));
    }

    @Test
    public void testAcknowledgeCompletesDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp1, tp2);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), topicIdPartitions, 500, 100, this.brokerTopicStats);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp1, 2);
        Mockito.when((Object)sp1.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp1.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareAcquiredRecords.empty());
        Mockito.when((Object)sp2.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareAcquiredRecords.empty());
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM)).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp1))).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp1, List.of(new ShareAcknowledgementBatch(12L, 20L, List.of(Byte.valueOf((byte)1))), new ShareAcknowledgementBatch(24L, 56L, List.of(Byte.valueOf((byte)1)))));
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        this.sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Assertions.assertEquals((int)1, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)1))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().count());
        Assertions.assertEquals((int)1, (int)this.brokerTopicStats.numTopics());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(tp1.topic()).totalShareAcknowledgementRequestRate().count());
    }

    @Test
    public void testAcknowledgeDoesNotCompleteDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp1, tp2);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp3)).acknowledge((String)ArgumentMatchers.eq((Object)memberId), (List)ArgumentMatchers.any());
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), topicIdPartitions, 500, 100, this.brokerTopicStats);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp3.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)false);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        sharePartitions.put(tp3, sp3);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>> acknowledgeTopics = new HashMap<TopicIdPartition, List<ShareAcknowledgementBatch>>();
        acknowledgeTopics.put(tp3, List.of(new ShareAcknowledgementBatch(12L, 20L, List.of(Byte.valueOf((byte)1))), new ShareAcknowledgementBatch(24L, 56L, List.of(Byte.valueOf((byte)1)))));
        this.sharePartitionManager.acknowledge(memberId, groupId, acknowledgeTopics);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().count());
        Assertions.assertEquals((int)1, (int)this.brokerTopicStats.numTopics());
        Assertions.assertEquals((long)1L, (long)this.brokerTopicStats.topicStats(tp3.topic()).totalShareAcknowledgementRequestRate().count());
    }

    @Test
    public void testReleaseSessionCompletesDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp1, tp2);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, Uuid.fromString((String)memberId)))).thenReturn((Object)shareSession);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        Mockito.when((Object)sp3.releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId))).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), topicIdPartitions, 500, 100, this.brokerTopicStats);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(this.mockReplicaManager, tp1, 1);
        Mockito.when((Object)sp1.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        this.sharePartitionManager = (SharePartitionManager)Mockito.spy((Object)SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withCache(cache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).build());
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM)).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        Mockito.when((Object)this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString((String)memberId))).thenReturn(List.of(tp1, tp3));
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp1))).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Mockito.when((Object)sp1.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)new ShareAcquiredRecords(EMPTY_ACQUIRED_RECORDS, 0));
        this.sharePartitionManager.releaseSession(groupId, memberId);
        Assertions.assertEquals((int)1, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)1))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testReleaseSessionDoesNotCompleteDelayedShareFetchRequest() {
        String groupId = "grp";
        String memberId = Uuid.randomUuid().toString();
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp1, tp2);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        ShareSessionCache cache = (ShareSessionCache)Mockito.mock(ShareSessionCache.class);
        ShareSession shareSession = (ShareSession)Mockito.mock(ShareSession.class);
        Mockito.when((Object)cache.remove(new ShareSessionKey(groupId, Uuid.fromString((String)memberId)))).thenReturn((Object)shareSession);
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp1)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp2)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        ((SharePartition)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)true);
            return CompletableFuture.completedFuture(Optional.empty());
        }).when((Object)sp3)).releaseAcquiredRecords((String)ArgumentMatchers.eq((Object)memberId));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        partitionCache.put(new SharePartitionKey(groupId, tp3), sp3);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), topicIdPartitions, 500, 100, this.brokerTopicStats);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp3.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)false);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        topicIdPartitions.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        this.sharePartitionManager = (SharePartitionManager)Mockito.spy((Object)SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withCache(cache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).build());
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        sharePartitions.put(tp3, sp3);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchTest.DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(this.mockReplicaManager).withSharePartitions(sharePartitions).build();
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        Mockito.when((Object)this.sharePartitionManager.cachedTopicIdPartitionsInShareSession(groupId, Uuid.fromString((String)memberId))).thenReturn(List.of(tp3));
        this.sharePartitionManager.releaseSession(groupId, memberId);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp2, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testPendingInitializationShouldCompleteFetchRequest() throws Exception {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        CompletableFuture pendingInitializationFuture = new CompletableFuture();
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(pendingInitializationFuture);
        Mockito.when((Object)sp0.loadStartTimeMs()).thenReturn((Object)10L);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)100L);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTime(time).withShareGroupMetrics(shareGroupMetrics).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        Assertions.assertTrue((boolean)((Map)future.join()).isEmpty());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)0))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertFalse((boolean)pendingInitializationFuture.isDone());
        Assertions.assertEquals((long)0L, (long)shareGroupMetrics.partitionLoadTimeMs().count());
        pendingInitializationFuture.complete(null);
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.partitionLoadTimeMs().count());
        Assertions.assertEquals((double)90.0, (double)shareGroupMetrics.partitionLoadTimeMs().min());
        Assertions.assertEquals((double)90.0, (double)shareGroupMetrics.partitionLoadTimeMs().max());
        Assertions.assertEquals((double)90.0, (double)shareGroupMetrics.partitionLoadTimeMs().sum());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(1L, 0L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(1L, 0L, 0L, 0L)));
        shareGroupMetrics.close();
    }

    @Test
    public void testPartitionLoadTimeMetricWithMultiplePartitions() throws Exception {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        CompletableFuture pendingInitializationFuture1 = new CompletableFuture();
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(pendingInitializationFuture1);
        Mockito.when((Object)sp0.loadStartTimeMs()).thenReturn((Object)10L);
        CompletableFuture pendingInitializationFuture2 = new CompletableFuture();
        Mockito.when((Object)sp1.maybeInitialize()).thenReturn(pendingInitializationFuture2);
        Mockito.when((Object)sp1.loadStartTimeMs()).thenReturn((Object)40L);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)100L);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTime(time).withShareGroupMetrics(shareGroupMetrics).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        Assertions.assertFalse((boolean)pendingInitializationFuture1.isDone());
        Assertions.assertFalse((boolean)pendingInitializationFuture2.isDone());
        Assertions.assertEquals((long)0L, (long)shareGroupMetrics.partitionLoadTimeMs().count());
        pendingInitializationFuture1.complete(null);
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.partitionLoadTimeMs().count());
        Assertions.assertEquals((double)90.0, (double)shareGroupMetrics.partitionLoadTimeMs().min());
        Assertions.assertEquals((double)90.0, (double)shareGroupMetrics.partitionLoadTimeMs().max());
        Assertions.assertEquals((double)90.0, (double)shareGroupMetrics.partitionLoadTimeMs().sum());
        pendingInitializationFuture2.complete(null);
        Assertions.assertEquals((long)2L, (long)shareGroupMetrics.partitionLoadTimeMs().count());
        Assertions.assertEquals((double)60.0, (double)shareGroupMetrics.partitionLoadTimeMs().min());
        Assertions.assertEquals((double)90.0, (double)shareGroupMetrics.partitionLoadTimeMs().max());
        Assertions.assertEquals((double)150.0, (double)shareGroupMetrics.partitionLoadTimeMs().sum());
        shareGroupMetrics.close();
    }

    @Test
    public void testDelayedInitializationShouldCompleteFetchRequest() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        CompletableFuture<Object> pendingInitializationFuture1 = new CompletableFuture<Object>();
        CompletableFuture<Object> pendingInitializationFuture2 = new CompletableFuture<Object>();
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(pendingInitializationFuture1).thenReturn(pendingInitializationFuture2).thenReturn(CompletableFuture.failedFuture((Throwable)new LeaderNotAvailableException("Leader not available")));
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory shareFetchPurgatorySpy = (DelayedOperationPurgatory)Mockito.spy((Object)new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true));
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)shareFetchPurgatorySpy);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future1 = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        CompletableFuture future2 = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        CompletableFuture future3 = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)3))).maybeInitialize();
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)3))).addDelayedShareFetchRequest((DelayedShareFetch)ArgumentMatchers.any(), (List)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)3))).tryCompleteElseWatch((DelayedOperation)((DelayedShareFetch)ArgumentMatchers.any()), (List)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)0))).checkAndComplete((DelayedOperationKey)ArgumentMatchers.any());
        Assertions.assertFalse((boolean)future1.isDone());
        Assertions.assertFalse((boolean)future2.isDone());
        Assertions.assertFalse((boolean)future3.isDone());
        pendingInitializationFuture1.complete(null);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)1))).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)1))).checkAndComplete((DelayedOperationKey)ArgumentMatchers.any());
        pendingInitializationFuture2.complete(null);
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)2))).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any());
        ((DelayedOperationPurgatory)Mockito.verify((Object)shareFetchPurgatorySpy, (VerificationMode)Mockito.times((int)2))).checkAndComplete((DelayedOperationKey)ArgumentMatchers.any());
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)0))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((long)3L, (long)this.brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().count());
        Assertions.assertEquals((int)1, (int)this.brokerTopicStats.numTopics());
        Assertions.assertEquals((long)3L, (long)this.brokerTopicStats.topicStats(tp0.topic()).totalShareFetchRequestRate().count());
    }

    @Test
    public void testSharePartitionInitializationExceptions() throws Exception {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        Uuid fooId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(fooId, new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new LeaderNotAvailableException("Leader not available")));
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Assertions.assertTrue((boolean)((Map)future.join()).isEmpty());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)0))).markFenced();
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new IllegalStateException("Illegal state")));
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Illegal state");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).markFenced();
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new CoordinatorNotAvailableException("Coordinator not available")));
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.COORDINATOR_NOT_AVAILABLE, "Coordinator not available");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)2))).markFenced();
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new InvalidRequestException("Invalid request")));
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.INVALID_REQUEST, "Invalid request");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)3))).markFenced();
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new FencedStateEpochException("Fenced state epoch")));
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced state epoch");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)4))).markFenced();
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new NotLeaderOrFollowerException("Not leader or follower")));
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Not leader or follower");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)5))).markFenced();
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn((Object)FutureUtils.failedFuture((Throwable)new RuntimeException("Runtime exception")));
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing in delayed share fetch queue never ended.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Runtime exception");
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)6))).markFenced();
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(7L, 6L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(7L, 6L, 0L, 0L)));
    }

    @Test
    public void testShareFetchProcessingExceptions() throws Exception {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        SharePartitionCache partitionCache = (SharePartitionCache)Mockito.mock(SharePartitionCache.class);
        Mockito.when((Object)partitionCache.computeIfAbsent((SharePartitionKey)ArgumentMatchers.any(), (Function)ArgumentMatchers.any())).thenThrow(new Throwable[]{new RuntimeException("Error creating instance")});
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing for delayed share fetch request not finished.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Error creating instance");
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(1L, 1L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(1L, 1L, 0L, 0L)));
    }

    @Test
    public void testSharePartitionInitializationFailure() throws Exception {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        SharePartitionCache partitionCache = new SharePartitionCache();
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.isLeader()).thenReturn((Object)false);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.getPartitionOrException((TopicPartition)ArgumentMatchers.any(TopicPartition.class))).thenThrow(new Throwable[]{new KafkaStorageException("Exception")}).thenReturn((Object)partition);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withPartitionCache(partitionCache).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing for delayed share fetch request not finished.");
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.KAFKA_STORAGE_ERROR, "Exception");
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        future = this.sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        TestUtils.waitForCondition(future::isDone, (long)3000L, () -> "Processing for delayed share fetch request not finished.");
        this.validateShareFetchFutureException(future, tp0, Errors.NOT_LEADER_OR_FOLLOWER);
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(2L, 2L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(2L, 2L, 0L, 0L)));
    }

    @Test
    public void testSharePartitionPartialInitializationFailure() throws Exception {
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(memberId1, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(memberId1, new TopicPartition("foo", 2));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1, tp2);
        Partition partition0 = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition0.isLeader()).thenReturn((Object)false);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.getPartitionOrException((TopicPartition)ArgumentMatchers.any(TopicPartition.class))).thenReturn((Object)partition0);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        Mockito.when((Object)sp1.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        Mockito.when((Object)sp1.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)new ShareAcquiredRecords(EMPTY_ACQUIRED_RECORDS, 0));
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        partitionCache.put(new SharePartitionKey(groupId, tp2), sp2);
        Mockito.when((Object)sp2.maybeInitialize()).thenReturn(CompletableFuture.failedFuture((Throwable)new FencedStateEpochException("Fenced state epoch")));
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, replicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(replicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp1))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withPartitionCache(partitionCache).withBrokerTopicStats(this.brokerTopicStats).withTimer(mockTimer).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, Uuid.randomUuid().toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Map partitionDataMap = (Map)future.get();
        Assertions.assertEquals((int)3, (int)partitionDataMap.size());
        Assertions.assertTrue((boolean)partitionDataMap.containsKey(tp0));
        Assertions.assertEquals((short)Errors.NOT_LEADER_OR_FOLLOWER.code(), (short)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp0)).errorCode());
        Assertions.assertTrue((boolean)partitionDataMap.containsKey(tp1));
        Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp1)).errorCode());
        Assertions.assertTrue((boolean)partitionDataMap.containsKey(tp2));
        Assertions.assertEquals((short)Errors.FENCED_STATE_EPOCH.code(), (short)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp2)).errorCode());
        Assertions.assertEquals((Object)"Fenced state epoch", (Object)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp2)).errorMessage());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)0))).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(1L, 1L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(1L, 1L, 0L, 0L)));
    }

    @Test
    public void testReplicaManagerFetchException() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Exception")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.UNKNOWN_SERVER_ERROR, "Exception");
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new NotLeaderOrFollowerException("Leader exception")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.NOT_LEADER_OR_FOLLOWER, "Leader exception");
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(2L, 2L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(2L, 2L, 0L, 0L)));
    }

    @Test
    public void testReplicaManagerFetchMultipleSharePartitionsException() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp1.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)false);
        Mockito.when((Object)sp1.maybeInitialize()).thenReturn(CompletableFuture.completedFuture(null));
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        partitionCache.put(new SharePartitionKey(groupId, tp1), sp1);
        Timer mockTimer = this.systemTimerReaper();
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", mockTimer, this.mockReplicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(this.mockReplicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new FencedStateEpochException("Fenced exception")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).withTimer(mockTimer).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, tp0, Errors.FENCED_STATE_EPOCH, "Fenced exception");
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        Assertions.assertEquals((Object)sp1, (Object)partitionCache.get(new SharePartitionKey(groupId, tp1)));
        Mockito.when((Object)sp1.maybeAcquireFetchLock((Uuid)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        partitionCache.put(new SharePartitionKey(groupId, tp0), sp0);
        ((ReplicaManager)Mockito.doThrow((Throwable[])new Throwable[]{new FencedStateEpochException("Fenced exception again")}).when((Object)this.mockReplicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        this.validateShareFetchFutureException((CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>>)future, List.of(tp0, tp1), Errors.FENCED_STATE_EPOCH, "Fenced exception again");
        Assertions.assertTrue((boolean)partitionCache.isEmpty());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(4L, 3L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(2L, 2L, 0L, 0L), tp1.topic(), new TopicMetrics(2L, 1L, 0L, 0L)));
    }

    @Test
    public void testListenerRegistration() {
        String groupId = "grp";
        Uuid memberId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1);
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Partition partition = this.mockPartition();
        Mockito.when((Object)mockReplicaManager.getPartitionOrException((TopicPartition)Mockito.any())).thenReturn((Object)partition);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(mockReplicaManager).withBrokerTopicStats(this.brokerTopicStats).build();
        CompletableFuture future = this.sharePartitionManager.fetchMessages(groupId, memberId.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        Assertions.assertTrue((boolean)future.isDone());
        ((ReplicaManager)Mockito.verify((Object)mockReplicaManager, (VerificationMode)Mockito.times((int)2))).maybeAddListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
        this.validateBrokerTopicStatsMetrics(this.brokerTopicStats, new TopicMetrics(2L, 2L, 0L, 0L), Map.of(tp0.topic(), new TopicMetrics(1L, 1L, 0L, 0L), tp1.topic(), new TopicMetrics(1L, 1L, 0L, 0L)));
    }

    @Test
    public void testSharePartitionListenerOnFailed() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        SharePartitionCache partitionCache = new SharePartitionCache();
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener partitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
        this.testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, arg_0 -> ((SharePartitionManager.SharePartitionListener)partitionListener).onFailed(arg_0));
    }

    @Test
    public void testSharePartitionListenerOnDeleted() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        SharePartitionCache partitionCache = new SharePartitionCache();
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener partitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
        this.testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, arg_0 -> ((SharePartitionManager.SharePartitionListener)partitionListener).onDeleted(arg_0));
    }

    @Test
    public void testSharePartitionListenerOnBecomingFollower() {
        SharePartitionKey sharePartitionKey = new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)));
        SharePartitionCache partitionCache = new SharePartitionCache();
        ReplicaManager mockReplicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        SharePartitionManager.SharePartitionListener partitionListener = new SharePartitionManager.SharePartitionListener(sharePartitionKey, mockReplicaManager, partitionCache);
        this.testSharePartitionListener(sharePartitionKey, partitionCache, mockReplicaManager, arg_0 -> ((SharePartitionManager.SharePartitionListener)partitionListener).onBecomingFollower(arg_0));
    }

    @Test
    public void testFetchMessagesRotatePartitions() {
        String groupId = "grp";
        Uuid memberId1 = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 0));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 1));
        TopicIdPartition tp4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        TopicIdPartition tp5 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("bar", 2));
        TopicIdPartition tp6 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3));
        List<TopicIdPartition> topicIdPartitions = List.of(tp0, tp1, tp2, tp3, tp4, tp5, tp6);
        this.sharePartitionManager = (SharePartitionManager)Mockito.spy((Object)SharePartitionManagerBuilder.builder().withBrokerTopicStats(this.brokerTopicStats).build());
        ArgumentCaptor captor = ArgumentCaptor.forClass(ShareFetch.class);
        this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 0, 500, 500, topicIdPartitions);
        ((SharePartitionManager)Mockito.verify((Object)this.sharePartitionManager, (VerificationMode)Mockito.times((int)1))).processShareFetch((ShareFetch)captor.capture());
        ShareFetch resultShareFetch = (ShareFetch)captor.getValue();
        ShareFetchTestUtils.validateRotatedListEquals((List)resultShareFetch.topicIdPartitions(), topicIdPartitions, (int)0);
        this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 1, 500, 500, topicIdPartitions);
        ((SharePartitionManager)Mockito.verify((Object)this.sharePartitionManager, (VerificationMode)Mockito.times((int)2))).processShareFetch((ShareFetch)captor.capture());
        resultShareFetch = (ShareFetch)captor.getValue();
        ShareFetchTestUtils.validateRotatedListEquals(topicIdPartitions, (List)resultShareFetch.topicIdPartitions(), (int)1);
        this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 3, 500, 500, topicIdPartitions);
        ((SharePartitionManager)Mockito.verify((Object)this.sharePartitionManager, (VerificationMode)Mockito.times((int)3))).processShareFetch((ShareFetch)captor.capture());
        resultShareFetch = (ShareFetch)captor.getValue();
        ShareFetchTestUtils.validateRotatedListEquals(topicIdPartitions, (List)resultShareFetch.topicIdPartitions(), (int)3);
        this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, 12, 500, 500, topicIdPartitions);
        ((SharePartitionManager)Mockito.verify((Object)this.sharePartitionManager, (VerificationMode)Mockito.times((int)4))).processShareFetch((ShareFetch)captor.capture());
        resultShareFetch = (ShareFetch)captor.getValue();
        ShareFetchTestUtils.validateRotatedListEquals(topicIdPartitions, (List)resultShareFetch.topicIdPartitions(), (int)5);
        this.sharePartitionManager.fetchMessages(groupId, memberId1.toString(), FETCH_PARAMS, Integer.MAX_VALUE, 500, 500, topicIdPartitions);
        ((SharePartitionManager)Mockito.verify((Object)this.sharePartitionManager, (VerificationMode)Mockito.times((int)5))).processShareFetch((ShareFetch)captor.capture());
        resultShareFetch = (ShareFetch)captor.getValue();
        ShareFetchTestUtils.validateRotatedListEquals(topicIdPartitions, (List)resultShareFetch.topicIdPartitions(), (int)1);
    }

    @Test
    public void testCreateIdleShareFetchTask() throws Exception {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        MockTimer mockTimer = new MockTimer(this.time);
        long maxWaitMs = 1000L;
        ((ReplicaManager)Mockito.doAnswer(invocation -> {
            TimerTask timerTask = (TimerTask)invocation.getArgument(0);
            mockTimer.add(timerTask);
            return null;
        }).when((Object)replicaManager)).addShareFetchTimerRequest((TimerTask)Mockito.any(TimerTask.class));
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withReplicaManager(replicaManager).withTime((Time)this.time).withTimer((Timer)mockTimer).build();
        CompletableFuture future = this.sharePartitionManager.createIdleShareFetchTimerTask(maxWaitMs);
        Assertions.assertFalse((boolean)future.isDone());
        mockTimer.advanceClock(maxWaitMs / 2L);
        Assertions.assertFalse((boolean)future.isDone());
        mockTimer.advanceClock(maxWaitMs / 2L + 1L);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
    }

    @Test
    public void testOnShareVersionToggle() {
        String groupId = "grp";
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo1", 0))), sp0);
        partitionCache.put(new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo2", 0))), sp1);
        partitionCache.put(new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo3", 0))), sp2);
        partitionCache.put(new SharePartitionKey(groupId, new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo4", 0))), sp3);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).build();
        Assertions.assertEquals((int)4, (int)partitionCache.size());
        this.sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
        Assertions.assertEquals((int)0, (int)partitionCache.size());
        ((SharePartition)Mockito.verify((Object)sp0)).markFenced();
        ((SharePartition)Mockito.verify((Object)sp1)).markFenced();
        ((SharePartition)Mockito.verify((Object)sp2)).markFenced();
        ((SharePartition)Mockito.verify((Object)sp3)).markFenced();
    }

    @Test
    public void testOnShareVersionToggleWhenEnabledFromConfig() {
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.put(new SharePartitionKey("grp", new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))), sp0);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withPartitionCache(partitionCache).build();
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        this.sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, true);
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)0))).markFenced();
    }

    @Test
    public void testShareGroupListener() {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        Uuid memberId1 = Uuid.randomUuid();
        Uuid memberId2 = Uuid.randomUuid();
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        ShareSessionCache cache = new ShareSessionCache(10);
        cache.maybeCreateSession(groupId, memberId1, new ImplicitLinkedHashCollection(), CONNECTION_ID);
        cache.maybeCreateSession(groupId, memberId2, new ImplicitLinkedHashCollection(), "id-2");
        SharePartitionCache partitionCache = new SharePartitionCache();
        partitionCache.computeIfAbsent(new SharePartitionKey(groupId, tp0), k -> sp0);
        partitionCache.computeIfAbsent(new SharePartitionKey(groupId, tp1), k -> sp1);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).build();
        Assertions.assertEquals((int)2, (int)cache.size());
        Assertions.assertEquals((int)2, (int)partitionCache.size());
        cache.connectionDisconnectListener().onDisconnect(CONNECTION_ID);
        Assertions.assertEquals((int)1, (int)cache.size());
        Assertions.assertEquals((int)2, (int)partitionCache.size());
        Assertions.assertNotNull((Object)cache.get(new ShareSessionKey(groupId, memberId2)));
        cache.connectionDisconnectListener().onDisconnect("id-2");
        Assertions.assertEquals((int)0, (int)cache.size());
        Assertions.assertEquals((int)0, (int)partitionCache.size());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).markFenced();
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)1))).markFenced();
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)2))).removeListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
    }

    @Test
    public void testShareGroupListenerWithEmptyCache() {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        Uuid memberId1 = Uuid.randomUuid();
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        ShareSessionCache cache = new ShareSessionCache(10);
        cache.maybeCreateSession(groupId, memberId1, new ImplicitLinkedHashCollection(), CONNECTION_ID);
        SharePartitionCache partitionCache = (SharePartitionCache)Mockito.spy((Object)new SharePartitionCache());
        partitionCache.computeIfAbsent(new SharePartitionKey(groupId, tp0), k -> sp0);
        this.sharePartitionManager = SharePartitionManagerBuilder.builder().withCache(cache).withPartitionCache(partitionCache).withReplicaManager(this.mockReplicaManager).build();
        Assertions.assertEquals((int)1, (int)cache.size());
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        this.sharePartitionManager.onShareVersionToggle(ShareVersion.SV_0, false);
        Assertions.assertEquals((int)0, (int)cache.size());
        Assertions.assertEquals((int)0, (int)partitionCache.size());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).markFenced();
        ((ReplicaManager)Mockito.verify((Object)this.mockReplicaManager, (VerificationMode)Mockito.times((int)1))).removeListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
        ((SharePartitionCache)Mockito.verify((Object)partitionCache, (VerificationMode)Mockito.times((int)0))).topicIdPartitionsForGroup(groupId);
        cache.connectionDisconnectListener().onDisconnect(CONNECTION_ID);
        ((SharePartitionCache)Mockito.verify((Object)partitionCache, (VerificationMode)Mockito.times((int)1))).topicIdPartitionsForGroup(groupId);
    }

    private Timer systemTimerReaper() {
        return new SystemTimerReaper("share-partition-manager-test-reaper", (Timer)new SystemTimer("share-partition-manager-test-timer"));
    }

    private void assertNoReaperThreadsPendingClose() throws InterruptedException {
        TestUtils.waitForCondition(() -> Thread.getAllStackTraces().keySet().stream().noneMatch(t -> t.getName().contains(TIMER_NAME_PREFIX)), (String)"Found unexpected reaper threads with name containing: share-partition-manager");
    }

    private void testSharePartitionListener(SharePartitionKey sharePartitionKey, SharePartitionCache partitionCache, ReplicaManager mockReplicaManager, Consumer<TopicPartition> listenerConsumer) {
        TopicPartition tp = new TopicPartition("foo", 1);
        TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
        SharePartitionKey spk = new SharePartitionKey("grp", tpId);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        partitionCache.put(sharePartitionKey, sp0);
        partitionCache.put(spk, sp1);
        listenerConsumer.accept(sharePartitionKey.topicIdPartition().topicPartition());
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        Assertions.assertFalse((boolean)partitionCache.containsKey(sharePartitionKey));
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).markFenced();
        ((ReplicaManager)Mockito.verify((Object)mockReplicaManager, (VerificationMode)Mockito.times((int)1))).removeListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
        listenerConsumer.accept(tp);
        Assertions.assertEquals((int)1, (int)partitionCache.size());
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)0))).markFenced();
        ((ReplicaManager)Mockito.verify((Object)mockReplicaManager, (VerificationMode)Mockito.times((int)1))).removeListener((TopicPartition)ArgumentMatchers.any(), (PartitionListener)ArgumentMatchers.any());
    }

    private ShareFetchResponseData.PartitionData noErrorShareFetchResponse() {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
    }

    private ShareFetchResponseData.PartitionData errorShareFetchResponse(Short errorCode) {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(0).setErrorCode(errorCode.shortValue());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void mockUpdateAndGenerateResponseData(ShareFetchContext context, String groupId, Uuid memberId) {
        LinkedHashMap data = new LinkedHashMap();
        if (context.getClass() == ShareSessionContext.class) {
            ShareSessionContext shareSessionContext = (ShareSessionContext)context;
            if (!shareSessionContext.isSubsequent()) {
                shareSessionContext.shareFetchData().forEach(topicIdPartition -> data.put(topicIdPartition, topicIdPartition.topic() == null ? this.errorShareFetchResponse(Errors.UNKNOWN_TOPIC_ID.code()) : this.noErrorShareFetchResponse()));
            } else {
                ShareSession shareSession = shareSessionContext.session();
                synchronized (shareSession) {
                    shareSessionContext.session().partitionMap().forEach(cachedSharePartition -> {
                        TopicIdPartition topicIdPartition;
                        data.put(topicIdPartition, (topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()))).topic() == null ? this.errorShareFetchResponse(Errors.UNKNOWN_TOPIC_ID.code()) : this.noErrorShareFetchResponse());
                    });
                }
            }
        }
        context.updateAndGenerateResponseData(groupId, memberId, data);
    }

    private void assertPartitionsPresent(ShareSessionContext context, List<TopicIdPartition> partitions) {
        HashSet partitionsInContext = new HashSet();
        if (!context.isSubsequent()) {
            partitionsInContext.addAll(context.shareFetchData());
        } else {
            context.session().partitionMap().forEach(cachedSharePartition -> {
                TopicIdPartition topicIdPartition = new TopicIdPartition(cachedSharePartition.topicId(), new TopicPartition(cachedSharePartition.topic(), cachedSharePartition.partition()));
                partitionsInContext.add(topicIdPartition);
            });
        }
        HashSet<TopicIdPartition> partitionsSet = new HashSet<TopicIdPartition>(partitions);
        Assertions.assertEquals(partitionsSet, partitionsInContext);
    }

    private void assertErroneousAndValidTopicIdPartitions(ErroneousAndValidPartitionData erroneousAndValidPartitionData, List<TopicIdPartition> expectedErroneous, List<TopicIdPartition> expectedValid) {
        HashSet<TopicIdPartition> expectedErroneousSet = new HashSet<TopicIdPartition>(expectedErroneous);
        HashSet<TopicIdPartition> expectedValidSet = new HashSet<TopicIdPartition>(expectedValid);
        HashSet actualErroneousPartitions = new HashSet();
        erroneousAndValidPartitionData.erroneous().forEach((topicIdPartition, partitionData) -> actualErroneousPartitions.add(topicIdPartition));
        HashSet actualValidPartitions = new HashSet(erroneousAndValidPartitionData.validTopicIdPartitions());
        Assertions.assertEquals(expectedErroneousSet, actualErroneousPartitions);
        Assertions.assertEquals(expectedValidSet, actualValidPartitions);
    }

    private Partition mockPartition() {
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.isLeader()).thenReturn((Object)true);
        Mockito.when((Object)partition.getLeaderEpoch()).thenReturn((Object)1);
        return partition;
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future, TopicIdPartition topicIdPartition, Errors error) {
        this.validateShareFetchFutureException(future, List.of(topicIdPartition), error, null);
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future, TopicIdPartition topicIdPartition, Errors error, String message) {
        this.validateShareFetchFutureException(future, List.of(topicIdPartition), error, message);
    }

    private void validateShareFetchFutureException(CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future, List<TopicIdPartition> topicIdPartitions, Errors error, String message) {
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> result = future.join();
        Assertions.assertEquals((int)topicIdPartitions.size(), (int)result.size());
        topicIdPartitions.forEach(topicIdPartition -> {
            Assertions.assertTrue((boolean)result.containsKey(topicIdPartition));
            Assertions.assertEquals((int)topicIdPartition.partition(), (int)((ShareFetchResponseData.PartitionData)result.get(topicIdPartition)).partitionIndex());
            Assertions.assertEquals((short)error.code(), (short)((ShareFetchResponseData.PartitionData)result.get(topicIdPartition)).errorCode());
            Assertions.assertEquals((Object)message, (Object)((ShareFetchResponseData.PartitionData)result.get(topicIdPartition)).errorMessage());
        });
    }

    private void mockFetchOffsetForTimestamp(ReplicaManager replicaManager) {
        FileRecords.TimestampAndOffset timestampAndOffset = new FileRecords.TimestampAndOffset(-1L, 0L, Optional.empty());
        ((ReplicaManager)Mockito.doReturn((Object)new OffsetResultHolder(Optional.of(timestampAndOffset), Optional.empty())).when((Object)replicaManager)).fetchOffsetForTimestamp((TopicPartition)Mockito.any(TopicPartition.class), Mockito.anyLong(), (Option)Mockito.any(), (Optional)Mockito.any(), Mockito.anyBoolean());
    }

    private void validateBrokerTopicStatsMetrics(BrokerTopicStats brokerTopicStats, TopicMetrics expectedAllTopicMetrics, Map<String, TopicMetrics> expectedTopicMetrics) {
        if (expectedAllTopicMetrics != null) {
            Assertions.assertEquals((long)expectedAllTopicMetrics.totalShareFetchRequestCount, (long)brokerTopicStats.allTopicsStats().totalShareFetchRequestRate().count());
            Assertions.assertEquals((long)expectedAllTopicMetrics.failedShareFetchRequestCount, (long)brokerTopicStats.allTopicsStats().failedShareFetchRequestRate().count());
            Assertions.assertEquals((long)expectedAllTopicMetrics.totalShareAcknowledgementRequestCount, (long)brokerTopicStats.allTopicsStats().totalShareAcknowledgementRequestRate().count());
            Assertions.assertEquals((long)expectedAllTopicMetrics.failedShareAcknowledgementRequestCount, (long)brokerTopicStats.allTopicsStats().failedShareAcknowledgementRequestRate().count());
        }
        Assertions.assertEquals((int)expectedTopicMetrics.size(), (int)brokerTopicStats.numTopics());
        expectedTopicMetrics.forEach((topic, metrics) -> {
            BrokerTopicMetrics topicMetrics = brokerTopicStats.topicStats(topic);
            Assertions.assertEquals((long)metrics.totalShareFetchRequestCount, (long)topicMetrics.totalShareFetchRequestRate().count());
            Assertions.assertEquals((long)metrics.failedShareFetchRequestCount, (long)topicMetrics.failedShareFetchRequestRate().count());
            Assertions.assertEquals((long)metrics.totalShareAcknowledgementRequestCount, (long)topicMetrics.totalShareAcknowledgementRequestRate().count());
            Assertions.assertEquals((long)metrics.failedShareAcknowledgementRequestCount, (long)topicMetrics.failedShareAcknowledgementRequestRate().count());
        });
    }

    static Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLogReadResult(List<TopicIdPartition> topicIdPartitions) {
        ArrayList logReadResults = new ArrayList();
        topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2(topicIdPartition, (Object)new LogReadResult(new FetchDataInfo(new LogOffsetMetadata(0L, 0L, 0), (Records)MemoryRecords.withRecords((Compression)Compression.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("test-key".getBytes(), "test-value".getBytes())})), Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), OptionalInt.empty(), Optional.empty()))));
        return CollectionConverters.asScala(logReadResults).toSeq();
    }

    static void mockReplicaManagerDelayedShareFetch(ReplicaManager replicaManager, DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
        ((ReplicaManager)Mockito.doAnswer(invocationOnMock -> {
            Object[] args = invocationOnMock.getArguments();
            DelayedShareFetchKey key = (DelayedShareFetchKey)args[0];
            delayedShareFetchPurgatory.checkAndComplete((DelayedOperationKey)key);
            return null;
        }).when((Object)replicaManager)).completeDelayedShareFetchRequest((DelayedShareFetchKey)ArgumentMatchers.any(DelayedShareFetchKey.class));
        ((ReplicaManager)Mockito.doAnswer(invocationOnMock -> {
            Object[] args = invocationOnMock.getArguments();
            DelayedShareFetch operation = (DelayedShareFetch)args[0];
            List keys = (List)args[1];
            delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)operation, keys);
            return null;
        }).when((Object)replicaManager)).addDelayedShareFetchRequest((DelayedShareFetch)ArgumentMatchers.any(), (List)ArgumentMatchers.any());
    }

    static class SharePartitionManagerBuilder {
        private final Persister persister = new NoOpStatePersister();
        private ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        private Time time = new MockTime();
        private ShareSessionCache cache = new ShareSessionCache(10);
        private SharePartitionCache partitionCache = new SharePartitionCache();
        private Timer timer = new MockTimer();
        private ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(this.time);
        private BrokerTopicStats brokerTopicStats;

        SharePartitionManagerBuilder() {
        }

        private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) {
            this.replicaManager = replicaManager;
            return this;
        }

        private SharePartitionManagerBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private SharePartitionManagerBuilder withCache(ShareSessionCache cache) {
            this.cache = cache;
            return this;
        }

        SharePartitionManagerBuilder withPartitionCache(SharePartitionCache partitionCache) {
            this.partitionCache = partitionCache;
            return this;
        }

        private SharePartitionManagerBuilder withTimer(Timer timer) {
            this.timer = timer;
            return this;
        }

        private SharePartitionManagerBuilder withShareGroupMetrics(ShareGroupMetrics shareGroupMetrics) {
            this.shareGroupMetrics = shareGroupMetrics;
            return this;
        }

        private SharePartitionManagerBuilder withBrokerTopicStats(BrokerTopicStats brokerTopicStats) {
            this.brokerTopicStats = brokerTopicStats;
            return this;
        }

        public static SharePartitionManagerBuilder builder() {
            return new SharePartitionManagerBuilder();
        }

        public SharePartitionManager build() {
            return new SharePartitionManager(this.replicaManager, this.time, this.cache, this.partitionCache, 30000, this.timer, 5, 200, 6000L, this.persister, (GroupConfigManager)Mockito.mock(GroupConfigManager.class), this.shareGroupMetrics, this.brokerTopicStats);
        }
    }

    private record TopicMetrics(long totalShareFetchRequestCount, long failedShareFetchRequestCount, long totalShareAcknowledgementRequestCount, long failedShareAcknowledgementRequestCount) {
    }
}

