/*
 * Decompiled with CFR 0.152.
 */
package unit.kafka.metrics;

import java.io.File;
import java.io.Serializable;
import java.util.Optional;
import java.util.Properties;
import kafka.integration.KafkaServerTestHarness;
import kafka.log.LogManager;
import kafka.server.BrokerTopicStats$;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestInfoUtils$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.Set$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@Timeout(value=120L)
@ScalaSignature(bytes="\u0006\u0005\u0005Ub\u0001B\b\u0011\u0001]AQ!\n\u0001\u0005\u0002\u0019B\u0011\"\u000b\u0001A\u0002\u0003\u0005\u000b\u0015\u0002\u0016\t\u000fY\u0002!\u0019!C\u0001o!1a\b\u0001Q\u0001\naBqa\u0010\u0001C\u0002\u0013\u0005q\u0007\u0003\u0004A\u0001\u0001\u0006I\u0001\u000f\u0005\b\u0003\u0002\u0011\r\u0011\"\u0001C\u0011\u0019Y\u0005\u0001)A\u0005\u0007\")A\n\u0001C\u0001\u001b\")A\u000b\u0001C!+\"9!\r\u0001b\u0001\n\u00039\u0004BB2\u0001A\u0003%\u0001\bC\u0003e\u0001\u0011\u0005S\rC\u0003p\u0001\u0011\u0005\u0001O\u0001\u000fGKR\u001c\u0007N\u0012:p[\u001a{G\u000e\\8xKJlU\r\u001e:jGN$Vm\u001d;\u000b\u0005E\u0011\u0012aB7fiJL7m\u001d\u0006\u0003'Q\tQa[1gW\u0006T\u0011!F\u0001\u0005k:LGo\u0001\u0001\u0014\u0007\u0001Ar\u0004\u0005\u0002\u001a;5\t!D\u0003\u0002\u001c9\u0005Y\u0011N\u001c;fOJ\fG/[8o\u0015\u0005\u0019\u0012B\u0001\u0010\u001b\u0005YY\u0015MZ6b'\u0016\u0014h/\u001a:UKN$\b*\u0019:oKN\u001c\bC\u0001\u0011$\u001b\u0005\t#B\u0001\u0012\u001d\u0003\u0015)H/\u001b7t\u0013\t!\u0013EA\u0004M_\u001e<\u0017N\\4\u0002\rqJg.\u001b;?)\u00059\u0003C\u0001\u0015\u0001\u001b\u0005\u0001\u0012!C0uKN$\u0018J\u001c4p!\tYC'D\u0001-\u0015\tic&A\u0002ba&T!a\f\u0019\u0002\u000f),\b/\u001b;fe*\u0011\u0011GM\u0001\u0006UVt\u0017\u000e\u001e\u0006\u0002g\u0005\u0019qN]4\n\u0005Ub#\u0001\u0003+fgRLeNZ8\u0002\u00119,XNT8eKN,\u0012\u0001\u000f\t\u0003sqj\u0011A\u000f\u0006\u0002w\u0005)1oY1mC&\u0011QH\u000f\u0002\u0004\u0013:$\u0018!\u00038v[:{G-Z:!\u0003!qW/\u001c)beR\u001c\u0018!\u00038v[B\u000b'\u000f^:!\u0003e\u0011X-];je\u0016$7*\u00194lCN+'O^3s!J,g-\u001b=\u0016\u0003\r\u0003\"\u0001R%\u000e\u0003\u0015S!AR$\u0002\t1\fgn\u001a\u0006\u0002\u0011\u0006!!.\u0019<b\u0013\tQUI\u0001\u0004TiJLgnZ\u0001\u001be\u0016\fX/\u001b:fI.\u000bgm[1TKJ4XM\u001d)sK\u001aL\u0007\u0010I\u0001\u0010_Z,'O]5eS:<\u0007K]8qgV\ta\n\u0005\u0002P%6\t\u0001K\u0003\u0002R\u000f\u0006!Q\u000f^5m\u0013\t\u0019\u0006K\u0001\u0006Qe>\u0004XM\u001d;jKN\fqbZ3oKJ\fG/Z\"p]\u001aLwm]\u000b\u0002-B\u0019qK\u0017/\u000e\u0003aS!!\u0017\u001e\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002\\1\n\u00191+Z9\u0011\u0005u\u0003W\"\u00010\u000b\u0005}c\u0012AB:feZ,'/\u0003\u0002b=\nY1*\u00194lC\u000e{gNZ5h\u0003%qW*Z:tC\u001e,7/\u0001\u0006o\u001b\u0016\u001c8/Y4fg\u0002\nQa]3u+B$\"AZ5\u0011\u0005e:\u0017B\u00015;\u0005\u0011)f.\u001b;\t\u000b)l\u0001\u0019\u0001\u0016\u0002\u0011Q,7\u000f^%oM>D#!\u00047\u0011\u0005-j\u0017B\u00018-\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001%i\u0016\u001cHOR3uG\"4%o\\7G_2dwn^3s\u001b\u0016$(/[2t\u0005f$Xm](viR\u0011a-\u001d\u0005\u0006e:\u0001\ra]\u0001\u0007cV|'/^7\u0011\u0005Q\\hBA;z!\t1((D\u0001x\u0015\tAh#\u0001\u0004=e>|GOP\u0005\u0003uj\na\u0001\u0015:fI\u00164\u0017B\u0001&}\u0015\tQ(\b\u000b\u0004\u000f}\u00065\u0011q\u0002\t\u0004\u007f\u0006%QBAA\u0001\u0015\u0011\t\u0019!!\u0002\u0002\u0011A\u0014xN^5eKJT1!a\u0002/\u0003\u0019\u0001\u0018M]1ng&!\u00111BA\u0001\u0005-1\u0016\r\\;f'>,(oY3\u0002\u000fM$(/\u001b8hg2\"\u0011\u0011CA\u000bC\t\t\u0019\"\u0001\u0002{W\u0006\u0012\u0011qC\u0001\u0006WJ\fg\r\u001e\u0015\b\u001d\u0005m\u00111EA\u0013!\u0011\ti\"a\b\u000e\u0005\u0005\u0015\u0011\u0002BA\u0011\u0003\u000b\u0011\u0011\u0003U1sC6,G/\u001a:ju\u0016$G+Z:u\u0003\u0011q\u0017-\\3\"\u0005\u0005\u001d\u0012\u0001G>eSN\u0004H.Y=OC6,WPL9v_J,X.P>1{\":\u0001!a\u000b\u00022\u0005M\u0002cA\u0016\u0002.%\u0019\u0011q\u0006\u0017\u0003\u000fQKW.Z8vi\u0006)a/\u00197vKz\t\u0001\u0010")
public class FetchFromFollowerMetricsTest
extends KafkaServerTestHarness {
    private TestInfo _testInfo;
    private final int numNodes;
    private final int numParts;
    private final String requiredKafkaServerPrefix;
    private final int nMessages;

    public int numNodes() {
        return this.numNodes;
    }

    public int numParts() {
        return this.numParts;
    }

    public String requiredKafkaServerPrefix() {
        return this.requiredKafkaServerPrefix;
    }

    public Properties overridingProps() {
        Properties props = new Properties();
        props.put(KafkaConfig$.MODULE$.NumPartitionsProp(), Integer.toString(this.numParts()));
        props.put("metrics.jmx.exclude", new StringBuilder(10).append(this.requiredKafkaServerPrefix()).append("=ClusterId").toString());
        props.put(KafkaConfig$.MODULE$.ReplicaSelectorClassProp(), "org.apache.kafka.common.replica.RackAwareReplicaSelector");
        if (TestInfoUtils$.MODULE$.isKRaft(this._testInfo)) {
            props.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "false");
        }
        return props;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        scala.collection.mutable.Map rackInfo = (scala.collection.mutable.Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)Nil$.MODULE$);
        RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.numNodes()).foreach((Function1 & Serializable)i -> FetchFromFollowerMetricsTest.$anonfun$generateConfigs$1(rackInfo, BoxesRunTime.unboxToInt((Object)i)));
        int x$1 = this.numNodes();
        String x$22 = this.zkConnectOrNull();
        boolean x$5 = true;
        None$ x$6 = None$.MODULE$;
        None$ x$7 = None$.MODULE$;
        None$ x$8 = None$.MODULE$;
        boolean x$9 = true;
        boolean x$10 = false;
        boolean x$11 = false;
        boolean x$12 = false;
        int x$13 = 1;
        boolean x$14 = false;
        int x$15 = 1;
        short x$16 = 1;
        int x$17 = 0;
        boolean x$18 = false;
        return (Seq)TestUtils$.MODULE$.createBrokerConfigs(x$1, x$22, false, x$5, (Option<SecurityProtocol>)x$6, (Option<File>)x$7, (Option<Properties>)x$8, x$9, x$10, x$11, x$12, (Map<Object, String>)rackInfo, x$13, x$14, x$15, x$16, x$17, x$18).map((Function1 & Serializable)x$2 -> {
            Properties fromProps_overrides = this.overridingProps();
            return KafkaConfig$.MODULE$.fromProps(x$2, fromProps_overrides, true);
        });
    }

    public int nMessages() {
        return this.nMessages;
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo testInfo) {
        this._testInfo = testInfo;
        super.setUp(testInfo);
    }

    @ParameterizedTest(name="{displayName}.quorum={0}")
    @ValueSource(strings={"zk", "kraft"})
    public void testFetchFromFollowerMetricsBytesOut(String quorum) {
        String topic = "test-fetch-from-follower-bytes-out";
        int leaderBrokerId = 0;
        int followerBrokerId = 1;
        String totalBytesOut = new StringBuilder(12).append("name=").append(BrokerTopicStats$.MODULE$.BytesOutPerSec()).append(",topic=").append(topic).toString();
        String fetchFromFollowerBytesOut = new StringBuilder(12).append("name=").append(BrokerTopicStats$.MODULE$.FetchFromFollowerBytesOutPerSec()).append(",topic=").append(topic).toString();
        Properties topicConfig = new Properties();
        topicConfig.setProperty("min.insync.replicas", "2");
        this.createTopic(topic, 1, this.numNodes(), topicConfig, this.createTopic$default$5(), this.createTopic$default$6());
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, this.nMessages(), -1);
        Admin admin = TestUtils$.MODULE$.createAdminClient(this.brokers(), this.listenerName(), new Properties());
        TestUtils$.MODULE$.describeTopic(admin, topic).partitions().forEach(partition -> {
            if (partition.leader().id() != leaderBrokerId) {
                java.util.Map reassignment = CollectionConverters$.MODULE$.MapHasAsJava((Map)scala.collection.Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), Optional.of(new NewPartitionReassignment(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)Predef$.MODULE$.int2Integer(leaderBrokerId), (List)new .colon.colon((Object)Predef$.MODULE$.int2Integer(followerBrokerId), (List)Nil$.MODULE$))).asJava())))}))).asJava();
                admin.alterPartitionReassignments(reassignment).all().get();
                long l = 100L;
                long waitUntilTrue_waitTimeMs = 15000L;
                long waitUntilTrue_startTime = System.currentTimeMillis();
                while (!FetchFromFollowerMetricsTest.$anonfun$testFetchFromFollowerMetricsBytesOut$2(admin)) {
                    void waitUntilTrue_pause;
                    if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                        Assertions.fail((String)FetchFromFollowerMetricsTest.$anonfun$testFetchFromFollowerMetricsBytesOut$3());
                    }
                    Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), (long)waitUntilTrue_pause));
                }
                admin.electLeaders(ElectionType.PREFERRED, CollectionConverters$.MODULE$.SetHasAsJava((Set)Set$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{new TopicPartition(topic, 0)}))).asJava()).all().get();
                return;
            }
        });
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        this.brokers().foreach((Function1 & Serializable)broker -> {
            FetchFromFollowerMetricsTest.$anonfun$testFetchFromFollowerMetricsBytesOut$4(topicPartition, broker);
            return BoxedUnit.UNIT;
        });
        Buffer<KafkaBroker> x$3 = this.brokers();
        int x$5 = this.nMessages();
        String x$6 = Integer.toString(followerBrokerId);
        String x$7 = "group";
        SecurityProtocol x$8 = SecurityProtocol.PLAINTEXT;
        None$ x$9 = None$.MODULE$;
        long x$10 = 15000L;
        TestUtils$.MODULE$.consumeTopicRecords(x$3, topic, x$5, x$7, x$8, (Option<File>)x$9, x$10, x$6);
        long totalBytesOut0 = TestUtils$.MODULE$.meterCount(totalBytesOut);
        long fetchFromFollowerBytesOut0 = TestUtils$.MODULE$.meterCount(fetchFromFollowerBytesOut);
        long consumeFromLeaderBytesOut0 = totalBytesOut0 - fetchFromFollowerBytesOut0;
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, this.nMessages(), -1);
        Buffer<KafkaBroker> x$11 = this.brokers();
        int x$13 = this.nMessages();
        String x$14 = Integer.toString(followerBrokerId);
        String x$15 = "group";
        SecurityProtocol x$16 = SecurityProtocol.PLAINTEXT;
        None$ x$17 = None$.MODULE$;
        long x$18 = 15000L;
        TestUtils$.MODULE$.consumeTopicRecords(x$11, topic, x$13, x$15, x$16, (Option<File>)x$17, x$18, x$14);
        long totalBytesOut1 = TestUtils$.MODULE$.meterCount(totalBytesOut);
        Assertions.assertTrue((totalBytesOut1 > totalBytesOut0 ? 1 : 0) != 0);
        long fetchFromFollowerBytesOut1 = TestUtils$.MODULE$.meterCount(fetchFromFollowerBytesOut);
        Assertions.assertTrue((fetchFromFollowerBytesOut1 > fetchFromFollowerBytesOut0 ? 1 : 0) != 0);
        Assertions.assertTrue((fetchFromFollowerBytesOut1 == TestUtils$.MODULE$.meterCount(totalBytesOut) ? 1 : 0) != 0);
        long consumeFromLeaderBytesOut1 = totalBytesOut1 - fetchFromFollowerBytesOut1;
        Assertions.assertTrue((consumeFromLeaderBytesOut1 == consumeFromLeaderBytesOut0 ? 1 : 0) != 0);
        Assertions.assertTrue((consumeFromLeaderBytesOut1 < fetchFromFollowerBytesOut1 ? 1 : 0) != 0);
        TestUtils$.MODULE$.generateAndProduceMessages(this.brokers(), topic, this.nMessages(), -1);
        Buffer<KafkaBroker> x$19 = this.brokers();
        int x$21 = this.nMessages();
        String x$22 = Integer.toString(leaderBrokerId);
        String x$23 = "group";
        SecurityProtocol x$24 = SecurityProtocol.PLAINTEXT;
        None$ x$25 = None$.MODULE$;
        long x$26 = 15000L;
        TestUtils$.MODULE$.consumeTopicRecords(x$19, topic, x$21, x$23, x$24, (Option<File>)x$25, x$26, x$22);
        long totalBytesOut2 = TestUtils$.MODULE$.meterCount(totalBytesOut);
        Assertions.assertTrue((totalBytesOut2 > totalBytesOut1 ? 1 : 0) != 0);
        long fetchFromFollowerBytesOut2 = TestUtils$.MODULE$.meterCount(fetchFromFollowerBytesOut);
        Assertions.assertTrue((fetchFromFollowerBytesOut2 == fetchFromFollowerBytesOut1 ? 1 : 0) != 0);
        Assertions.assertTrue((fetchFromFollowerBytesOut2 < totalBytesOut2 ? 1 : 0) != 0);
        Assertions.assertTrue((totalBytesOut2 - fetchFromFollowerBytesOut2 > consumeFromLeaderBytesOut1 ? 1 : 0) != 0);
        admin.close();
    }

    public static final /* synthetic */ scala.collection.mutable.Map $anonfun$generateConfigs$1(scala.collection.mutable.Map rackInfo$1, int i) {
        return (scala.collection.mutable.Map)rackInfo$1.$plus$eq((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)BoxesRunTime.boxToInteger((int)i)), (Object)Integer.toString(i)));
    }

    public static final /* synthetic */ boolean $anonfun$testFetchFromFollowerMetricsBytesOut$2(Admin admin$1) {
        return ((java.util.Map)admin$1.listPartitionReassignments().reassignments().get()).isEmpty();
    }

    public static final /* synthetic */ String $anonfun$testFetchFromFollowerMetricsBytesOut$3() {
        return "reassignment did not complete.";
    }

    public static final /* synthetic */ void $anonfun$testFetchFromFollowerMetricsBytesOut$4(TopicPartition topicPartition$1, KafkaBroker broker) {
        LogManager qual$1 = broker.logManager();
        boolean x$2 = qual$1.getLog$default$2();
        Option log = qual$1.getLog(topicPartition$1, x$2);
        int brokerId = broker.config().brokerId();
        Option logSize = log.map((Function1 & Serializable)x$3 -> BoxesRunTime.boxToLong((long)x$3.size()));
        Assertions.assertTrue((boolean)logSize.exists((Function1)(JFunction1.mcZJ.sp & Serializable)x$4 -> x$4 > 0L), (String)new StringBuilder(64).append("Expected broker ").append(brokerId).append(" to have a Log for ").append(topicPartition$1).append(" with positive size, actual: ").append(logSize).toString());
    }

    public FetchFromFollowerMetricsTest() {
        this.numNodes = 2;
        this.numParts = 2;
        this.requiredKafkaServerPrefix = "kafka.server:type=KafkaServer,name";
        this.nMessages = 2;
    }
}

