/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.File;
import java.io.Serializable;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.function.Supplier;
import kafka.cluster.Partition;
import kafka.log.LogManager;
import kafka.log.UnifiedLog;
import kafka.server.AbstractFetcherThread;
import kafka.server.FailedPartitions;
import kafka.server.Fetching$;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.LeaderEndPoint;
import kafka.server.LocalLeaderEndPoint;
import kafka.server.PartitionFetchState;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaAlterLogDirsThread;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.ReplicaState;
import kafka.server.ReplicationQuotaManager;
import kafka.server.Truncating$;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.common.DirectoryEventHandler;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.network.BrokerEndPoint;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchParams;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.verification.VerificationMode;
import scala.Function1;
import scala.Int$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map$;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.math.Integral;
import scala.math.Numeric;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005\t]c\u0001B\u0013'\u0001-BQA\r\u0001\u0005\u0002MBqA\u000e\u0001C\u0002\u0013%q\u0007\u0003\u0004D\u0001\u0001\u0006I\u0001\u000f\u0005\b\t\u0002\u0011\r\u0011\"\u00038\u0011\u0019)\u0005\u0001)A\u0005q!9a\t\u0001b\u0001\n\u00139\u0005BB&\u0001A\u0003%\u0001\nC\u0004M\u0001\t\u0007I\u0011B'\t\ry\u0003\u0001\u0015!\u0003O\u0011\u001dy\u0006A1A\u0005\n\u0001Da\u0001\u001a\u0001!\u0002\u0013\t\u0007bB3\u0001\u0005\u0004%IA\u001a\u0005\u0007U\u0002\u0001\u000b\u0011B4\t\u000f-\u0004!\u0019!C\u0005Y\"11\u000f\u0001Q\u0001\n5DQ\u0001\u001e\u0001\u0005\nUD\u0011\"a\u0002\u0001#\u0003%I!!\u0003\t\u000f\u0005}\u0001\u0001\"\u0001\u0002\"!9\u0011q\b\u0001\u0005\u0002\u0005\u0005\u0002bBA\"\u0001\u0011\u0005\u0011\u0011\u0005\u0005\b\u0003\u000f\u0002A\u0011BA%\u0011\u001d\ty\b\u0001C\u0001\u0003CAq!a!\u0001\t\u0003\t\t\u0003C\u0004\u0002\b\u0002!\t!!\t\t\u000f\u0005-\u0005\u0001\"\u0003\u0002\u000e\"9\u0011Q\u001b\u0001\u0005\u0002\u0005\u0005\u0002bBAm\u0001\u0011\u0005\u0011\u0011\u0005\u0005\b\u0003;\u0004A\u0011AA\u0011\u0011\u001d\t\t\u000f\u0001C\u0001\u0003CAq!!:\u0001\t\u0003\t\t\u0003C\u0004\u0002j\u0002!\t!!\t\t\u000f\u00055\b\u0001\"\u0001\u0002\"!9\u0011\u0011\u001f\u0001\u0005\u0002\u0005\u0005\u0002bBA{\u0001\u0011\u0005\u0011\u0011\u0005\u0005\b\u0003s\u0004A\u0011AA~\u0011\u001d\u0011)\u0003\u0001C\u0001\u0005O\u0011QDU3qY&\u001c\u0017-\u00117uKJdun\u001a#jeN$\u0006N]3bIR+7\u000f\u001e\u0006\u0003O!\naa]3sm\u0016\u0014(\"A\u0015\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001\u0001\f\t\u0003[Aj\u0011A\f\u0006\u0002_\u0005)1oY1mC&\u0011\u0011G\f\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005!\u0004CA\u001b\u0001\u001b\u00051\u0013\u0001\u0002;2aB*\u0012\u0001\u000f\t\u0003s\u0005k\u0011A\u000f\u0006\u0003wq\naaY8n[>t'BA\u0015>\u0015\tqt(\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0001\u0006\u0019qN]4\n\u0005\tS$A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\\\u0001\u0006iF\u0002\b\u0007I\u0001\u0005iF\u0002\u0018'A\u0003ucA\f\u0004%A\u0004u_BL7-\u00133\u0016\u0003!\u0003\"!O%\n\u0005)S$\u0001B+vS\u0012\f\u0001\u0002^8qS\u000eLE\rI\u0001\u000bi>\u0004\u0018n\u0019(b[\u0016\u001cX#\u0001(\u0011\t=#\u0006JV\u0007\u0002!*\u0011\u0011KU\u0001\nS6lW\u000f^1cY\u0016T!a\u0015\u0018\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0002V!\n\u0019Q*\u00199\u0011\u0005]cV\"\u0001-\u000b\u0005eS\u0016\u0001\u00027b]\u001eT\u0011aW\u0001\u0005U\u00064\u0018-\u0003\u0002^1\n11\u000b\u001e:j]\u001e\f1\u0002^8qS\u000et\u0015-\\3tA\u00051A/\u001b32aB*\u0012!\u0019\t\u0003s\tL!a\u0019\u001e\u0003!Q{\u0007/[2JIB\u000b'\u000f^5uS>t\u0017a\u0002;jIF\u0002\b\u0007I\u0001\u0011M\u0006LG.\u001a3QCJ$\u0018\u000e^5p]N,\u0012a\u001a\t\u0003k!L!!\u001b\u0014\u0003!\u0019\u000b\u0017\u000e\\3e!\u0006\u0014H/\u001b;j_:\u001c\u0018!\u00054bS2,G\rU1si&$\u0018n\u001c8tA\u0005iQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016,\u0012!\u001c\t\u0003]Fl\u0011a\u001c\u0006\u0003a\u001a\n\u0001\"\\3uC\u0012\fG/Y\u0005\u0003e>\u0014!c\u0013*bMRlU\r^1eCR\f7)Y2iK\u0006qQ.\u001a;bI\u0006$\u0018mQ1dQ\u0016\u0004\u0013!E5oSRL\u0017\r\u001c$fi\u000eD7\u000b^1uKR\u0019a/\u001f@\u0011\u0005U:\u0018B\u0001='\u0005EIe.\u001b;jC24U\r^2i'R\fG/\u001a\u0005\u0006uB\u0001\ra_\u0001\fM\u0016$8\r[(gMN,G\u000f\u0005\u0002.y&\u0011QP\f\u0002\u0005\u0019>tw\r\u0003\u0005\u0000!A\u0005\t\u0019AA\u0001\u0003-aW-\u00193fe\u0016\u0003xn\u00195\u0011\u00075\n\u0019!C\u0002\u0002\u00069\u00121!\u00138u\u0003mIg.\u001b;jC24U\r^2i'R\fG/\u001a\u0013eK\u001a\fW\u000f\u001c;%eU\u0011\u00111\u0002\u0016\u0005\u0003\u0003\tia\u000b\u0002\u0002\u0010A!\u0011\u0011CA\u000e\u001b\t\t\u0019B\u0003\u0003\u0002\u0016\u0005]\u0011!C;oG\",7m[3e\u0015\r\tIBL\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA\u000f\u0003'\u0011\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u00031\u001a\bn\\;mI:{G/\u00113e!\u0006\u0014H/\u001b;j_:LeMR;ukJ,Gj\\4Jg:{G\u000fR3gS:,G\r\u0006\u0002\u0002$A\u0019Q&!\n\n\u0007\u0005\u001dbF\u0001\u0003V]&$\bf\u0001\n\u0002,A!\u0011QFA\u001e\u001b\t\tyC\u0003\u0003\u00022\u0005M\u0012aA1qS*!\u0011QGA\u001c\u0003\u001dQW\u000f]5uKJT1!!\u000f@\u0003\u0015QWO\\5u\u0013\u0011\ti$a\f\u0003\tQ+7\u000f^\u0001-g\"|W\u000f\u001c3Va\u0012\fG/\u001a'fC\u0012,'/\u00129pG\"\fe\r^3s\r\u0016t7-\u001a3Fa>\u001c\u0007.\u0012:s_JD3aEA\u0016\u0003\u0019\u001a\bn\\;mIJ+\u0007\u000f\\1dK\u000e+(O]3oi2{w\rR5s/\",gnQ1vO\"$X\u000b\u001d\u0015\u0004)\u0005-\u0012aF;qI\u0006$XMU3bgNLwM\\7f]R\u001cF/\u0019;f)!\tY%!\u0015\u0002\\\u0005}\u0003#B\u0017\u0002N\u0005\r\u0012bAA(]\t1q\n\u001d;j_:Dq!a\u0015\u0016\u0001\u0004\t)&\u0001\u0004uQJ,\u0017\r\u001a\t\u0004k\u0005]\u0013bAA-M\tI\"+\u001a9mS\u000e\f\u0017\t\u001c;fe2{w\rR5sgRC'/Z1e\u0011\u001d\ti&\u0006a\u0001\u0003\u0003\t1\u0002]1si&$\u0018n\u001c8JI\"9\u0011\u0011M\u000bA\u0002\u0005\r\u0014\u0001\u00038foN#\u0018\r^3\u0011\t\u0005\u0015\u0014\u0011\u0010\b\u0005\u0003O\n)H\u0004\u0003\u0002j\u0005Md\u0002BA6\u0003cj!!!\u001c\u000b\u0007\u0005=$&\u0001\u0004=e>|GOP\u0005\u0002S%\u0011q\u0005K\u0005\u0004\u0003o2\u0013!\u0007*fa2L7-Y!mi\u0016\u0014Hj\\4ESJ\u001cH\u000b\u001b:fC\u0012LA!a\u001f\u0002~\t\t\"+Z1tg&<g.\\3oiN#\u0018\r^3\u000b\u0007\u0005]d%\u0001)tQ>,H\u000e\u001a*fa2\f7-Z\"veJ,g\u000e\u001e'pO\u0012K'o\u00165f]\u000e\u000bWo\u001a5u+B<\u0016\u000e\u001e5BMR,'/Q:tS\u001etW.\u001a8u%\u0016\fX/Z:u\u0011\u0006\u001c()Z3o\u0007>l\u0007\u000f\\3uK\u0012D3AFA\u0016\u0003\u0001\u001b\bn\\;mIJ+g/\u001a:u\u0003:L8k\u00195fIVdW\rZ!tg&<g.\\3oiJ+\u0017/^3ti&3\u0017i]:jO:lWM\u001c;Jg\u000e\u000bgnY3mY\u0016$\u0007fA\f\u0002,\u0005i4\u000f[8vY\u0012\u0014VM^3siJ+\u0017m]:jO:lWM\u001c;t\r>\u0014\u0018J\\2p[BdW\r^3GkR,(/\u001a*fa2L7-\u0019)s_6|G/[8og\"\u001a\u0001$a\u000b\u0002/5|7m\u001b$fi\u000eDgI]8n\u0007V\u0014(/\u001a8u\u0019><G\u0003DA\u0012\u0003\u001f\u000b\u0019*a+\u00026\u0006}\u0006BBAI3\u0001\u0007\u0011-\u0001\tu_BL7-\u00133QCJ$\u0018\u000e^5p]\"9\u0011QS\rA\u0002\u0005]\u0015a\u0003:fcV,7\u000f\u001e#bi\u0006\u0004B!!'\u0002&:!\u00111TAQ\u001b\t\tiJC\u0002\u0002 j\n\u0001B]3rk\u0016\u001cHo]\u0005\u0005\u0003G\u000bi*\u0001\u0007GKR\u001c\u0007NU3rk\u0016\u001cH/\u0003\u0003\u0002(\u0006%&!\u0004)beRLG/[8o\t\u0006$\u0018M\u0003\u0003\u0002$\u0006u\u0005bBAW3\u0001\u0007\u0011qV\u0001\u0007G>tg-[4\u0011\u0007U\n\t,C\u0002\u00024\u001a\u00121bS1gW\u0006\u001cuN\u001c4jO\"9\u0011qW\rA\u0002\u0005e\u0016A\u0004:fa2L7-Y'b]\u0006<WM\u001d\t\u0004k\u0005m\u0016bAA_M\tq!+\u001a9mS\u000e\fW*\u00198bO\u0016\u0014\bbBAa3\u0001\u0007\u00111Y\u0001\re\u0016\u001c\bo\u001c8tK\u0012\u000bG/\u0019\t\u0005\u0003\u000b\f\t.\u0004\u0002\u0002H*!\u0011\u0011ZAf\u0003\rawn\u001a\u0006\u0005\u0003\u001b\fy-A\u0004ti>\u0014\u0018mZ3\u000b\u0005\u001db\u0014\u0002BAj\u0003\u000f\u0014!CR3uG\"\u0004\u0016M\u001d;ji&|g\u000eR1uC\u0006\u0011\u0013n]:vKN,\u0005o\\2i%\u0016\fX/Z:u\rJ|W\u000eT8dC2\u0014V\r\u001d7jG\u0006D3AGA\u0016\u0003u2W\r^2i\u000bB|7\r[:Ge>lG*Z1eKJ\u001c\u0006n\\;mI\"\u000bg\u000e\u001a7f\u000bb\u001cW\r\u001d;j_:4%o\\7HKRdunY1m%\u0016\u0004H.[2bQ\rY\u00121F\u0001\u001eg\"|W\u000f\u001c3UeVt7-\u0019;f)>\u0014V\r\u001d7jG\u0006|eMZ:fi\"\u001aA$a\u000b\u0002[MDw.\u001e7e)J,hnY1uKR{WI\u001c3PM\u001a\u001cX\r^(g\u0019\u0006\u0014x-Z:u\u0007>lWn\u001c8Fa>\u001c\u0007\u000eK\u0002\u001e\u0003W\t\u0011i\u001d5pk2$GK];oG\u0006$X\rV8J]&$\u0018.\u00197GKR\u001c\u0007n\u00144gg\u0016$\u0018J\u001a*fa2L7-\u0019*fiV\u0014hn]+oI\u00164\u0017N\\3e\u001f\u001a47/\u001a;)\u0007y\tY#A\u0016tQ>,H\u000e\u001a)pY2Le\u000eZ3gS:LG/\u001a7z\u0013\u001a\u0014V\r\u001d7jG\u0006tu\u000e^!wC&d\u0017M\u00197fQ\ry\u00121F\u0001'g\"|W\u000f\u001c3GKR\u001c\u0007\u000eT3bI\u0016\u0014X\t]8dQ>sg)\u001b:ti\u001a+Go\u00195P]2L\bf\u0001\u0011\u0002,\u0005a2\u000f[8vY\u00124U\r^2i\u001f:,'+\u001a9mS\u000e\f\u0017\t^!US6,\u0007fA\u0011\u0002,\u0005i3\u000f[8vY\u00124U\r^2i\u001d>tG)\u001a7bs\u0016$\u0017I\u001c3O_:$&/\u001e8dCRLgn\u001a*fa2L7-Y:)\u0007\t\nY#\u0001\u0003tiV\u0014G\u0003DA\u0012\u0003{\u0014YAa\u0004\u0003\u0014\t\r\u0002bBA\u0000G\u0001\u0007!\u0011A\u0001\bY><G+\r91!\u0011\u0011\u0019Aa\u0002\u000e\u0005\t\u0015!bAAeQ%!!\u0011\u0002B\u0003\u0005))f.\u001b4jK\u0012dun\u001a\u0005\b\u0005\u001b\u0019\u0003\u0019\u0001B\u0001\u0003\u001dawn\u001a+2aFBqA!\u0005$\u0001\u0004\u0011\t!A\u0005gkR,(/\u001a'pO\"9!QC\u0012A\u0002\t]\u0011!\u00039beRLG/[8o!\u0011\u0011IBa\b\u000e\u0005\tm!b\u0001B\u000fQ\u000591\r\\;ti\u0016\u0014\u0018\u0002\u0002B\u0011\u00057\u0011\u0011\u0002U1si&$\u0018n\u001c8\t\u000f\u0005]6\u00051\u0001\u0002:\u0006)2\u000f^;c/&$\bNR3uG\"lUm]:bO\u0016\u001cHCDA\u0012\u0005S\u0011YC!\f\u00030\tE\"1\u0007\u0005\b\u0003\u007f$\u0003\u0019\u0001B\u0001\u0011\u001d\u0011i\u0001\na\u0001\u0005\u0003AqA!\u0005%\u0001\u0004\u0011\t\u0001C\u0004\u0003\u0016\u0011\u0002\rAa\u0006\t\u000f\u0005]F\u00051\u0001\u0002:\"9!Q\u0007\u0013A\u0002\t]\u0012\u0001\u0005:fgB|gn]3DC2d'-Y2l!\u0019\u0011IDa\u0010\u0003D5\u0011!1\b\u0006\u0004\u0005{y\u0014aB7pG.LGo\\\u0005\u0005\u0005\u0003\u0012YD\u0001\bBe\u001e,X.\u001a8u\u0007\u0006\u0004Ho\u001c:\u0011\u000f5\u0012)E!\u0013\u0002$%\u0019!q\t\u0018\u0003\u0013\u0019+hn\u0019;j_:\f\u0004C\u0002B&\u0005\u001b\u0012\t&D\u0001S\u0013\r\u0011yE\u0015\u0002\u0004'\u0016\f\bCB\u0017\u0003T\u0005\f\u0019-C\u0002\u0003V9\u0012a\u0001V;qY\u0016\u0014\u0004")
public class ReplicaAlterLogDirsThreadTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final Uuid topicId = Uuid.randomUuid();
    private final scala.collection.immutable.Map<Uuid, String> topicNames = (scala.collection.immutable.Map)Map$.MODULE$.apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.topicId()), (Object)"topic1")}));
    private final TopicIdPartition tid1p0 = new TopicIdPartition(this.topicId(), this.t1p0());
    private final FailedPartitions failedPartitions = new FailedPartitions();
    private final KRaftMetadataCache metadataCache;

    private TopicPartition t1p0() {
        return this.t1p0;
    }

    private TopicPartition t1p1() {
        return this.t1p1;
    }

    private Uuid topicId() {
        return this.topicId;
    }

    private scala.collection.immutable.Map<Uuid, String> topicNames() {
        return this.topicNames;
    }

    private TopicIdPartition tid1p0() {
        return this.tid1p0;
    }

    private FailedPartitions failedPartitions() {
        return this.failedPartitions;
    }

    private KRaftMetadataCache metadataCache() {
        return this.metadataCache;
    }

    private InitialFetchState initialFetchState(long fetchOffset, int leaderEpoch) {
        Some x$1 = new Some((Object)this.topicId());
        BrokerEndPoint x$2 = new BrokerEndPoint(0, "localhost", 9092);
        return new InitialFetchState((Option)x$1, x$2, leaderEpoch, fetchOffset);
    }

    private int initialFetchState$default$2() {
        return 1;
    }

    @Test
    public void shouldNotAddPartitionIfFutureLogIsNotDefined() {
        int brokerId = 1;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats(), Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        Set addedPartitions = thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1))})));
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().empty(), (Object)addedPartitions);
        Assertions.assertEquals((int)0, (int)thread.partitionCount());
        Assertions.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
    }

    @Test
    public void shouldUpdateLeaderEpochAfterFencedEpochError() {
        int brokerId = 1;
        int partitionId = 0;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        int leaderEpoch = 5;
        int logEndOffset = 0;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partition.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.onlinePartition(this.t1p0())).thenReturn((Object)new Some((Object)partition));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)logEndOffset));
        Mockito.when((Object)partition.futureLocalLogOrException()).thenReturn((Object)futureLog);
        ((Partition)Mockito.doNothing().when((Object)partition)).truncateTo(0L, true);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.maybeReplaceCurrentWithFutureReplica())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"gOZOXHnkR9eiA1W9ZuLk8A")));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)None$.MODULE$);
        FetchRequest.PartitionData fencedRequestData = new FetchRequest.PartitionData(this.topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch - 1)));
        FetchPartitionData fencedResponseData = new FetchPartitionData(Errors.FENCED_LEADER_EPOCH, -1L, -1L, (Records)MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false);
        this.mockFetchFromCurrentLog(this.tid1p0(), fencedRequestData, config, replicaManager, fencedResponseData);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-log-dirs-thread", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats(), Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, leaderEpoch - 1))})));
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        thread.doWork();
        Assertions.assertTrue((boolean)this.failedPartitions().contains(this.t1p0()));
        Assertions.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
        Assertions.assertEquals((int)0, (int)thread.partitionCount());
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, leaderEpoch))})));
        Assertions.assertEquals((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)), (Object)thread.fetchState(this.t1p0()).map((Function1 & Serializable)x$1 -> BoxesRunTime.boxToInteger((int)x$1.currentLeaderEpoch())));
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        FetchRequest.PartitionData requestData = new FetchRequest.PartitionData(this.topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch)));
        FetchPartitionData responseData = new FetchPartitionData(Errors.NONE, 0L, 0L, (Records)MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false);
        this.mockFetchFromCurrentLog(this.tid1p0(), requestData, config, replicaManager, responseData);
        thread.doWork();
        Assertions.assertFalse((boolean)this.failedPartitions().contains(this.t1p0()));
        Assertions.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
        Assertions.assertEquals((int)0, (int)thread.partitionCount());
    }

    @Test
    public void shouldReplaceCurrentLogDirWhenCaughtUp() {
        int brokerId = 1;
        int partitionId = 0;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        int leaderEpoch = 5;
        int logEndOffset = 0;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partition.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.onlinePartition(this.t1p0())).thenReturn((Object)new Some((Object)partition));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)logEndOffset));
        Mockito.when((Object)partition.futureLocalLogOrException()).thenReturn((Object)futureLog);
        ((Partition)Mockito.doNothing().when((Object)partition)).truncateTo(0L, true);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.maybeReplaceCurrentWithFutureReplica())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"PGLOjDjKQaCOXFOtxymIig")));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)None$.MODULE$);
        FetchRequest.PartitionData requestData = new FetchRequest.PartitionData(this.topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch)));
        FetchPartitionData responseData = new FetchPartitionData(Errors.NONE, 0L, 0L, (Records)MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false);
        this.mockFetchFromCurrentLog(this.tid1p0(), requestData, config, replicaManager, responseData);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats(), Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, leaderEpoch))})));
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        thread.doWork();
        Assertions.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
        Assertions.assertEquals((int)0, (int)thread.partitionCount());
    }

    private Option<BoxedUnit> updateReassignmentState(ReplicaAlterLogDirsThread thread, int partitionId, ReplicaAlterLogDirsThread.ReassignmentState newState) {
        return this.topicNames().get((Object)this.topicId()).map((Function1 & Serializable)topicName -> {
            thread.updateReassignmentState(new TopicPartition(topicName, partitionId), newState);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void shouldReplaceCurrentLogDirWhenCaughtUpWithAfterAssignmentRequestHasBeenCompleted() {
        int brokerId = 1;
        int partitionId = 0;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        DirectoryEventHandler directoryEventHandler = (DirectoryEventHandler)Mockito.mock(DirectoryEventHandler.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        int leaderEpoch = 5;
        int logEndOffset = 0;
        Uuid currentDirectoryId = Uuid.fromString((String)"EzI9SqkFQKW1iFc1ZwP9SQ");
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partition.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)partition.topicId()).thenReturn((Object)new Some((Object)this.topicId()));
        Mockito.when((Object)partition.futureReplicaDirectoryId()).thenReturn((Object)new Some((Object)Uuid.randomUuid()));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.onlinePartition(this.t1p0())).thenReturn((Object)new Some((Object)partition));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)logEndOffset));
        Mockito.when((Object)partition.futureLocalLogOrException()).thenReturn((Object)futureLog);
        ((Partition)Mockito.doNothing().when((Object)partition)).truncateTo(0L, true);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.maybeReplaceCurrentWithFutureReplica())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.runCallbackIfFutureReplicaCaughtUp((Function1)ArgumentMatchers.any()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)currentDirectoryId));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)None$.MODULE$);
        FetchRequest.PartitionData requestData = new FetchRequest.PartitionData(this.topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch)));
        FetchPartitionData responseData = new FetchPartitionData(Errors.NONE, 0L, 0L, (Records)MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false);
        this.mockFetchFromCurrentLog(this.tid1p0(), requestData, config, replicaManager, responseData);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats(), Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), directoryEventHandler);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, leaderEpoch))})));
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        thread.doWork();
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        this.updateReassignmentState(thread, partitionId, (ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.Queued$.MODULE$);
        thread.doWork();
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        this.updateReassignmentState(thread, partitionId, (ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.Accepted$.MODULE$);
        thread.doWork();
        Assertions.assertEquals((Object)None$.MODULE$, (Object)thread.fetchState(this.t1p0()));
        Assertions.assertEquals((int)0, (int)thread.partitionCount());
        Mockito.verifyNoInteractions((Object[])new Object[]{directoryEventHandler});
    }

    @Test
    public void shouldRevertAnyScheduledAssignmentRequestIfAssignmentIsCancelled() {
        int brokerId = 1;
        int partitionId = 0;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(brokerId, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        DirectoryEventHandler directoryEventHandler = (DirectoryEventHandler)Mockito.mock(DirectoryEventHandler.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        int leaderEpoch = 5;
        int logEndOffset = 0;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partition.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)partition.topicId()).thenReturn((Object)new Some((Object)this.topicId()));
        Mockito.when((Object)partition.futureReplicaDirectoryId()).thenReturn((Object)new Some((Object)Uuid.randomUuid()));
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.randomUuid()));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.onlinePartition(this.t1p0())).thenReturn((Object)new Some((Object)partition));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)quotaManager.isQuotaExceeded())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)false));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)logEndOffset));
        Mockito.when((Object)partition.futureLocalLogOrException()).thenReturn((Object)futureLog);
        ((Partition)Mockito.doNothing().when((Object)partition)).truncateTo(0L, true);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.maybeReplaceCurrentWithFutureReplica())).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)partition.runCallbackIfFutureReplicaCaughtUp((Function1)ArgumentMatchers.any()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)0L));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)None$.MODULE$);
        FetchRequest.PartitionData requestData = new FetchRequest.PartitionData(this.topicId(), 0L, 0L, Predef$.MODULE$.Integer2int(config.replicaFetchMaxBytes()), Optional.of(Predef$.MODULE$.int2Integer(leaderEpoch)));
        FetchPartitionData responseData = new FetchPartitionData(Errors.NONE, 0L, 0L, (Records)MemoryRecords.EMPTY, Optional.empty(), OptionalLong.empty(), Optional.empty(), OptionalInt.empty(), false);
        this.mockFetchFromCurrentLog(this.tid1p0(), requestData, config, replicaManager, responseData);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, new BrokerTopicStats(), Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), directoryEventHandler);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, leaderEpoch))})));
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        thread.doWork();
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isDefined());
        Assertions.assertEquals((int)1, (int)thread.partitionCount());
        this.updateReassignmentState(thread, partitionId, (ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.Queued$.MODULE$);
        thread.removePartitions((Set)Predef$.MODULE$.Set().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new TopicPartition[]{this.t1p0()})));
        Assertions.assertTrue((boolean)thread.fetchState(this.t1p0()).isEmpty());
        Assertions.assertEquals((int)0, (int)thread.partitionCount());
        ArgumentCaptor topicIdPartitionCaptureT1p0 = ArgumentCaptor.forClass(org.apache.kafka.server.common.TopicIdPartition.class);
        ArgumentCaptor logIdCaptureT1p0 = ArgumentCaptor.forClass(Uuid.class);
        ((DirectoryEventHandler)Mockito.verify((Object)directoryEventHandler)).handleAssignment((org.apache.kafka.server.common.TopicIdPartition)topicIdPartitionCaptureT1p0.capture(), (Uuid)logIdCaptureT1p0.capture(), (String)ArgumentMatchers.eq((Object)"Reverting reassignment for canceled future replica"), (Runnable)ArgumentMatchers.any());
        Assertions.assertEquals((Object)new org.apache.kafka.server.common.TopicIdPartition(this.topicId(), this.t1p0().partition()), (Object)topicIdPartitionCaptureT1p0.getValue());
        Assertions.assertEquals((Object)partition.logDirectoryId().get(), (Object)logIdCaptureT1p0.getValue());
    }

    @Test
    public void shouldRevertReassignmentsForIncompleteFutureReplicaPromotions() {
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        DirectoryEventHandler directoryEventHandler = (DirectoryEventHandler)Mockito.mock(DirectoryEventHandler.class);
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, (BrokerTopicStats)Mockito.mock(BrokerTopicStats.class), 0, directoryEventHandler);
        Seq tp2 = (Seq)((IterableOps)Seq$.MODULE$.range((Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToInteger((int)4), (Integral)Numeric.IntIsIntegral$.MODULE$)).map((Function1 & Serializable)x$2 -> ReplicaAlterLogDirsThreadTest.$anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$1(BoxesRunTime.unboxToInt((Object)x$2)));
        Seq tips = (Seq)((IterableOps)Seq$.MODULE$.range((Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToInteger((int)4), (Integral)Numeric.IntIsIntegral$.MODULE$)).map((Function1 & Serializable)x$3 -> ReplicaAlterLogDirsThreadTest.$anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$2(this, BoxesRunTime.unboxToInt((Object)x$3)));
        Seq dirIds = (Seq)((IterableOps)Seq$.MODULE$.range((Object)BoxesRunTime.boxToInteger((int)0), (Object)BoxesRunTime.boxToInteger((int)4), (Integral)Numeric.IntIsIntegral$.MODULE$)).map((Function1 & Serializable)i -> ReplicaAlterLogDirsThreadTest.$anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$3(BoxesRunTime.unboxToInt((Object)i)));
        tp2.foreach((Function1 & Serializable)tp -> thread.promotionStates().put(tp, new ReplicaAlterLogDirsThread.PromotionState((ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.None$.MODULE$, (Option)new Some((Object)this.topicId()), (Option)new Some(dirIds.apply(tp.partition())))));
        thread.updateReassignmentState((TopicPartition)tp2.apply(0), (ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.None$.MODULE$);
        thread.updateReassignmentState((TopicPartition)tp2.apply(1), (ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.Queued$.MODULE$);
        thread.updateReassignmentState((TopicPartition)tp2.apply(2), (ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.Accepted$.MODULE$);
        thread.updateReassignmentState((TopicPartition)tp2.apply(3), (ReplicaAlterLogDirsThread.ReassignmentState)ReplicaAlterLogDirsThread.ReassignmentState$.Effective$.MODULE$);
        thread.removePartitions((Set)tp2.toSet());
        ((DirectoryEventHandler)Mockito.verify((Object)directoryEventHandler)).handleAssignment((org.apache.kafka.server.common.TopicIdPartition)ArgumentMatchers.eq((Object)tips.apply(1)), (Uuid)ArgumentMatchers.eq((Object)dirIds.apply(1)), (String)ArgumentMatchers.eq((Object)"Reverting reassignment for canceled future replica"), (Runnable)ArgumentMatchers.any());
        ((DirectoryEventHandler)Mockito.verify((Object)directoryEventHandler)).handleAssignment((org.apache.kafka.server.common.TopicIdPartition)ArgumentMatchers.eq((Object)tips.apply(2)), (Uuid)ArgumentMatchers.eq((Object)dirIds.apply(2)), (String)ArgumentMatchers.eq((Object)"Reverting reassignment for canceled future replica"), (Runnable)ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{directoryEventHandler});
    }

    private void mockFetchFromCurrentLog(TopicIdPartition topicIdPartition, FetchRequest.PartitionData requestData, KafkaConfig config, ReplicaManager replicaManager, FetchPartitionData responseData) {
        ArgumentCaptor callbackCaptor = ArgumentCaptor.forClass(Function1.class);
        FetchParams expectedFetchParams = new FetchParams(ApiKeys.FETCH.latestVersion(), -3, -1L, 0L, 0, Predef$.MODULE$.Integer2int(config.replicaFetchResponseMaxBytes()), FetchIsolation.LOG_END, Optional.empty());
        replicaManager.fetchMessages((FetchParams)ArgumentMatchers.eq((Object)expectedFetchParams), (Seq)ArgumentMatchers.eq((Object)new .colon.colon((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topicIdPartition), (Object)requestData), (List)Nil$.MODULE$)), (ReplicaQuota)ArgumentMatchers.eq((Object)QuotaFactory.UNBOUNDED_QUOTA), (Function1)callbackCaptor.capture());
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer(x$4 -> {
            ReplicaAlterLogDirsThreadTest.$anonfun$mockFetchFromCurrentLog$1(callbackCaptor, topicIdPartition, responseData, x$4);
            return BoxedUnit.UNIT;
        });
    }

    @Test
    public void issuesEpochRequestFromLocalReplica() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        Partition partitionT1p0 = (Partition)Mockito.mock(Partition.class);
        Partition partitionT1p1 = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int partitionT1p0Id = 0;
        int partitionT1p1Id = 1;
        int leaderEpochT1p0 = 2;
        int leaderEpochT1p1 = 5;
        int leoT1p0 = 13;
        int leoT1p1 = 232;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partitionT1p0.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionT1p0Id));
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partitionT1p0.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionT1p1Id));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partitionT1p0);
        Mockito.when((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p0, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionT1p0Id).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpochT1p0).setEndOffset((long)leoT1p0));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p1())).thenReturn((Object)partitionT1p1);
        Mockito.when((Object)partitionT1p1.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p1, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionT1p1Id).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpochT1p1).setEndOffset((long)leoT1p1));
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, null);
        Map result = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, null, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP).leader().fetchEpochEndOffsets((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p0().partition()).setLeaderEpoch(leaderEpochT1p0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p1().partition()).setLeaderEpoch(leaderEpochT1p1))})));
        Assertions.assertEquals((Object)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(this.t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpochT1p0).setEndOffset((long)leoT1p0)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(this.t1p1().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpochT1p1).setEndOffset((long)leoT1p1))}))), (Object)result, (String)"results from leader epoch request should have offset from local replica");
    }

    @Test
    public void fetchEpochsFromLeaderShouldHandleExceptionFromGetLocalReplica() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        Partition partitionT1p0 = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int partitionId = 0;
        int leaderEpoch = 2;
        int leo = 13;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partitionT1p0.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partitionT1p0);
        Mockito.when((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)leo));
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p1())).thenThrow(new Throwable[]{new KafkaStorageException()});
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, null);
        Map result = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, null, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP).leader().fetchEpochEndOffsets((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p0().partition()).setLeaderEpoch(leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(this.t1p1().partition()).setLeaderEpoch(leaderEpoch))})));
        Assertions.assertEquals((Object)((scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(this.t1p0().partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)leo)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(this.t1p1().partition()).setErrorCode(Errors.KAFKA_STORAGE_ERROR.code()))}))), (Object)result);
    }

    @Test
    public void shouldTruncateToReplicaOffset() {
        ArgumentCaptor truncateCaptureT1p0 = ArgumentCaptor.forClass(Long.TYPE);
        ArgumentCaptor truncateCaptureT1p1 = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        UnifiedLog logT1p0 = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog logT1p1 = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLogT1p0 = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLogT1p1 = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Partition partitionT1p0 = (Partition)Mockito.mock(Partition.class);
        Partition partitionT1p1 = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ArgumentCaptor responseCallback = ArgumentCaptor.forClass(Function1.class);
        int partitionT1p0Id = 0;
        int partitionT1p1Id = 1;
        int leaderEpoch = 2;
        int futureReplicaLEO = 191;
        int replicaT1p0LEO = 190;
        int replicaT1p1LEO = 192;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partitionT1p0.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionT1p0Id));
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partitionT1p1.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionT1p1Id));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partitionT1p0);
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p1())).thenReturn((Object)partitionT1p1);
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLogT1p0);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p1())).thenReturn((Object)futureLogT1p1);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p1()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLogT1p0.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)Int$.MODULE$.int2long(futureReplicaLEO)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLogT1p1.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)Int$.MODULE$.int2long(futureReplicaLEO)));
        Mockito.when((Object)futureLogT1p0.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)futureLogT1p0.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch)));
        Mockito.when((Object)partitionT1p0.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionT1p0Id).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)replicaT1p0LEO));
        Mockito.when((Object)futureLogT1p1.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)futureLogT1p1.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch)));
        Mockito.when((Object)partitionT1p1.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionT1p1Id).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)replicaT1p1LEO));
        Mockito.when((Object)partitionT1p0.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"Jsg8ufNCQYONNquPt7VYpA")));
        Mockito.when((Object)partitionT1p1.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"D2Yf6FtNROGVKoIZadSFIg")));
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        this.stubWithFetchMessages(logT1p0, logT1p1, futureLogT1p0, partitionT1p0, replicaManager, (ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, 1))})));
        thread.doWork();
        ((Partition)Mockito.verify((Object)partitionT1p0)).truncateTo(BoxesRunTime.unboxToLong((Object)truncateCaptureT1p0.capture()), ArgumentMatchers.anyBoolean());
        ((Partition)Mockito.verify((Object)partitionT1p1)).truncateTo(BoxesRunTime.unboxToLong((Object)truncateCaptureT1p1.capture()), ArgumentMatchers.anyBoolean());
        Assertions.assertEquals((long)replicaT1p0LEO, (long)BoxesRunTime.unboxToLong((Object)truncateCaptureT1p0.getValue()));
        Assertions.assertEquals((long)futureReplicaLEO, (long)BoxesRunTime.unboxToLong((Object)truncateCaptureT1p1.getValue()));
    }

    @Test
    public void shouldTruncateToEndOffsetOfLargestCommonEpoch() {
        ArgumentCaptor truncateToCapture = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ArgumentCaptor responseCallback = ArgumentCaptor.forClass(Function1.class);
        int partitionId = 0;
        int leaderEpoch = 5;
        int futureReplicaLEO = 195;
        int replicaLEO = 200;
        int replicaEpochEndOffset = 190;
        int futureReplicaEpochEndOffset = 191;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partition.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)Int$.MODULE$.int2long(futureReplicaLEO)));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch))).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)(leaderEpoch - 2))));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch - 1).setEndOffset((long)replicaLEO));
        Mockito.when((Object)futureLog.endOffsetForEpoch(leaderEpoch - 1)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch - 2)));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch - 2, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch - 2).setEndOffset((long)replicaEpochEndOffset));
        Mockito.when((Object)futureLog.endOffsetForEpoch(leaderEpoch - 2)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaEpochEndOffset, leaderEpoch - 2)));
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"n6WOe2zPScqZLIreCWN6Ug")));
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        this.stubWithFetchMessages(log, null, futureLog, partition, replicaManager, (ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1))})));
        thread.doWork();
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.times((int)2))).truncateTo(BoxesRunTime.unboxToLong((Object)truncateToCapture.capture()), ArgumentMatchers.eq((boolean)true));
        Assertions.assertTrue((boolean)CollectionConverters$.MODULE$.ListHasAsScala(truncateToCapture.getAllValues()).asScala().contains((Object)BoxesRunTime.boxToInteger((int)replicaEpochEndOffset)), (String)("Expected offset " + replicaEpochEndOffset + " in captured truncation offsets " + truncateToCapture.getAllValues()));
    }

    @Test
    public void shouldTruncateToInitialFetchOffsetIfReplicaReturnsUndefinedOffset() {
        ArgumentCaptor truncated = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ArgumentCaptor responseCallback = ArgumentCaptor.forClass(Function1.class);
        int initialFetchOffset = 100;
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"b2e1ihvGQiu6A504oKoddQ")));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)None$.MODULE$);
        this.stubWithFetchMessages(log, null, futureLog, partition, replicaManager, (ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(initialFetchOffset, 1))})));
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition)).truncateTo(BoxesRunTime.unboxToLong((Object)truncated.capture()), ArgumentMatchers.eq((boolean)true));
        Assertions.assertEquals((long)initialFetchOffset, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()), (String)"Expected future replica to truncate to initial fetch offset if replica returns UNDEFINED_EPOCH_OFFSET");
    }

    @Test
    public void shouldPollIndefinitelyIfReplicaNotAvailable() {
        ArgumentCaptor truncated = ArgumentCaptor.forClass(Long.TYPE);
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ArgumentCaptor responseCallback = ArgumentCaptor.forClass(Function1.class);
        int partitionId = 0;
        int futureReplicaLeaderEpoch = 1;
        int futureReplicaLEO = 290;
        int replicaLEO = 300;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partition.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"wO7bUpvcSZC0QKEK6P6AiA")));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)Int$.MODULE$.int2long(futureReplicaLEO)));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)futureReplicaLeaderEpoch)));
        Mockito.when((Object)futureLog.endOffsetForEpoch(futureReplicaLeaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, futureReplicaLeaderEpoch)));
        Mockito.when((Object)replicaManager.localLog(this.t1p0())).thenReturn((Object)new Some((Object)log));
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), futureReplicaLeaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.REPLICA_NOT_AVAILABLE.code())).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(futureReplicaLeaderEpoch).setEndOffset((long)replicaLEO));
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        replicaManager.fetchMessages((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(), (Function1)responseCallback.capture());
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer(x$5 -> {
            ReplicaAlterLogDirsThreadTest.$anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(responseCallback, x$5);
            return BoxedUnit.UNIT;
        });
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1))})));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)x$6 -> thread.doWork());
        ((Partition)Mockito.verify((Object)partition, (VerificationMode)Mockito.never())).truncateTo(BoxesRunTime.unboxToLong((Object)truncated.capture()), ArgumentMatchers.eq((boolean)true));
        Assertions.assertEquals((int)0, (int)truncated.getAllValues().size());
        thread.doWork();
        ((Partition)Mockito.verify((Object)partition)).truncateTo(BoxesRunTime.unboxToLong((Object)truncated.capture()), ArgumentMatchers.eq((boolean)true));
        Assertions.assertEquals((long)futureReplicaLEO, (long)BoxesRunTime.unboxToLong((Object)truncated.getValue()));
    }

    @Test
    public void shouldFetchLeaderEpochOnFirstFetchOnly() {
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        ArgumentCaptor responseCallback = ArgumentCaptor.forClass(Function1.class);
        int partitionId = 0;
        int leaderEpoch = 5;
        int futureReplicaLEO = 190;
        int replicaLEO = 213;
        Mockito.when((Object)BoxesRunTime.boxToInteger((int)partition.partitionId())).thenReturn((Object)BoxesRunTime.boxToInteger((int)partitionId));
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"dybMM9CpRP2s6HSslW4NHg")));
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)partition.lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false)).thenReturn((Object)new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(partitionId).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset((long)replicaLEO));
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)futureLog.latestEpoch()).thenReturn((Object)new Some((Object)BoxesRunTime.boxToInteger((int)leaderEpoch)));
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logEndOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)Int$.MODULE$.int2long(futureReplicaLEO)));
        Mockito.when((Object)futureLog.endOffsetForEpoch(leaderEpoch)).thenReturn((Object)new Some((Object)new OffsetAndEpoch((long)futureReplicaLEO, leaderEpoch)));
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        this.stubWithFetchMessages(log, null, futureLog, partition, replicaManager, (ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>>)responseCallback);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, 1))})));
        RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(0), 3).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable)x$7 -> thread.doWork());
        ((Partition)Mockito.verify((Object)partition)).lastOffsetForLeaderEpoch(Optional.of(Predef$.MODULE$.int2Integer(1)), leaderEpoch, false);
        ((Partition)Mockito.verify((Object)partition)).truncateTo((long)futureReplicaLEO, true);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void shouldFetchOneReplicaAtATime() {
        void var14_14;
        void var13_13;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p1())).thenReturn((Object)partition);
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"Y0qUL19gSmKAXmohmrUM4g")));
        this.stub(log, null, futureLog, partition, replicaManager);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        int leaderEpoch = 1;
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, leaderEpoch))})));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions = thread.leader().buildFetch((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId()), 150L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId()), 160L, (Option)None$.MODULE$, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$))})));
        if (resultWithPartitions == null) {
            throw new MatchError(null);
        }
        Option fetchRequestOpt = (Option)resultWithPartitions.result();
        Set partitionsWithError = resultWithPartitions.partitionsWithError();
        Assertions.assertTrue((boolean)var13_13.isDefined());
        FetchRequest.Builder fetchRequest = ((AbstractFetcherThread.ReplicaFetch)var13_13.get()).fetchRequest();
        Assertions.assertFalse((boolean)fetchRequest.fetchData().isEmpty());
        Assertions.assertFalse((boolean)var14_14.nonEmpty());
        FetchRequest request = (FetchRequest)fetchRequest.build();
        Assertions.assertEquals((int)0, (int)request.minBytes());
        scala.collection.immutable.Seq fetchInfos = CollectionConverters$.MODULE$.MapHasAsScala(request.fetchData(CollectionConverters$.MODULE$.MapHasAsJava(this.topicNames()).asJava())).asScala().toSeq();
        Assertions.assertEquals((int)1, (int)fetchInfos.length());
        Assertions.assertEquals((Object)this.t1p0(), (Object)((TopicIdPartition)((Tuple2)fetchInfos.head())._1()).topicPartition(), (String)"Expected fetch request for first partition");
        Assertions.assertEquals((long)150L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos.head())._2()).fetchOffset);
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void shouldFetchNonDelayedAndNonTruncatingReplicas() {
        void var25_40;
        void var24_39;
        void var20_35;
        void var19_34;
        void apply_lastFetchedEpoch;
        void apply_state;
        void apply_lag;
        void apply_offset;
        void var15_25;
        void var14_24;
        void apply_lastFetchedEpoch2;
        void apply_state2;
        void apply_lag2;
        void apply_offset2;
        void apply_lastFetchedEpoch3;
        void apply_state3;
        void apply_lag3;
        void apply_offset3;
        KafkaConfig config = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(1, true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false));
        ReplicationQuotaManager quotaManager = (ReplicationQuotaManager)Mockito.mock(ReplicationQuotaManager.class);
        LogManager logManager = (LogManager)Mockito.mock(LogManager.class);
        UnifiedLog log = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        UnifiedLog futureLog = (UnifiedLog)Mockito.mock(UnifiedLog.class);
        Partition partition = (Partition)Mockito.mock(Partition.class);
        ReplicaManager replicaManager = (ReplicaManager)Mockito.mock(ReplicaManager.class);
        int startOffset = 123;
        Mockito.when((Object)BoxesRunTime.boxToLong((long)futureLog.logStartOffset())).thenReturn((Object)BoxesRunTime.boxToLong((long)Int$.MODULE$.int2long(startOffset)));
        Mockito.when((Object)replicaManager.logManager()).thenReturn((Object)logManager);
        Mockito.when((Object)replicaManager.metadataCache()).thenReturn((Object)this.metadataCache());
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p0())).thenReturn((Object)partition);
        Mockito.when((Object)replicaManager.getPartitionOrException(this.t1p1())).thenReturn((Object)partition);
        Mockito.when((Object)partition.logDirectoryId()).thenReturn((Object)new Some((Object)Uuid.fromString((String)"rtrdy3nsQwO1OQUEUYGxRQ")));
        this.stub(log, null, futureLog, partition, replicaManager);
        BrokerEndPoint endPoint = new BrokerEndPoint(0, "localhost", 1000);
        int leaderEpoch = 1;
        LocalLeaderEndPoint leader = new LocalLeaderEndPoint(endPoint, config, replicaManager, (ReplicaQuota)quotaManager);
        ReplicaAlterLogDirsThread thread = new ReplicaAlterLogDirsThread("alter-logs-dirs-thread-test1", (LeaderEndPoint)leader, this.failedPartitions(), replicaManager, quotaManager, null, Predef$.MODULE$.Integer2int(config.replicaFetchBackoffMs()), DirectoryEventHandler.NOOP);
        thread.addPartitions((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)this.initialFetchState(0L, leaderEpoch)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)this.initialFetchState(0L, leaderEpoch))})));
        Object[] objectArray = new Tuple2[2];
        None$ none$ = None$.MODULE$;
        Fetching$ fetching$ = Fetching$.MODULE$;
        None$ none$2 = None$.MODULE$;
        long l = 150L;
        Some apply_topicId = new Some((Object)this.topicId());
        Object var26_17 = null;
        none$2 = null;
        fetching$ = null;
        none$ = null;
        objectArray[0] = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new PartitionFetchState((Option)apply_topicId, (long)apply_offset3, (Option)apply_lag3, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)apply_state3, (Option)apply_lastFetchedEpoch3));
        None$ none$3 = None$.MODULE$;
        Truncating$ truncating$ = Truncating$.MODULE$;
        None$ none$4 = None$.MODULE$;
        long l2 = 160L;
        Some apply_topicId2 = new Some((Object)this.topicId());
        Object var32_22 = null;
        none$4 = null;
        truncating$ = null;
        none$3 = null;
        objectArray[1] = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState((Option)apply_topicId2, (long)apply_offset2, (Option)apply_lag2, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)apply_state2, (Option)apply_lastFetchedEpoch2));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions = thread.leader().buildFetch((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray(objectArray)));
        if (resultWithPartitions == null) {
            throw new MatchError(null);
        }
        Option fetchRequestOpt = (Option)resultWithPartitions.result();
        Set partitionsWithError = resultWithPartitions.partitionsWithError();
        Assertions.assertTrue((boolean)var14_24.isDefined());
        AbstractFetcherThread.ReplicaFetch fetchRequest = (AbstractFetcherThread.ReplicaFetch)var14_24.get();
        Assertions.assertFalse((boolean)fetchRequest.partitionData().isEmpty());
        Assertions.assertFalse((boolean)var15_25.nonEmpty());
        scala.collection.immutable.Seq fetchInfos = CollectionConverters$.MODULE$.MapHasAsScala(((FetchRequest)fetchRequest.fetchRequest().build()).fetchData(CollectionConverters$.MODULE$.MapHasAsJava(this.topicNames()).asJava())).asScala().toSeq();
        Assertions.assertEquals((int)1, (int)fetchInfos.length());
        Assertions.assertEquals((Object)this.t1p0(), (Object)((TopicIdPartition)((Tuple2)fetchInfos.head())._1()).topicPartition(), (String)"Expected fetch request for non-truncating partition");
        Assertions.assertEquals((long)150L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos.head())._2()).fetchOffset);
        Object[] objectArray2 = new Tuple2[2];
        None$ none$5 = None$.MODULE$;
        Fetching$ fetching$2 = Fetching$.MODULE$;
        None$ none$6 = None$.MODULE$;
        long l3 = 140L;
        Some apply_topicId3 = new Some((Object)this.topicId());
        Object var38_32 = null;
        none$6 = null;
        fetching$2 = null;
        none$5 = null;
        objectArray2[0] = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new PartitionFetchState((Option)apply_topicId3, (long)apply_offset, (Option)apply_lag, leaderEpoch, (Option)None$.MODULE$, (ReplicaState)apply_state, (Option)apply_lastFetchedEpoch));
        objectArray2[1] = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId()), 160L, (Option)None$.MODULE$, leaderEpoch, (Option)new Some((Object)BoxesRunTime.boxToLong((long)5000L)), (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$));
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions2 = thread.leader().buildFetch((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray(objectArray2)));
        if (resultWithPartitions2 == null) {
            throw new MatchError(null);
        }
        Option fetchRequest2Opt = (Option)resultWithPartitions2.result();
        Set partitionsWithError2 = resultWithPartitions2.partitionsWithError();
        Assertions.assertTrue((boolean)var19_34.isDefined());
        AbstractFetcherThread.ReplicaFetch fetchRequest2 = (AbstractFetcherThread.ReplicaFetch)var19_34.get();
        Assertions.assertFalse((boolean)fetchRequest2.partitionData().isEmpty());
        Assertions.assertFalse((boolean)var20_35.nonEmpty());
        scala.collection.immutable.Seq fetchInfos2 = CollectionConverters$.MODULE$.MapHasAsScala(((FetchRequest)fetchRequest2.fetchRequest().build()).fetchData(CollectionConverters$.MODULE$.MapHasAsJava(this.topicNames()).asJava())).asScala().toSeq();
        Assertions.assertEquals((int)1, (int)fetchInfos2.length());
        Assertions.assertEquals((Object)this.t1p0(), (Object)((TopicIdPartition)((Tuple2)fetchInfos2.head())._1()).topicPartition(), (String)"Expected fetch request for non-delayed partition");
        Assertions.assertEquals((long)140L, (long)((FetchRequest.PartitionData)((Tuple2)fetchInfos2.head())._2()).fetchOffset);
        AbstractFetcherThread.ResultWithPartitions resultWithPartitions3 = thread.leader().buildFetch((Map)Predef$.MODULE$.Map().apply((scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Tuple2[]{Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p0()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId()), 140L, (Option)None$.MODULE$, leaderEpoch, (Option)new Some((Object)BoxesRunTime.boxToLong((long)5000L)), (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$)), Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)this.t1p1()), (Object)new PartitionFetchState((Option)new Some((Object)this.topicId()), 160L, (Option)None$.MODULE$, leaderEpoch, (Option)new Some((Object)BoxesRunTime.boxToLong((long)5000L)), (ReplicaState)Fetching$.MODULE$, (Option)None$.MODULE$))})));
        if (resultWithPartitions3 == null) {
            throw new MatchError(null);
        }
        Option fetchRequest3Opt = (Option)resultWithPartitions3.result();
        Set partitionsWithError3 = resultWithPartitions3.partitionsWithError();
        Assertions.assertTrue((boolean)var24_39.isEmpty(), (String)"Expected no fetch requests since all partitions are delayed");
        Assertions.assertFalse((boolean)var25_40.nonEmpty());
    }

    public void stub(UnifiedLog logT1p0, UnifiedLog logT1p1, UnifiedLog futureLog, Partition partition, ReplicaManager replicaManager) {
        Mockito.when((Object)replicaManager.localLog(this.t1p0())).thenReturn((Object)new Some((Object)logT1p0));
        Mockito.when((Object)replicaManager.localLogOrException(this.t1p0())).thenReturn((Object)logT1p0);
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p0())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p0()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.onlinePartition(this.t1p0())).thenReturn((Object)new Some((Object)partition));
        Mockito.when((Object)replicaManager.localLog(this.t1p1())).thenReturn((Object)new Some((Object)logT1p1));
        Mockito.when((Object)replicaManager.localLogOrException(this.t1p1())).thenReturn((Object)logT1p1);
        Mockito.when((Object)replicaManager.futureLocalLogOrException(this.t1p1())).thenReturn((Object)futureLog);
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)replicaManager.futureLogExists(this.t1p1()))).thenReturn((Object)BoxesRunTime.boxToBoolean((boolean)true));
        Mockito.when((Object)replicaManager.onlinePartition(this.t1p1())).thenReturn((Object)new Some((Object)partition));
    }

    public void stubWithFetchMessages(UnifiedLog logT1p0, UnifiedLog logT1p1, UnifiedLog futureLog, Partition partition, ReplicaManager replicaManager, ArgumentCaptor<Function1<Seq<Tuple2<TopicIdPartition, FetchPartitionData>>, BoxedUnit>> responseCallback) {
        this.stub(logT1p0, logT1p1, futureLog, partition, replicaManager);
        replicaManager.fetchMessages((FetchParams)ArgumentMatchers.any(), (Seq)ArgumentMatchers.any(), (ReplicaQuota)ArgumentMatchers.any(), (Function1)responseCallback.capture());
        Mockito.when((Object)BoxedUnit.UNIT).thenAnswer(x$12 -> {
            ReplicaAlterLogDirsThreadTest.$anonfun$stubWithFetchMessages$1(responseCallback, x$12);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ TopicPartition $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$1(int x$2) {
        return new TopicPartition("t", x$2);
    }

    public static final /* synthetic */ org.apache.kafka.server.common.TopicIdPartition $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$2(ReplicaAlterLogDirsThreadTest $this, int x$3) {
        return new org.apache.kafka.server.common.TopicIdPartition($this.topicId(), x$3);
    }

    public static final /* synthetic */ Uuid $anonfun$shouldRevertReassignmentsForIncompleteFutureReplicaPromotions$3(int i) {
        return Uuid.fromString((String)("TESTBROKER0000DIR" + i + "AAAA"));
    }

    public static final /* synthetic */ void $anonfun$mockFetchFromCurrentLog$1(ArgumentCaptor callbackCaptor$1, TopicIdPartition topicIdPartition$1, FetchPartitionData responseData$1, InvocationOnMock x$4) {
        ((Function1)callbackCaptor$1.getValue()).apply((Object)new .colon.colon((Object)new Tuple2((Object)topicIdPartition$1, (Object)responseData$1), (List)Nil$.MODULE$));
    }

    public static final /* synthetic */ void $anonfun$shouldPollIndefinitelyIfReplicaNotAvailable$1(ArgumentCaptor responseCallback$1, InvocationOnMock x$5) {
        ((Function1)responseCallback$1.getValue()).apply((Object)Seq$.MODULE$.empty());
    }

    public static final /* synthetic */ void $anonfun$stubWithFetchMessages$1(ArgumentCaptor responseCallback$2, InvocationOnMock x$12) {
        ((Function1)responseCallback$2.getValue()).apply((Object)Seq$.MODULE$.empty());
    }

    public ReplicaAlterLogDirsThreadTest() {
        Supplier<KRaftVersion> kRaftMetadataCache_kraftVersionSupplier = () -> KRaftVersion.LATEST_PRODUCTION;
        int kRaftMetadataCache_brokerId = 1;
        this.metadataCache = new KRaftMetadataCache(kRaftMetadataCache_brokerId, kRaftMetadataCache_kraftVersionSupplier);
    }
}

