/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.share;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import kafka.cluster.Partition;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.share.DelayedShareFetch;
import kafka.server.share.PendingRemoteFetches;
import kafka.server.share.ShareFetchUtils;
import kafka.server.share.SharePartition;
import kafka.server.share.SharePartitionManagerTest;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory;
import org.apache.kafka.server.share.SharePartitionKey;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
import org.apache.kafka.server.share.fetch.PartitionMaxBytesStrategy;
import org.apache.kafka.server.share.fetch.ShareFetch;
import org.apache.kafka.server.share.fetch.ShareFetchTestUtils;
import org.apache.kafka.server.share.metrics.ShareGroupMetrics;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.Option;
import scala.Tuple2;
import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;

public class DelayedShareFetchTest {
    private static final int MAX_WAIT_MS = 5000;
    private static final int BATCH_SIZE = 500;
    private static final int MAX_FETCH_RECORDS = 100;
    private static final FetchParams FETCH_PARAMS = new FetchParams(-1, -1L, 5000L, 1, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true);
    private static final FetchDataInfo REMOTE_FETCH_INFO = new FetchDataInfo(new LogOffsetMetadata(0L, 0L, 0), (Records)MemoryRecords.EMPTY, false, Optional.empty(), Optional.of((RemoteStorageFetchInfo)Mockito.mock(RemoteStorageFetchInfo.class)));
    private static final BrokerTopicStats BROKER_TOPIC_STATS = new BrokerTopicStats();
    private Timer mockTimer;

    @BeforeEach
    public void setUp() {
        TestUtils.clearYammerMetrics();
        this.mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper", (Timer)new SystemTimer("DelayedShareFetchTestTimer"));
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.mockTimer.close();
    }

    @Test
    public void testDelayedShareFetchTryCompleteReturnsFalseDueToNonAcquirablePartitions() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics((Time)new MockTime());
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withShareGroupMetrics(shareGroupMetrics).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.tryComplete());
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)0))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        Assertions.assertNull((Object)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId));
        Assertions.assertNull((Object)shareGroupMetrics.topicPartitionsFetchRatio(groupId));
        Assertions.assertEquals((long)0L, (long)delayedShareFetch.expiredRequestMeter().count());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testTryCompleteWhenMinBytesNotSatisfiedOnFirstFetch() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        ShareFetch shareFetch = new ShareFetch(new FetchParams(-1, -1L, 5000L, 2, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp0.acquire((String)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty()).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1L, 1L, 1);
        this.mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = DelayedShareFetchTest.mockExceptionHandler();
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp0));
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)100L).thenReturn((Object)110L);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withExceptionHandler(exceptionHandler).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withShareGroupMetrics(shareGroupMetrics).withTime(time).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertFalse((boolean)delayedShareFetch.tryComplete());
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
        Assertions.assertEquals((double)10.0, (double)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
        Assertions.assertNull((Object)shareGroupMetrics.topicPartitionsFetchRatio(groupId));
        delayedShareFetch.lock().unlock();
        ((BiConsumer)Mockito.verify(exceptionHandler, (VerificationMode)Mockito.times((int)1))).accept((SharePartitionKey)ArgumentMatchers.any(), (Throwable)ArgumentMatchers.any());
    }

    @Test
    public void testTryCompleteWhenMinBytesNotSatisfiedOnSubsequentFetch() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        ShareFetch shareFetch = new ShareFetch(new FetchParams(-1, -1L, 5000L, 2, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp0.acquire((String)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        LogOffsetMetadata hwmOffsetMetadata = (LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class);
        Mockito.when((Object)hwmOffsetMetadata.positionDiff((LogOffsetMetadata)ArgumentMatchers.any())).thenReturn((Object)1);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of((LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class)));
        this.mockTopicIdPartitionFetchBytes(replicaManager, tp0, hwmOffsetMetadata);
        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = DelayedShareFetchTest.mockExceptionHandler();
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withExceptionHandler(exceptionHandler).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertFalse((boolean)delayedShareFetch.tryComplete());
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
        ((BiConsumer)Mockito.verify(exceptionHandler, (VerificationMode)Mockito.times((int)1))).accept((SharePartitionKey)ArgumentMatchers.any(), (Throwable)ArgumentMatchers.any());
    }

    @Test
    public void testDelayedShareFetchTryCompleteReturnsTrue() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp0.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp0));
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)120L).thenReturn((Object)140L);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withShareGroupMetrics(shareGroupMetrics).withTime(time).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
        Assertions.assertEquals((double)20.0, (double)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
        Assertions.assertEquals((double)50.0, (double)shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testEmptyFutureReturnedByDelayedShareFetchOnComplete() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), future, List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)90L).thenReturn((Object)140L);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withShareGroupMetrics(shareGroupMetrics).withTime(time).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        delayedShareFetch.forceComplete();
        Assertions.assertEquals((int)0, (int)((Map)future.join()).size());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)0))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)0))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
        Assertions.assertEquals((double)50.0, (double)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
        Assertions.assertEquals((double)0.0, (double)shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testReplicaManagerFetchShouldHappenOnComplete() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp0.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp0));
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)10L).thenReturn((Object)140L);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withShareGroupMetrics(shareGroupMetrics).withTime(time).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        delayedShareFetch.forceComplete();
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).nextFetchOffset();
        ((SharePartition)Mockito.verify((Object)sp1, (VerificationMode)Mockito.times((int)0))).nextFetchOffset();
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        Assertions.assertTrue((boolean)shareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
        Assertions.assertEquals((double)130.0, (double)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
        Assertions.assertEquals((double)50.0, (double)shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testToCompleteAnAlreadyCompletedFuture() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), future, List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)false);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics((Time)new MockTime());
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withShareGroupMetrics(shareGroupMetrics).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        delayedShareFetch.forceComplete();
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).acquirablePartitions(sharePartitions);
        Assertions.assertEquals((int)0, (int)((Map)future.join()).size());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
        delayedShareFetch.forceComplete();
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).acquirablePartitions(sharePartitions);
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)0))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testForceCompleteTriggersDelayedActionsQueue() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2));
        List<TopicIdPartition> topicIdPartitions1 = List.of(tp0, tp1);
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions1 = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions1.put(tp0, sp0);
        sharePartitions1.put(tp1, sp1);
        sharePartitions1.put(tp2, sp2);
        ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), topicIdPartitions1, 500, 100, BROKER_TOPIC_STATS);
        DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory("TestShareFetch", this.mockTimer, replicaManager.localBrokerId(), 1000, false, true);
        SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch(replicaManager, (DelayedOperationPurgatory<DelayedShareFetch>)delayedShareFetchPurgatory);
        ArrayList delayedShareFetchWatchKeys = new ArrayList();
        topicIdPartitions1.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition())));
        Uuid fetchId1 = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch1 = DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch1).withReplicaManager(replicaManager).withSharePartitions(sharePartitions1).withFetchId(fetchId1).build();
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId1)).thenReturn((Object)false);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId1)).thenReturn((Object)false);
        Mockito.when((Object)sp2.maybeAcquireFetchLock(fetchId1)).thenReturn((Object)false);
        delayedShareFetchPurgatory.tryCompleteElseWatch((DelayedOperation)delayedShareFetch1, delayedShareFetchWatchKeys);
        Assertions.assertEquals((int)2, (int)delayedShareFetchPurgatory.watched());
        Assertions.assertFalse((boolean)shareFetch1.isCompleted());
        Assertions.assertTrue((boolean)delayedShareFetch1.lock().tryLock());
        delayedShareFetch1.lock().unlock();
        ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp1))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp1));
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions2 = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions2.put(tp0, sp0);
        sharePartitions2.put(tp1, sp1);
        sharePartitions2.put(tp2, sp2);
        Uuid fetchId2 = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch2 = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch2).withReplicaManager(replicaManager).withSharePartitions(sharePartitions2).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withFetchId(fetchId2).build());
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId2)).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        delayedShareFetch2.forceComplete();
        Assertions.assertTrue((boolean)delayedShareFetch2.isCompleted());
        Assertions.assertTrue((boolean)shareFetch2.isCompleted());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Assertions.assertFalse((boolean)delayedShareFetch1.isCompleted());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).addToActionQueue((Runnable)ArgumentMatchers.any());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)0))).tryCompleteActions();
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch2, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch2.lock().tryLock());
        delayedShareFetch2.lock().unlock();
    }

    @Test
    public void testCombineLogReadResponse() {
        String groupId = "grp";
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(new FetchParams(-1, -1L, 5000L, 1, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), future, List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp1));
        DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).build();
        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<TopicIdPartition, Long>();
        topicPartitionData.put(tp0, 0L);
        topicPartitionData.put(tp1, 0L);
        LinkedHashMap<TopicIdPartition, LogReadResult> logReadResponse = new LinkedHashMap<TopicIdPartition, LogReadResult>();
        LogReadResult logReadResult = (LogReadResult)Mockito.mock(LogReadResult.class);
        Records records = (Records)Mockito.mock(Records.class);
        Mockito.when((Object)records.sizeInBytes()).thenReturn((Object)2);
        FetchDataInfo fetchDataInfo = new FetchDataInfo((LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class), records);
        Mockito.when((Object)logReadResult.info()).thenReturn((Object)fetchDataInfo);
        logReadResponse.put(tp0, logReadResult);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp1))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        LinkedHashMap combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
        Assertions.assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet());
        Assertions.assertEquals(combinedLogReadResponse.get(tp0), logReadResponse.get(tp0));
        logReadResponse = new LinkedHashMap();
        logReadResponse.put(tp0, (LogReadResult)Mockito.mock(LogReadResult.class));
        logReadResponse.put(tp1, (LogReadResult)Mockito.mock(LogReadResult.class));
        combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
        Assertions.assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet());
        Assertions.assertEquals(combinedLogReadResponse.get(tp0), logReadResponse.get(tp0));
        Assertions.assertEquals(combinedLogReadResponse.get(tp1), logReadResponse.get(tp1));
    }

    @Test
    public void testExceptionInMinBytesCalculation() {
        String groupId = "grp";
        Uuid topicId = Uuid.randomUuid();
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        ShareFetch shareFetch = new ShareFetch(new FetchParams(-1, -1L, 5000L, 1, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.acquire((String)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)replicaManager.getPartitionOrException(tp0.topicPartition())).thenReturn((Object)partition);
        Mockito.when((Object)partition.fetchOffsetSnapshot((Optional)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenThrow(new Throwable[]{new RuntimeException("Exception thrown")});
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp0));
        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = DelayedShareFetchTest.mockExceptionHandler();
        Time time = (Time)Mockito.mock(Time.class);
        Mockito.when((Object)time.hiResClockMs()).thenReturn((Object)100L).thenReturn((Object)110L).thenReturn((Object)170L);
        ShareGroupMetrics shareGroupMetrics = new ShareGroupMetrics(time);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withExceptionHandler(exceptionHandler).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withShareGroupMetrics(shareGroupMetrics).withTime(time).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.tryComplete());
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertTrue((boolean)shareFetch.errorInAllPartitions());
        ((BiConsumer)Mockito.verify(exceptionHandler, (VerificationMode)Mockito.times((int)1))).accept((SharePartitionKey)ArgumentMatchers.any(), (Throwable)ArgumentMatchers.any());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).releaseFetchLock(fetchId);
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)false);
        Assertions.assertTrue((boolean)delayedShareFetch.forceComplete());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        Assertions.assertEquals((long)2L, (long)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).count());
        Assertions.assertEquals((double)70.0, (double)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).sum());
        Assertions.assertEquals((double)10.0, (double)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).min());
        Assertions.assertEquals((double)60.0, (double)shareGroupMetrics.topicPartitionsAcquireTimeMs(groupId).max());
        Assertions.assertEquals((long)1L, (long)shareGroupMetrics.topicPartitionsFetchRatio(groupId).count());
        Assertions.assertEquals((double)0.0, (double)shareGroupMetrics.topicPartitionsFetchRatio(groupId).sum());
        delayedShareFetch.lock().unlock();
        ((BiConsumer)Mockito.verify(exceptionHandler, (VerificationMode)Mockito.times((int)1))).accept((SharePartitionKey)ArgumentMatchers.any(), (Throwable)ArgumentMatchers.any());
    }

    @Test
    public void testTryCompleteLocksReleasedOnCompleteException() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp0));
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException()}).when((Object)delayedShareFetch)).onComplete();
        Assertions.assertFalse((boolean)delayedShareFetch.tryComplete());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks((Set)ArgumentMatchers.any());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).releaseFetchLock(fetchId);
    }

    @Test
    public void testLocksReleasedForCompletedFetch() {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions1 = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions1.put(tp0, sp0);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(List.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = this.mockPartitionMaxBytes(Set.of(tp0));
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions1).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withFetchId(fetchId).build();
        DelayedShareFetch spy = (DelayedShareFetch)Mockito.spy((Object)delayedShareFetch);
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        ((DelayedShareFetch)Mockito.doReturn((Object)false).when((Object)spy)).forceComplete();
        Assertions.assertFalse((boolean)spy.tryComplete());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).releaseFetchLock(fetchId);
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testLocksReleasedAcquireException() {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenThrow(new Throwable[]{new RuntimeException("Acquire exception")});
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withFetchId(fetchId).build();
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.tryComplete());
        ((SharePartition)Mockito.verify((Object)sp0, (VerificationMode)Mockito.times((int)1))).releaseFetchLock(fetchId);
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testTryCompleteWhenPartitionMaxBytesStrategyThrowsException() {
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(new FetchParams(-1, -1L, 5000L, 2, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), future, List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = (PartitionMaxBytesStrategy)Mockito.mock(PartitionMaxBytesStrategy.class);
        Mockito.when((Object)partitionMaxBytesStrategy.maxBytes(ArgumentMatchers.anyInt(), (Set)ArgumentMatchers.any(), ArgumentMatchers.anyInt())).thenThrow(new Throwable[]{new IllegalArgumentException("Exception thrown")});
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withExceptionHandler(DelayedShareFetchTest.mockExceptionHandler()).withPartitionMaxBytesStrategy(partitionMaxBytesStrategy).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)2))).releasePartitionLocks((Set)ArgumentMatchers.any());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Map partitionDataMap = (Map)future.join();
        Assertions.assertEquals((int)1, (int)partitionDataMap.size());
        Assertions.assertTrue((boolean)partitionDataMap.containsKey(tp0));
        Assertions.assertEquals((Object)"Exception thrown", (Object)((ShareFetchResponseData.PartitionData)partitionDataMap.get(tp0)).errorMessage());
    }

    @Test
    public void testPartitionMaxBytesFromUniformStrategyWhenAllPartitionsAreAcquirable() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3));
        TopicIdPartition tp4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 4));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp4 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp4.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        sharePartitions.put(tp3, sp3);
        sharePartitions.put(tp4, sp4);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1, tp2, tp3, tp4), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        Mockito.when((Object)sp1.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        Mockito.when((Object)sp2.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        Mockito.when((Object)sp3.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        Mockito.when((Object)sp4.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(sharePartitions.keySet().stream().toList())).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        Mockito.when((Object)sp2.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        Mockito.when((Object)sp3.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        Mockito.when((Object)sp4.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp2, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp3, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp4, 1);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM)).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp2.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp3.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp4.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        int expectedPartitionMaxBytes = 209715;
        LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap();
        sharePartitions.keySet().forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, new FetchRequest.PartitionData(topicIdPartition.topicId(), 0L, 0L, expectedPartitionMaxBytes, Optional.empty())));
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog(shareFetch.fetchParams(), (Seq)CollectionConverters.asScala(sharePartitions.keySet().stream().map(topicIdPartition -> new Tuple2(topicIdPartition, (Object)((FetchRequest.PartitionData)expectedReadPartitionInfo.get(topicIdPartition)))).collect(Collectors.toList())), QuotaFactory.UNBOUNDED_QUOTA, true);
    }

    @Test
    public void testPartitionMaxBytesFromUniformStrategyWhenFewPartitionsAreAcquirable() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        String groupId = "grp";
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        TopicIdPartition tp3 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 3));
        TopicIdPartition tp4 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 4));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp3 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp4 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp3.canAcquireRecords()).thenReturn((Object)false);
        Mockito.when((Object)sp4.canAcquireRecords()).thenReturn((Object)false);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        sharePartitions.put(tp3, sp3);
        sharePartitions.put(tp4, sp4);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1, tp2, tp3, tp4), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        Mockito.when((Object)sp1.acquire(ArgumentMatchers.anyString(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(FetchPartitionData.class), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        LinkedHashSet<TopicIdPartition> acquirableTopicPartitions = new LinkedHashSet<TopicIdPartition>();
        acquirableTopicPartitions.add(tp0);
        acquirableTopicPartitions.add(tp1);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(acquirableTopicPartitions.stream().toList())).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(0L, 1L, 0)));
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp0, 1);
        DelayedShareFetchTest.mockTopicIdPartitionToReturnDataEqualToMinBytes(replicaManager, tp1, 1);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM)).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp2.maybeAcquireFetchLock(fetchId)).thenReturn((Object)false);
        Mockito.when((Object)sp3.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp4.maybeAcquireFetchLock(fetchId)).thenReturn((Object)false);
        Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        int expectedPartitionMaxBytes = 524288;
        LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap();
        acquirableTopicPartitions.forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, new FetchRequest.PartitionData(topicIdPartition.topicId(), 0L, 0L, expectedPartitionMaxBytes, Optional.empty())));
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog(shareFetch.fetchParams(), (Seq)CollectionConverters.asScala(acquirableTopicPartitions.stream().map(topicIdPartition -> new Tuple2(topicIdPartition, (Object)((FetchRequest.PartitionData)expectedReadPartitionInfo.get(topicIdPartition)))).collect(Collectors.toList())), QuotaFactory.UNBOUNDED_QUOTA, true);
    }

    @Test
    public void testPartitionMaxBytesFromUniformStrategyInCombineLogReadResponse() {
        String groupId = "grp";
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        ShareFetch shareFetch = new ShareFetch(new FetchParams(-1, -1L, 5000L, 1, 0x100000, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1, tp2), 500, 100, BROKER_TOPIC_STATS);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM)).build();
        LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<TopicIdPartition, Long>();
        topicPartitionData.put(tp0, 0L);
        topicPartitionData.put(tp1, 0L);
        topicPartitionData.put(tp2, 0L);
        LinkedHashMap<TopicIdPartition, LogReadResult> logReadResponse = new LinkedHashMap<TopicIdPartition, LogReadResult>();
        LogReadResult logReadResult = (LogReadResult)Mockito.mock(LogReadResult.class);
        Records records = (Records)Mockito.mock(Records.class);
        Mockito.when((Object)records.sizeInBytes()).thenReturn((Object)2);
        FetchDataInfo fetchDataInfo = new FetchDataInfo((LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class), records);
        Mockito.when((Object)logReadResult.info()).thenReturn((Object)fetchDataInfo);
        logReadResponse.put(tp0, logReadResult);
        LinkedHashSet<TopicIdPartition> fetchableTopicPartitions = new LinkedHashSet<TopicIdPartition>();
        fetchableTopicPartitions.add(tp1);
        fetchableTopicPartitions.add(tp2);
        ((ReplicaManager)Mockito.doAnswer(invocation -> SharePartitionManagerTest.buildLogReadResult(fetchableTopicPartitions.stream().toList())).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        LinkedHashMap combinedLogReadResponse = delayedShareFetch.combineLogReadResponse(topicPartitionData, logReadResponse);
        Assertions.assertEquals(topicPartitionData.keySet(), combinedLogReadResponse.keySet());
        int expectedPartitionMaxBytes = 349525;
        LinkedHashMap expectedReadPartitionInfo = new LinkedHashMap();
        fetchableTopicPartitions.forEach(topicIdPartition -> expectedReadPartitionInfo.put(topicIdPartition, new FetchRequest.PartitionData(topicIdPartition.topicId(), 0L, 0L, expectedPartitionMaxBytes, Optional.empty())));
        ((ReplicaManager)Mockito.verify((Object)replicaManager, (VerificationMode)Mockito.times((int)1))).readFromLog(shareFetch.fetchParams(), (Seq)CollectionConverters.asScala(fetchableTopicPartitions.stream().map(topicIdPartition -> new Tuple2(topicIdPartition, (Object)((FetchRequest.PartitionData)expectedReadPartitionInfo.get(topicIdPartition)))).collect(Collectors.toList())), QuotaFactory.UNBOUNDED_QUOTA, true);
    }

    @Test
    public void testOnCompleteExecutionOnTimeout() {
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), new CompletableFuture(), List.of(), 500, 100, BROKER_TOPIC_STATS);
        DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).build();
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertFalse((boolean)shareFetch.isCompleted());
        delayedShareFetch.run();
        Assertions.assertTrue((boolean)shareFetch.isCompleted());
        Assertions.assertEquals((long)1L, (long)delayedShareFetch.expiredRequestMeter().count());
    }

    @Test
    public void testRemoteStorageFetchTryCompleteReturnsFalse() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), new CompletableFuture(), List.of(tp0, tp1, tp2), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp1.nextFetchOffset()).thenReturn((Object)20L);
        Mockito.when((Object)sp2.nextFetchOffset()).thenReturn((Object)30L);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10L, 1L, 0)));
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        Mockito.when((Object)sp2.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        ((ReplicaManager)Mockito.doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        RemoteLogManager remoteLogManager = (RemoteLogManager)Mockito.mock(RemoteLogManager.class);
        Mockito.when((Object)remoteLogManager.asyncRead((RemoteStorageFetchInfo)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any())).thenReturn((Object)((Future)Mockito.mock(Future.class)));
        Mockito.when((Object)replicaManager.remoteLogManager()).thenReturn((Object)Option.apply((Object)remoteLogManager));
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(this.mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp2.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertFalse((boolean)delayedShareFetch.tryComplete());
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertNotNull((Object)delayedShareFetch.pendingRemoteFetches());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0, tp1));
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testRemoteStorageFetchTryCompleteThrowsException() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0, tp1, tp2), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp1.nextFetchOffset()).thenReturn((Object)20L);
        Mockito.when((Object)sp2.nextFetchOffset()).thenReturn((Object)25L);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        Mockito.when((Object)sp2.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        ((ReplicaManager)Mockito.doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1, tp2))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        Future remoteFetchTask = (Future)Mockito.mock(Future.class);
        ((Future)Mockito.doAnswer(invocation -> {
            Mockito.when((Object)remoteFetchTask.isCancelled()).thenReturn((Object)true);
            return false;
        }).when((Object)remoteFetchTask)).cancel(false);
        RemoteLogManager remoteLogManager = (RemoteLogManager)Mockito.mock(RemoteLogManager.class);
        Mockito.when((Object)remoteLogManager.asyncRead((RemoteStorageFetchInfo)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any())).thenReturn((Object)remoteFetchTask).thenThrow(new Throwable[]{new RejectedExecutionException("Exception thrown")});
        Mockito.when((Object)replicaManager.remoteLogManager()).thenReturn((Object)Option.apply((Object)remoteLogManager));
        BiConsumer<SharePartitionKey, Throwable> exceptionHandler = DelayedShareFetchTest.mockExceptionHandler();
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withExceptionHandler(exceptionHandler).withPartitionMaxBytesStrategy(this.mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp2.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        Assertions.assertTrue((boolean)shareFetch.isCompleted());
        Assertions.assertTrue((boolean)remoteFetchTask.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        Assertions.assertEquals(Set.of(tp1, tp2), ((Map)future.join()).keySet());
        ((BiConsumer)Mockito.verify(exceptionHandler, (VerificationMode)Mockito.times((int)2))).accept((SharePartitionKey)ArgumentMatchers.any(), (Throwable)ArgumentMatchers.any());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0));
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp1, tp2));
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).onComplete();
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0, tp1, tp2), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp1.nextFetchOffset()).thenReturn((Object)20L);
        Mockito.when((Object)sp2.nextFetchOffset()).thenReturn((Object)30L);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10L, 1L, 0)));
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        Mockito.when((Object)sp2.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);){
            LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
            partitionDataMap.put(tp0, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            partitionDataMap.put(tp1, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse((ShareFetch)((ShareFetch)ArgumentMatchers.any()), (List)((List)ArgumentMatchers.any()), (LinkedHashMap)((LinkedHashMap)ArgumentMatchers.any()), (ReplicaManager)((ReplicaManager)ArgumentMatchers.any()), (BiConsumer)((BiConsumer)ArgumentMatchers.any()))).thenReturn(partitionDataMap);
            ((ReplicaManager)Mockito.doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of())).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
            Future remoteFetchTask = (Future)Mockito.mock(Future.class);
            ((Future)Mockito.doAnswer(invocation -> {
                Mockito.when((Object)remoteFetchTask.isCancelled()).thenReturn((Object)true);
                return false;
            }).when((Object)remoteFetchTask)).cancel(false);
            Mockito.when((Object)remoteFetchTask.cancel(false)).thenReturn((Object)true);
            RemoteLogManager remoteLogManager = (RemoteLogManager)Mockito.mock(RemoteLogManager.class);
            Mockito.when((Object)remoteLogManager.asyncRead((RemoteStorageFetchInfo)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any())).thenReturn((Object)remoteFetchTask);
            Mockito.when((Object)replicaManager.remoteLogManager()).thenReturn((Object)Option.apply((Object)remoteLogManager));
            Mockito.when((Object)replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(new Throwable[]{(Throwable)Mockito.mock(KafkaStorageException.class)});
            ((ReplicaManager)Mockito.doAnswer(invocationOnMock -> {
                TimerTask timerTask = (TimerTask)invocationOnMock.getArgument(0);
                timerTask.run();
                return null;
            }).when((Object)replicaManager)).addShareFetchTimerRequest((TimerTask)ArgumentMatchers.any());
            Uuid fetchId = Uuid.randomUuid();
            DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(this.mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))).withFetchId(fetchId).build());
            Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
            Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
            Mockito.when((Object)sp2.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
            Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
            Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
            Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
            Assertions.assertNotNull((Object)delayedShareFetch.pendingRemoteFetches());
            List remoteFetches = delayedShareFetch.pendingRemoteFetches().remoteFetches();
            Assertions.assertEquals((int)1, (int)remoteFetches.size());
            Assertions.assertTrue((boolean)((PendingRemoteFetches.RemoteFetch)remoteFetches.get(0)).remoteFetchTask().isCancelled());
            ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0, tp1, tp2));
            Assertions.assertTrue((boolean)shareFetch.isCompleted());
            Assertions.assertEquals(Set.of(tp0, tp1), ((Map)future.join()).keySet());
            Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
            delayedShareFetch.lock().unlock();
        }
    }

    @Test
    public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)false);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        ((ReplicaManager)Mockito.doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(Optional.empty(), Optional.of(new TimeoutException("Error occurred while creating remote fetch result")));
        RemoteLogManager remoteLogManager = (RemoteLogManager)Mockito.mock(RemoteLogManager.class);
        ((RemoteLogManager)Mockito.doAnswer(invocationOnMock -> {
            Consumer callback = (Consumer)invocationOnMock.getArgument(1);
            callback.accept(remoteFetchResult);
            return CompletableFuture.completedFuture(remoteFetchResult);
        }).when((Object)remoteLogManager)).asyncRead((RemoteStorageFetchInfo)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        Mockito.when((Object)replicaManager.remoteLogManager()).thenReturn((Object)Option.apply((Object)remoteLogManager));
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(this.mockPartitionMaxBytes(Set.of(tp0, tp1))).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)false);
        Mockito.when((Object)sp0.acquire((String)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyLong(), (FetchPartitionData)ArgumentMatchers.any(), (FetchIsolation)ArgumentMatchers.any())).thenReturn((Object)ShareFetchTestUtils.createShareAcquiredRecords((ShareFetchResponseData.AcquiredRecords)new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0L).setLastOffset(3L).setDeliveryCount((short)1)));
        Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
        Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
        Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
        Assertions.assertNotNull((Object)delayedShareFetch.pendingRemoteFetches());
        ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0));
        Assertions.assertTrue((boolean)shareFetch.isCompleted());
        Assertions.assertEquals(Set.of(tp0), ((Map)future.join()).keySet());
        Assertions.assertEquals((short)Errors.REQUEST_TIMED_OUT.code(), (short)((ShareFetchResponseData.PartitionData)((Map)future.join()).get(tp0)).errorCode());
        Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
        delayedShareFetch.lock().unlock();
    }

    @Test
    public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        ((ReplicaManager)Mockito.doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty());
        RemoteLogManager remoteLogManager = (RemoteLogManager)Mockito.mock(RemoteLogManager.class);
        ((RemoteLogManager)Mockito.doAnswer(invocationOnMock -> {
            Consumer callback = (Consumer)invocationOnMock.getArgument(1);
            callback.accept(remoteFetchResult);
            return CompletableFuture.completedFuture(remoteFetchResult);
        }).when((Object)remoteLogManager)).asyncRead((RemoteStorageFetchInfo)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        Mockito.when((Object)replicaManager.remoteLogManager()).thenReturn((Object)Option.apply((Object)remoteLogManager));
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(this.mockPartitionMaxBytes(Set.of(tp0))).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);){
            LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
            partitionDataMap.put(tp0, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse((ShareFetch)((ShareFetch)ArgumentMatchers.any()), (List)((List)ArgumentMatchers.any()), (LinkedHashMap)((LinkedHashMap)ArgumentMatchers.any()), (ReplicaManager)((ReplicaManager)ArgumentMatchers.any()), (BiConsumer)((BiConsumer)ArgumentMatchers.any()))).thenReturn(partitionDataMap);
            Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
            Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
            Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
            Assertions.assertNotNull((Object)delayedShareFetch.pendingRemoteFetches());
            ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0));
            Assertions.assertTrue((boolean)shareFetch.isCompleted());
            Assertions.assertEquals(Set.of(tp0), ((Map)future.join()).keySet());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)((Map)future.join()).get(tp0)).errorCode());
            Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
            delayedShareFetch.lock().unlock();
        }
    }

    @Test
    public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp2 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp2.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        sharePartitions.put(tp2, sp2);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0, tp1, tp2), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp1.nextFetchOffset()).thenReturn((Object)20L);
        Mockito.when((Object)sp2.nextFetchOffset()).thenReturn((Object)30L);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        Mockito.when((Object)sp2.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);){
            LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
            partitionDataMap.put(tp0, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            partitionDataMap.put(tp1, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            partitionDataMap.put(tp2, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse((ShareFetch)((ShareFetch)ArgumentMatchers.any()), (List)((List)ArgumentMatchers.any()), (LinkedHashMap)((LinkedHashMap)ArgumentMatchers.any()), (ReplicaManager)((ReplicaManager)ArgumentMatchers.any()), (BiConsumer)((BiConsumer)ArgumentMatchers.any()))).thenReturn(partitionDataMap);
            ((ReplicaManager)Mockito.doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of(tp2))).doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of())).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
            RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty());
            RemoteLogManager remoteLogManager = (RemoteLogManager)Mockito.mock(RemoteLogManager.class);
            ((RemoteLogManager)Mockito.doAnswer(invocationOnMock -> {
                Consumer callback = (Consumer)invocationOnMock.getArgument(1);
                callback.accept(remoteFetchResult);
                return CompletableFuture.completedFuture(remoteFetchResult);
            }).when((Object)remoteLogManager)).asyncRead((RemoteStorageFetchInfo)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
            Mockito.when((Object)replicaManager.remoteLogManager()).thenReturn((Object)Option.apply((Object)remoteLogManager));
            Uuid fetchId = Uuid.randomUuid();
            DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(this.mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))).withFetchId(fetchId).build());
            Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
            Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
            Mockito.when((Object)sp2.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
            Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
            Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
            Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
            Assertions.assertNotNull((Object)delayedShareFetch.pendingRemoteFetches());
            Assertions.assertTrue((boolean)shareFetch.isCompleted());
            Assertions.assertEquals(Set.of(tp0, tp1, tp2), ((Map)future.join()).keySet());
            ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0, tp1, tp2));
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)((Map)future.join()).get(tp0)).errorCode());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)((Map)future.join()).get(tp1)).errorCode());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)((Map)future.join()).get(tp2)).errorCode());
            Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
            delayedShareFetch.lock().unlock();
        }
    }

    @Test
    public void testRemoteStorageFetchHappensForAllTopicPartitions() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        SharePartition sp1 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp1.canAcquireRecords()).thenReturn((Object)true);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        sharePartitions.put(tp1, sp1);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0, tp1), 500, 100, BROKER_TOPIC_STATS);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp1.nextFetchOffset()).thenReturn((Object)10L);
        Mockito.when((Object)sp0.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        Mockito.when((Object)sp1.fetchOffsetMetadata(ArgumentMatchers.anyLong())).thenReturn(Optional.empty());
        LinkedHashSet<TopicIdPartition> remoteStorageFetchPartitions = new LinkedHashSet<TopicIdPartition>();
        remoteStorageFetchPartitions.add(tp0);
        remoteStorageFetchPartitions.add(tp1);
        ((ReplicaManager)Mockito.doAnswer(invocation -> this.buildLocalAndRemoteFetchResult(Set.of(), remoteStorageFetchPartitions)).when((Object)replicaManager)).readFromLog((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(ReplicaQuota.class), ArgumentMatchers.anyBoolean());
        RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty());
        RemoteLogManager remoteLogManager = (RemoteLogManager)Mockito.mock(RemoteLogManager.class);
        ((RemoteLogManager)Mockito.doAnswer(invocationOnMock -> {
            Consumer callback = (Consumer)invocationOnMock.getArgument(1);
            callback.accept(remoteFetchResult);
            return CompletableFuture.completedFuture(remoteFetchResult);
        }).when((Object)remoteLogManager)).asyncRead((RemoteStorageFetchInfo)ArgumentMatchers.any(), (Consumer)ArgumentMatchers.any());
        Mockito.when((Object)replicaManager.remoteLogManager()).thenReturn((Object)Option.apply((Object)remoteLogManager));
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withSharePartitions(sharePartitions).withReplicaManager(replicaManager).withPartitionMaxBytesStrategy(this.mockPartitionMaxBytes(Set.of(tp0, tp1))).withFetchId(fetchId).build());
        Mockito.when((Object)sp0.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        Mockito.when((Object)sp1.maybeAcquireFetchLock(fetchId)).thenReturn((Object)true);
        try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);){
            LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
            partitionDataMap.put(tp0, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            partitionDataMap.put(tp1, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse((ShareFetch)((ShareFetch)ArgumentMatchers.any()), (List)((List)ArgumentMatchers.any()), (LinkedHashMap)((LinkedHashMap)ArgumentMatchers.any()), (ReplicaManager)((ReplicaManager)ArgumentMatchers.any()), (BiConsumer)((BiConsumer)ArgumentMatchers.any()))).thenReturn(partitionDataMap);
            Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
            Assertions.assertTrue((boolean)delayedShareFetch.tryComplete());
            Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
            Assertions.assertNotNull((Object)delayedShareFetch.pendingRemoteFetches());
            ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0, tp1));
            Assertions.assertTrue((boolean)shareFetch.isCompleted());
            Assertions.assertEquals(Set.of(tp0, tp1), ((Map)future.join()).keySet());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)((Map)future.join()).get(tp0)).errorCode());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)((ShareFetchResponseData.PartitionData)((Map)future.join()).get(tp1)).errorCode());
            Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
            delayedShareFetch.lock().unlock();
        }
    }

    @Test
    public void testRemoteStorageFetchCompletionPostRegisteringCallbackByPendingFetchesCompletion() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        PendingRemoteFetches pendingRemoteFetches = (PendingRemoteFetches)Mockito.mock(PendingRemoteFetches.class);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM)).withPendingRemoteFetches(pendingRemoteFetches).withFetchId(fetchId).build());
        LinkedHashMap<TopicIdPartition, Long> partitionsAcquired = new LinkedHashMap<TopicIdPartition, Long>();
        partitionsAcquired.put(tp0, 10L);
        delayedShareFetch.updatePartitionsAcquired(partitionsAcquired);
        PendingRemoteFetches.RemoteFetch remoteFetch = (PendingRemoteFetches.RemoteFetch)Mockito.mock(PendingRemoteFetches.RemoteFetch.class);
        Mockito.when((Object)remoteFetch.topicIdPartition()).thenReturn((Object)tp0);
        Mockito.when((Object)remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture(new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty())));
        Mockito.when((Object)remoteFetch.logReadResult()).thenReturn((Object)new LogReadResult(REMOTE_FETCH_INFO, Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), OptionalInt.empty(), Optional.empty()));
        Mockito.when((Object)pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
        Mockito.when((Object)pendingRemoteFetches.isDone()).thenReturn((Object)false);
        ((PendingRemoteFetches)Mockito.doAnswer(invocationOnMock -> {
            BiConsumer callback = (BiConsumer)invocationOnMock.getArgument(0);
            callback.accept((Void)Mockito.mock(Void.class), null);
            return null;
        }).when((Object)pendingRemoteFetches)).invokeCallbackOnCompletion((BiConsumer)ArgumentMatchers.any());
        try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);){
            LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
            partitionDataMap.put(tp0, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse((ShareFetch)((ShareFetch)ArgumentMatchers.any()), (List)((List)ArgumentMatchers.any()), (LinkedHashMap)((LinkedHashMap)ArgumentMatchers.any()), (ReplicaManager)((ReplicaManager)ArgumentMatchers.any()), (BiConsumer)((BiConsumer)ArgumentMatchers.any()))).thenReturn(partitionDataMap);
            Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
            delayedShareFetch.forceComplete();
            Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
            Assertions.assertTrue((boolean)shareFetch.isCompleted());
            Assertions.assertEquals(Set.of(tp0), ((Map)future.join()).keySet());
            ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0));
            Assertions.assertTrue((boolean)delayedShareFetch.outsidePurgatoryCallbackLock());
            Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
            delayedShareFetch.lock().unlock();
        }
    }

    @Test
    public void testRemoteStorageFetchCompletionPostRegisteringCallbackByTimerTaskCompletion() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0));
        SharePartition sp0 = (SharePartition)Mockito.mock(SharePartition.class);
        Mockito.when((Object)sp0.canAcquireRecords()).thenReturn((Object)true);
        Mockito.when((Object)sp0.nextFetchOffset()).thenReturn((Object)10L);
        LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<TopicIdPartition, SharePartition>();
        sharePartitions.put(tp0, sp0);
        CompletableFuture future = new CompletableFuture();
        ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), future, List.of(tp0), 500, 100, BROKER_TOPIC_STATS);
        PendingRemoteFetches pendingRemoteFetches = (PendingRemoteFetches)Mockito.mock(PendingRemoteFetches.class);
        Uuid fetchId = Uuid.randomUuid();
        DelayedShareFetch delayedShareFetch = (DelayedShareFetch)Mockito.spy((Object)DelayedShareFetchBuilder.builder().withShareFetchData(shareFetch).withReplicaManager(replicaManager).withSharePartitions(sharePartitions).withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type((PartitionMaxBytesStrategy.StrategyType)PartitionMaxBytesStrategy.StrategyType.UNIFORM)).withPendingRemoteFetches(pendingRemoteFetches).withFetchId(fetchId).build());
        LinkedHashMap<TopicIdPartition, Long> partitionsAcquired = new LinkedHashMap<TopicIdPartition, Long>();
        partitionsAcquired.put(tp0, 10L);
        delayedShareFetch.updatePartitionsAcquired(partitionsAcquired);
        PendingRemoteFetches.RemoteFetch remoteFetch = (PendingRemoteFetches.RemoteFetch)Mockito.mock(PendingRemoteFetches.RemoteFetch.class);
        Mockito.when((Object)remoteFetch.topicIdPartition()).thenReturn((Object)tp0);
        Mockito.when((Object)remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture(new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty())));
        Mockito.when((Object)remoteFetch.logReadResult()).thenReturn((Object)new LogReadResult(REMOTE_FETCH_INFO, Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), OptionalInt.empty(), Optional.empty()));
        Mockito.when((Object)pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch));
        Mockito.when((Object)pendingRemoteFetches.isDone()).thenReturn((Object)false);
        ((PendingRemoteFetches)Mockito.doAnswer(invocationOnMock -> null).when((Object)pendingRemoteFetches)).invokeCallbackOnCompletion((BiConsumer)ArgumentMatchers.any());
        ((ReplicaManager)Mockito.doAnswer(invocationOnMock -> {
            TimerTask timerTask = (TimerTask)invocationOnMock.getArgument(0);
            timerTask.run();
            return null;
        }).when((Object)replicaManager)).addShareFetchTimerRequest((TimerTask)ArgumentMatchers.any());
        try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class);){
            LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
            partitionDataMap.put(tp0, (ShareFetchResponseData.PartitionData)Mockito.mock(ShareFetchResponseData.PartitionData.class));
            mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse((ShareFetch)((ShareFetch)ArgumentMatchers.any()), (List)((List)ArgumentMatchers.any()), (LinkedHashMap)((LinkedHashMap)ArgumentMatchers.any()), (ReplicaManager)((ReplicaManager)ArgumentMatchers.any()), (BiConsumer)((BiConsumer)ArgumentMatchers.any()))).thenReturn(partitionDataMap);
            Assertions.assertFalse((boolean)delayedShareFetch.isCompleted());
            delayedShareFetch.forceComplete();
            Assertions.assertTrue((boolean)delayedShareFetch.isCompleted());
            Assertions.assertTrue((boolean)shareFetch.isCompleted());
            Assertions.assertEquals(Set.of(tp0), ((Map)future.join()).keySet());
            ((DelayedShareFetch)Mockito.verify((Object)delayedShareFetch, (VerificationMode)Mockito.times((int)1))).releasePartitionLocks(Set.of(tp0));
            Assertions.assertTrue((boolean)delayedShareFetch.outsidePurgatoryCallbackLock());
            Assertions.assertTrue((boolean)delayedShareFetch.lock().tryLock());
            delayedShareFetch.lock().unlock();
        }
    }

    static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) {
        LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1L, 1L, minBytes);
        LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1L, (LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class), hwmOffsetMetadata, (LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.isLeader()).thenReturn((Object)true);
        Mockito.when((Object)partition.getLeaderEpoch()).thenReturn((Object)1);
        Mockito.when((Object)partition.fetchOffsetSnapshot((Optional)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)endOffsetSnapshot);
        Mockito.when((Object)replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn((Object)partition);
    }

    private void mockTopicIdPartitionFetchBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, LogOffsetMetadata hwmOffsetMetadata) {
        LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1L, (LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class), hwmOffsetMetadata, (LogOffsetMetadata)Mockito.mock(LogOffsetMetadata.class));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        Mockito.when((Object)partition.fetchOffsetSnapshot((Optional)ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn((Object)endOffsetSnapshot);
        Mockito.when((Object)replicaManager.getPartitionOrException(topicIdPartition.topicPartition())).thenReturn((Object)partition);
    }

    private PartitionMaxBytesStrategy mockPartitionMaxBytes(Set<TopicIdPartition> partitions) {
        PartitionMaxBytesStrategy partitionMaxBytesStrategy = (PartitionMaxBytesStrategy)Mockito.mock(PartitionMaxBytesStrategy.class);
        LinkedHashMap maxBytes = new LinkedHashMap();
        partitions.forEach(partition -> maxBytes.put(partition, 1));
        Mockito.when((Object)partitionMaxBytesStrategy.maxBytes(ArgumentMatchers.anyInt(), (Set)ArgumentMatchers.any(), ArgumentMatchers.anyInt())).thenReturn(maxBytes);
        return partitionMaxBytesStrategy;
    }

    private Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLocalAndRemoteFetchResult(Set<TopicIdPartition> localLogReadTopicIdPartitions, Set<TopicIdPartition> remoteReadTopicIdPartitions) {
        ArrayList logReadResults = new ArrayList();
        localLogReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2(topicIdPartition, (Object)new LogReadResult(new FetchDataInfo(new LogOffsetMetadata(0L, 0L, 0), (Records)MemoryRecords.EMPTY), Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), OptionalInt.empty(), Optional.empty()))));
        remoteReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2(topicIdPartition, (Object)new LogReadResult(REMOTE_FETCH_INFO, Optional.empty(), -1L, -1L, -1L, -1L, -1L, OptionalLong.empty(), OptionalInt.empty(), Optional.empty()))));
        return CollectionConverters.asScala(logReadResults).toSeq();
    }

    private static BiConsumer<SharePartitionKey, Throwable> mockExceptionHandler() {
        return (BiConsumer)Mockito.mock(BiConsumer.class);
    }

    static class DelayedShareFetchBuilder {
        private ShareFetch shareFetch = (ShareFetch)Mockito.mock(ShareFetch.class);
        private ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        private BiConsumer<SharePartitionKey, Throwable> exceptionHandler = DelayedShareFetchTest.mockExceptionHandler();
        private LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = (LinkedHashMap)Mockito.mock(LinkedHashMap.class);
        private PartitionMaxBytesStrategy partitionMaxBytesStrategy = (PartitionMaxBytesStrategy)Mockito.mock(PartitionMaxBytesStrategy.class);
        private Time time = new MockTime();
        private Optional<PendingRemoteFetches> pendingRemoteFetches = Optional.empty();
        private ShareGroupMetrics shareGroupMetrics = (ShareGroupMetrics)Mockito.mock(ShareGroupMetrics.class);
        private Uuid fetchId = Uuid.randomUuid();

        DelayedShareFetchBuilder() {
        }

        DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) {
            this.shareFetch = shareFetch;
            return this;
        }

        DelayedShareFetchBuilder withReplicaManager(ReplicaManager replicaManager) {
            this.replicaManager = replicaManager;
            return this;
        }

        DelayedShareFetchBuilder withExceptionHandler(BiConsumer<SharePartitionKey, Throwable> exceptionHandler) {
            this.exceptionHandler = exceptionHandler;
            return this;
        }

        DelayedShareFetchBuilder withSharePartitions(LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions) {
            this.sharePartitions = sharePartitions;
            return this;
        }

        DelayedShareFetchBuilder withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy partitionMaxBytesStrategy) {
            this.partitionMaxBytesStrategy = partitionMaxBytesStrategy;
            return this;
        }

        private DelayedShareFetchBuilder withShareGroupMetrics(ShareGroupMetrics shareGroupMetrics) {
            this.shareGroupMetrics = shareGroupMetrics;
            return this;
        }

        private DelayedShareFetchBuilder withTime(Time time) {
            this.time = time;
            return this;
        }

        private DelayedShareFetchBuilder withPendingRemoteFetches(PendingRemoteFetches pendingRemoteFetches) {
            this.pendingRemoteFetches = Optional.of(pendingRemoteFetches);
            return this;
        }

        private DelayedShareFetchBuilder withFetchId(Uuid fetchId) {
            this.fetchId = fetchId;
            return this;
        }

        public static DelayedShareFetchBuilder builder() {
            return new DelayedShareFetchBuilder();
        }

        public DelayedShareFetch build() {
            return new DelayedShareFetch(this.shareFetch, this.replicaManager, this.exceptionHandler, this.sharePartitions, this.partitionMaxBytesStrategy, this.shareGroupMetrics, this.time, this.pendingRemoteFetches, this.fetchId, 6000L);
        }
    }
}

