/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.File;
import java.io.Serializable;
import java.net.Socket;
import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.server.BaseFetchRequestTest;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005\u0005Mb\u0001B\n\u0015\u0001eAQA\b\u0001\u0005\u0002}Aq!\t\u0001C\u0002\u0013\u0005!\u0005\u0003\u0004*\u0001\u0001\u0006Ia\t\u0005\bU\u0001\u0011\r\u0011\"\u0001#\u0011\u0019Y\u0003\u0001)A\u0005G!9A\u0006\u0001b\u0001\n\u0003i\u0003B\u0002\u001c\u0001A\u0003%a\u0006C\u00048\u0001\t\u0007I\u0011\u0001\u0012\t\ra\u0002\u0001\u0015!\u0003$\u0011\u001dI\u0004A1A\u0005\u0002\tBaA\u000f\u0001!\u0002\u0013\u0019\u0003\"B\u001e\u0001\t\u0003a\u0004\"B\"\u0001\t\u0003\"\u0005\"\u0002(\u0001\t\u0003y\u0005bBA\u0003\u0001\u0011\u0005\u0011q\u0001\u0005\b\u0003#\u0001A\u0011AA\n\u0011\u001d\ti\u0002\u0001C\u0001\u0003?Aa!!\r\u0001\t\u0013\u0011#\u0001\t$fi\u000eDgI]8n\r>dGn\\<fe&sG/Z4sCRLwN\u001c+fgRT!!\u0006\f\u0002\rM,'O^3s\u0015\u00059\u0012!B6bM.\f7\u0001A\n\u0003\u0001i\u0001\"a\u0007\u000f\u000e\u0003QI!!\b\u000b\u0003)\t\u000b7/\u001a$fi\u000eD'+Z9vKN$H+Z:u\u0003\u0019a\u0014N\\5u}Q\t\u0001\u0005\u0005\u0002\u001c\u0001\u0005Aa.^7O_\u0012,7/F\u0001$!\t!s%D\u0001&\u0015\u00051\u0013!B:dC2\f\u0017B\u0001\u0015&\u0005\rIe\u000e^\u0001\n]Vlgj\u001c3fg\u0002\n\u0001B\\;n!\u0006\u0014Ho]\u0001\n]Vl\u0007+\u0019:ug\u0002\nQ\u0001^8qS\u000e,\u0012A\f\t\u0003_Qj\u0011\u0001\r\u0006\u0003cI\nA\u0001\\1oO*\t1'\u0001\u0003kCZ\f\u0017BA\u001b1\u0005\u0019\u0019FO]5oO\u00061Ao\u001c9jG\u0002\na\u0002\\3bI\u0016\u0014(I]8lKJLE-A\bmK\u0006$WM\u001d\"s_.,'/\u00133!\u0003A1w\u000e\u001c7po\u0016\u0014(I]8lKJLE-A\tg_2dwn^3s\u0005J|7.\u001a:JI\u0002\nqb\u001c<feJLG-\u001b8h!J|\u0007o]\u000b\u0002{A\u0011a(Q\u0007\u0002\u007f)\u0011\u0001IM\u0001\u0005kRLG.\u0003\u0002C\u007f\tQ\u0001K]8qKJ$\u0018.Z:\u0002\u001f\u001d,g.\u001a:bi\u0016\u001cuN\u001c4jON,\u0012!\u0012\t\u0004\r&[U\"A$\u000b\u0005!+\u0013AC2pY2,7\r^5p]&\u0011!j\u0012\u0002\u0004'\u0016\f\bCA\u000eM\u0013\tiECA\u0006LC\u001a\\\u0017mQ8oM&<\u0017a\f;fgR4u\u000e\u001c7po\u0016\u00148i\\7qY\u0016$X\rR3mCf,GMR3uG\",7o\u00148SKBd\u0017nY1uS>tGC\u0001)T!\t!\u0013+\u0003\u0002SK\t!QK\\5u\u0011\u0015!f\u00021\u0001V\u000359'o\\;q!J|Go\\2pYB\u0011a+\u0018\b\u0003/n\u0003\"\u0001W\u0013\u000e\u0003eS!A\u0017\r\u0002\rq\u0012xn\u001c;?\u0013\taV%\u0001\u0004Qe\u0016$WMZ\u0005\u0003kyS!\u0001X\u0013)\t9\u0001G.\u001c\t\u0003C*l\u0011A\u0019\u0006\u0003G\u0012\fa\u0001]1sC6\u001c(BA3g\u0003\u001dQW\u000f]5uKJT!a\u001a5\u0002\u000b),h.\u001b;\u000b\u0003%\f1a\u001c:h\u0013\tY'MA\tQCJ\fW.\u001a;fe&TX\r\u001a+fgR\fAA\\1nK\u0006\na.A\u0010|I&\u001c\b\u000f\\1z\u001d\u0006lW- \u0018he>,\b\u000f\u0015:pi>\u001cw\u000e\\\u001f|auDCA\u00049woB\u0011\u0011\u000f^\u0007\u0002e*\u00111OY\u0001\taJ|g/\u001b3fe&\u0011QO\u001d\u0002\r\u001b\u0016$\bn\u001c3T_V\u00148-Z\u0001\u0006m\u0006dW/\u001a\u0017\u0002q\u0006\n\u00110A\u0011hKR$Vm\u001d;He>,\b\u000f\u0015:pi>\u001cw\u000e\u001c)be\u0006lW\r^3sg\u0006cG\u000eK\u0003\u000fwZ\f\u0019\u0001\u0005\u0002}\u007f6\tQP\u0003\u0002\u007fI\u0006\u0019\u0011\r]5\n\u0007\u0005\u0005QPA\u0004US6,w.\u001e;\u001f\u0003=\t\u0011\b^3ti\u001a+Go\u00195Ge>lG*Z1eKJ<\u0006.\u001b7f!J,g-\u001a:sK\u0012\u0014V-\u00193SKBd\u0017nY1JgVs\u0017M^1jY\u0006\u0014G.\u001a\u000b\u0004!\u0006%\u0001\"\u0002+\u0010\u0001\u0004)\u0006\u0006B\baY6DSa\u00049w\u0003\u001fa\u0013\u0001_\u0001\u001ei\u0016\u001cHOR3uG\"4%o\\7G_2dwn^3s/&$\bNU8mYR\u0019\u0001+!\u0006\t\u000bQ\u0003\u0002\u0019A+)\tA\u0001G.\u001c\u0015\u0006!A4\u00181\u0004\u0017\u0002q\u0006QB/Z:u%\u0006\u001c7.Q<be\u0016\u0014\u0016M\\4f\u0003N\u001c\u0018n\u001a8peR\u0019\u0001+!\t\t\u000bQ\u000b\u0002\u0019A+)\u0007E\t)\u0003E\u0002}\u0003OI1!!\u000b~\u0005!!\u0015n]1cY\u0016$\u0007\u0006B\taY6DS!\u00059w\u0003_a\u0013\u0001_\u0001\u0014O\u0016$\bK]3gKJ\u0014X\r\u001a*fa2L7-\u0019")
public class FetchFromFollowerIntegrationTest
extends BaseFetchRequestTest {
    private final int numNodes;
    private final int numParts;
    private final String topic;
    private final int leaderBrokerId;
    private final int followerBrokerId;

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String topic() {
        return this.topic;
    }

    public int leaderBrokerId() {
        return this.leaderBrokerId;
    }

    public int followerBrokerId() {
        return this.followerBrokerId;
    }

    public Properties overridingProps() {
        Properties props = new Properties();
        props.put("num.partitions", Integer.toString(this.numParts()));
        props.put("offsets.topic.replication.factor", Integer.toString(this.numNodes()));
        return props;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        int x$12 = this.numNodes();
        boolean x$4 = true;
        None$ x$5 = None$.MODULE$;
        None$ x$6 = None$.MODULE$;
        None$ x$7 = None$.MODULE$;
        boolean x$8 = true;
        boolean x$9 = false;
        boolean x$10 = false;
        boolean x$11 = false;
        Map x$122 = (Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$);
        int x$13 = 1;
        boolean x$14 = false;
        int x$15 = 1;
        short x$16 = 1;
        int x$17 = 0;
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(x$12, false, x$4, (Option<SecurityProtocol>)x$5, (Option<File>)x$6, (Option<Properties>)x$7, x$8, x$9, x$10, x$11, (Map<Object, String>)x$122, x$13, x$14, x$15, x$16, x$17, true).map((Function1 & Serializable)x$1 -> {
            Properties fromProps_overrides = this.overridingProps();
            return KafkaConfig$.MODULE$.fromProps(x$1, fromProps_overrides, true);
        });
    }

    @ParameterizedTest(name="{displayName}.groupProtocol={0}")
    @MethodSource(value={"getTestGroupProtocolParametersAll"})
    @Timeout(value=15L)
    public void testFollowerCompleteDelayedFetchesOnReplication(String groupProtocol) {
        Admin admin = this.createAdminClient(this.createAdminClient$default$1(), this.createAdminClient$default$2());
        String x$22 = this.topic();
        Buffer<KafkaBroker> x$3 = this.brokers();
        Seq<ControllerServer> x$4 = this.controllerServers();
        scala.collection.immutable.Map x$5 = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{this.leaderBrokerId(), this.followerBrokerId()})))}));
        int x$6 = 1;
        int x$7 = 1;
        Properties x$8 = new Properties();
        scala.collection.immutable.Map<Object, Object> partitionLeaders = TestUtils$.MODULE$.createTopicWithAdmin(admin, x$22, x$3, x$4, x$6, x$7, (Map<Object, Seq<Object>>)x$5, x$8);
        TestUtils$.MODULE$.waitUntilLeaderIsKnown(this.brokers(), new TopicPartition(this.topic(), 0), 15000L);
        Assertions.assertTrue((boolean)partitionLeaders.values().forall((Function1)(JFunction1.mcZI.sp & Serializable)x$2 -> x$2 == this.leaderBrokerId()));
        short version = ApiKeys.FETCH.latestVersion();
        TopicPartition topicPartition = new TopicPartition(this.topic(), 0);
        scala.collection.immutable.Map offsetMap = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)BoxesRunTime.boxToLong((long)0L))}));
        FetchRequest fetchRequest = this.createConsumerFetchRequest(1000, 1000, (Seq<TopicPartition>)new .colon.colon((Object)topicPartition, (List)Nil$.MODULE$), (scala.collection.immutable.Map<TopicPartition, Object>)offsetMap, version, 20000, 1, this.createConsumerFetchRequest$default$8());
        try (Socket socket = this.connect(this.brokerSocketServer(this.followerBrokerId()), this.connect$default$2());){
            this.send((AbstractRequest)fetchRequest, socket, this.send$default$3(), this.send$default$4());
            TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), this.topic(), 1, -1);
            FetchResponse response = (FetchResponse)this.receive(socket, ApiKeys.FETCH, version, ClassTag$.MODULE$.apply(FetchResponse.class));
            Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
            Assertions.assertEquals(java.util.Map.of(Errors.NONE, BoxesRunTime.boxToInteger((int)2)), (Object)response.errorCounts());
        }
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.groupProtocol={0}")
    @MethodSource(value={"getTestGroupProtocolParametersAll"})
    public void testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable(String groupProtocol) {
        Admin admin = this.createAdminClient(this.createAdminClient$default$1(), this.createAdminClient$default$2());
        String x$2 = this.topic();
        Buffer<KafkaBroker> x$3 = this.brokers();
        Seq<ControllerServer> x$4 = this.controllerServers();
        scala.collection.immutable.Map x$5 = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{this.leaderBrokerId(), this.followerBrokerId()})))}));
        int x$6 = 1;
        int x$7 = 1;
        Properties x$8 = new Properties();
        TestUtils$.MODULE$.createTopicWithAdmin(admin, x$2, x$3, x$4, x$6, x$7, (Map<Object, Seq<Object>>)x$5, x$8);
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), this.topic(), 10, -1);
        Assertions.assertEquals((int)1, (int)this.getPreferredReplica());
        ((KafkaBroker)this.brokers().apply(this.followerBrokerId())).shutdown();
        TopicPartition topicPartition = new TopicPartition(this.topic(), 0);
        long l = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!FetchFromFollowerIntegrationTest.$anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$1(this, topicPartition)) {
            void waitUntilTrue_pause;
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"follower is still reachable.");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
        }
        Assertions.assertEquals((int)-1, (int)this.getPreferredReplica());
    }

    /*
     * WARNING - void declaration
     */
    @ParameterizedTest(name="{displayName}.groupProtocol={0}")
    @MethodSource(value={"getTestGroupProtocolParametersAll"})
    public void testFetchFromFollowerWithRoll(String groupProtocol) {
        Admin admin = this.createAdminClient(this.createAdminClient$default$1(), this.createAdminClient$default$2());
        String x$2 = this.topic();
        Buffer<KafkaBroker> x$3 = this.brokers();
        Seq<ControllerServer> x$4 = this.controllerServers();
        scala.collection.immutable.Map x$5 = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)0)), (Object)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{this.leaderBrokerId(), this.followerBrokerId()})))}));
        int x$6 = 1;
        int x$7 = 1;
        Properties x$8 = new Properties();
        TestUtils$.MODULE$.createTopicWithAdmin(admin, x$2, x$3, x$4, x$6, x$7, (Map<Object, Seq<Object>>)x$5, x$8);
        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", this.bootstrapServers(this.bootstrapServers$default$1()));
        consumerProps.put("group.id", "test-group");
        consumerProps.put("auto.offset.reset", "earliest");
        consumerProps.put("client.rack", Integer.toString(this.followerBrokerId()));
        consumerProps.put("group.protocol", groupProtocol);
        try (KafkaConsumer consumer = new KafkaConsumer(consumerProps, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());){
            consumer.subscribe(java.util.List.of(this.topic()));
            long l = 100L;
            long waitUntilTrue_waitTimeMs = 15000L;
            long waitUntilTrue_startTime = System.currentTimeMillis();
            while (!FetchFromFollowerIntegrationTest.$anonfun$testFetchFromFollowerWithRoll$1(this)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                    Assertions.fail((String)"Preferred replica is not set");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
            }
            TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), this.topic(), 1, -1);
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, 1, 15000L);
            ((KafkaBroker)this.brokers().apply(this.followerBrokerId())).shutdown();
            TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), this.topic(), 1, -1);
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, 1, 15000L);
            ((KafkaBroker)this.brokers().apply(this.followerBrokerId())).startup();
            long l2 = 100L;
            long waitUntilTrue_waitTimeMs2 = 15000L;
            long waitUntilTrue_startTime2 = System.currentTimeMillis();
            while (!FetchFromFollowerIntegrationTest.$anonfun$testFetchFromFollowerWithRoll$3(this)) {
                void waitUntilTrue_pause;
                if (System.currentTimeMillis() > waitUntilTrue_startTime2 + waitUntilTrue_waitTimeMs2) {
                    Assertions.fail((String)"Preferred replica is not set");
                }
                Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs2), (long)waitUntilTrue_pause));
            }
            TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), this.topic(), 1, -1);
            TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, 1, 15000L);
        }
    }

    @Disabled
    @ParameterizedTest(name="{displayName}.groupProtocol={0}")
    @MethodSource(value={"getTestGroupProtocolParametersAll"})
    public void testRackAwareRangeAssignor(String groupProtocol) {
        List partitionList = this.brokers().indices().toList();
        String topicWithAllPartitionsOnAllRacks = "topicWithAllPartitionsOnAllRacks";
        this.createTopic(topicWithAllPartitionsOnAllRacks, this.brokers().size(), this.brokers().size(), this.createTopic$default$4(), this.createTopic$default$5(), this.createTopic$default$6());
        String topicWithSingleRackPartitions = "topicWithSingleRackPartitions";
        this.createTopicWithAssignment(topicWithSingleRackPartitions, (Map<Object, Seq<Object>>)partitionList.map((Function1 & Serializable)i -> FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$1(this, BoxesRunTime.unboxToInt((Object)i))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl()), this.createTopicWithAssignment$default$3());
        this.consumerConfig().setProperty("partition.assignment.strategy", RangeAssignor.class.getName());
        Buffer consumers = (Buffer)this.brokers().map((Function1 & Serializable)server -> {
            this.consumerConfig().setProperty("auto.offset.reset", "earliest");
            this.consumerConfig().setProperty("client.rack", (String)server.config().rack().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
            this.consumerConfig().setProperty("group.instance.id", "instance-" + server.config().brokerId());
            this.consumerConfig().setProperty("metadata.max.age.ms", "1000");
            return this.createConsumer(this.createConsumer$default$1(), this.createConsumer$default$2(), this.createConsumer$default$3(), this.createConsumer$default$4());
        });
        KafkaProducer producer = this.createProducer(this.createProducer$default$1(), this.createProducer$default$2(), this.createProducer$default$3());
        ExecutorService executor = Executors.newFixedThreadPool(consumers.size());
        try {
            consumers.foreach((Function1 & Serializable)x$3 -> {
                x$3.subscribe(java.util.Set.of(topicWithSingleRackPartitions));
                return BoxedUnit.UNIT;
            });
            FetchFromFollowerIntegrationTest.verifyAssignments$1(partitionList.reverse(), (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topicWithSingleRackPartitions}), consumers, executor, producer);
            consumers.foreach((Function1 & Serializable)x$4 -> {
                x$4.subscribe(java.util.Set.of(topicWithAllPartitionsOnAllRacks));
                return BoxedUnit.UNIT;
            });
            FetchFromFollowerIntegrationTest.verifyAssignments$1(partitionList, (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topicWithAllPartitionsOnAllRacks}), consumers, executor, producer);
            consumers.foreach((Function1 & Serializable)x$5 -> {
                x$5.subscribe(java.util.Set.of(topicWithSingleRackPartitions, topicWithAllPartitionsOnAllRacks));
                return BoxedUnit.UNIT;
            });
            FetchFromFollowerIntegrationTest.verifyAssignments$1(partitionList.reverse(), (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topicWithAllPartitionsOnAllRacks, topicWithSingleRackPartitions}), consumers, executor, producer);
            Admin admin = this.createAdminClient(this.createAdminClient$default$1(), this.createAdminClient$default$2());
            HashMap reassignments = new HashMap();
            partitionList.foreach((Function1 & Serializable)p -> FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$18(reassignments, topicWithSingleRackPartitions, BoxesRunTime.unboxToInt((Object)p)));
            admin.alterPartitionReassignments(reassignments).all().get(30L, TimeUnit.SECONDS);
            FetchFromFollowerIntegrationTest.verifyAssignments$1(partitionList, (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{topicWithAllPartitionsOnAllRacks, topicWithSingleRackPartitions}), consumers, executor, producer);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private int getPreferredReplica() {
        TopicPartition topicPartition = new TopicPartition(this.topic(), 0);
        scala.collection.immutable.Map offsetMap = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicPartition), (Object)BoxesRunTime.boxToLong((long)0L))}));
        FetchRequest request = this.createConsumerFetchRequest(1000, 1000, (Seq<TopicPartition>)new .colon.colon((Object)topicPartition, (List)Nil$.MODULE$), (scala.collection.immutable.Map<TopicPartition, Object>)offsetMap, ApiKeys.FETCH.latestVersion(), 500, 1, Integer.toString(this.followerBrokerId()));
        FetchResponse response = (FetchResponse)this.connectAndReceive((AbstractRequest)request, ((KafkaBroker)this.brokers().apply(this.leaderBrokerId())).socketServer(), this.connectAndReceive$default$3(), ClassTag$.MODULE$.apply(FetchResponse.class));
        Assertions.assertEquals((Object)Errors.NONE, (Object)response.error());
        Assertions.assertEquals(java.util.Map.of(Errors.NONE, BoxesRunTime.boxToInteger((int)2)), (Object)response.errorCounts());
        Assertions.assertEquals((int)1, (int)response.data().responses().size());
        FetchResponseData.FetchableTopicResponse topicResponse = (FetchResponseData.FetchableTopicResponse)response.data().responses().get(0);
        Assertions.assertEquals((int)1, (int)topicResponse.partitions().size());
        return ((FetchResponseData.PartitionData)topicResponse.partitions().get(0)).preferredReadReplica();
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$1(FetchFromFollowerIntegrationTest $this, TopicPartition topicPartition$1) {
        return !((KafkaBroker)$this.brokers().apply($this.leaderBrokerId())).metadataCache().getPartitionReplicaEndpoints(topicPartition$1, $this.listenerName()).containsKey(BoxesRunTime.boxToInteger((int)$this.followerBrokerId()));
    }

    public static final /* synthetic */ String $anonfun$testFetchFromLeaderWhilePreferredReadReplicaIsUnavailable$2() {
        return "follower is still reachable.";
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerWithRoll$1(FetchFromFollowerIntegrationTest $this) {
        return $this.getPreferredReplica() == 1;
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerWithRoll$2() {
        return "Preferred replica is not set";
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerWithRoll$3(FetchFromFollowerIntegrationTest $this) {
        return $this.getPreferredReplica() == 1;
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerWithRoll$4() {
        return "Preferred replica is not set";
    }

    public static final /* synthetic */ Tuple2 $anonfun$testRackAwareRangeAssignor$1(FetchFromFollowerIntegrationTest $this, int i) {
        return new Tuple2((Object)BoxesRunTime.boxToInteger((int)i), (Object)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{$this.brokers().size() - i - 1})));
    }

    public static final /* synthetic */ scala.collection.immutable.Set $anonfun$testRackAwareRangeAssignor$3(scala.collection.immutable.Seq topics$1, int p) {
        return ((IterableOnceOps)topics$1.map((Function1 & Serializable)topic -> new TopicPartition(topic, p))).toSet();
    }

    public static final /* synthetic */ boolean $anonfun$testRackAwareRangeAssignor$7(Consumer consumer$1, scala.collection.immutable.Set expectedAssignment$1) {
        java.util.Set set = consumer$1.assignment();
        java.util.Set set2 = CollectionConverters$.MODULE$.SetHasAsJava((Set)expectedAssignment$1).asJava();
        return !(set != null ? !((Object)set).equals(set2) : set2 != null);
    }

    public static final /* synthetic */ String $anonfun$testRackAwareRangeAssignor$8(scala.collection.immutable.Set expectedAssignment$1, Consumer consumer$1) {
        return "Timed out while awaiting expected assignment " + expectedAssignment$1 + ". The current assignment is " + consumer$1.assignment();
    }

    public static final /* synthetic */ void $anonfun$testRackAwareRangeAssignor$9(Future future) {
        Assertions.assertEquals((int)0, (int)BoxesRunTime.unboxToInt(future.get(30L, TimeUnit.SECONDS)));
    }

    public static final /* synthetic */ void $anonfun$testRackAwareRangeAssignor$13(List assignments$1, Tuple2 x0$3) {
        if (x0$3 != null) {
            Future future = (Future)x0$3._1();
            int i = x0$3._2$mcI$sp();
            Seq records = (Seq)future.get(30L, TimeUnit.SECONDS);
            Assertions.assertEquals((Object)assignments$1.apply(i), (Object)((IterableOnceOps)records.map((Function1 & Serializable)r -> new TopicPartition(r.topic(), r.partition()))).toSet());
            return;
        }
        throw new MatchError(null);
    }

    private static final void verifyAssignments$1(List partitionOrder, scala.collection.immutable.Seq topics, Buffer consumers$1, ExecutorService executor$1, KafkaProducer producer$1) {
        List assignments = partitionOrder.map((Function1 & Serializable)p -> FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$3(topics, BoxesRunTime.unboxToInt((Object)p)));
        ((Buffer)((IterableOps)consumers$1.zipWithIndex()).map((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                Consumer consumer = (Consumer)x0$1._1();
                int i = x0$1._2$mcI$sp();
                return executor$1.submit(() -> {
                    scala.collection.immutable.Set expectedAssignment = (scala.collection.immutable.Set)assignments.apply(i);
                    long pollUntilTrue_waitTimeMs = 30000L;
                    long pollUntilTrue_waitUntilTrue_pause = 0L;
                    long pollUntilTrue_waitUntilTrue_startTime = System.currentTimeMillis();
                    while (true) {
                        consumer.poll(Duration.ofMillis(100L));
                        if (FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$7(consumer, expectedAssignment)) break;
                        if (System.currentTimeMillis() > pollUntilTrue_waitUntilTrue_startTime + pollUntilTrue_waitTimeMs) {
                            Assertions.fail((String)FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$8(expectedAssignment, consumer));
                        }
                        Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(pollUntilTrue_waitTimeMs), pollUntilTrue_waitUntilTrue_pause));
                    }
                }, BoxesRunTime.boxToInteger((int)0));
            }
            throw new MatchError(null);
        })).foreach((Function1 & Serializable)future -> {
            FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9(future);
            return BoxedUnit.UNIT;
        });
        ((List)assignments.flatten(Predef$.MODULE$.$conforms())).foreach((Function1 & Serializable)tp -> producer$1.send(new ProducerRecord(tp.topic(), Predef$.MODULE$.int2Integer(tp.partition()), (Object)("key-" + tp).getBytes(), (Object)("value-" + tp).getBytes())));
        ((IterableOnceOps)((Buffer)((IterableOps)consumers$1.zipWithIndex()).map((Function1 & Serializable)x0$2 -> {
            if (x0$2 != null) {
                Consumer consumer = (Consumer)x0$2._1();
                int i = x0$2._2$mcI$sp();
                return executor$1.submit(() -> TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, ((IterableOnceOps)assignments.apply(i)).size(), 30000L));
            }
            throw new MatchError(null);
        })).zipWithIndex()).foreach((Function1 & Serializable)x0$3 -> {
            FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$13(assignments, x0$3);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ Optional $anonfun$testRackAwareRangeAssignor$18(HashMap reassignments$1, String topicWithSingleRackPartitions$1, int p) {
        NewPartitionReassignment newAssignment = new NewPartitionReassignment(java.util.List.of(Predef$.MODULE$.int2Integer(p)));
        return reassignments$1.put(new TopicPartition(topicWithSingleRackPartitions$1, p), Optional.of(newAssignment));
    }

    public FetchFromFollowerIntegrationTest() {
        this.numNodes = 2;
        this.numParts = 1;
        this.topic = "test-fetch-from-follower";
        this.leaderBrokerId = 0;
        this.followerBrokerId = 1;
    }
}

