/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Stream;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FetchMetadata;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;

@Timeout(value=120L)
public class FetchSessionHandlerTest {
    private static final LogContext LOG_CONTEXT = new LogContext("[FetchSessionHandler]=");

    private static Set<TopicPartition> toSet(TopicPartition ... arr) {
        TreeSet<TopicPartition> set = new TreeSet<TopicPartition>(Comparator.comparing(TopicPartition::toString));
        set.addAll(Arrays.asList(arr));
        return set;
    }

    @Test
    public void testFindMissing() {
        TopicPartition foo0 = new TopicPartition("foo", 0);
        TopicPartition foo1 = new TopicPartition("foo", 1);
        TopicPartition bar0 = new TopicPartition("bar", 0);
        TopicPartition bar1 = new TopicPartition("bar", 1);
        TopicPartition baz0 = new TopicPartition("baz", 0);
        TopicPartition baz1 = new TopicPartition("baz", 1);
        Assertions.assertEquals(FetchSessionHandlerTest.toSet(new TopicPartition[0]), (Object)FetchSessionHandler.findMissing(FetchSessionHandlerTest.toSet(foo0), FetchSessionHandlerTest.toSet(foo0)));
        Assertions.assertEquals(FetchSessionHandlerTest.toSet(foo0), (Object)FetchSessionHandler.findMissing(FetchSessionHandlerTest.toSet(foo0), FetchSessionHandlerTest.toSet(foo1)));
        Assertions.assertEquals(FetchSessionHandlerTest.toSet(foo0, foo1), (Object)FetchSessionHandler.findMissing(FetchSessionHandlerTest.toSet(foo0, foo1), FetchSessionHandlerTest.toSet(baz0)));
        Assertions.assertEquals(FetchSessionHandlerTest.toSet(bar1, foo0, foo1), (Object)FetchSessionHandler.findMissing(FetchSessionHandlerTest.toSet(foo0, foo1, bar0, bar1), FetchSessionHandlerTest.toSet(bar0, baz0, baz1)));
        Assertions.assertEquals(FetchSessionHandlerTest.toSet(new TopicPartition[0]), (Object)FetchSessionHandler.findMissing(FetchSessionHandlerTest.toSet(foo0, foo1, bar0, bar1, baz1), FetchSessionHandlerTest.toSet(foo0, foo1, bar0, bar1, baz0, baz1)));
    }

    private static LinkedHashMap<TopicPartition, FetchRequest.PartitionData> reqMap(ReqEntry ... entries) {
        LinkedHashMap<TopicPartition, FetchRequest.PartitionData> map = new LinkedHashMap<TopicPartition, FetchRequest.PartitionData>();
        for (ReqEntry entry : entries) {
            map.put(entry.part, entry.data);
        }
        return map;
    }

    private static void assertMapEquals(Map<TopicPartition, FetchRequest.PartitionData> expected, Map<TopicPartition, FetchRequest.PartitionData> actual) {
        Iterator<Map.Entry<TopicPartition, FetchRequest.PartitionData>> expectedIter = expected.entrySet().iterator();
        Iterator<Map.Entry<TopicPartition, FetchRequest.PartitionData>> actualIter = actual.entrySet().iterator();
        int i = 1;
        while (expectedIter.hasNext()) {
            Map.Entry<TopicPartition, FetchRequest.PartitionData> expectedEntry = expectedIter.next();
            if (!actualIter.hasNext()) {
                Assertions.fail((String)("Element " + i + " not found."));
            }
            Map.Entry<TopicPartition, FetchRequest.PartitionData> actuaLEntry = actualIter.next();
            Assertions.assertEquals((Object)expectedEntry.getKey(), (Object)actuaLEntry.getKey(), (String)("Element " + i + " had a different TopicPartition than expected."));
            Assertions.assertEquals((Object)expectedEntry.getValue(), (Object)actuaLEntry.getValue(), (String)("Element " + i + " had different PartitionData than expected."));
            ++i;
        }
        if (actualIter.hasNext()) {
            Assertions.fail((String)("Unexpected element " + i + " found."));
        }
    }

    @SafeVarargs
    private static void assertMapsEqual(Map<TopicPartition, FetchRequest.PartitionData> expected, Map<TopicPartition, FetchRequest.PartitionData> ... actuals) {
        for (Map<TopicPartition, FetchRequest.PartitionData> actual : actuals) {
            FetchSessionHandlerTest.assertMapEquals(expected, actual);
        }
    }

    private static void assertListEquals(List<TopicIdPartition> expected, List<TopicIdPartition> actual) {
        for (TopicIdPartition expectedPart : expected) {
            if (actual.contains(expectedPart)) continue;
            Assertions.fail((String)("Failed to find expected partition " + String.valueOf(expectedPart)));
        }
        for (TopicIdPartition actualPart : actual) {
            if (expected.contains(actualPart)) continue;
            Assertions.fail((String)("Found unexpected partition " + String.valueOf(actualPart)));
        }
    }

    private static LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> respMap(RespEntry ... entries) {
        LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> map = new LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData>();
        for (RespEntry entry : entries) {
            map.put(entry.part, entry.data);
        }
        return map;
    }

    @Test
    public void testSessionless() {
        HashMap topicIds = new HashMap();
        HashMap topicNames = new HashMap();
        List<Short> versions = Arrays.asList((short)12, ApiKeys.FETCH.latestVersion());
        versions.forEach(version -> {
            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
            FetchSessionHandler.Builder builder = handler.newBuilder();
            this.addTopicId(topicIds, topicNames, "foo", (short)version);
            Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
            builder.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            builder.add(new TopicPartition("foo", 1), new FetchRequest.PartitionData(fooId, 10L, 110L, 210, Optional.empty()));
            FetchSessionHandler.FetchRequestData data = builder.build();
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200), new ReqEntry("foo", fooId, 1, 10L, 110L, 210)), data.toSend(), data.sessionPartitions());
            Assertions.assertEquals((int)0, (int)data.metadata().sessionId());
            Assertions.assertEquals((int)0, (int)data.metadata().epoch());
            FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 0L, 0L), new RespEntry("foo", 1, fooId, 0L, 0L)), List.of());
            handler.handleResponse(resp, version.shortValue());
            FetchSessionHandler.Builder builder2 = handler.newBuilder();
            builder2.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            FetchSessionHandler.FetchRequestData data2 = builder2.build();
            Assertions.assertEquals((int)0, (int)data2.metadata().sessionId());
            Assertions.assertEquals((int)0, (int)data2.metadata().epoch());
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200)), data2.toSend(), data2.sessionPartitions());
        });
    }

    @Test
    public void testIncrementals() {
        HashMap topicIds = new HashMap();
        HashMap topicNames = new HashMap();
        List<Short> versions = Arrays.asList((short)12, ApiKeys.FETCH.latestVersion());
        versions.forEach(version -> {
            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
            FetchSessionHandler.Builder builder = handler.newBuilder();
            this.addTopicId(topicIds, topicNames, "foo", (short)version);
            Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
            TopicPartition foo0 = new TopicPartition("foo", 0);
            TopicPartition foo1 = new TopicPartition("foo", 1);
            builder.add(foo0, new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            builder.add(foo1, new FetchRequest.PartitionData(fooId, 10L, 110L, 210, Optional.empty()));
            FetchSessionHandler.FetchRequestData data = builder.build();
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200), new ReqEntry("foo", fooId, 1, 10L, 110L, 210)), data.toSend(), data.sessionPartitions());
            Assertions.assertEquals((int)0, (int)data.metadata().sessionId());
            Assertions.assertEquals((int)0, (int)data.metadata().epoch());
            FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 10L, 20L), new RespEntry("foo", 1, fooId, 10L, 20L)), List.of());
            handler.handleResponse(resp, version.shortValue());
            FetchSessionHandler.Builder builder2 = handler.newBuilder();
            this.addTopicId(topicIds, topicNames, "bar", (short)version);
            Uuid barId = topicIds.getOrDefault("bar", Uuid.ZERO_UUID);
            TopicPartition bar0 = new TopicPartition("bar", 0);
            builder2.add(foo0, new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            builder2.add(foo1, new FetchRequest.PartitionData(fooId, 10L, 120L, 210, Optional.empty()));
            builder2.add(bar0, new FetchRequest.PartitionData(barId, 20L, 200L, 200, Optional.empty()));
            FetchSessionHandler.FetchRequestData data2 = builder2.build();
            Assertions.assertFalse((boolean)data2.metadata().isFull());
            FetchSessionHandlerTest.assertMapEquals(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200), new ReqEntry("foo", fooId, 1, 10L, 120L, 210), new ReqEntry("bar", barId, 0, 20L, 200L, 200)), data2.sessionPartitions());
            FetchSessionHandlerTest.assertMapEquals(FetchSessionHandlerTest.reqMap(new ReqEntry("bar", barId, 0, 20L, 200L, 200), new ReqEntry("foo", fooId, 1, 10L, 120L, 210)), data2.toSend());
            FetchResponse resp2 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 1, fooId, 20L, 20L)), List.of());
            handler.handleResponse(resp2, version.shortValue());
            FetchResponse resp3 = FetchResponse.of((Errors)Errors.INVALID_FETCH_SESSION_EPOCH, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry[0]), List.of());
            handler.handleResponse(resp3, version.shortValue());
            FetchSessionHandler.Builder builder4 = handler.newBuilder();
            builder4.add(foo0, new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            builder4.add(foo1, new FetchRequest.PartitionData(fooId, 10L, 120L, 210, Optional.empty()));
            builder4.add(bar0, new FetchRequest.PartitionData(barId, 20L, 200L, 200, Optional.empty()));
            FetchSessionHandler.FetchRequestData data4 = builder4.build();
            Assertions.assertTrue((boolean)data4.metadata().isFull());
            Assertions.assertEquals((int)data2.metadata().sessionId(), (int)data4.metadata().sessionId());
            Assertions.assertEquals((int)0, (int)data4.metadata().epoch());
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200), new ReqEntry("foo", fooId, 1, 10L, 120L, 210), new ReqEntry("bar", barId, 0, 20L, 200L, 200)), data4.sessionPartitions(), data4.toSend());
        });
    }

    @Test
    public void testDoubleBuild() {
        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
        FetchSessionHandler.Builder builder = handler.newBuilder();
        builder.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(Uuid.randomUuid(), 0L, 100L, 200, Optional.empty()));
        builder.build();
        Assertions.assertThrows(Throwable.class, () -> ((FetchSessionHandler.Builder)builder).build(), (String)"Expected calling build twice to fail.");
    }

    @Test
    public void testIncrementalPartitionRemoval() {
        HashMap topicIds = new HashMap();
        HashMap topicNames = new HashMap();
        List<Short> versions = Arrays.asList((short)12, ApiKeys.FETCH.latestVersion());
        versions.forEach(version -> {
            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
            FetchSessionHandler.Builder builder = handler.newBuilder();
            this.addTopicId(topicIds, topicNames, "foo", (short)version);
            this.addTopicId(topicIds, topicNames, "bar", (short)version);
            Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
            Uuid barId = topicIds.getOrDefault("bar", Uuid.ZERO_UUID);
            TopicPartition foo0 = new TopicPartition("foo", 0);
            TopicPartition foo1 = new TopicPartition("foo", 1);
            TopicPartition bar0 = new TopicPartition("bar", 0);
            builder.add(foo0, new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            builder.add(foo1, new FetchRequest.PartitionData(fooId, 10L, 110L, 210, Optional.empty()));
            builder.add(bar0, new FetchRequest.PartitionData(barId, 20L, 120L, 220, Optional.empty()));
            FetchSessionHandler.FetchRequestData data = builder.build();
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200), new ReqEntry("foo", fooId, 1, 10L, 110L, 210), new ReqEntry("bar", barId, 0, 20L, 120L, 220)), data.toSend(), data.sessionPartitions());
            Assertions.assertTrue((boolean)data.metadata().isFull());
            FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 10L, 20L), new RespEntry("foo", 1, fooId, 10L, 20L), new RespEntry("bar", 0, barId, 10L, 20L)), List.of());
            handler.handleResponse(resp, version.shortValue());
            FetchSessionHandler.Builder builder2 = handler.newBuilder();
            builder2.add(foo1, new FetchRequest.PartitionData(fooId, 10L, 110L, 210, Optional.empty()));
            FetchSessionHandler.FetchRequestData data2 = builder2.build();
            Assertions.assertFalse((boolean)data2.metadata().isFull());
            Assertions.assertEquals((int)123, (int)data2.metadata().sessionId());
            Assertions.assertEquals((int)1, (int)data2.metadata().epoch());
            FetchSessionHandlerTest.assertMapEquals(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 1, 10L, 110L, 210)), data2.sessionPartitions());
            FetchSessionHandlerTest.assertMapEquals(FetchSessionHandlerTest.reqMap(new ReqEntry[0]), data2.toSend());
            ArrayList<TopicIdPartition> expectedToForget2 = new ArrayList<TopicIdPartition>();
            expectedToForget2.add(new TopicIdPartition(fooId, foo0));
            expectedToForget2.add(new TopicIdPartition(barId, bar0));
            FetchSessionHandlerTest.assertListEquals(expectedToForget2, data2.toForget());
            FetchResponse resp2 = FetchResponse.of((Errors)Errors.FETCH_SESSION_ID_NOT_FOUND, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry[0]), List.of());
            handler.handleResponse(resp2, version.shortValue());
            FetchSessionHandler.Builder builder3 = handler.newBuilder();
            builder3.add(foo0, new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            FetchSessionHandler.FetchRequestData data3 = builder3.build();
            Assertions.assertTrue((boolean)data3.metadata().isFull());
            Assertions.assertEquals((int)0, (int)data3.metadata().sessionId());
            Assertions.assertEquals((int)0, (int)data3.metadata().epoch());
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200)), data3.sessionPartitions(), data3.toSend());
        });
    }

    @Test
    public void testTopicIdUsageGrantedOnIdUpgrade() {
        List<Integer> partitions = Arrays.asList(0, 1);
        partitions.forEach(partition -> {
            String testType = partition == 0 ? "updating a partition" : "adding a new partition";
            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
            FetchSessionHandler.Builder builder = handler.newBuilder();
            builder.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 100L, 200, Optional.empty()));
            FetchSessionHandler.FetchRequestData data = builder.build();
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", Uuid.ZERO_UUID, 0, 0L, 100L, 200)), data.toSend(), data.sessionPartitions());
            Assertions.assertTrue((boolean)data.metadata().isFull());
            Assertions.assertFalse((boolean)data.canUseTopicIds());
            FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, Uuid.ZERO_UUID, 10L, 20L)), List.of());
            handler.handleResponse(resp, (short)12);
            Uuid topicId = Uuid.randomUuid();
            FetchSessionHandler.Builder builder2 = handler.newBuilder();
            builder2.add(new TopicPartition("foo", partition.intValue()), new FetchRequest.PartitionData(topicId, 10L, 110L, 210, Optional.empty()));
            FetchSessionHandler.FetchRequestData data2 = builder2.build();
            boolean updated = partition == 0;
            Assertions.assertEquals((int)123, (int)data2.metadata().sessionId(), (String)("Did not use same session when " + testType));
            Assertions.assertEquals((int)1, (int)data2.metadata().epoch(), (String)("Did not have correct epoch when " + testType));
            Assertions.assertEquals((Object)updated, (Object)data2.canUseTopicIds());
        });
    }

    @Test
    public void testIdUsageRevokedOnIdDowngrade() {
        List<Integer> partitions = Arrays.asList(0, 1);
        partitions.forEach(partition -> {
            String testType = partition == 0 ? "updating a partition" : "adding a new partition";
            Uuid fooId = Uuid.randomUuid();
            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
            FetchSessionHandler.Builder builder = handler.newBuilder();
            builder.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            FetchSessionHandler.FetchRequestData data = builder.build();
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200)), data.toSend(), data.sessionPartitions());
            Assertions.assertTrue((boolean)data.metadata().isFull());
            Assertions.assertTrue((boolean)data.canUseTopicIds());
            FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 10L, 20L)), List.of());
            handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
            FetchSessionHandler.Builder builder2 = handler.newBuilder();
            builder2.add(new TopicPartition("foo", partition.intValue()), new FetchRequest.PartitionData(Uuid.ZERO_UUID, 10L, 110L, 210, Optional.empty()));
            FetchSessionHandler.FetchRequestData data2 = builder2.build();
            Assertions.assertEquals((int)123, (int)data2.metadata().sessionId(), (String)("Did not use same session when " + testType));
            Assertions.assertEquals((int)1, (int)data2.metadata().epoch(), (String)("Did not have correct epoch when " + testType));
            Assertions.assertFalse((boolean)data2.canUseTopicIds());
        });
    }

    private static Stream<Arguments> idUsageCombinations() {
        return Stream.of(Arguments.of((Object[])new Object[]{true, true}), Arguments.of((Object[])new Object[]{true, false}), Arguments.of((Object[])new Object[]{false, true}), Arguments.of((Object[])new Object[]{false, false}));
    }

    @ParameterizedTest
    @MethodSource(value={"idUsageCombinations"})
    public void testTopicIdReplaced(boolean startsWithTopicIds, boolean endsWithTopicIds) {
        TopicPartition tp = new TopicPartition("foo", 0);
        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
        FetchSessionHandler.Builder builder = handler.newBuilder();
        Uuid topicId1 = startsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
        builder.add(tp, new FetchRequest.PartitionData(topicId1, 0L, 100L, 200, Optional.empty()));
        FetchSessionHandler.FetchRequestData data = builder.build();
        FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", topicId1, 0, 0L, 100L, 200)), data.toSend(), data.sessionPartitions());
        Assertions.assertTrue((boolean)data.metadata().isFull());
        Assertions.assertEquals((Object)startsWithTopicIds, (Object)data.canUseTopicIds());
        FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, topicId1, 10L, 20L)), List.of());
        short version = startsWithTopicIds ? (short)ApiKeys.FETCH.latestVersion() : (short)12;
        handler.handleResponse(resp, version);
        FetchSessionHandler.Builder builder2 = handler.newBuilder();
        Uuid topicId2 = endsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
        FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(topicId2, 0L, 100L, 200, Optional.empty());
        builder2.add(tp, partitionData);
        FetchSessionHandler.FetchRequestData data2 = builder2.build();
        if (startsWithTopicIds && endsWithTopicIds) {
            Assertions.assertEquals(Collections.singletonList(new TopicIdPartition(topicId1, tp)), (Object)data2.toReplace());
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", topicId2, 0, 0L, 100L, 200)), data2.toSend(), data2.sessionPartitions());
            Assertions.assertEquals(Collections.singletonMap(topicId2, tp.topic()), (Object)handler.sessionTopicNames());
        } else if (startsWithTopicIds || endsWithTopicIds) {
            Assertions.assertEquals(Collections.emptyList(), (Object)data2.toReplace());
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", topicId2, 0, 0L, 100L, 200)), data2.toSend(), data2.sessionPartitions());
            if (endsWithTopicIds) {
                Assertions.assertEquals(Collections.singletonMap(topicId2, tp.topic()), (Object)handler.sessionTopicNames());
            } else {
                Assertions.assertEquals(Collections.emptyMap(), (Object)handler.sessionTopicNames());
            }
        } else {
            Assertions.assertEquals(Collections.emptyList(), (Object)data2.toReplace());
            Assertions.assertEquals(Collections.emptyMap(), (Object)data2.toSend());
            FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", topicId2, 0, 0L, 100L, 200)), data2.sessionPartitions());
            Assertions.assertEquals(Collections.emptyMap(), (Object)handler.sessionTopicNames());
        }
        Assertions.assertEquals((int)123, (int)data2.metadata().sessionId(), (String)"Did not use same session");
        Assertions.assertEquals((int)1, (int)data2.metadata().epoch(), (String)"Did not have correct epoch");
        Assertions.assertEquals((Object)endsWithTopicIds, (Object)data2.canUseTopicIds());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testSessionEpochWhenMixedUsageOfTopicIDs(boolean startsWithTopicIds) {
        Uuid fooId = startsWithTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
        Uuid barId = startsWithTopicIds ? Uuid.ZERO_UUID : Uuid.randomUuid();
        short responseVersion = startsWithTopicIds ? (short)ApiKeys.FETCH.latestVersion() : (short)12;
        TopicPartition tp0 = new TopicPartition("foo", 0);
        TopicPartition tp1 = new TopicPartition("bar", 1);
        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
        FetchSessionHandler.Builder builder = handler.newBuilder();
        builder.add(tp0, new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
        FetchSessionHandler.FetchRequestData data = builder.build();
        FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", fooId, 0, 0L, 100L, 200)), data.toSend(), data.sessionPartitions());
        Assertions.assertTrue((boolean)data.metadata().isFull());
        Assertions.assertEquals((Object)startsWithTopicIds, (Object)data.canUseTopicIds());
        FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 10L, 20L)), List.of());
        handler.handleResponse(resp, responseVersion);
        FetchSessionHandler.Builder builder2 = handler.newBuilder();
        builder2.add(tp0, new FetchRequest.PartitionData(fooId, 10L, 110L, 210, Optional.empty()));
        builder2.add(tp1, new FetchRequest.PartitionData(barId, 0L, 100L, 200, Optional.empty()));
        FetchSessionHandler.FetchRequestData data2 = builder2.build();
        Assertions.assertEquals((int)123, (int)data2.metadata().sessionId(), (String)"Did not use same session");
        Assertions.assertEquals((int)1, (int)data2.metadata().epoch(), (String)"Did not have final epoch");
        Assertions.assertFalse((boolean)data2.canUseTopicIds());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testIdUsageWithAllForgottenPartitions(boolean useTopicIds) {
        TopicPartition foo0 = new TopicPartition("foo", 0);
        Uuid topicId = useTopicIds ? Uuid.randomUuid() : Uuid.ZERO_UUID;
        short responseVersion = useTopicIds ? (short)ApiKeys.FETCH.latestVersion() : (short)12;
        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
        FetchSessionHandler.Builder builder = handler.newBuilder();
        builder.add(foo0, new FetchRequest.PartitionData(topicId, 0L, 100L, 200, Optional.empty()));
        FetchSessionHandler.FetchRequestData data = builder.build();
        FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", topicId, 0, 0L, 100L, 200)), data.toSend(), data.sessionPartitions());
        Assertions.assertTrue((boolean)data.metadata().isFull());
        Assertions.assertEquals((Object)useTopicIds, (Object)data.canUseTopicIds());
        FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, topicId, 10L, 20L)), List.of());
        handler.handleResponse(resp, responseVersion);
        FetchSessionHandler.Builder builder2 = handler.newBuilder();
        FetchSessionHandler.FetchRequestData data2 = builder2.build();
        Assertions.assertEquals(Collections.singletonList(new TopicIdPartition(topicId, foo0)), (Object)data2.toForget());
        Assertions.assertEquals((int)123, (int)data2.metadata().sessionId(), (String)("Did not use same session when useTopicIds was " + useTopicIds));
        Assertions.assertEquals((int)1, (int)data2.metadata().epoch(), (String)("Did not have correct epoch when useTopicIds was " + useTopicIds));
        Assertions.assertEquals((Object)useTopicIds, (Object)data2.canUseTopicIds());
    }

    @Test
    public void testOkToAddNewIdAfterTopicRemovedFromSession() {
        Uuid topicId = Uuid.randomUuid();
        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
        FetchSessionHandler.Builder builder = handler.newBuilder();
        builder.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(topicId, 0L, 100L, 200, Optional.empty()));
        FetchSessionHandler.FetchRequestData data = builder.build();
        FetchSessionHandlerTest.assertMapsEqual(FetchSessionHandlerTest.reqMap(new ReqEntry("foo", topicId, 0, 0L, 100L, 200)), data.toSend(), data.sessionPartitions());
        Assertions.assertTrue((boolean)data.metadata().isFull());
        Assertions.assertTrue((boolean)data.canUseTopicIds());
        FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, topicId, 10L, 20L)), List.of());
        handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
        FetchSessionHandler.Builder builder2 = handler.newBuilder();
        FetchSessionHandler.FetchRequestData data2 = builder2.build();
        FetchSessionHandlerTest.assertMapsEqual(new LinkedHashMap<TopicPartition, FetchRequest.PartitionData>(), data2.toSend(), data2.sessionPartitions());
        FetchResponse resp2 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, new LinkedHashMap(), List.of());
        handler.handleResponse(resp2, ApiKeys.FETCH.latestVersion());
        FetchSessionHandler.Builder builder3 = handler.newBuilder();
        builder3.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(Uuid.randomUuid(), 0L, 100L, 200, Optional.empty()));
        FetchSessionHandler.FetchRequestData data3 = builder3.build();
        Assertions.assertEquals((int)123, (int)data3.metadata().sessionId(), (String)"Did not use same session");
        Assertions.assertEquals((int)2, (int)data3.metadata().epoch(), (String)"Did not have the correct session epoch");
        Assertions.assertTrue((boolean)data.canUseTopicIds());
    }

    @Test
    public void testVerifyFullFetchResponsePartitions() {
        HashMap topicIds = new HashMap();
        HashMap topicNames = new HashMap();
        List<Short> versions = Arrays.asList((short)12, ApiKeys.FETCH.latestVersion());
        versions.forEach(version -> {
            FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
            this.addTopicId(topicIds, topicNames, "foo", (short)version);
            this.addTopicId(topicIds, topicNames, "bar", (short)version);
            Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
            Uuid barId = topicIds.getOrDefault("bar", Uuid.ZERO_UUID);
            TopicPartition foo0 = new TopicPartition("foo", 0);
            TopicPartition foo1 = new TopicPartition("foo", 1);
            TopicPartition bar0 = new TopicPartition("bar", 0);
            FetchResponse resp1 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 10L, 20L), new RespEntry("foo", 1, fooId, 10L, 20L), new RespEntry("bar", 0, barId, 10L, 20L)), List.of());
            String issue = handler.verifyFullFetchResponsePartitions(resp1.responseData(topicNames, version.shortValue()).keySet(), resp1.topicIds(), version.shortValue());
            Assertions.assertTrue((boolean)issue.contains("extraPartitions="));
            Assertions.assertFalse((boolean)issue.contains("omittedPartitions="));
            FetchSessionHandler.Builder builder = handler.newBuilder();
            builder.add(foo0, new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
            builder.add(foo1, new FetchRequest.PartitionData(fooId, 10L, 110L, 210, Optional.empty()));
            builder.add(bar0, new FetchRequest.PartitionData(barId, 20L, 120L, 220, Optional.empty()));
            builder.build();
            FetchResponse resp2 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 10L, 20L), new RespEntry("foo", 1, fooId, 10L, 20L), new RespEntry("bar", 0, barId, 10L, 20L)), List.of());
            String issue2 = handler.verifyFullFetchResponsePartitions(resp2.responseData(topicNames, version.shortValue()).keySet(), resp2.topicIds(), version.shortValue());
            Assertions.assertNull((Object)issue2);
            FetchResponse resp3 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, fooId, 10L, 20L), new RespEntry("foo", 1, fooId, 10L, 20L)), List.of());
            String issue3 = handler.verifyFullFetchResponsePartitions(resp3.responseData(topicNames, version.shortValue()).keySet(), resp3.topicIds(), version.shortValue());
            Assertions.assertFalse((boolean)issue3.contains("extraPartitions="));
            Assertions.assertTrue((boolean)issue3.contains("omittedPartitions="));
        });
    }

    @Test
    public void testVerifyFullFetchResponsePartitionsWithTopicIds() {
        HashMap<String, Uuid> topicIds = new HashMap<String, Uuid>();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
        this.addTopicId(topicIds, topicNames, "foo", ApiKeys.FETCH.latestVersion());
        this.addTopicId(topicIds, topicNames, "bar", ApiKeys.FETCH.latestVersion());
        this.addTopicId(topicIds, topicNames, "extra2", ApiKeys.FETCH.latestVersion());
        FetchResponse resp1 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, (Uuid)topicIds.get("foo"), 10L, 20L), new RespEntry("extra2", 1, (Uuid)topicIds.get("extra2"), 10L, 20L), new RespEntry("bar", 0, (Uuid)topicIds.get("bar"), 10L, 20L)), List.of());
        String issue = handler.verifyFullFetchResponsePartitions(resp1.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet(), resp1.topicIds(), ApiKeys.FETCH.latestVersion());
        Assertions.assertTrue((boolean)issue.contains("extraPartitions="));
        Assertions.assertFalse((boolean)issue.contains("omittedPartitions="));
        FetchSessionHandler.Builder builder = handler.newBuilder();
        builder.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData((Uuid)topicIds.get("foo"), 0L, 100L, 200, Optional.empty()));
        builder.add(new TopicPartition("bar", 0), new FetchRequest.PartitionData((Uuid)topicIds.get("bar"), 20L, 120L, 220, Optional.empty()));
        builder.build();
        FetchResponse resp2 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, (Uuid)topicIds.get("foo"), 10L, 20L), new RespEntry("extra2", 1, (Uuid)topicIds.get("extra2"), 10L, 20L), new RespEntry("bar", 0, (Uuid)topicIds.get("bar"), 10L, 20L)), List.of());
        String issue2 = handler.verifyFullFetchResponsePartitions(resp2.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet(), resp2.topicIds(), ApiKeys.FETCH.latestVersion());
        Assertions.assertTrue((boolean)issue2.contains("extraPartitions="));
        Assertions.assertFalse((boolean)issue2.contains("omittedPartitions="));
        FetchResponse resp3 = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)0, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, (Uuid)topicIds.get("foo"), 10L, 20L), new RespEntry("bar", 0, (Uuid)topicIds.get("bar"), 10L, 20L)), List.of());
        String issue3 = handler.verifyFullFetchResponsePartitions(resp3.responseData(topicNames, ApiKeys.FETCH.latestVersion()).keySet(), resp3.topicIds(), ApiKeys.FETCH.latestVersion());
        Assertions.assertNull((Object)issue3);
    }

    @Test
    public void testTopLevelErrorResetsMetadata() {
        HashMap<String, Uuid> topicIds = new HashMap<String, Uuid>();
        HashMap<Uuid, String> topicNames = new HashMap<Uuid, String>();
        FetchSessionHandler handler = new FetchSessionHandler(LOG_CONTEXT, 1);
        FetchSessionHandler.Builder builder = handler.newBuilder();
        this.addTopicId(topicIds, topicNames, "foo", ApiKeys.FETCH.latestVersion());
        Uuid fooId = topicIds.getOrDefault("foo", Uuid.ZERO_UUID);
        builder.add(new TopicPartition("foo", 0), new FetchRequest.PartitionData(fooId, 0L, 100L, 200, Optional.empty()));
        builder.add(new TopicPartition("foo", 1), new FetchRequest.PartitionData(fooId, 10L, 110L, 210, Optional.empty()));
        FetchSessionHandler.FetchRequestData data = builder.build();
        Assertions.assertEquals((int)0, (int)data.metadata().sessionId());
        Assertions.assertEquals((int)0, (int)data.metadata().epoch());
        FetchResponse resp = FetchResponse.of((Errors)Errors.NONE, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("foo", 0, (Uuid)topicIds.get("foo"), 10L, 20L), new RespEntry("foo", 1, (Uuid)topicIds.get("foo"), 10L, 20L)), List.of());
        handler.handleResponse(resp, ApiKeys.FETCH.latestVersion());
        FetchSessionHandler.Builder builder2 = handler.newBuilder();
        this.addTopicId(topicIds, topicNames, "unknown", ApiKeys.FETCH.latestVersion());
        builder2.add(new TopicPartition("unknown", 0), new FetchRequest.PartitionData(topicIds.getOrDefault("unknown", Uuid.ZERO_UUID), 0L, 100L, 200, Optional.empty()));
        FetchSessionHandler.FetchRequestData data2 = builder2.build();
        Assertions.assertFalse((boolean)data2.metadata().isFull());
        Assertions.assertEquals((int)123, (int)data2.metadata().sessionId());
        Assertions.assertEquals((int)FetchMetadata.nextEpoch((int)0), (int)data2.metadata().epoch());
        FetchResponse resp2 = FetchResponse.of((Errors)Errors.UNKNOWN_TOPIC_ID, (int)0, (int)123, FetchSessionHandlerTest.respMap(new RespEntry("unknown", 0, Uuid.randomUuid(), Errors.UNKNOWN_TOPIC_ID)), List.of());
        Assertions.assertFalse((boolean)handler.handleResponse(resp2, ApiKeys.FETCH.latestVersion()));
        FetchSessionHandler.Builder builder3 = handler.newBuilder();
        FetchSessionHandler.FetchRequestData data3 = builder3.build();
        Assertions.assertEquals((int)123, (int)data3.metadata().sessionId());
        Assertions.assertEquals((int)0, (int)data3.metadata().epoch());
    }

    private void addTopicId(Map<String, Uuid> topicIds, Map<Uuid, String> topicNames, String name, short version) {
        if (version >= 13) {
            Uuid id = Uuid.randomUuid();
            topicIds.put(name, id);
            topicNames.put(id, name);
        }
    }

    private static final class ReqEntry {
        final TopicPartition part;
        final FetchRequest.PartitionData data;

        ReqEntry(String topic, Uuid topicId, int partition, long fetchOffset, long logStartOffset, int maxBytes) {
            this.part = new TopicPartition(topic, partition);
            this.data = new FetchRequest.PartitionData(topicId, fetchOffset, logStartOffset, maxBytes, Optional.empty());
        }
    }

    private static final class RespEntry {
        final TopicIdPartition part;
        final FetchResponseData.PartitionData data;

        RespEntry(String topic, int partition, Uuid topicId, long highWatermark, long lastStableOffset) {
            this.part = new TopicIdPartition(topicId, new TopicPartition(topic, partition));
            this.data = new FetchResponseData.PartitionData().setPartitionIndex(partition).setHighWatermark(highWatermark).setLastStableOffset(lastStableOffset).setLogStartOffset(0L);
        }

        RespEntry(String topic, int partition, Uuid topicId, Errors error) {
            this.part = new TopicIdPartition(topicId, new TopicPartition(topic, partition));
            this.data = new FetchResponseData.PartitionData().setPartitionIndex(partition).setErrorCode(error.code()).setHighWatermark(-1L);
        }
    }
}

