/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.typesafe.scalalogging.Logger;
import java.nio.file.FileStore;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicLong;
import kafka.server.Constants$;
import kafka.server.Defaults$;
import kafka.server.DiskThrottleListenerManager;
import kafka.server.DiskUsageBasedThrottleListener;
import kafka.server.DiskUsageBasedThrottler;
import kafka.server.DiskUsageBasedThrottler$;
import kafka.server.DiskUsageBasedThrottlingConfig;
import kafka.server.DiskUsageBasedThrottlingConfig$;
import kafka.server.QuotaType;
import kafka.server.ReplicationQuotaManager;
import kafka.server.ReplicationQuotaManagerConfig;
import kafka.utils.Logging;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.Function0;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;

@ScalaSignature(bytes="\u0006\u0005]4A\u0001F\u000b\u00015!)\u0011\u0005\u0001C\u0001E!9Q\u0005\u0001b\u0001\n\u00131\u0003B\u0002\u001b\u0001A\u0003%q\u0005C\u00046\u0001\t\u0007I\u0011\u0002\u001c\t\rq\u0002\u0001\u0015!\u00038\u0011\u0015i\u0004\u0001\"\u0001?\u0011\u0015i\u0005\u0001\"\u0001?\u0011\u0015\u0011\u0006\u0001\"\u0001?\u0011\u0015!\u0006\u0001\"\u0001?\u0011\u00151\u0006\u0001\"\u0001?\u0011\u0015A\u0006\u0001\"\u0001Z\u0011\u0015q\u0006\u0001\"\u0001?\u0011\u0015\u0001\u0007\u0001\"\u0001?\u0011\u0015\u0011\u0007\u0001\"\u0001?\u0011\u0015!\u0007\u0001\"\u0001?\u0011\u00151\u0007\u0001\"\u0001?\u0011\u0015A\u0007\u0001\"\u0001?\u0011\u0015Q\u0007\u0001\"\u0001?\u0011\u0015a\u0007\u0001\"\u0003n\u0005m\u0011V\r\u001d7jG\u0006$\u0018n\u001c8Rk>$\u0018-T1oC\u001e,'\u000fV3ti*\u0011acF\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003a\tQa[1gW\u0006\u001c\u0001a\u0005\u0002\u00017A\u0011AdH\u0007\u0002;)\ta$A\u0003tG\u0006d\u0017-\u0003\u0002!;\t1\u0011I\\=SK\u001a\fa\u0001P5oSRtD#A\u0012\u0011\u0005\u0011\u0002Q\"A\u000b\u0002\u00115|7m\u001b+j[\u0016,\u0012a\n\t\u0003QIj\u0011!\u000b\u0006\u0003U-\nQ!\u001e;jYNT!\u0001L\u0017\u0002\r\r|W.\\8o\u0015\tAbF\u0003\u00020a\u00051\u0011\r]1dQ\u0016T\u0011!M\u0001\u0004_J<\u0017BA\u001a*\u0005!iunY6US6,\u0017!C7pG.$\u0016.\\3!\u0003\u001diW\r\u001e:jGN,\u0012a\u000e\t\u0003qij\u0011!\u000f\u0006\u0003k-J!aO\u001d\u0003\u000f5+GO]5dg\u0006AQ.\u001a;sS\u000e\u001c\b%\u0001\u0005uK\u0006\u0014Hi\\<o)\u0005y\u0004C\u0001\u000fA\u0013\t\tUD\u0001\u0003V]&$\bF\u0001\u0004D!\t!5*D\u0001F\u0015\t1u)A\u0002ba&T!\u0001S%\u0002\u000f),\b/\u001b;fe*\u0011!\nM\u0001\u0006UVt\u0017\u000e^\u0005\u0003\u0019\u0016\u0013\u0011\"\u00114uKJ,\u0015m\u00195\u00029MDw.\u001e7e'\u0016$8i\u001c8gS\u001e,(/\u001a3Rk>$\u0018MU1uK\"\u0012qa\u0014\t\u0003\tBK!!U#\u0003\tQ+7\u000f^\u00012g\"|W\u000f\u001c3UQJ|G\u000f\u001e7f\u00032d'+\u001a9mS\u000e\f7o\u00165f]\n\u0013xn[3s\u0019\u00164X\r\\\"p]\u001aLwmU3uQ\tAq*A\u0011tQ>,H\u000e\u001a+ie>$H\u000f\\3P]2LH)\u001a4j]\u0016$'+\u001a9mS\u000e\f7\u000f\u000b\u0002\n\u001f\u0006)4\u000f[8vY\u0012,\u0005pY3fIF+x\u000e^1UQ\u0016t'+\u001a;ve:\u0014\u0015mY6CK2|wOQ8v]\u0012\f5\u000fV5nKB\u000b7o]3tQ\tQq*\u0001\u0003sCR,GC\u0001.^!\ta2,\u0003\u0002];\t1Ai\\;cY\u0016DQ!N\u0006A\u0002]\nae\u001d5pk2$7+\u001e9q_J$x+\u001b7eG\u0006\u0014H\r\u00165s_R$H.\u001a3SKBd\u0017nY1tQ\taq*\u0001\u001ftQ>,H\u000eZ*vaB|'\u000f\u001e(p]\u0016$\u0006N]8ui2,GMU3qY&\u001c\u0017m]!oI>3XM\u001d:jI\u0016\u0014%o\\6feRC'o\u001c;uY\u0016\u001c\bFA\u0007P\u0003\u0019\u001b\bn\\;mIN+\b\u000f]8si>3XM\u001d:jI\u0016\u0014%o\\6feRC'o\u001c;uY\u0016<\u0006.\u001a8T_6,'+\u001a9mS\u000e\f7/\u0012=qY&\u001c\u0017\u000e\u001e7z)\"\u0014x\u000e\u001e;mK\u0012D#AD(\u0002IMDw.\u001e7e'V\u0004\bo\u001c:u\u0005J|7.\u001a:UQJ|G\u000f\u001e7fIJ+\u0007\u000f\\5dCND#aD(\u0002EMDw.\u001e7e%\u0016\u001cX\r\u001e\"s_.,'\u000f\u00165s_R$H.\u001a3SKBd\u0017nY1tQ\t\u0001r*\u0001\u0016uKN$(I]8lKJL5\u000f\u00165s_R$H.\u001a3P]2{w\u000fR5tW\u00063\u0018-\u001b7bE&d\u0017\u000e^=)\u0005Ey\u0015\u0001\u0010;fgR\u0014%o\\6fe&\u001bH\u000b\u001b:piRdW\rZ(o\u0019><H)[:l\u0003Z\f\u0017\u000e\\1cS2LG/_,ji\"\u001cE.^:uKJd\u0015N\\6j]\u001eD#AE(\u0002\u0007Q\u0004\u0018\u0007\u0006\u0002oeB\u0011q\u000e]\u0007\u0002W%\u0011\u0011o\u000b\u0002\u000f)>\u0004\u0018n\u0019)beRLG/[8o\u0011\u0015\u00198\u00031\u0001u\u0003\tIG\r\u0005\u0002\u001dk&\u0011a/\b\u0002\u0004\u0013:$\b")
public class ReplicationQuotaManagerTest {
    private final MockTime kafka$server$ReplicationQuotaManagerTest$$mockTime = new MockTime();
    private final Metrics metrics = new Metrics(new MetricConfig(), Collections.emptyList(), (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());

    public MockTime kafka$server$ReplicationQuotaManagerTest$$mockTime() {
        return this.kafka$server$ReplicationQuotaManagerTest$$mockTime;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    @AfterEach
    public void tearDown() {
        this.metrics().close();
    }

    @Test
    public void shouldSetConfiguredQuotaRate() {
        ReplicationQuotaManager quotaManager = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(111L, Defaults$.MODULE$.DefaultNumQuotaSamples(), Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds(), false), this.metrics(), (QuotaType)QuotaType.LeaderReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        Assertions.assertEquals((long)111L, (long)quotaManager.upperBound());
    }

    @Test
    public void shouldThrottleAllReplicasWhenBrokerLevelConfigSet() {
        long x$2 = Defaults$.MODULE$.QuotaBytesPerSecond();
        int x$3 = Defaults$.MODULE$.DefaultNumQuotaSamples();
        int x$4 = Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds();
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(x$2, x$3, x$4, true), this.metrics(), (QuotaType)QuotaType.Fetch$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        int tp1_id = 1;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id)));
        int tp1_id2 = 2;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id2)));
        int tp1_id3 = 3;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id3)));
        int tp1_id4 = 4;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id4)));
        int tp1_id5 = 400;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id5)));
        quota.markThrottled("topic1", (Seq)Nil$.MODULE$);
        int tp1_id6 = 1;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id6)));
        int tp1_id7 = 2;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id7)));
        int tp1_id8 = 3;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id8)));
        int tp1_id9 = 4;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id9)));
        int tp1_id10 = 400;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id10)));
    }

    @Test
    public void shouldThrottleOnlyDefinedReplicas() {
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(Defaults$.MODULE$.QuotaBytesPerSecond(), Defaults$.MODULE$.DefaultNumQuotaSamples(), Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds(), false), this.metrics(), (QuotaType)QuotaType.Fetch$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        quota.markThrottled("topic1", (Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{1, 2, 3})));
        int tp1_id = 1;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id)));
        int tp1_id2 = 2;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id2)));
        int tp1_id3 = 3;
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id3)));
        int tp1_id4 = 4;
        Assertions.assertFalse((boolean)quota.isThrottled(new TopicPartition("topic1", tp1_id4)));
    }

    @Test
    public void shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses() {
        long x$3 = Defaults$.MODULE$.QuotaBytesPerSecond();
        boolean x$4 = false;
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(x$3, 10, 1, x$4), this.metrics(), (QuotaType)QuotaType.LeaderReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        quota.updateQuota(new Quota(100.0, true));
        Assertions.assertFalse((boolean)quota.isQuotaExceeded());
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(1000L);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(500L);
        quota.record(1L);
        Assertions.assertFalse((boolean)quota.isQuotaExceeded());
        quota.record(149L);
        Assertions.assertFalse((boolean)quota.isQuotaExceeded());
        quota.record(1L);
        Assertions.assertEquals((double)100.66666666666667, (double)this.rate(this.metrics()), (double)0.0);
        Assertions.assertTrue((boolean)quota.isQuotaExceeded());
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(500L);
        Assertions.assertFalse((boolean)quota.isQuotaExceeded());
        Assertions.assertEquals((double)75.5, (double)this.rate(this.metrics()), (double)0.1);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(500L);
        quota.record(99L);
        Assertions.assertEquals((double)100.0, (double)this.rate(this.metrics()), (double)0.0);
        Assertions.assertFalse((boolean)quota.isQuotaExceeded());
        quota.record(1L);
        Assertions.assertTrue((boolean)quota.isQuotaExceeded());
        Assertions.assertEquals((double)100.4, (double)this.rate(this.metrics()), (double)0.0);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(2000L);
        Assertions.assertFalse((boolean)quota.isQuotaExceeded());
        Assertions.assertEquals((double)55.77777777777778, (double)this.rate(this.metrics()), (double)0.0);
    }

    public double rate(Metrics metrics) {
        MetricName metricName = metrics.metricName("byte-rate", "LeaderReplication", new StringBuilder(23).append("Tracking byte-rate for ").append(QuotaType.LeaderReplication$.MODULE$).toString());
        return BoxesRunTime.unboxToDouble((Object)((KafkaMetric)CollectionConverters$.MODULE$.MapHasAsScala(metrics.metrics()).asScala().apply((Object)metricName)).metricValue());
    }

    @Test
    public void shouldSupportWildcardThrottledReplicas() {
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(Defaults$.MODULE$.QuotaBytesPerSecond(), Defaults$.MODULE$.DefaultNumQuotaSamples(), Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds(), false), this.metrics(), (QuotaType)QuotaType.LeaderReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        quota.markThrottled("MyTopic", Constants$.MODULE$.AllReplicas());
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("MyTopic", 0)));
        Assertions.assertFalse((boolean)quota.isThrottled(new TopicPartition("MyOtherTopic", 0)));
    }

    @Test
    public void shouldSupportNoneThrottledReplicasAndOverrideBrokerThrottles() {
        long x$2 = Defaults$.MODULE$.QuotaBytesPerSecond();
        int x$3 = Defaults$.MODULE$.DefaultNumQuotaSamples();
        int x$4 = Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds();
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(x$2, x$3, x$4, true), this.metrics(), (QuotaType)QuotaType.LeaderReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        quota.markThrottled("MyTopic", Constants$.MODULE$.NoReplicas());
        Assertions.assertFalse((boolean)quota.isThrottled(new TopicPartition("MyTopic", 0)), (String)"Topics that are explicitly unthrottled should not be throttled");
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("MyOtherTopic", 0)));
    }

    @Test
    public void shouldSupportOverrideBrokerThrottleWhenSomeReplicasExplicitlyThrottled() {
        long x$2 = Defaults$.MODULE$.QuotaBytesPerSecond();
        int x$3 = Defaults$.MODULE$.DefaultNumQuotaSamples();
        int x$4 = Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds();
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(x$2, x$3, x$4, true), this.metrics(), (QuotaType)QuotaType.LeaderReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        TopicPartition tp0 = new TopicPartition("MyTopic", 0);
        TopicPartition tp1 = new TopicPartition("MyTopic", 1);
        quota.markThrottled("MyTopic", (Seq)package$.MODULE$.Seq().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapIntArray(new int[]{tp0.partition()})));
        Assertions.assertTrue((boolean)quota.isThrottled(tp0), (String)"Replicas that are explicitly throttled should be throttled");
        Assertions.assertFalse((boolean)quota.isThrottled(tp1), (String)"Replicas that are not explicitly throttled (while others in the same topic are explicitly throttled) should not be throttled");
    }

    @Test
    public void shouldSupportBrokerThrottledReplicas() {
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(Defaults$.MODULE$.QuotaBytesPerSecond(), Defaults$.MODULE$.DefaultNumQuotaSamples(), Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds(), false), this.metrics(), (QuotaType)QuotaType.LeaderReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        quota.markBrokerThrottled();
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("MyTopic", 0)), (String)"Should have set broker replication throttle");
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("MyOtherTopic", 0)), (String)"Should have set broker replication throttle");
        quota.removeBrokerThrottle(false);
        Assertions.assertFalse((boolean)quota.isThrottled(new TopicPartition("MyTopic", 0)), (String)"Should have reset broker replication throttle");
        Assertions.assertFalse((boolean)quota.isThrottled(new TopicPartition("MyOtherTopic", 0)), (String)"Should have reset broker replication throttle");
    }

    @Test
    public void shouldResetBrokerThrottledReplicas() {
        long x$2 = Defaults$.MODULE$.QuotaBytesPerSecond();
        int x$3 = Defaults$.MODULE$.DefaultNumQuotaSamples();
        int x$4 = Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds();
        ReplicationQuotaManager quota = new ReplicationQuotaManager(new ReplicationQuotaManagerConfig(x$2, x$3, x$4, true), this.metrics(), (QuotaType)QuotaType.LeaderReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        quota.removeBrokerThrottle(false);
        Assertions.assertFalse((boolean)quota.isThrottled(new TopicPartition("MyTopic", 0)), (String)"Should have overridden broker replication throttle");
        Assertions.assertFalse((boolean)quota.isThrottled(new TopicPartition("MyOtherTopic", 0)), (String)"Should have overridden broker replication throttle");
        quota.removeBrokerThrottle(true);
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("MyTopic", 0)), (String)"Should have reset broker replication throttle");
        Assertions.assertTrue((boolean)quota.isThrottled(new TopicPartition("MyOtherTopic", 0)), (String)"Should have reset broker replication throttle");
    }

    @Test
    public void testBrokerIsThrottledOnLowDiskAvailability() {
        long diskThreshold = DiskUsageBasedThrottlingConfig$.MODULE$.MinDiskThresholdBytes();
        long throttledLimit = DiskUsageBasedThrottlingConfig$.MODULE$.MinThroughputBytesPerSec();
        AtomicLong freeDiskRemaining = new AtomicLong(0L);
        DiskUsageBasedThrottlingConfig throttlingConfig = DiskUsageBasedThrottlingConfig$.MODULE$.apply(diskThreshold, throttledLimit, (Seq)new .colon.colon((Object)System.getProperty("java.io.tmpdir"), (List)Nil$.MODULE$), true, 500L, 1.5, false);
        DiskUsageBasedThrottler throttler = new DiskUsageBasedThrottler(this, throttlingConfig, freeDiskRemaining){
            private volatile DiskUsageBasedThrottlingConfig kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig;
            private AtomicLong kafka$server$DiskUsageBasedThrottler$$lastCheckedTime;
            private KafkaMetricsGroup kafka$server$DiskUsageBasedThrottler$$metricsGroup;
            private volatile Seq<FileStore> kafka$server$DiskUsageBasedThrottler$$fileStores;
            private volatile long producerThrottleRate;
            private volatile long kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate;
            private volatile long followerThrottleRate;
            private DiskThrottleListenerManager listenerManager;
            private Logger logger;
            private String logIdent;
            private volatile boolean bitmap$0;
            private final /* synthetic */ ReplicationQuotaManagerTest $outer;
            private final DiskUsageBasedThrottlingConfig throttlingConfig$1;
            private final AtomicLong freeDiskRemaining$1;

            public Seq<FileStore> getFileStores() {
                return DiskUsageBasedThrottler.getFileStores$((DiskUsageBasedThrottler)this);
            }

            public void createDiskThrottlerMetrics() {
                DiskUsageBasedThrottler.createDiskThrottlerMetrics$((DiskUsageBasedThrottler)this);
            }

            public void removeDiskThrottlerMetrics() {
                DiskUsageBasedThrottler.removeDiskThrottlerMetrics$((DiskUsageBasedThrottler)this);
            }

            public void updateDiskThrottlingConfig(DiskUsageBasedThrottlingConfig newConfig) {
                DiskUsageBasedThrottler.updateDiskThrottlingConfig$((DiskUsageBasedThrottler)this, (DiskUsageBasedThrottlingConfig)newConfig);
            }

            public boolean diskThrottlingEnabledInConfig(DiskUsageBasedThrottlingConfig config) {
                return DiskUsageBasedThrottler.diskThrottlingEnabledInConfig$((DiskUsageBasedThrottler)this, (DiskUsageBasedThrottlingConfig)config);
            }

            public long minDiskTotalBytes() {
                return DiskUsageBasedThrottler.minDiskTotalBytes$((DiskUsageBasedThrottler)this);
            }

            public void checkAndUpdateQuotaOnDiskUsage(long timeMs) {
                DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)this, (long)timeMs);
            }

            public void initThrottler() {
                DiskUsageBasedThrottler.initThrottler$((DiskUsageBasedThrottler)this);
            }

            public DiskUsageBasedThrottlingConfig getCurrentDiskThrottlingConfig() {
                return DiskUsageBasedThrottler.getCurrentDiskThrottlingConfig$((DiskUsageBasedThrottler)this);
            }

            public String loggerName() {
                return Logging.loggerName$((Logging)this);
            }

            public String msgWithLogIdent(String msg) {
                return Logging.msgWithLogIdent$((Logging)this, (String)msg);
            }

            public void trace(Function0<String> msg) {
                Logging.trace$((Logging)this, msg);
            }

            public void trace(Function0<String> msg, Function0<Throwable> e) {
                Logging.trace$((Logging)this, msg, e);
            }

            public boolean isDebugEnabled() {
                return Logging.isDebugEnabled$((Logging)this);
            }

            public boolean isTraceEnabled() {
                return Logging.isTraceEnabled$((Logging)this);
            }

            public void debug(Function0<String> msg) {
                Logging.debug$((Logging)this, msg);
            }

            public void debug(Function0<String> msg, Function0<Throwable> e) {
                Logging.debug$((Logging)this, msg, e);
            }

            public void info(Function0<String> msg) {
                Logging.info$((Logging)this, msg);
            }

            public void info(Function0<String> msg, Function0<Throwable> e) {
                Logging.info$((Logging)this, msg, e);
            }

            public void warn(Function0<String> msg) {
                Logging.warn$((Logging)this, msg);
            }

            public void warn(Function0<String> msg, Function0<Throwable> e) {
                Logging.warn$((Logging)this, msg, e);
            }

            public void error(Function0<String> msg) {
                Logging.error$((Logging)this, msg);
            }

            public void error(Function0<String> msg, Function0<Throwable> e) {
                Logging.error$((Logging)this, msg, e);
            }

            public void fatal(Function0<String> msg) {
                Logging.fatal$((Logging)this, msg);
            }

            public void fatal(Function0<String> msg, Function0<Throwable> e) {
                Logging.fatal$((Logging)this, msg, e);
            }

            public DiskUsageBasedThrottlingConfig kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig() {
                return this.kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig;
            }

            public void kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig_$eq(DiskUsageBasedThrottlingConfig x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig = x$1;
            }

            public AtomicLong kafka$server$DiskUsageBasedThrottler$$lastCheckedTime() {
                return this.kafka$server$DiskUsageBasedThrottler$$lastCheckedTime;
            }

            public KafkaMetricsGroup kafka$server$DiskUsageBasedThrottler$$metricsGroup() {
                return this.kafka$server$DiskUsageBasedThrottler$$metricsGroup;
            }

            public Seq<FileStore> kafka$server$DiskUsageBasedThrottler$$fileStores() {
                return this.kafka$server$DiskUsageBasedThrottler$$fileStores;
            }

            public void kafka$server$DiskUsageBasedThrottler$$fileStores_$eq(Seq<FileStore> x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$fileStores = x$1;
            }

            public long producerThrottleRate() {
                return this.producerThrottleRate;
            }

            public void producerThrottleRate_$eq(long x$1) {
                this.producerThrottleRate = x$1;
            }

            public long kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate() {
                return this.kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate;
            }

            public void kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate_$eq(long x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate = x$1;
            }

            public long followerThrottleRate() {
                return this.followerThrottleRate;
            }

            public void followerThrottleRate_$eq(long x$1) {
                this.followerThrottleRate = x$1;
            }

            public DiskThrottleListenerManager listenerManager() {
                return this.listenerManager;
            }

            public final void kafka$server$DiskUsageBasedThrottler$_setter_$kafka$server$DiskUsageBasedThrottler$$lastCheckedTime_$eq(AtomicLong x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$lastCheckedTime = x$1;
            }

            public final void kafka$server$DiskUsageBasedThrottler$_setter_$kafka$server$DiskUsageBasedThrottler$$metricsGroup_$eq(KafkaMetricsGroup x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$metricsGroup = x$1;
            }

            public void kafka$server$DiskUsageBasedThrottler$_setter_$listenerManager_$eq(DiskThrottleListenerManager x$1) {
                this.listenerManager = x$1;
            }

            private Logger logger$lzycompute() {
                synchronized (this) {
                    if (!this.bitmap$0) {
                        this.logger = Logging.logger$((Logging)this);
                        this.bitmap$0 = true;
                    }
                }
                return this.logger;
            }

            public Logger logger() {
                if (!this.bitmap$0) {
                    return this.logger$lzycompute();
                }
                return this.logger;
            }

            public String logIdent() {
                return this.logIdent;
            }

            public void logIdent_$eq(String x$1) {
                this.logIdent = x$1;
            }

            public DiskUsageBasedThrottlingConfig diskThrottlingConfig() {
                return this.throttlingConfig$1;
            }

            public long minDiskUsableBytes() {
                return this.freeDiskRemaining$1.get();
            }

            public Time time() {
                return this.$outer.kafka$server$ReplicationQuotaManagerTest$$mockTime();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.throttlingConfig$1 = throttlingConfig$1;
                this.freeDiskRemaining$1 = freeDiskRemaining$1;
                DiskUsageBasedThrottler.$init$((DiskUsageBasedThrottler)this);
                Statics.releaseFence();
            }
        };
        ReplicationQuotaManagerConfig config = new ReplicationQuotaManagerConfig(Defaults$.MODULE$.QuotaBytesPerSecond(), Defaults$.MODULE$.DefaultNumQuotaSamples(), Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds(), false);
        ReplicationQuotaManager quotaManager = new ReplicationQuotaManager(config, this.metrics(), (QuotaType)QuotaType.FollowerReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        DiskThrottleListenerManager.registerListener$((DiskThrottleListenerManager)DiskUsageBasedThrottler$.MODULE$, (DiskUsageBasedThrottleListener)quotaManager);
        freeDiskRemaining.set(diskThreshold);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs);
        int tp1_id = 0;
        Assertions.assertFalse((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id)), (String)"Throttling shouldn't happen with high disk availability");
        Assertions.assertEquals((long)config.quotaBytesPerSecond(), (long)quotaManager.upperBound());
        freeDiskRemaining.set(diskThreshold - 1L);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs2 = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs2);
        int tp1_id2 = 0;
        Assertions.assertTrue((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id2)), (String)"Throttling should happen with low disk availability");
        Assertions.assertEquals((long)(2L * throttledLimit), (long)quotaManager.upperBound());
        freeDiskRemaining.set(diskThreshold + 1L);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs3 = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs3);
        int tp1_id3 = 0;
        Assertions.assertTrue((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id3)), (String)"Throttling should still continue");
        Assertions.assertEquals((long)(2L * throttledLimit), (long)quotaManager.upperBound());
        freeDiskRemaining.set((long)((double)diskThreshold * 1.5));
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs4 = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs4);
        int tp1_id4 = 0;
        Assertions.assertFalse((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id4)), (String)"Throttling should have stopped with high disk availability");
        Assertions.assertEquals((long)config.quotaBytesPerSecond(), (long)quotaManager.upperBound());
        DiskThrottleListenerManager.deRegisterListener$((DiskThrottleListenerManager)DiskUsageBasedThrottler$.MODULE$, (DiskUsageBasedThrottleListener)quotaManager);
    }

    @Test
    public void testBrokerIsThrottledOnLowDiskAvailabilityWithClusterLinking() {
        long diskThreshold = DiskUsageBasedThrottlingConfig$.MODULE$.MinDiskThresholdBytes();
        long throttledLimit = DiskUsageBasedThrottlingConfig$.MODULE$.MinThroughputBytesPerSec();
        AtomicLong freeDiskRemaining = new AtomicLong(0L);
        DiskUsageBasedThrottlingConfig throttlingConfig = DiskUsageBasedThrottlingConfig$.MODULE$.apply(diskThreshold, throttledLimit, (Seq)new .colon.colon((Object)System.getProperty("java.io.tmpdir"), (List)Nil$.MODULE$), true, 500L, 1.5, true);
        DiskUsageBasedThrottler throttler = new DiskUsageBasedThrottler(this, throttlingConfig, freeDiskRemaining){
            private volatile DiskUsageBasedThrottlingConfig kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig;
            private AtomicLong kafka$server$DiskUsageBasedThrottler$$lastCheckedTime;
            private KafkaMetricsGroup kafka$server$DiskUsageBasedThrottler$$metricsGroup;
            private volatile Seq<FileStore> kafka$server$DiskUsageBasedThrottler$$fileStores;
            private volatile long producerThrottleRate;
            private volatile long kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate;
            private volatile long followerThrottleRate;
            private DiskThrottleListenerManager listenerManager;
            private Logger logger;
            private String logIdent;
            private volatile boolean bitmap$0;
            private final /* synthetic */ ReplicationQuotaManagerTest $outer;
            private final DiskUsageBasedThrottlingConfig throttlingConfig$2;
            private final AtomicLong freeDiskRemaining$2;

            public Seq<FileStore> getFileStores() {
                return DiskUsageBasedThrottler.getFileStores$((DiskUsageBasedThrottler)this);
            }

            public void createDiskThrottlerMetrics() {
                DiskUsageBasedThrottler.createDiskThrottlerMetrics$((DiskUsageBasedThrottler)this);
            }

            public void removeDiskThrottlerMetrics() {
                DiskUsageBasedThrottler.removeDiskThrottlerMetrics$((DiskUsageBasedThrottler)this);
            }

            public void updateDiskThrottlingConfig(DiskUsageBasedThrottlingConfig newConfig) {
                DiskUsageBasedThrottler.updateDiskThrottlingConfig$((DiskUsageBasedThrottler)this, (DiskUsageBasedThrottlingConfig)newConfig);
            }

            public boolean diskThrottlingEnabledInConfig(DiskUsageBasedThrottlingConfig config) {
                return DiskUsageBasedThrottler.diskThrottlingEnabledInConfig$((DiskUsageBasedThrottler)this, (DiskUsageBasedThrottlingConfig)config);
            }

            public long minDiskTotalBytes() {
                return DiskUsageBasedThrottler.minDiskTotalBytes$((DiskUsageBasedThrottler)this);
            }

            public void checkAndUpdateQuotaOnDiskUsage(long timeMs) {
                DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)this, (long)timeMs);
            }

            public void initThrottler() {
                DiskUsageBasedThrottler.initThrottler$((DiskUsageBasedThrottler)this);
            }

            public DiskUsageBasedThrottlingConfig getCurrentDiskThrottlingConfig() {
                return DiskUsageBasedThrottler.getCurrentDiskThrottlingConfig$((DiskUsageBasedThrottler)this);
            }

            public String loggerName() {
                return Logging.loggerName$((Logging)this);
            }

            public String msgWithLogIdent(String msg) {
                return Logging.msgWithLogIdent$((Logging)this, (String)msg);
            }

            public void trace(Function0<String> msg) {
                Logging.trace$((Logging)this, msg);
            }

            public void trace(Function0<String> msg, Function0<Throwable> e) {
                Logging.trace$((Logging)this, msg, e);
            }

            public boolean isDebugEnabled() {
                return Logging.isDebugEnabled$((Logging)this);
            }

            public boolean isTraceEnabled() {
                return Logging.isTraceEnabled$((Logging)this);
            }

            public void debug(Function0<String> msg) {
                Logging.debug$((Logging)this, msg);
            }

            public void debug(Function0<String> msg, Function0<Throwable> e) {
                Logging.debug$((Logging)this, msg, e);
            }

            public void info(Function0<String> msg) {
                Logging.info$((Logging)this, msg);
            }

            public void info(Function0<String> msg, Function0<Throwable> e) {
                Logging.info$((Logging)this, msg, e);
            }

            public void warn(Function0<String> msg) {
                Logging.warn$((Logging)this, msg);
            }

            public void warn(Function0<String> msg, Function0<Throwable> e) {
                Logging.warn$((Logging)this, msg, e);
            }

            public void error(Function0<String> msg) {
                Logging.error$((Logging)this, msg);
            }

            public void error(Function0<String> msg, Function0<Throwable> e) {
                Logging.error$((Logging)this, msg, e);
            }

            public void fatal(Function0<String> msg) {
                Logging.fatal$((Logging)this, msg);
            }

            public void fatal(Function0<String> msg, Function0<Throwable> e) {
                Logging.fatal$((Logging)this, msg, e);
            }

            public DiskUsageBasedThrottlingConfig kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig() {
                return this.kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig;
            }

            public void kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig_$eq(DiskUsageBasedThrottlingConfig x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$dynamicDiskThrottlingConfig = x$1;
            }

            public AtomicLong kafka$server$DiskUsageBasedThrottler$$lastCheckedTime() {
                return this.kafka$server$DiskUsageBasedThrottler$$lastCheckedTime;
            }

            public KafkaMetricsGroup kafka$server$DiskUsageBasedThrottler$$metricsGroup() {
                return this.kafka$server$DiskUsageBasedThrottler$$metricsGroup;
            }

            public Seq<FileStore> kafka$server$DiskUsageBasedThrottler$$fileStores() {
                return this.kafka$server$DiskUsageBasedThrottler$$fileStores;
            }

            public void kafka$server$DiskUsageBasedThrottler$$fileStores_$eq(Seq<FileStore> x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$fileStores = x$1;
            }

            public long producerThrottleRate() {
                return this.producerThrottleRate;
            }

            public void producerThrottleRate_$eq(long x$1) {
                this.producerThrottleRate = x$1;
            }

            public long kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate() {
                return this.kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate;
            }

            public void kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate_$eq(long x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$clusterLinkThrottleRate = x$1;
            }

            public long followerThrottleRate() {
                return this.followerThrottleRate;
            }

            public void followerThrottleRate_$eq(long x$1) {
                this.followerThrottleRate = x$1;
            }

            public DiskThrottleListenerManager listenerManager() {
                return this.listenerManager;
            }

            public final void kafka$server$DiskUsageBasedThrottler$_setter_$kafka$server$DiskUsageBasedThrottler$$lastCheckedTime_$eq(AtomicLong x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$lastCheckedTime = x$1;
            }

            public final void kafka$server$DiskUsageBasedThrottler$_setter_$kafka$server$DiskUsageBasedThrottler$$metricsGroup_$eq(KafkaMetricsGroup x$1) {
                this.kafka$server$DiskUsageBasedThrottler$$metricsGroup = x$1;
            }

            public void kafka$server$DiskUsageBasedThrottler$_setter_$listenerManager_$eq(DiskThrottleListenerManager x$1) {
                this.listenerManager = x$1;
            }

            private Logger logger$lzycompute() {
                synchronized (this) {
                    if (!this.bitmap$0) {
                        this.logger = Logging.logger$((Logging)this);
                        this.bitmap$0 = true;
                    }
                }
                return this.logger;
            }

            public Logger logger() {
                if (!this.bitmap$0) {
                    return this.logger$lzycompute();
                }
                return this.logger;
            }

            public String logIdent() {
                return this.logIdent;
            }

            public void logIdent_$eq(String x$1) {
                this.logIdent = x$1;
            }

            public DiskUsageBasedThrottlingConfig diskThrottlingConfig() {
                return this.throttlingConfig$2;
            }

            public long minDiskUsableBytes() {
                return this.freeDiskRemaining$2.get();
            }

            public Time time() {
                return this.$outer.kafka$server$ReplicationQuotaManagerTest$$mockTime();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                this.throttlingConfig$2 = throttlingConfig$2;
                this.freeDiskRemaining$2 = freeDiskRemaining$2;
                DiskUsageBasedThrottler.$init$((DiskUsageBasedThrottler)this);
                Statics.releaseFence();
            }
        };
        ReplicationQuotaManagerConfig config = new ReplicationQuotaManagerConfig(Defaults$.MODULE$.QuotaBytesPerSecond(), Defaults$.MODULE$.DefaultNumQuotaSamples(), Defaults$.MODULE$.DefaultQuotaWindowSizeSeconds(), false);
        ReplicationQuotaManager quotaManager = new ReplicationQuotaManager(config, this.metrics(), (QuotaType)QuotaType.FollowerReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        ReplicationQuotaManager linkQuotaManager = new ReplicationQuotaManager(config, this.metrics(), (QuotaType)QuotaType.ClusterLinkReplication$.MODULE$, (Time)this.kafka$server$ReplicationQuotaManagerTest$$mockTime());
        DiskThrottleListenerManager.registerListener$((DiskThrottleListenerManager)DiskUsageBasedThrottler$.MODULE$, (DiskUsageBasedThrottleListener)quotaManager);
        DiskThrottleListenerManager.registerListener$((DiskThrottleListenerManager)DiskUsageBasedThrottler$.MODULE$, (DiskUsageBasedThrottleListener)linkQuotaManager);
        freeDiskRemaining.set(diskThreshold);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs);
        int tp1_id = 0;
        Assertions.assertFalse((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id)), (String)"Throttling shouldn't happen with high disk availability");
        Assertions.assertEquals((long)config.quotaBytesPerSecond(), (long)quotaManager.upperBound());
        Assertions.assertEquals((long)config.quotaBytesPerSecond(), (long)linkQuotaManager.upperBound());
        freeDiskRemaining.set(diskThreshold - 1L);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs2 = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs2);
        int tp1_id2 = 0;
        Assertions.assertTrue((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id2)), (String)"Throttling should happen with low disk availability");
        Assertions.assertEquals((long)(4L * throttledLimit), (long)quotaManager.upperBound());
        Assertions.assertEquals((long)throttledLimit, (long)linkQuotaManager.upperBound());
        freeDiskRemaining.set(diskThreshold + 1L);
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs3 = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs3);
        int tp1_id3 = 0;
        Assertions.assertTrue((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id3)), (String)"Throttling should still continue");
        Assertions.assertEquals((long)(4L * throttledLimit), (long)quotaManager.upperBound());
        Assertions.assertEquals((long)throttledLimit, (long)linkQuotaManager.upperBound());
        freeDiskRemaining.set((long)((double)diskThreshold * 1.5));
        this.kafka$server$ReplicationQuotaManagerTest$$mockTime().sleep(501L);
        long checkAndUpdateQuotaOnDiskUsage_timeMs4 = this.kafka$server$ReplicationQuotaManagerTest$$mockTime().milliseconds();
        DiskUsageBasedThrottler.checkAndUpdateQuotaOnDiskUsage$((DiskUsageBasedThrottler)throttler, (long)checkAndUpdateQuotaOnDiskUsage_timeMs4);
        int tp1_id4 = 0;
        Assertions.assertFalse((boolean)quotaManager.isThrottled(new TopicPartition("topic1", tp1_id4)), (String)"Throttling should have stopped with high disk availability");
        Assertions.assertEquals((long)config.quotaBytesPerSecond(), (long)quotaManager.upperBound());
        Assertions.assertEquals((long)config.quotaBytesPerSecond(), (long)linkQuotaManager.upperBound());
        DiskThrottleListenerManager.deRegisterListener$((DiskThrottleListenerManager)DiskUsageBasedThrottler$.MODULE$, (DiskUsageBasedThrottleListener)quotaManager);
        DiskThrottleListenerManager.deRegisterListener$((DiskThrottleListenerManager)DiskUsageBasedThrottler$.MODULE$, (DiskUsageBasedThrottleListener)linkQuotaManager);
    }

    private TopicPartition tp1(int id) {
        return new TopicPartition("topic1", id);
    }
}

