/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.ShareCompletedFetch;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsAggregator;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.ShareInFlightBatch;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.UUIDDeserializer;
import org.apache.kafka.common.serialization.UUIDSerializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ShareCompletedFetchTest {
    private static final String TOPIC_NAME = "test";
    private static final TopicIdPartition TIP = new TopicIdPartition(Uuid.randomUuid(), 0, "test");
    private static final long PRODUCER_ID = 1000L;
    private static final short PRODUCER_EPOCH = 0;

    @Test
    public void testSimple() {
        long startingOffset = 10L;
        int numRecordsPerBatch = 10;
        int numRecords = 20;
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecordsPerBatch, 2)).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(startingOffset, numRecords));
        Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();
        ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
        ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 10, true);
        List records = batch.getInFlightRecords();
        Assertions.assertEquals((int)10, (int)records.size());
        ConsumerRecord record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)10L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        Acknowledgements acknowledgements = batch.getAcknowledgements();
        Assertions.assertEquals((int)0, (int)acknowledgements.size());
        batch = completedFetch.fetchRecords(deserializers, 10, true);
        records = batch.getInFlightRecords();
        Assertions.assertEquals((int)10, (int)records.size());
        record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)20L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        acknowledgements = batch.getAcknowledgements();
        Assertions.assertEquals((int)0, (int)acknowledgements.size());
        batch = completedFetch.fetchRecords(deserializers, 10, true);
        records = batch.getInFlightRecords();
        Assertions.assertEquals((int)0, (int)records.size());
        acknowledgements = batch.getAcknowledgements();
        Assertions.assertEquals((int)0, (int)acknowledgements.size());
    }

    @Test
    public void testSoftMaxPollRecordLimit() {
        long startingOffset = 10L;
        int numRecords = 11;
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecords)).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(startingOffset, numRecords));
        Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();
        ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
        ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 10, true);
        List records = batch.getInFlightRecords();
        Assertions.assertEquals((int)11, (int)records.size());
        ConsumerRecord record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)10L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        Acknowledgements acknowledgements = batch.getAcknowledgements();
        Assertions.assertEquals((int)0, (int)acknowledgements.size());
        batch = completedFetch.fetchRecords(deserializers, 10, true);
        records = batch.getInFlightRecords();
        Assertions.assertEquals((int)0, (int)records.size());
        acknowledgements = batch.getAcknowledgements();
        Assertions.assertEquals((int)0, (int)acknowledgements.size());
    }

    @Test
    public void testUnaligned() {
        long startingOffset = 10L;
        int numRecords = 10;
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecords + 500)).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(startingOffset + 500L, numRecords));
        Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();
        ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
        ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 10, true);
        List records = batch.getInFlightRecords();
        Assertions.assertEquals((int)10, (int)records.size());
        ConsumerRecord record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)510L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        Acknowledgements acknowledgements = batch.getAcknowledgements();
        Assertions.assertEquals((int)0, (int)acknowledgements.size());
        batch = completedFetch.fetchRecords(deserializers, 10, true);
        records = batch.getInFlightRecords();
        Assertions.assertEquals((int)0, (int)records.size());
        acknowledgements = batch.getAcknowledgements();
        Assertions.assertEquals((int)0, (int)acknowledgements.size());
    }

    @Test
    public void testCommittedTransactionRecordsIncluded() {
        int numRecords = 10;
        Records rawRecords = this.newTransactionalRecords(numRecords);
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setRecords((BaseRecords)rawRecords).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(0L, numRecords));
        ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
        try (Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();){
            ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 10, true);
            List records = batch.getInFlightRecords();
            Assertions.assertEquals((int)10, (int)records.size());
            Acknowledgements acknowledgements = batch.getAcknowledgements();
            Assertions.assertEquals((int)0, (int)acknowledgements.size());
        }
    }

    @Test
    public void testNegativeFetchCount() {
        int startingOffset = 0;
        int numRecords = 10;
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecords)).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(0L, 10));
        try (Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();){
            ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
            ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, -10, true);
            List records = batch.getInFlightRecords();
            Assertions.assertEquals((int)0, (int)records.size());
            Acknowledgements acknowledgements = batch.getAcknowledgements();
            Assertions.assertEquals((int)0, (int)acknowledgements.size());
        }
    }

    @Test
    public void testNoRecordsInFetch() {
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setPartitionIndex(0);
        ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
        try (Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();){
            ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 10, true);
            List records = batch.getInFlightRecords();
            Assertions.assertEquals((int)0, (int)records.size());
            Acknowledgements acknowledgements = batch.getAcknowledgements();
            Assertions.assertEquals((int)0, (int)acknowledgements.size());
        }
    }

    @Test
    public void testCorruptedMessage() {
        try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
             UUIDSerializer serializer = new UUIDSerializer();){
            builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
            builder.append(0L, "key".getBytes(), "value".getBytes());
            RecordHeaders headers = new RecordHeaders();
            headers.add("hkey", "hvalue".getBytes());
            builder.append(10L, serializer.serialize("key", UUID.randomUUID()), "otherValue".getBytes(), headers.toArray());
            builder.append(new SimpleRecord(serializer.serialize(TOPIC_NAME, UUID.randomUUID())));
            MemoryRecords records = builder.build();
            ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setPartitionIndex(0).setRecords((BaseRecords)records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(0L, 4));
            try (Deserializers<UUID, UUID> deserializers = ShareCompletedFetchTest.newUuidDeserializers();){
                ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
                ShareInFlightBatch batch = completedFetch.fetchRecords(deserializers, 10, false);
                Assertions.assertNull((Object)((Object)batch.getException()));
                List fetchedRecords = batch.getInFlightRecords();
                Assertions.assertEquals((int)1, (int)fetchedRecords.size());
                Assertions.assertEquals((long)0L, (long)((ConsumerRecord)fetchedRecords.get(0)).offset());
                Acknowledgements acknowledgements = batch.getAcknowledgements();
                Assertions.assertEquals((int)0, (int)acknowledgements.size());
                batch = completedFetch.fetchRecords(deserializers, 10, false);
                Assertions.assertEquals(RecordDeserializationException.class, ((Object)((Object)batch.getException())).getClass());
                RecordDeserializationException thrown = (RecordDeserializationException)batch.getException();
                Assertions.assertEquals((Object)RecordDeserializationException.DeserializationExceptionOrigin.KEY, (Object)thrown.origin());
                Assertions.assertEquals((long)1L, (long)thrown.offset());
                Assertions.assertEquals((Object)TOPIC_NAME, (Object)thrown.topicPartition().topic());
                Assertions.assertEquals((int)0, (int)thrown.topicPartition().partition());
                Assertions.assertEquals((long)0L, (long)thrown.timestamp());
                Assertions.assertArrayEquals((byte[])"key".getBytes(), (byte[])Utils.toNullableArray((ByteBuffer)thrown.keyBuffer()));
                Assertions.assertArrayEquals((byte[])"value".getBytes(), (byte[])Utils.toNullableArray((ByteBuffer)thrown.valueBuffer()));
                Assertions.assertEquals((int)0, (int)thrown.headers().toArray().length);
                fetchedRecords = batch.getInFlightRecords();
                Assertions.assertEquals((int)0, (int)fetchedRecords.size());
                acknowledgements = batch.getAcknowledgements();
                Assertions.assertEquals((int)1, (int)acknowledgements.size());
                Assertions.assertEquals((Object)AcknowledgeType.RELEASE, (Object)acknowledgements.get(1L));
                batch = completedFetch.fetchRecords(deserializers, 10, false);
                Assertions.assertEquals(RecordDeserializationException.class, ((Object)((Object)batch.getException())).getClass());
                thrown = (RecordDeserializationException)batch.getException();
                Assertions.assertEquals((Object)RecordDeserializationException.DeserializationExceptionOrigin.VALUE, (Object)thrown.origin());
                Assertions.assertEquals((long)2L, (long)thrown.offset());
                Assertions.assertEquals((Object)TOPIC_NAME, (Object)thrown.topicPartition().topic());
                Assertions.assertEquals((int)0, (int)thrown.topicPartition().partition());
                Assertions.assertEquals((long)10L, (long)thrown.timestamp());
                Assertions.assertNotNull((Object)thrown.keyBuffer());
                Assertions.assertArrayEquals((byte[])"otherValue".getBytes(), (byte[])Utils.toNullableArray((ByteBuffer)thrown.valueBuffer()));
                fetchedRecords = batch.getInFlightRecords();
                Assertions.assertEquals((int)0, (int)fetchedRecords.size());
                acknowledgements = batch.getAcknowledgements();
                Assertions.assertEquals((int)1, (int)acknowledgements.size());
                Assertions.assertEquals((Object)AcknowledgeType.RELEASE, (Object)acknowledgements.get(2L));
                batch = completedFetch.fetchRecords(deserializers, 10, false);
                Assertions.assertNull((Object)((Object)batch.getException()));
                fetchedRecords = batch.getInFlightRecords();
                Assertions.assertEquals((int)1, (int)fetchedRecords.size());
                Assertions.assertEquals((long)3L, (long)((ConsumerRecord)fetchedRecords.get(0)).offset());
                acknowledgements = batch.getAcknowledgements();
                Assertions.assertEquals((int)0, (int)acknowledgements.size());
            }
        }
    }

    @Test
    public void testAcquiredRecords() {
        int startingOffset = 0;
        int numRecords = 10;
        ArrayList<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new ArrayList<ShareFetchResponseData.AcquiredRecords>(ShareCompletedFetchTest.acquiredRecords(0L, 3));
        acquiredRecords.addAll(ShareCompletedFetchTest.acquiredRecords(6L, 3));
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecords)).setAcquiredRecords(acquiredRecords);
        Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();
        ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
        List records = completedFetch.fetchRecords(deserializers, 10, true).getInFlightRecords();
        Assertions.assertEquals((int)6, (int)records.size());
        ConsumerRecord record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)0L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        record = (ConsumerRecord)records.get(3);
        Assertions.assertEquals((long)6L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        records = completedFetch.fetchRecords(deserializers, 10, true).getInFlightRecords();
        Assertions.assertEquals((int)0, (int)records.size());
    }

    @Test
    public void testAcquireOddRecords() {
        int startingOffset = 0;
        int numRecords = 10;
        ArrayList<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new ArrayList<ShareFetchResponseData.AcquiredRecords>();
        for (long i = 1L; i <= 9L; i += 2L) {
            acquiredRecords.add(ShareCompletedFetchTest.acquiredRecords(i, 1).get(0));
        }
        ShareFetchResponseData.PartitionData partitionData = new ShareFetchResponseData.PartitionData().setRecords((BaseRecords)this.newRecords(startingOffset, numRecords)).setAcquiredRecords(acquiredRecords);
        Deserializers<String, String> deserializers = ShareCompletedFetchTest.newStringDeserializers();
        ShareCompletedFetch completedFetch = this.newShareCompletedFetch(partitionData);
        List records = completedFetch.fetchRecords(deserializers, 10, true).getInFlightRecords();
        Assertions.assertEquals((int)5, (int)records.size());
        ConsumerRecord record = (ConsumerRecord)records.get(0);
        Assertions.assertEquals((long)1L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        record = (ConsumerRecord)records.get(1);
        Assertions.assertEquals((long)3L, (long)record.offset());
        Assertions.assertEquals(Optional.of((short)1), (Object)record.deliveryCount());
        records = completedFetch.fetchRecords(deserializers, 10, true).getInFlightRecords();
        Assertions.assertEquals((int)0, (int)records.size());
    }

    private ShareCompletedFetch newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) {
        LogContext logContext = new LogContext();
        ShareFetchMetricsRegistry shareFetchMetricsRegistry = new ShareFetchMetricsRegistry();
        ShareFetchMetricsManager shareFetchMetricsManager = new ShareFetchMetricsManager(new Metrics(), shareFetchMetricsRegistry);
        HashSet<TopicPartition> partitionSet = new HashSet<TopicPartition>();
        partitionSet.add(TIP.topicPartition());
        ShareFetchMetricsAggregator shareFetchMetricsAggregator = new ShareFetchMetricsAggregator(shareFetchMetricsManager, partitionSet);
        return new ShareCompletedFetch(logContext, BufferSupplier.create(), TIP, partitionData, shareFetchMetricsAggregator, ApiKeys.SHARE_FETCH.latestVersion());
    }

    private static Deserializers<UUID, UUID> newUuidDeserializers() {
        return new Deserializers((Deserializer)new UUIDDeserializer(), (Deserializer)new UUIDDeserializer());
    }

    private static Deserializers<String, String> newStringDeserializers() {
        return new Deserializers((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }

    private Records newRecords(long baseOffset, int count) {
        try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);){
            for (int i = 0; i < count; ++i) {
                builder.append(0L, "key".getBytes(), "value-".getBytes());
            }
            MemoryRecords memoryRecords = builder.build();
            return memoryRecords;
        }
    }

    private Records newRecords(long baseOffset, int numRecordsPerBatch, int batchCount) {
        MockTime time = new MockTime();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        for (long b = 0L; b < (long)batchCount; ++b) {
            try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)(baseOffset + b * (long)numRecordsPerBatch), (long)time.milliseconds(), (long)1000L, (short)0, (int)0, (boolean)true, (int)-1);){
                for (int i = 0; i < numRecordsPerBatch; ++i) {
                    builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
                }
                builder.build();
                continue;
            }
        }
        buffer.flip();
        return MemoryRecords.readableRecords((ByteBuffer)buffer);
    }

    public static List<ShareFetchResponseData.AcquiredRecords> acquiredRecords(long firstOffset, int count) {
        ShareFetchResponseData.AcquiredRecords acquiredRecords = new ShareFetchResponseData.AcquiredRecords().setFirstOffset(firstOffset).setLastOffset(firstOffset + (long)count - 1L).setDeliveryCount((short)1);
        return Collections.singletonList(acquiredRecords);
    }

    private Records newTransactionalRecords(int numRecords) {
        MockTime time = new MockTime();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        try (MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L, (long)time.milliseconds(), (long)1000L, (short)0, (int)0, (boolean)true, (int)-1);){
            for (int i = 0; i < numRecords; ++i) {
                builder.append(new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
            }
            builder.build();
        }
        this.writeTransactionMarker(buffer, numRecords, time);
        buffer.flip();
        return MemoryRecords.readableRecords((ByteBuffer)buffer);
    }

    private void writeTransactionMarker(ByteBuffer buffer, int offset, Time time) {
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)offset, (long)time.milliseconds(), (int)0, (long)1000L, (short)0, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.COMMIT, 0));
    }
}

