/*
 * Decompiled with CFR 0.152.
 */
package kafka.test.api;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import kafka.api.BaseConsumerTest;
import kafka.server.BrokerServer;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigsOptions;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Timeout(value=1200L)
@Tag(value="integration")
public class ShareConsumerTest {
    private KafkaClusterTestKit cluster;
    private final TopicPartition tp = new TopicPartition("topic", 0);
    private final TopicPartition tp2 = new TopicPartition("topic2", 0);
    private final TopicPartition warmupTp = new TopicPartition("warmup", 0);
    private static final String DEFAULT_STATE_PERSISTER = "org.apache.kafka.server.share.persister.DefaultStatePersister";
    private static final String NO_OP_PERSISTER = "org.apache.kafka.server.share.persister.NoOpShareStatePersister";
    private Admin adminClient;

    @BeforeEach
    public void createCluster(TestInfo testInfo) throws Exception {
        String persisterClassName = NO_OP_PERSISTER;
        if (testInfo.getDisplayName().contains(".persister=")) {
            persisterClassName = testInfo.getDisplayName().split("=")[1];
        }
        this.cluster = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setNumBrokerNodes(1).setNumControllerNodes(1).build()).setConfigProp("auto.create.topics.enable", (Object)"false").setConfigProp("group.coordinator.rebalance.protocols", (Object)"classic,consumer,share").setConfigProp("group.share.enable", (Object)"true").setConfigProp("group.share.partition.max.record.locks", (Object)"10000").setConfigProp("group.share.persister.class.name", (Object)persisterClassName).setConfigProp("group.share.record.lock.duration.ms", (Object)"15000").setConfigProp("offsets.topic.replication.factor", (Object)"1").setConfigProp("share.coordinator.state.topic.min.isr", (Object)"1").setConfigProp("share.coordinator.state.topic.num.partitions", (Object)"3").setConfigProp("share.coordinator.state.topic.replication.factor", (Object)"1").setConfigProp("transaction.state.log.min.isr", (Object)"1").setConfigProp("transaction.state.log.replication.factor", (Object)"1").setConfigProp("unstable.api.versions.enable", (Object)"true").build();
        this.cluster.format();
        this.cluster.startup();
        this.cluster.waitForActiveController();
        this.cluster.waitForReadyBrokers();
        this.createTopic("topic");
        this.createTopic("topic2");
        this.adminClient = this.createAdminClient();
        this.warmup();
    }

    @AfterEach
    public void destroyCluster() throws Exception {
        this.adminClient.close();
        this.cluster.close();
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testPollNoSubscribeFails(String persister) {
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            Assertions.assertEquals(Collections.emptySet(), (Object)shareConsumer.subscription());
            Assertions.assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500L)));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscribeAndPollNoRecords(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            Set<String> subscription = Collections.singleton(this.tp.topic());
            shareConsumer.subscribe(subscription);
            Assertions.assertEquals(subscription, (Object)shareConsumer.subscription());
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscribePollUnsubscribe(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            Set<String> subscription = Collections.singleton(this.tp.topic());
            shareConsumer.subscribe(subscription);
            Assertions.assertEquals(subscription, (Object)shareConsumer.subscription());
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500L));
            shareConsumer.unsubscribe();
            Assertions.assertEquals(Collections.emptySet(), (Object)shareConsumer.subscription());
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscribePollSubscribe(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            Set<String> subscription = Collections.singleton(this.tp.topic());
            shareConsumer.subscribe(subscription);
            Assertions.assertEquals(subscription, (Object)shareConsumer.subscription());
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
            shareConsumer.subscribe(subscription);
            Assertions.assertEquals(subscription, (Object)shareConsumer.subscription());
            records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscribeUnsubscribePollFails(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            Set<String> subscription = Collections.singleton(this.tp.topic());
            shareConsumer.subscribe(subscription);
            Assertions.assertEquals(subscription, (Object)shareConsumer.subscription());
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500L));
            shareConsumer.unsubscribe();
            Assertions.assertEquals(Collections.emptySet(), (Object)shareConsumer.subscription());
            Assertions.assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500L)));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscribeSubscribeEmptyPollFails(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            Set<String> subscription = Collections.singleton(this.tp.topic());
            shareConsumer.subscribe(subscription);
            Assertions.assertEquals(subscription, (Object)shareConsumer.subscription());
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500L));
            shareConsumer.subscribe(Collections.emptySet());
            Assertions.assertEquals(Collections.emptySet(), (Object)shareConsumer.subscription());
            Assertions.assertThrows(IllegalStateException.class, () -> shareConsumer.poll(Duration.ofMillis(500L)));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscriptionAndPoll(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscriptionAndPollMultiple(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            producer.send(record);
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            producer.send(record);
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcknowledgementSentOnSubscriptionChange(String persister) throws ExecutionException, InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap = new HashMap<TopicPartition, Exception>();
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            ProducerRecord record2 = new ProducerRecord(this.tp2.topic(), Integer.valueOf(this.tp2.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record2).get();
            producer.flush();
            shareConsumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "Failed to consume records for share consumer");
            shareConsumer.subscribe(Collections.singletonList(this.tp2.topic()));
            TestUtils.waitForCondition(() -> {
                shareConsumer.poll(Duration.ofMillis(500L));
                return partitionExceptionMap.containsKey(this.tp) && partitionExceptionMap.containsKey(this.tp2);
            }, (long)15000L, (long)100L, () -> "Failed to consume records from the updated subscription");
            Assertions.assertNull(partitionExceptionMap.get(this.tp));
            Assertions.assertNull(partitionExceptionMap.get(this.tp2));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcknowledgementCommitCallbackSuccessfulAcknowledgement(String persister) throws Exception {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap = new HashMap<TopicPartition, Exception>();
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "Failed to consume records for share consumer");
            TestUtils.waitForCondition(() -> {
                shareConsumer.poll(Duration.ofMillis(500L));
                return partitionExceptionMap.containsKey(this.tp);
            }, (long)15000L, (long)100L, () -> "Failed to receive call to callback");
            Assertions.assertNull(partitionExceptionMap.get(this.tp));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcknowledgementCommitCallbackOnClose(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap = new HashMap<TopicPartition, Exception>();
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            shareConsumer.poll(Duration.ofMillis(1000L));
            shareConsumer.close();
            Assertions.assertTrue((boolean)partitionExceptionMap.containsKey(this.tp));
            Assertions.assertNull(partitionExceptionMap.get(this.tp));
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcknowledgementCommitCallbackInvalidRecordStateException(String persister) throws Exception {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap = new HashMap<TopicPartition, Exception>();
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            Thread.sleep(20000L);
            TestUtils.waitForCondition(() -> {
                shareConsumer.poll(Duration.ofMillis(500L));
                return partitionExceptionMap.containsKey(this.tp) && partitionExceptionMap.get(this.tp) instanceof InvalidRecordStateException;
            }, (long)15000L, (long)100L, () -> "Failed to be notified by InvalidRecordStateException");
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testHeaders(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            int numRecords = 1;
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            record.headers().add("headerKey", "headerValue".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            List records = this.consumeRecords(shareConsumer, numRecords);
            Assertions.assertEquals((int)numRecords, (int)records.size());
            for (ConsumerRecord consumerRecord : records) {
                Header header = consumerRecord.headers().lastHeader("headerKey");
                if (header == null) continue;
                Assertions.assertEquals((Object)"headerValue", (Object)new String(header.value()));
            }
        }
    }

    private void testHeadersSerializeDeserialize(Serializer<byte[]> serializer, Deserializer<byte[]> deserializer) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)serializer);
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)deserializer, (Deserializer)new ByteArrayDeserializer(), "group1");){
            int numRecords = 1;
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            List records = this.consumeRecords(shareConsumer, numRecords);
            Assertions.assertEquals((int)numRecords, (int)records.size());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testHeadersSerializerDeserializer(String persister) {
        this.testHeadersSerializeDeserialize(new BaseConsumerTest.SerializerImpl(), new BaseConsumerTest.DeserializerImpl());
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testMaxPollRecords(String persister) {
        int numRecords = 10000;
        int maxPollRecords = 2;
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1", Collections.singletonMap("max.poll.records", String.valueOf(maxPollRecords)));){
            long startingTimestamp = System.currentTimeMillis();
            this.produceMessagesWithTimestamp(numRecords, startingTimestamp);
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            List records = this.consumeRecords(shareConsumer, numRecords);
            long i = 0L;
            for (ConsumerRecord record : records) {
                Assertions.assertEquals((Object)this.tp.topic(), (Object)record.topic());
                Assertions.assertEquals((int)this.tp.partition(), (int)record.partition());
                Assertions.assertEquals((Object)TimestampType.CREATE_TIME, (Object)record.timestampType());
                Assertions.assertEquals((long)(startingTimestamp + i), (long)record.timestamp());
                Assertions.assertEquals((Object)("key " + i), (Object)new String((byte[])record.key()));
                Assertions.assertEquals((Object)("value " + i), (Object)new String((byte[])record.value()));
                Assertions.assertEquals((int)("key " + i).length(), (int)record.serializedKeySize());
                Assertions.assertEquals((int)("value " + i).length(), (int)record.serializedValueSize());
                ++i;
            }
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testControlRecordsSkipped(String persister) throws Exception {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer transactionalProducer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer(), "T1");
             KafkaProducer nonTransactionalProducer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            transactionalProducer.initTransactions();
            transactionalProducer.beginTransaction();
            RecordMetadata transactional1 = (RecordMetadata)transactionalProducer.send(record).get();
            RecordMetadata nonTransactional1 = (RecordMetadata)nonTransactionalProducer.send(record).get();
            transactionalProducer.commitTransaction();
            transactionalProducer.beginTransaction();
            RecordMetadata transactional2 = (RecordMetadata)transactionalProducer.send(record).get();
            transactionalProducer.abortTransaction();
            RecordMetadata nonTransactional2 = (RecordMetadata)nonTransactionalProducer.send(record).get();
            transactionalProducer.close();
            nonTransactionalProducer.close();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)4, (int)records.count());
            Assertions.assertEquals((long)transactional1.offset(), (long)((ConsumerRecord)records.records(this.tp).get(0)).offset());
            Assertions.assertEquals((long)nonTransactional1.offset(), (long)((ConsumerRecord)records.records(this.tp).get(1)).offset());
            Assertions.assertEquals((long)transactional2.offset(), (long)((ConsumerRecord)records.records(this.tp).get(2)).offset());
            Assertions.assertEquals((long)nonTransactional2.offset(), (long)((ConsumerRecord)records.records(this.tp).get(3)).offset());
            Assertions.assertNotEquals((long)3L, (long)nonTransactional2.offset());
            records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgeSuccess(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            records.forEach(arg_0 -> shareConsumer.acknowledge(arg_0));
            producer.send(record);
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgeCommitSuccess(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            records.forEach(arg_0 -> shareConsumer.acknowledge(arg_0));
            producer.send(record);
            Map result = shareConsumer.commitSync();
            Assertions.assertEquals((int)1, (int)result.size());
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgementCommitAsync(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer1 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaShareConsumer shareConsumer2 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record1 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord record2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord record3 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record1);
            producer.send(record2);
            producer.send(record3);
            producer.flush();
            shareConsumer1.subscribe(Collections.singleton(this.tp.topic()));
            shareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<TopicPartition, Exception>();
            shareConsumer1.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));
            ConsumerRecords records = shareConsumer1.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)3, (int)records.count());
            Iterator iterator = records.iterator();
            ConsumerRecord firstRecord = (ConsumerRecord)iterator.next();
            ConsumerRecord secondRecord = (ConsumerRecord)iterator.next();
            Assertions.assertEquals((long)0L, (long)firstRecord.offset());
            Assertions.assertEquals((long)1L, (long)secondRecord.offset());
            shareConsumer1.acknowledge(firstRecord);
            shareConsumer1.acknowledge(secondRecord);
            shareConsumer1.commitAsync();
            TestUtils.waitForCondition(() -> {
                ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(1000L));
                return records2.count() == 1 && ((ConsumerRecord)records2.iterator().next()).offset() == 2L;
            }, (long)30000L, (long)100L, () -> "Didn't receive timed out record");
            Assertions.assertFalse((boolean)partitionExceptionMap1.containsKey(this.tp));
            TestUtils.waitForCondition(() -> {
                shareConsumer1.poll(Duration.ofMillis(1000L));
                return partitionExceptionMap1.containsKey(this.tp);
            }, (long)30000L, (long)100L, () -> "Didn't receive call to callback");
            Assertions.assertNull(partitionExceptionMap1.get(this.tp));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testImplicitModeNotTriggeredByPollWhenNoAcksToSend(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<TopicPartition, Exception>();
            shareConsumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)0, (int)records.count());
            shareConsumer.commitAsync();
            ProducerRecord record1 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record1);
            producer.flush();
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            shareConsumer.acknowledge((ConsumerRecord)records.iterator().next());
            shareConsumer.commitAsync();
            TestUtils.waitForCondition(() -> {
                shareConsumer.poll(Duration.ofMillis(500L));
                return partitionExceptionMap1.containsKey(this.tp);
            }, (long)30000L, (long)100L, () -> "Didn't receive call to callback");
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgementCommitAsyncPartialBatch(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer1 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record1 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord record2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord record3 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record1);
            producer.send(record2);
            producer.send(record3);
            producer.flush();
            shareConsumer1.subscribe(Collections.singleton(this.tp.topic()));
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap = new HashMap<TopicPartition, Exception>();
            shareConsumer1.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap, partitionExceptionMap));
            ConsumerRecords records = shareConsumer1.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)3, (int)records.count());
            Iterator iterator = records.iterator();
            ConsumerRecord firstRecord = (ConsumerRecord)iterator.next();
            ConsumerRecord secondRecord = (ConsumerRecord)iterator.next();
            Assertions.assertEquals((long)0L, (long)firstRecord.offset());
            Assertions.assertEquals((long)1L, (long)secondRecord.offset());
            shareConsumer1.acknowledge(firstRecord);
            shareConsumer1.acknowledge(secondRecord);
            shareConsumer1.commitAsync();
            records = shareConsumer1.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            iterator = records.iterator();
            firstRecord = (ConsumerRecord)iterator.next();
            Assertions.assertEquals((long)2L, (long)firstRecord.offset());
            records = shareConsumer1.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            iterator = records.iterator();
            firstRecord = (ConsumerRecord)iterator.next();
            Assertions.assertEquals((long)2L, (long)firstRecord.offset());
            shareConsumer1.acknowledge(firstRecord);
            shareConsumer1.poll(Duration.ofMillis(500L));
            shareConsumer1.close();
            Assertions.assertTrue((boolean)partitionExceptionMap.containsKey(this.tp));
            Assertions.assertNull(partitionExceptionMap.get(this.tp));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgeReleasePollAccept(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
            records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgeReleaseAccept(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
            records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.ACCEPT));
            records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgeReleaseClose(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            records.forEach(consumedRecord -> shareConsumer.acknowledge(consumedRecord, AcknowledgeType.RELEASE));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testExplicitAcknowledgeThrowsNotInBatch(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            ConsumerRecord consumedRecord = (ConsumerRecord)records.records(this.tp).get(0);
            shareConsumer.acknowledge(consumedRecord);
            records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
            Assertions.assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testImplicitAcknowledgeFailsExplicit(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            ConsumerRecord consumedRecord = (ConsumerRecord)records.records(this.tp).get(0);
            records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
            Assertions.assertThrows(IllegalStateException.class, () -> shareConsumer.acknowledge(consumedRecord));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testImplicitAcknowledgeCommitSync(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            Map result = shareConsumer.commitSync();
            Assertions.assertEquals((int)1, (int)result.size());
            result = shareConsumer.commitSync();
            Assertions.assertEquals((int)0, (int)result.size());
            records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testImplicitAcknowledgementCommitAsync(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record1 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord record2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord record3 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record1);
            producer.send(record2);
            producer.send(record3);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            HashMap<TopicPartition, Set<Long>> partitionOffsetsMap1 = new HashMap<TopicPartition, Set<Long>>();
            HashMap<TopicPartition, Exception> partitionExceptionMap1 = new HashMap<TopicPartition, Exception>();
            shareConsumer.setAcknowledgementCommitCallback((AcknowledgementCommitCallback)new TestableAcknowledgementCommitCallback(partitionOffsetsMap1, partitionExceptionMap1));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)3, (int)records.count());
            shareConsumer.commitAsync();
            Assertions.assertFalse((boolean)partitionExceptionMap1.containsKey(this.tp));
            TestUtils.waitForCondition(() -> {
                shareConsumer.poll(Duration.ofMillis(1000L));
                return partitionExceptionMap1.containsKey(this.tp);
            }, (long)15000L, (long)100L, () -> "Acknowledgement commit callback did not receive the response yet");
            Assertions.assertNull(partitionExceptionMap1.get(this.tp));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testFetchRecordLargerThanMaxPartitionFetchBytes(String persister) throws Exception {
        int maxPartitionFetchBytes = 10000;
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1", Collections.singletonMap("max.partition.fetch.bytes", String.valueOf(maxPartitionFetchBytes)));){
            ProducerRecord smallRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord bigRecord = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)new byte[maxPartitionFetchBytes]);
            producer.send(smallRecord).get();
            producer.send(bigRecord).get();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testMultipleConsumersWithDifferentGroupIds(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        this.alterShareAutoOffsetReset("group2", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer1 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaShareConsumer shareConsumer2 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group2");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            shareConsumer1.subscribe(Collections.singleton(this.tp.topic()));
            shareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
            producer.send(record);
            producer.send(record);
            producer.send(record);
            producer.flush();
            AtomicInteger shareConsumer1Records = new AtomicInteger();
            AtomicInteger shareConsumer2Records = new AtomicInteger();
            TestUtils.waitForCondition(() -> {
                int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000L)).count());
                int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000L)).count());
                return records1 == 3 && records2 == 3;
            }, (long)15000L, (long)100L, () -> "Failed to consume records for both consumers");
            producer.send(record);
            producer.send(record);
            shareConsumer1Records.set(0);
            TestUtils.waitForCondition(() -> shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000L)).count()) == 2, (long)15000L, (long)100L, () -> "Failed to consume records for share consumer 1");
            producer.send(record);
            producer.send(record);
            producer.send(record);
            shareConsumer1Records.set(0);
            shareConsumer2Records.set(0);
            TestUtils.waitForCondition(() -> {
                int records1 = shareConsumer1Records.addAndGet(shareConsumer1.poll(Duration.ofMillis(2000L)).count());
                int records2 = shareConsumer2Records.addAndGet(shareConsumer2.poll(Duration.ofMillis(2000L)).count());
                return records1 == 3 && records2 == 5;
            }, (long)15000L, (long)100L, () -> "Failed to consume records for both consumers for the last batch");
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testMultipleConsumersInGroupSequentialConsumption(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer1 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaShareConsumer shareConsumer2 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            shareConsumer1.subscribe(Collections.singleton(this.tp.topic()));
            shareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
            int totalMessages = 2000;
            for (int i = 0; i < totalMessages; ++i) {
                producer.send(record);
            }
            producer.flush();
            int consumer1MessageCount = 0;
            int consumer2MessageCount = 0;
            int maxRetries = 10;
            for (int retries = 0; retries < maxRetries; ++retries) {
                ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(2000L));
                consumer1MessageCount += records1.count();
                ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(2000L));
                consumer2MessageCount += records2.count();
                if (records1.count() + records2.count() == 0) break;
            }
            Assertions.assertEquals((int)totalMessages, (int)(consumer1MessageCount + consumer2MessageCount));
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testMultipleConsumersInGroupConcurrentConsumption(String persister) throws InterruptedException, ExecutionException, TimeoutException {
        AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
        int consumerCount = 4;
        int producerCount = 4;
        int messagesPerProducer = 5000;
        String groupId = "group1";
        this.alterShareAutoOffsetReset(groupId, "earliest");
        ArrayList<CompletableFuture<Void>> producerFutures = new ArrayList<CompletableFuture<Void>>();
        for (int i = 0; i < producerCount; ++i) {
            producerFutures.add(CompletableFuture.runAsync(() -> this.produceMessages(messagesPerProducer)));
        }
        int maxBytes = 100000;
        ArrayList<CompletableFuture<Integer>> consumerFutures = new ArrayList<CompletableFuture<Integer>>();
        for (int i = 0; i < consumerCount; ++i) {
            int consumerNumber = i + 1;
            consumerFutures.add(CompletableFuture.supplyAsync(() -> this.consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 30, true, maxBytes)));
        }
        CompletableFuture.allOf((CompletableFuture[])producerFutures.toArray(CompletableFuture[]::new)).get(60L, TimeUnit.SECONDS);
        CompletableFuture.allOf((CompletableFuture[])consumerFutures.toArray(CompletableFuture[]::new)).get(60L, TimeUnit.SECONDS);
        int totalResult = consumerFutures.stream().mapToInt(CompletableFuture::join).sum();
        Assertions.assertEquals((int)(producerCount * messagesPerProducer), (int)totalResult);
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testMultipleConsumersInMultipleGroupsConcurrentConsumption(String persister) throws ExecutionException, InterruptedException, TimeoutException {
        AtomicInteger totalMessagesConsumedGroup1 = new AtomicInteger(0);
        AtomicInteger totalMessagesConsumedGroup2 = new AtomicInteger(0);
        AtomicInteger totalMessagesConsumedGroup3 = new AtomicInteger(0);
        int producerCount = 4;
        int messagesPerProducer = 2000;
        int totalMessagesSent = producerCount * messagesPerProducer;
        String groupId1 = "group1";
        String groupId2 = "group2";
        String groupId3 = "group3";
        this.alterShareAutoOffsetReset(groupId1, "earliest");
        this.alterShareAutoOffsetReset(groupId2, "earliest");
        this.alterShareAutoOffsetReset(groupId3, "earliest");
        ArrayList<CompletableFuture<Integer>> producerFutures = new ArrayList<CompletableFuture<Integer>>();
        for (int i = 0; i < producerCount; ++i) {
            producerFutures.add(CompletableFuture.supplyAsync(() -> this.produceMessages(messagesPerProducer)));
        }
        Assertions.assertDoesNotThrow(() -> CompletableFuture.allOf((CompletableFuture[])producerFutures.toArray(CompletableFuture[]::new)).get(15L, TimeUnit.SECONDS), (String)"Exception awaiting produceMessages");
        int actualMessageSent = producerFutures.stream().mapToInt(CompletableFuture::join).sum();
        ArrayList<CompletableFuture<Integer>> consumeMessagesFutures1 = new ArrayList<CompletableFuture<Integer>>();
        ArrayList<CompletableFuture<Integer>> consumeMessagesFutures2 = new ArrayList<CompletableFuture<Integer>>();
        ArrayList<CompletableFuture<Integer>> consumeMessagesFutures3 = new ArrayList<CompletableFuture<Integer>>();
        int maxBytes = 100000;
        for (int i = 0; i < 2; ++i) {
            int consumerNumber = i + 1;
            consumeMessagesFutures1.add(CompletableFuture.supplyAsync(() -> this.consumeMessages(totalMessagesConsumedGroup1, totalMessagesSent, "group1", consumerNumber, 100, true, maxBytes)));
            consumeMessagesFutures2.add(CompletableFuture.supplyAsync(() -> this.consumeMessages(totalMessagesConsumedGroup2, totalMessagesSent, "group2", consumerNumber, 100, true, maxBytes)));
            consumeMessagesFutures3.add(CompletableFuture.supplyAsync(() -> this.consumeMessages(totalMessagesConsumedGroup3, totalMessagesSent, "group3", consumerNumber, 100, true, maxBytes)));
        }
        CompletableFuture.allOf((CompletableFuture[])Stream.of(consumeMessagesFutures1.stream(), consumeMessagesFutures2.stream(), consumeMessagesFutures3.stream()).flatMap(Function.identity()).toArray(CompletableFuture[]::new)).get(120L, TimeUnit.SECONDS);
        int totalResult1 = consumeMessagesFutures1.stream().mapToInt(CompletableFuture::join).sum();
        int totalResult2 = consumeMessagesFutures2.stream().mapToInt(CompletableFuture::join).sum();
        int totalResult3 = consumeMessagesFutures3.stream().mapToInt(CompletableFuture::join).sum();
        Assertions.assertEquals((int)totalMessagesSent, (int)totalResult1);
        Assertions.assertEquals((int)totalMessagesSent, (int)totalResult2);
        Assertions.assertEquals((int)totalMessagesSent, (int)totalResult3);
        Assertions.assertEquals((int)totalMessagesSent, (int)actualMessageSent);
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testConsumerCloseInGroupSequential(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer1 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaShareConsumer shareConsumer2 = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            shareConsumer1.subscribe(Collections.singleton(this.tp.topic()));
            shareConsumer2.subscribe(Collections.singleton(this.tp.topic()));
            int totalMessages = 1500;
            for (int i = 0; i < totalMessages; ++i) {
                producer.send(record);
            }
            producer.close();
            int consumer1MessageCount = 0;
            int consumer2MessageCount = 0;
            ConsumerRecords records1 = shareConsumer1.poll(Duration.ofMillis(5000L));
            consumer1MessageCount += records1.count();
            int consumer1MessageCountA = records1.count();
            records1 = shareConsumer1.poll(Duration.ofMillis(5000L));
            consumer1MessageCount += records1.count();
            int consumer1MessageCountB = records1.count();
            records1 = shareConsumer1.poll(Duration.ofMillis(5000L));
            int consumer1MessageCountC = records1.count();
            Assertions.assertEquals((int)totalMessages, (int)(consumer1MessageCountA + consumer1MessageCountB + consumer1MessageCountC));
            shareConsumer1.close();
            int maxRetries = 10;
            for (int retries = 0; consumer1MessageCount + consumer2MessageCount < totalMessages && retries < maxRetries; ++retries) {
                ConsumerRecords records2 = shareConsumer2.poll(Duration.ofMillis(5000L));
                consumer2MessageCount += records2.count();
            }
            shareConsumer2.close();
            Assertions.assertEquals((int)totalMessages, (int)(consumer1MessageCount + consumer2MessageCount));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testMultipleConsumersInGroupFailureConcurrentConsumption(String persister) throws InterruptedException, ExecutionException, TimeoutException {
        AtomicInteger totalMessagesConsumed = new AtomicInteger(0);
        int consumerCount = 4;
        int producerCount = 4;
        int messagesPerProducer = 5000;
        String groupId = "group1";
        this.alterShareAutoOffsetReset(groupId, "earliest");
        ArrayList<CompletableFuture<Void>> produceMessageFutures = new ArrayList<CompletableFuture<Void>>();
        for (int i = 0; i < producerCount; ++i) {
            produceMessageFutures.add(CompletableFuture.runAsync(() -> this.produceMessages(messagesPerProducer)));
        }
        int maxBytes = 1000000;
        CompletableFuture<Integer> failedMessagesConsumedFuture = CompletableFuture.supplyAsync(() -> this.consumeMessages(new AtomicInteger(0), producerCount * messagesPerProducer, groupId, 0, 1, false));
        Assertions.assertDoesNotThrow(() -> (Integer)failedMessagesConsumedFuture.get(15L, TimeUnit.SECONDS), (String)"Exception awaiting consumeMessages");
        ArrayList<CompletableFuture<Integer>> consumeMessagesFutures = new ArrayList<CompletableFuture<Integer>>();
        for (int i = 0; i < consumerCount; ++i) {
            int consumerNumber = i + 1;
            consumeMessagesFutures.add(CompletableFuture.supplyAsync(() -> this.consumeMessages(totalMessagesConsumed, producerCount * messagesPerProducer, groupId, consumerNumber, 40, true, maxBytes)));
        }
        CompletableFuture.allOf((CompletableFuture[])produceMessageFutures.toArray(CompletableFuture[]::new)).get(60L, TimeUnit.SECONDS);
        CompletableFuture.allOf((CompletableFuture[])consumeMessagesFutures.toArray(CompletableFuture[]::new)).get(60L, TimeUnit.SECONDS);
        int totalSuccessResult = consumeMessagesFutures.stream().mapToInt(CompletableFuture::join).sum();
        Assertions.assertEquals((int)(producerCount * messagesPerProducer), (int)totalSuccessResult);
    }

    @Flaky(value="KAFKA-18025")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcquisitionLockTimeoutOnConsumer(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord producerRecord1 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key_1".getBytes(), (Object)"value_1".getBytes());
            ProducerRecord producerRecord2 = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key_2".getBytes(), (Object)"value_2".getBytes());
            shareConsumer.subscribe(Set.of(this.tp.topic()));
            producer.send(producerRecord1);
            producer.flush();
            ConsumerRecords consumerRecords = shareConsumer.poll(Duration.ofMillis(5000L));
            ConsumerRecord consumerRecord = (ConsumerRecord)consumerRecords.records(this.tp).get(0);
            Assertions.assertEquals((Object)"key_1", (Object)new String((byte[])consumerRecord.key()));
            Assertions.assertEquals((Object)"value_1", (Object)new String((byte[])consumerRecord.value()));
            Assertions.assertEquals((int)1, (int)consumerRecords.count());
            consumerRecords = shareConsumer.poll(Duration.ofMillis(1000L));
            Assertions.assertEquals((int)0, (int)consumerRecords.count());
            producer.send(producerRecord2);
            producer.flush();
            consumerRecords = shareConsumer.poll(Duration.ofMillis(5000L));
            consumerRecord = (ConsumerRecord)consumerRecords.records(this.tp).get(0);
            Assertions.assertEquals((Object)"key_2", (Object)new String((byte[])consumerRecord.key()));
            Assertions.assertEquals((Object)"value_2", (Object)new String((byte[])consumerRecord.value()));
            Assertions.assertEquals((int)1, (int)consumerRecords.count());
            Thread.sleep(20000L);
            consumerRecords = shareConsumer.poll(Duration.ofMillis(5000L));
            consumerRecord = (ConsumerRecord)consumerRecords.records(this.tp).get(0);
            Assertions.assertEquals((Object)"key_2", (Object)new String((byte[])consumerRecord.key()));
            Assertions.assertEquals((Object)"value_2", (Object)new String((byte[])consumerRecord.value()));
            Assertions.assertEquals((int)1, (int)consumerRecords.count());
            consumerRecords = shareConsumer.poll(Duration.ofMillis(1000L));
            Assertions.assertEquals((int)0, (int)consumerRecords.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcknowledgementCommitCallbackCallsShareConsumerDisallowed(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWithShareConsumer(shareConsumer));
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            shareConsumer.poll(Duration.ofMillis(5000L));
            shareConsumer.poll(Duration.ofMillis(500L));
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcknowledgementCommitCallbackCallsShareConsumerWakeup(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackWakeup(shareConsumer));
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "Failed to consume records for share consumer");
            shareConsumer.poll(Duration.ofMillis(2000L));
            AtomicBoolean exceptionThrown = new AtomicBoolean(false);
            TestUtils.waitForCondition(() -> {
                try {
                    shareConsumer.poll(Duration.ofMillis(500L));
                }
                catch (WakeupException e) {
                    exceptionThrown.set(true);
                }
                return exceptionThrown.get();
            }, (long)15000L, (long)100L, () -> "Failed to receive expected exception");
        }
    }

    @Flaky(value="KAFKA-18033")
    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testAcknowledgementCommitCallbackThrowsException(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.setAcknowledgementCommitCallback(new TestableAcknowledgementCommitCallbackThrows());
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "Failed to consume records for share consumer");
            AtomicBoolean exceptionThrown = new AtomicBoolean(false);
            TestUtils.waitForCondition(() -> {
                try {
                    shareConsumer.poll(Duration.ofMillis(500L));
                }
                catch (OutOfOrderSequenceException e) {
                    exceptionThrown.set(true);
                }
                return exceptionThrown.get();
            }, (long)15000L, (long)100L, () -> "Failed to receive expected exception");
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testPollThrowsInterruptExceptionIfInterrupted(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            try {
                Thread.currentThread().interrupt();
                Assertions.assertThrows(InterruptException.class, () -> shareConsumer.poll(Duration.ZERO));
            }
            finally {
                Thread.interrupted();
            }
            Assertions.assertDoesNotThrow(() -> shareConsumer.poll(Duration.ZERO), (String)"Failed to consume records");
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscribeOnInvalidTopicThrowsInvalidTopicException(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            shareConsumer.subscribe(Collections.singleton("topic abc"));
            Assertions.assertThrows(InvalidTopicException.class, () -> shareConsumer.poll(Duration.ofMillis(10000L)));
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testWakeupWithFetchedRecordsAvailable(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            shareConsumer.wakeup();
            Assertions.assertThrows(WakeupException.class, () -> shareConsumer.poll(Duration.ZERO));
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscriptionFollowedByTopicCreation(String persister) throws InterruptedException {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            String topic = "foo";
            shareConsumer.subscribe(Collections.singleton(topic));
            this.createTopic(topic);
            ProducerRecord record = new ProducerRecord(topic, Integer.valueOf(0), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "Failed to consume records for share consumer, metadata sync failed");
            producer.send(record);
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            producer.send(record);
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testSubscriptionAndPollFollowedByTopicDeletion(String persister) throws InterruptedException, ExecutionException {
        String topic1 = "bar";
        String topic2 = "baz";
        this.createTopic(topic1);
        this.createTopic(topic2);
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");){
            ProducerRecord recordTopic1 = new ProducerRecord(topic1, Integer.valueOf(0), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            ProducerRecord recordTopic2 = new ProducerRecord(topic2, Integer.valueOf(0), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            shareConsumer.subscribe(Arrays.asList(topic1, topic2));
            producer.send(recordTopic1).get();
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "incorrect number of records");
            producer.send(recordTopic2).get();
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "incorrect number of records");
            this.deleteTopic(topic1);
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(500L));
            Assertions.assertEquals((int)0, (int)records.count());
            producer.send(recordTopic2).get();
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "incorrect number of records");
            producer.send(recordTopic2).get();
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(2000L)).count() == 1, (long)15000L, (long)100L, () -> "incorrect number of records");
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testLsoMovementByRecordsDeletion(String persister) {
        String groupId = "group1";
        this.alterShareAutoOffsetReset(groupId, "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(0), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            for (int i = 0; i < 10; ++i) {
                Assertions.assertDoesNotThrow(() -> (RecordMetadata)producer.send(record).get(), (String)"Failed to send records");
            }
            this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset((long)5L)));
            int messageCount = this.consumeMessages(new AtomicInteger(0), 5, groupId, 1, 10, true);
            Assertions.assertEquals((int)5, (int)messageCount);
            for (int i = 0; i < 5; ++i) {
                Assertions.assertDoesNotThrow(() -> (RecordMetadata)producer.send(record).get(), (String)"Failed to send records");
            }
            this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset((long)14L)));
            int consumeMessagesCount = this.consumeMessages(new AtomicInteger(0), 1, groupId, 1, 10, true);
            Assertions.assertEquals((int)1, (int)consumeMessagesCount);
            this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset((long)15L)));
            messageCount = this.consumeMessages(new AtomicInteger(0), 0, groupId, 1, 5, true);
            Assertions.assertEquals((int)0, (int)messageCount);
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testShareAutoOffsetResetDefaultValue(String persister) {
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)0, (int)records.count());
            producer.send(record);
            producer.flush();
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testShareAutoOffsetResetEarliest(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            ConsumerRecords records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
            producer.send(record);
            producer.flush();
            records = shareConsumer.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records.count());
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testShareAutoOffsetResetEarliestAfterLsoMovement(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            for (int i = 0; i < 10; ++i) {
                Assertions.assertDoesNotThrow(() -> (RecordMetadata)producer.send(record).get(), (String)"Failed to send records");
            }
            this.adminClient.deleteRecords(Collections.singletonMap(this.tp, RecordsToDelete.beforeOffset((long)5L)));
            int consumedMessageCount = this.consumeMessages(new AtomicInteger(0), 5, "group1", 1, 10, true);
            Assertions.assertEquals((int)5, (int)consumedMessageCount);
        }
    }

    @ParameterizedTest(name="{displayName}.persister={0}")
    @ValueSource(strings={"org.apache.kafka.server.share.persister.NoOpShareStatePersister", "org.apache.kafka.server.share.persister.DefaultStatePersister"})
    public void testShareAutoOffsetResetMultipleGroupsWithDifferentValue(String persister) {
        this.alterShareAutoOffsetReset("group1", "earliest");
        this.alterShareAutoOffsetReset("group2", "latest");
        try (KafkaShareConsumer shareConsumerEarliest = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group1");
             KafkaShareConsumer shareConsumerLatest = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "group2");
             KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            shareConsumerEarliest.subscribe(Collections.singleton(this.tp.topic()));
            shareConsumerLatest.subscribe(Collections.singleton(this.tp.topic()));
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            producer.send(record);
            producer.flush();
            ConsumerRecords records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records1.count());
            ConsumerRecords records2 = shareConsumerLatest.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)0, (int)records2.count());
            producer.send(record);
            records1 = shareConsumerEarliest.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records1.count());
            records2 = shareConsumerLatest.poll(Duration.ofMillis(5000L));
            Assertions.assertEquals((int)1, (int)records2.count());
        }
    }

    private int produceMessages(int messageCount) {
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
            IntStream.range(0, messageCount).forEach(__ -> producer.send(record));
            producer.flush();
        }
        return messageCount;
    }

    private void produceMessagesWithTimestamp(int messageCount, long startingTimestamp) {
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());){
            for (int i = 0; i < messageCount; ++i) {
                ProducerRecord record = new ProducerRecord(this.tp.topic(), Integer.valueOf(this.tp.partition()), Long.valueOf(startingTimestamp + (long)i), (Object)("key " + i).getBytes(), (Object)("value " + i).getBytes());
                producer.send(record);
            }
            producer.flush();
        }
    }

    private int consumeMessages(AtomicInteger totalMessagesConsumed, int totalMessages, String groupId, int consumerNumber, int maxPolls, boolean commit) {
        return (Integer)Assertions.assertDoesNotThrow(() -> {
            try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), groupId);){
                shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Integer n = this.consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit);
                return n;
            }
        }, (String)("Consumer " + consumerNumber + " failed with exception"));
    }

    private int consumeMessages(AtomicInteger totalMessagesConsumed, int totalMessages, String groupId, int consumerNumber, int maxPolls, boolean commit, int maxFetchBytes) {
        return (Integer)Assertions.assertDoesNotThrow(() -> {
            try (KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), groupId, Map.of("max.partition.fetch.bytes", maxFetchBytes));){
                shareConsumer.subscribe(Collections.singleton(this.tp.topic()));
                Integer n = this.consumeMessages(shareConsumer, totalMessagesConsumed, totalMessages, consumerNumber, maxPolls, commit);
                return n;
            }
        }, (String)("Consumer " + consumerNumber + " failed with exception"));
    }

    private int consumeMessages(KafkaShareConsumer<byte[], byte[]> consumer, AtomicInteger totalMessagesConsumed, int totalMessages, int consumerNumber, int maxPolls, boolean commit) {
        return (Integer)Assertions.assertDoesNotThrow(() -> {
            int messagesConsumed = 0;
            if (totalMessages > 0) {
                for (retries = 0; totalMessagesConsumed.get() < totalMessages && retries < maxPolls; ++retries) {
                    ConsumerRecords records = consumer.poll(Duration.ofMillis(2000L));
                    messagesConsumed += records.count();
                    totalMessagesConsumed.addAndGet(records.count());
                }
            } else {
                while (retries < maxPolls) {
                    ConsumerRecords records = consumer.poll(Duration.ofMillis(2000L));
                    messagesConsumed += records.count();
                    totalMessagesConsumed.addAndGet(records.count());
                    ++retries;
                }
            }
            if (commit) {
                consumer.commitSync(Duration.ofMillis(10000L));
            }
            return messagesConsumed;
        }, (String)("Consumer " + consumerNumber + " failed with exception"));
    }

    private <K, V> List<ConsumerRecord<K, V>> consumeRecords(KafkaShareConsumer<K, V> consumer, int numRecords) {
        ArrayList<ConsumerRecord<K, V>> accumulatedRecords = new ArrayList<ConsumerRecord<K, V>>();
        long startTimeMs = System.currentTimeMillis();
        while (accumulatedRecords.size() < numRecords) {
            ConsumerRecords records = consumer.poll(Duration.ofMillis(100L));
            records.forEach(accumulatedRecords::add);
            long currentTimeMs = System.currentTimeMillis();
            Assertions.assertFalse((currentTimeMs - startTimeMs > 60000L ? 1 : 0) != 0, (String)"Timed out before consuming expected records.");
        }
        return accumulatedRecords;
    }

    private void createTopic(String topicName) {
        Properties props = this.cluster.clientProperties();
        Assertions.assertDoesNotThrow(() -> {
            try (Admin admin = Admin.create((Properties)props);){
                admin.createTopics(Collections.singleton(new NewTopic(topicName, 1, 1))).all().get();
            }
        }, (String)"Failed to create topic");
    }

    private void deleteTopic(String topicName) {
        Properties props = this.cluster.clientProperties();
        Assertions.assertDoesNotThrow(() -> {
            try (Admin admin = Admin.create((Properties)props);){
                admin.deleteTopics(Collections.singleton(topicName)).all().get();
            }
        }, (String)"Failed to delete topic");
    }

    private Admin createAdminClient() {
        Properties props = this.cluster.clientProperties();
        return Admin.create((Properties)props);
    }

    private <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer) {
        Properties props = this.cluster.clientProperties();
        return new KafkaProducer(props, keySerializer, valueSerializer);
    }

    private <K, V> KafkaProducer<K, V> createProducer(Serializer<K> keySerializer, Serializer<V> valueSerializer, String transactionalId) {
        Properties props = this.cluster.clientProperties();
        props.put("transactional.id", transactionalId);
        return new KafkaProducer(props, keySerializer, valueSerializer);
    }

    private <K, V> KafkaShareConsumer<K, V> createShareConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String groupId) {
        Properties props = this.cluster.clientProperties();
        props.put("group.id", groupId);
        return new KafkaShareConsumer(props, keyDeserializer, valueDeserializer);
    }

    private <K, V> KafkaShareConsumer<K, V> createShareConsumer(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, String groupId, Map<?, ?> additionalProperties) {
        Properties props = this.cluster.clientProperties();
        props.put("group.id", groupId);
        props.putAll(additionalProperties);
        return new KafkaShareConsumer(props, keyDeserializer, valueDeserializer);
    }

    private void warmup() throws InterruptedException {
        this.createTopic(this.warmupTp.topic());
        TestUtils.waitForCondition(() -> !((BrokerServer)this.cluster.brokers().get(0)).metadataCache().getAliveBrokerNodes(new ListenerName("EXTERNAL")).isEmpty(), (long)15000L, (long)100L, () -> "cache not up yet");
        ProducerRecord record = new ProducerRecord(this.warmupTp.topic(), Integer.valueOf(this.warmupTp.partition()), null, (Object)"key".getBytes(), (Object)"value".getBytes());
        Set<String> subscription = Collections.singleton(this.warmupTp.topic());
        this.alterShareAutoOffsetReset("warmupgroup1", "earliest");
        try (KafkaProducer producer = this.createProducer((Serializer)new ByteArraySerializer(), (Serializer)new ByteArraySerializer());
             KafkaShareConsumer shareConsumer = this.createShareConsumer((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), "warmupgroup1");){
            producer.send(record);
            producer.flush();
            shareConsumer.subscribe(subscription);
            TestUtils.waitForCondition(() -> shareConsumer.poll(Duration.ofMillis(5000L)).count() == 1, (long)30000L, (long)200L, () -> "warmup record not received");
        }
    }

    private void alterShareAutoOffsetReset(String groupId, String newValue) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
        HashMap<ConfigResource, List<AlterConfigOp>> alterEntries = new HashMap<ConfigResource, List<AlterConfigOp>>();
        alterEntries.put(configResource, List.of(new AlterConfigOp(new ConfigEntry("share.auto.offset.reset", newValue), AlterConfigOp.OpType.SET)));
        AlterConfigsOptions alterOptions = new AlterConfigsOptions();
        Assertions.assertDoesNotThrow(() -> (Void)this.adminClient.incrementalAlterConfigs(alterEntries, alterOptions).all().get(60L, TimeUnit.SECONDS), (String)"Failed to alter configs");
    }

    private static class TestableAcknowledgementCommitCallback
    implements AcknowledgementCommitCallback {
        private final Map<TopicPartition, Set<Long>> partitionOffsetsMap;
        private final Map<TopicPartition, Exception> partitionExceptionMap;

        public TestableAcknowledgementCommitCallback(Map<TopicPartition, Set<Long>> partitionOffsetsMap, Map<TopicPartition, Exception> partitionExceptionMap) {
            this.partitionOffsetsMap = partitionOffsetsMap;
            this.partitionExceptionMap = partitionExceptionMap;
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) {
            offsetsMap.forEach((partition, offsets) -> {
                this.partitionOffsetsMap.merge(partition.topicPartition(), (Set<Long>)offsets, (oldOffsets, newOffsets) -> {
                    HashSet mergedOffsets = new HashSet();
                    mergedOffsets.addAll(oldOffsets);
                    mergedOffsets.addAll(newOffsets);
                    return mergedOffsets;
                });
                if (!this.partitionExceptionMap.containsKey(partition.topicPartition())) {
                    this.partitionExceptionMap.put(partition.topicPartition(), exception);
                }
            });
        }
    }

    private class TestableAcknowledgementCommitCallbackWithShareConsumer<K, V>
    implements AcknowledgementCommitCallback {
        private final KafkaShareConsumer<K, V> shareConsumer;

        TestableAcknowledgementCommitCallbackWithShareConsumer(KafkaShareConsumer<K, V> shareConsumer) {
            this.shareConsumer = shareConsumer;
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) {
            Assertions.assertThrows(IllegalStateException.class, () -> this.shareConsumer.close());
            Assertions.assertThrows(IllegalStateException.class, () -> this.shareConsumer.subscribe(Collections.singleton(ShareConsumerTest.this.tp.topic())));
            Assertions.assertThrows(IllegalStateException.class, () -> this.shareConsumer.poll(Duration.ofMillis(5000L)));
        }
    }

    private static class TestableAcknowledgementCommitCallbackWakeup<K, V>
    implements AcknowledgementCommitCallback {
        private final KafkaShareConsumer<K, V> shareConsumer;

        TestableAcknowledgementCommitCallbackWakeup(KafkaShareConsumer<K, V> shareConsumer) {
            this.shareConsumer = shareConsumer;
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) {
            this.shareConsumer.wakeup();
        }
    }

    private static class TestableAcknowledgementCommitCallbackThrows<K, V>
    implements AcknowledgementCommitCallback {
        private TestableAcknowledgementCommitCallbackThrows() {
        }

        public void onComplete(Map<TopicIdPartition, Set<Long>> offsetsMap, Exception exception) {
            throw new OutOfOrderSequenceException("Exception thrown in TestableAcknowledgementCommitCallbackThrows.onComplete");
        }
    }
}

