/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import kafka.catalog.ZKMetadataCollector;
import kafka.catalog.metadata.ClusterLinkInfo;
import kafka.controller.KafkaController;
import kafka.server.BrokerToControllerChannelManager;
import kafka.server.ConfigType$;
import kafka.server.KafkaConfig;
import kafka.server.MetadataCache;
import kafka.server.ReplicaManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkControllerRequestCompletionHandler;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkTopicState;
import kafka.server.metadata.ZkMetadataCache;
import kafka.zk.AdminZkClient;
import kafka.zk.ClusterLinkData;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterMirrorOp;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.AlterMirrorsRequestData;
import org.apache.kafka.common.message.CreateClusterLinksRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AlterMirrorsRequest;
import org.apache.kafka.common.requests.AlterMirrorsResponse;
import org.apache.kafka.common.requests.CreateClusterLinksResponse;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.storage.internals.log.LogConfig;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.Map;
import scala.collection.Map$;
import scala.collection.Seq;
import scala.collection.immutable.Set;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0005\r\rb\u0001B\u00193\u0001eB\u0001B\u0010\u0001\u0003\u0002\u0003\u0006Ia\u0010\u0005\t\u0007\u0002\u0011\t\u0011)A\u0005\t\"Iq\t\u0001B\u0001B\u0003%\u0001J\u0014\u0005\n\u001f\u0002\u0011\t\u0011)A\u0005!NC\u0001\u0002\u0016\u0001\u0003\u0006\u0004%I!\u0016\u0005\t9\u0002\u0011\t\u0011)A\u0005-\"AQ\f\u0001BC\u0002\u0013%a\f\u0003\u0005f\u0001\t\u0005\t\u0015!\u0003`\u0011!1\u0007A!A!\u0002\u00139\u0007\u0002\u0003>\u0001\u0005\u0003\u0005\u000b\u0011B>\t\u0011y\u0004!\u0011!Q\u0001\n}D!\"!\u0004\u0001\u0005\u0003\u0005\u000b\u0011BA\b\u0011\u001d\t\t\u0003\u0001C\u0001\u0003GA\u0011\"a\u000f\u0001\u0005\u0004%I!!\u0010\t\u0011\u0005\u0015\u0003\u0001)A\u0005\u0003\u007fAq!a\u0012\u0001\t\u0003\nI\u0005C\u0004\u0002R\u0001!\t%a\u0015\t\u000f\u0005=\u0004\u0001\"\u0011\u0002r!9\u00111\u0013\u0001\u0005B\u0005U\u0005bBAT\u0001\u0011\u0005\u0013\u0011\u0016\u0005\b\u0003\u000f\u0004A\u0011IAe\u0011\u001d\t9\r\u0001C!\u0003\u001bDq!a2\u0001\t\u0003\nI\u000eC\u0004\u0002\\\u0002!\t%!8\t\u000f\u0005m\u0007\u0001\"\u0011\u0002b\"9\u00111\u001c\u0001\u0005B\u0005e\u0007bBAs\u0001\u0011\u0005\u0013q\u001d\u0005\b\u0003S\u0004A\u0011IAv\u0011\u001d\ty\u000f\u0001C!\u0003cDq!a?\u0001\t\u0003\ni\u0010C\u0004\u0003\u0002\u0001!\tEa\u0001\t\u000f\t\u0005\u0001\u0001\"\u0011\u0003\u0010!9!1\u0003\u0001\u0005B\tU\u0001b\u0002B\u0015\u0001\u0011\u0005#1\u0006\u0005\b\u0005k\u0001A\u0011\tB\u001c\u0011\u001d\u0011Y\u0004\u0001C!\u0005{AqA!\u0012\u0001\t\u0003\u00129\u0005C\u0004\u0003V\u0001!\tEa\u0016\t\u000f\t}\u0003\u0001\"\u0011\u0003b!9!Q\r\u0001\u0005B\t\u001d\u0004\"\u0003B>\u0001E\u0005I\u0011\u0001B?\u0011\u001d\u0011\u0019\n\u0001C!\u0005+CqAa0\u0001\t\u0003\u0012\t\rC\u0004\u0003F\u0002!\tEa2\t\u000f\tm\u0007\u0001\"\u0011\u0003^\"9!1\u001c\u0001\u0005\u0002\t}\bbBB\u0002\u0001\u0011%1Q\u0001\u0005\u000e\u00073\u0001\u0001\u0013aA\u0001\u0002\u0013%11\u0004(\u0003O\rcWo\u001d;fe2Kgn['fi\u0006$\u0017\r^1NC:\fw-\u001a:XSRD'l[*vaB|'\u000f\u001e\u0006\u0003gQ\nA\u0001\\5oW*\u0011QGN\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003]\nQa[1gW\u0006\u001c\u0001a\u0005\u0002\u0001uA\u00111\bP\u0007\u0002e%\u0011QH\r\u0002\u001b\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/Y'b]\u0006<WM]\u0001\rEJ|7.\u001a:D_:4\u0017n\u001a\t\u0003\u0001\u0006k\u0011\u0001N\u0005\u0003\u0005R\u00121bS1gW\u0006\u001cuN\u001c4jO\u0006I1o\u00195fIVdWM\u001d\t\u0003w\u0015K!A\u0012\u001a\u0003)\rcWo\u001d;fe2Kgn[*dQ\u0016$W\u000f\\3s\u00035iW\r^1eCR\f7)Y2iKB\u0011\u0011\nT\u0007\u0002\u0015*\u00111\nN\u0001\t[\u0016$\u0018\rZ1uC&\u0011QJ\u0013\u0002\u00105.lU\r^1eCR\f7)Y2iK&\u0011q\tP\u0001\u0019G>tGO]8mY\u0016\u00148\t[1o]\u0016dW*\u00198bO\u0016\u0014\bC\u0001!R\u0013\t\u0011FG\u0001\u0011Ce>\\WM\u001d+p\u0007>tGO]8mY\u0016\u00148\t[1o]\u0016dW*\u00198bO\u0016\u0014\u0018BA(=\u0003=Y\u0017MZ6b\u0007>tGO]8mY\u0016\u0014X#\u0001,\u0011\u0005]SV\"\u0001-\u000b\u0005e3\u0014AC2p]R\u0014x\u000e\u001c7fe&\u00111\f\u0017\u0002\u0010\u0017\u000647.Y\"p]R\u0014x\u000e\u001c7fe\u0006\u00012.\u00194lC\u000e{g\u000e\u001e:pY2,'\u000fI\u0001\tu.\u001cE.[3oiV\tq\f\u0005\u0002aG6\t\u0011M\u0003\u0002cm\u0005\u0011!p[\u0005\u0003I\u0006\u0014QbS1gW\u0006T6n\u00117jK:$\u0018!\u0003>l\u00072LWM\u001c;!\u0003EawnY1m\u0003\u0012l\u0017N\u001c$bGR|'/\u001f\t\u0004Q.lW\"A5\u000b\u0003)\fQa]2bY\u0006L!\u0001\\5\u0003\u0013\u0019+hn\u0019;j_:\u0004\u0004C\u00018y\u001b\u0005y'B\u00019r\u0003\u0015\tG-\\5o\u0015\t\u00118/A\u0004dY&,g\u000e^:\u000b\u0005]\"(BA;w\u0003\u0019\t\u0007/Y2iK*\tq/A\u0002pe\u001eL!!_8\u0003\u000b\u0005#W.\u001b8\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feB\u0011\u0001\t`\u0005\u0003{R\u0012aBU3qY&\u001c\u0017-T1oC\u001e,'/\u0001\u0006tKJ4XM]%oM>\u0004B!!\u0001\u0002\n5\u0011\u00111\u0001\u0006\u0005\u0003\u000b\t9!\u0001\u0006bkRDwN]5{KJT!!N:\n\t\u0005-\u00111\u0001\u0002\u0015\u0003V$\bn\u001c:ju\u0016\u00148+\u001a:wKJLeNZ8\u0002'i\\W*\u001a;bI\u0006$\u0018mQ8mY\u0016\u001cGo\u001c:\u0011\u000b!\f\t\"!\u0006\n\u0007\u0005M\u0011N\u0001\u0004PaRLwN\u001c\t\u0005\u0003/\ti\"\u0004\u0002\u0002\u001a)\u0019\u00111\u0004\u001c\u0002\u000f\r\fG/\u00197pO&!\u0011qDA\r\u0005MQ6*T3uC\u0012\fG/Y\"pY2,7\r^8s\u0003\u0019a\u0014N\\5u}Q1\u0012QEA\u0014\u0003S\tY#!\f\u00020\u0005E\u00121GA\u001b\u0003o\tI\u0004\u0005\u0002<\u0001!)a(\u0004a\u0001\u007f!)1)\u0004a\u0001\t\")q)\u0004a\u0001\u0011\")q*\u0004a\u0001!\")A+\u0004a\u0001-\")Q,\u0004a\u0001?\")a-\u0004a\u0001O\")!0\u0004a\u0001w\")a0\u0004a\u0001\u007f\"9\u0011QB\u0007A\u0002\u0005=\u0011!D1e[&t'l[\"mS\u0016tG/\u0006\u0002\u0002@A\u0019\u0001-!\u0011\n\u0007\u0005\r\u0013MA\u0007BI6LgNW6DY&,g\u000e^\u0001\u000fC\u0012l\u0017N\u001c.l\u00072LWM\u001c;!\u0003aI7\u000fT5oW\u000e{wN\u001d3j]\u0006$xN]#oC\ndW\rZ\u000b\u0003\u0003\u0017\u00022\u0001[A'\u0013\r\ty%\u001b\u0002\b\u0005>|G.Z1o\u0003EI7\u000fT5oW\u000e{wN\u001d3j]\u0006$xN\u001d\u000b\u0005\u0003\u0017\n)\u0006C\u0004\u0002XE\u0001\r!!\u0017\u0002\u00111Lgn\u001b(b[\u0016\u0004B!a\u0017\u0002j9!\u0011QLA3!\r\ty&[\u0007\u0003\u0003CR1!a\u00199\u0003\u0019a$o\\8u}%\u0019\u0011qM5\u0002\rA\u0013X\rZ3g\u0013\u0011\tY'!\u001c\u0003\rM#(/\u001b8h\u0015\r\t9'[\u0001\u0010Y&t7nQ8pe\u0012Lg.\u0019;peR1\u00111OAA\u0003\u0007\u0003R\u0001[A\t\u0003k\u0002B!a\u001e\u0002~5\u0011\u0011\u0011\u0010\u0006\u0004\u0003w\u001a\u0018AB2p[6|g.\u0003\u0003\u0002\u0000\u0005e$\u0001\u0002(pI\u0016Dq!a\u0016\u0013\u0001\u0004\tI\u0006C\u0004\u0002\u0006J\u0001\r!a\"\u0002\u00191L7\u000f^3oKJt\u0015-\\3\u0011\t\u0005%\u0015qR\u0007\u0003\u0003\u0017SA!!$\u0002z\u00059a.\u001a;x_J\\\u0017\u0002BAI\u0003\u0017\u0013A\u0002T5ti\u0016tWM\u001d(b[\u0016\f1\"\\5se>\u0014Hk\u001c9jGR!\u0011qSAR!\u0015A\u0017\u0011CAM!\u0011\tY*a(\u000e\u0005\u0005u%BA&t\u0013\u0011\t\t+!(\u0003\u00175K'O]8s)>\u0004\u0018n\u0019\u0005\b\u0003K\u001b\u0002\u0019AA-\u0003\u0015!x\u000e]5d\u0003\tj\u0017N\u001d:peR{\u0007/[2Ti\u0006$Xm\u001d$s_6lU\r^1eCR\f7\u000b^8sKR!\u00111VA_!!\ti+a-\u0002Z\u0005]VBAAX\u0015\r\t\t,[\u0001\u000bG>dG.Z2uS>t\u0017\u0002BA[\u0003_\u00131!T1q!\rY\u0014\u0011X\u0005\u0004\u0003w\u0013$!F\"mkN$XM\u001d'j].$v\u000e]5d'R\fG/\u001a\u0005\b\u0003\u007f#\u0002\u0019AAa\u0003\u0019!x\u000e]5dgB1\u00111LAb\u00033JA!!2\u0002n\t\u00191+\u001a;\u0002M5L'O]8s)>\u0004\u0018nY*uCR,7O\u0012:p[\u000e{g\u000e\u001e:pY2,'oQ8oi\u0016DH\u000f\u0006\u0003\u0002,\u0006-\u0007bBA`+\u0001\u0007\u0011\u0011\u0019\u000b\u0005\u0003W\u000by\rC\u0004\u0002RZ\u0001\r!a5\u0002\r1Lgn[%e!\u0011\t9(!6\n\t\u0005]\u0017\u0011\u0010\u0002\u0005+VLG\r\u0006\u0002\u0002,\u0006\u0011S.\u001b:s_J$v\u000e]5d'R\fG/Z:Ge>lW*\u001a;bI\u0006$\u0018mQ1dQ\u0016$B!a+\u0002`\"9\u0011\u0011\u001b\rA\u0002\u0005MG\u0003BAV\u0003GDq!a0\u001a\u0001\u0004\t\t-A\u000bhKR\fE\u000e\u001c+pa&\u001c7/\u00138DYV\u001cH/\u001a:\u0015\u0005\u0005\u0005\u0017AG5t)>\u0004\u0018nY)vKV,G-\u00169G_J$U\r\\3uS>tG\u0003BA&\u0003[Dq!!*\u001d\u0001\u0004\tI&A\ff]N,(/Z\"mkN$XM\u001d'j].,\u00050[:ugR!\u00111_A}!\rA\u0017Q_\u0005\u0004\u0003oL'\u0001B+oSRDq!!5\u001e\u0001\u0004\t\u0019.A\tdYV\u001cH/\u001a:MS:\\W\t_5tiN$B!a\u0013\u0002\u0000\"9\u0011\u0011\u001b\u0010A\u0002\u0005M\u0017AE4fi\u000ecWo\u001d;fe2Kgn\u001b#bi\u0006$BA!\u0002\u0003\u000eA)\u0001.!\u0005\u0003\bA\u0019\u0001M!\u0003\n\u0007\t-\u0011MA\bDYV\u001cH/\u001a:MS:\\G)\u0019;b\u0011\u001d\t\tn\ba\u0001\u0003'$BA!\u0002\u0003\u0012!9\u0011q\u000b\u0011A\u0002\u0005e\u0013!G4fi\u000ecWo\u001d;fe2Kgn[\"p]\u001aLw\r\u0015:paN$BAa\u0006\u0003(A!!\u0011\u0004B\u0012\u001b\t\u0011YB\u0003\u0003\u0003\u001e\t}\u0011\u0001B;uS2T!A!\t\u0002\t)\fg/Y\u0005\u0005\u0005K\u0011YB\u0001\u0006Qe>\u0004XM\u001d;jKNDq!!5\"\u0001\u0004\t\u0019.\u0001\u000bhKR\u001cE.^:uKJd\u0015N\\6D_:4\u0017n\u001a\u000b\u0005\u0005[\u0011\u0019\u0004E\u0002<\u0005_I1A!\r3\u0005E\u0019E.^:uKJd\u0015N\\6D_:4\u0017n\u001a\u0005\b\u0003#\u0014\u0003\u0019AAj\u000399W\r\u001e+pa&\u001c7i\u001c8gS\u001e$BAa\u0006\u0003:!9\u0011QU\u0012A\u0002\u0005e\u0013aG1mi\u0016\u00148\t\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4[\u001d>$W\r\u0006\u0004\u0002t\n}\"\u0011\t\u0005\b\u0003#$\u0003\u0019AAj\u0011\u001d\u0011\u0019\u0005\na\u0001\u0005/\tQ\u0001\u001d:paN\f\u0001%\\1zE\u0016\u0014V-\u001a8def\u0004Ho\u00117vgR,'\u000fT5oW\u000e{gNZ5hgR!\u00111\u001fB%\u0011\u001d\u0011Y%\na\u0001\u0005\u001b\n\u0011cY8oM&<GK]1og\u001a|'/\\3s!\u001dA'q\nB\f\u0005'J1A!\u0015j\u0005%1UO\\2uS>t\u0017\u0007E\u0003i\u0003#\u00119\"\u0001\fde\u0016\fG/Z\"mkN$XM\u001d'j].Tfj\u001c3f)\u0019\t\u0019P!\u0017\u0003^!9!1\f\u0014A\u0002\t\u001d\u0011aD2mkN$XM\u001d'j].$\u0015\r^1\t\u000f\t\rc\u00051\u0001\u0003\u0018\u0005\u00192/\u001a;DYV\u001cH/\u001a:MS:\\'LT8eKR!\u00111\u001fB2\u0011\u001d\u0011Yf\na\u0001\u0005\u000f\tQ#\u00197uKJl\u0015N\u001d:peR{\u0007/[2Ti\u0006$X\r\u0006\u0006\u0002t\n%$1\u000eB:\u0005oBq!!*)\u0001\u0004\tI\u0006C\u0004\u0003n!\u0002\rAa\u001c\u0002\u0019M$\u0018\r^3Va\u0012\fG/\u001a:\u0011\u000f!\u0014y%a.\u0003rA)\u0001.!\u0005\u00028\"9!Q\u000f\u0015A\u0002\u0005-\u0013\u0001\u0004<bY&$\u0017\r^3P]2L\b\"\u0003B=QA\u0005\t\u0019AA&\u0003=1\u0017-\u001b7JM:{G/T5se>\u0014\u0018aH1mi\u0016\u0014X*\u001b:s_J$v\u000e]5d'R\fG/\u001a\u0013eK\u001a\fW\u000f\u001c;%iU\u0011!q\u0010\u0016\u0005\u0003\u0017\u0012\ti\u000b\u0002\u0003\u0004B!!Q\u0011BH\u001b\t\u00119I\u0003\u0003\u0003\n\n-\u0015!C;oG\",7m[3e\u0015\r\u0011i)[\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002\u0002BI\u0005\u000f\u0013\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0003I\u0019'/Z1uK\u000ecWo\u001d;fe2Kgn[:\u0015\t\t]%q\u0016\t\u0007\u00053\u0013yJa)\u000e\u0005\tm%\u0002\u0002BO\u00057\t!bY8oGV\u0014(/\u001a8u\u0013\u0011\u0011\tKa'\u0003#\r{W\u000e\u001d7fi\u0006\u0014G.\u001a$viV\u0014X\r\u0005\u0003\u0003&\n-VB\u0001BT\u0015\u0011\u0011I+!\u001f\u0002\u0011I,\u0017/^3tiNLAA!,\u0003(\nQ2I]3bi\u0016\u001cE.^:uKJd\u0015N\\6t%\u0016\u001c\bo\u001c8tK\"9!\u0011\u0017\u0016A\u0002\tM\u0016!H2sK\u0006$Xm\u00117vgR,'\u000fT5oWN\u0014V-];fgR$\u0015\r^1\u0011\t\tU&1X\u0007\u0003\u0005oSAA!/\u0002z\u00059Q.Z:tC\u001e,\u0017\u0002\u0002B_\u0005o\u0013Qd\u0011:fCR,7\t\\;ti\u0016\u0014H*\u001b8lgJ+\u0017/^3ti\u0012\u000bG/Y\u0001\u0017I\u0016dW\r^3DYV\u001cH/\u001a:MS:\\'LT8eKR!\u00111\u001fBb\u0011\u001d\t\tn\u000ba\u0001\u0003'\fqcZ3u\u00032dG*\u001b8lg\u001a\u0013x.\\'fi\u0006$\u0017\r^1\u0015\u0005\t%\u0007C\u0002Bf\u0005+\u00149A\u0004\u0003\u0003N\nEg\u0002BA0\u0005\u001fL\u0011A[\u0005\u0004\u0005'L\u0017a\u00029bG.\fw-Z\u0005\u0005\u0005/\u0014INA\u0002TKFT1Aa5j\u0003=\u0019Ho\u001c9NSJ\u0014xN\u001d+pa&\u001cGC\u0002Bp\u0005c\u0014\u0019\u0010\u0005\u0004\u0002x\t\u0005(Q]\u0005\u0005\u0005G\fIHA\u0006LC\u001a\\\u0017MR;ukJ,\u0007\u0003\u0002Bt\u0005[l!A!;\u000b\t\t-(qD\u0001\u0005Y\u0006tw-\u0003\u0003\u0003p\n%(\u0001\u0002,pS\u0012Dq!!*.\u0001\u0004\tI\u0006C\u0004\u0003v6\u0002\rAa>\u0002\u001b1|w-\u00128e\u001f\u001a47/\u001a;t!\u0019\u0011YM!6\u0003zB\u0019\u0001Na?\n\u0007\tu\u0018N\u0001\u0003M_:<G\u0003\u0002Bp\u0007\u0003Aq!!*/\u0001\u0004\tI&\u0001\nhKR\u001cE.^:uKJd\u0015N\\6J]\u001a|GCBB\u0004\u0007#\u0019)\u0002\u0005\u0003\u0004\n\r5QBAB\u0006\u0015\rY\u0015\u0011D\u0005\u0005\u0007\u001f\u0019YAA\bDYV\u001cH/\u001a:MS:\\\u0017J\u001c4p\u0011\u001d\u0019\u0019b\fa\u0001\u0005\u000f\t1a\u00197e\u0011\u001d\u00199b\fa\u0001\u0005[\taaY8oM&<\u0017aE:va\u0016\u0014H%\\3uC\u0012\fG/Y\"bG\",WCAB\u000f!\r\u00015qD\u0005\u0004\u0007C!$!D'fi\u0006$\u0017\r^1DC\u000eDW\r")
public class ClusterLinkMetadataManagerWithZkSupport
extends ClusterLinkMetadataManager {
    private final KafkaConfig brokerConfig;
    private final KafkaController kafkaController;
    private final KafkaZkClient zkClient;
    private final ReplicaManager replicaManager;
    private final Option<ZKMetadataCollector> zkMetadataCollector;
    private final AdminZkClient adminZkClient;

    private /* synthetic */ MetadataCache super$metadataCache() {
        return super.metadataCache();
    }

    private KafkaController kafkaController() {
        return this.kafkaController;
    }

    private KafkaZkClient zkClient() {
        return this.zkClient;
    }

    private AdminZkClient adminZkClient() {
        return this.adminZkClient;
    }

    @Override
    public boolean isLinkCoordinatorEnabled() {
        return ((ZkMetadataCache)super.metadataCache()).linkCoordinatorEnabled();
    }

    @Override
    public boolean isLinkCoordinator(String linkName) {
        if (this.isLinkCoordinatorEnabled()) {
            return super.isLinkCoordinator(linkName);
        }
        return this.kafkaController().isActive();
    }

    @Override
    public Option<Node> linkCoordinator(String linkName, ListenerName listenerName) {
        if (!Predef$.MODULE$.Boolean2boolean(this.brokerConfig.clusterLinkMetadataTopicEnable())) {
            int controllerId = ((ZkMetadataCache)super.metadataCache()).getZkControllerId(false);
            if (controllerId == Node.noNode().id()) {
                return None$.MODULE$;
            }
            return ((ZkMetadataCache)super.metadataCache()).getAliveBrokerNode(controllerId, listenerName);
        }
        return super.linkCoordinator(linkName, listenerName);
    }

    @Override
    public Option<MirrorTopic> mirrorTopic(String topic) {
        throw new IllegalStateException("TopicImage not supported with Zookeeper");
    }

    @Override
    public Map<String, ClusterLinkTopicState> mirrorTopicStatesFromMetadataStore(Set<String> topics) {
        return this.zkClient().getClusterLinkForTopics(topics);
    }

    @Override
    public Map<String, ClusterLinkTopicState> mirrorTopicStatesFromControllerContext(Set<String> topics) {
        if (this.kafkaController().isActive()) {
            return ((IterableOnceOps)this.kafkaController().controllerContext().linkedTopics().filter((Function1 & Serializable)keyVal -> BoxesRunTime.boxToBoolean((boolean)topics.contains(keyVal._1())))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        }
        return (Map)Map$.MODULE$.empty();
    }

    @Override
    public Map<String, ClusterLinkTopicState> mirrorTopicStatesFromControllerContext(Uuid linkId) {
        if (this.kafkaController().isActive()) {
            return ((IterableOnceOps)this.kafkaController().controllerContext().linkedTopics().filter((Function1 & Serializable)cl -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataManagerWithZkSupport.$anonfun$mirrorTopicStatesFromControllerContext$2(linkId, cl)))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
        }
        return (Map)Map$.MODULE$.empty();
    }

    @Override
    public Map<String, ClusterLinkTopicState> mirrorTopicStatesFromControllerContext() {
        if (this.kafkaController().isActive()) {
            return this.kafkaController().controllerContext().linkedTopics();
        }
        return (Map)Map$.MODULE$.empty();
    }

    @Override
    public Map<String, ClusterLinkTopicState> mirrorTopicStatesFromMetadataCache(Uuid linkId) {
        if (!this.isLinkCoordinatorEnabled()) {
            return this.mirrorTopicStatesFromControllerContext(linkId);
        }
        return (Map)((ZkMetadataCache)super.metadataCache()).mirrorTopicStates().filter((Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataManagerWithZkSupport.$anonfun$mirrorTopicStatesFromMetadataCache$1(linkId, x$1)));
    }

    @Override
    public Map<String, ClusterLinkTopicState> mirrorTopicStatesFromMetadataCache(Set<String> topics) {
        if (!this.isLinkCoordinatorEnabled()) {
            return this.mirrorTopicStatesFromControllerContext(topics);
        }
        return ((IterableOnceOps)((IterableOps)topics.map((Function1 & Serializable)topic -> new Tuple2(topic, ((ZkMetadataCache)this.super$metadataCache()).mirrorTopicStates().get(topic).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl())))).filterNot((Function1 & Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataManagerWithZkSupport.$anonfun$mirrorTopicStatesFromMetadataCache$3(x$2)))).toMap((.less.colon.less)$less$colon$less$.MODULE$.refl());
    }

    @Override
    public Map<String, ClusterLinkTopicState> mirrorTopicStatesFromMetadataCache() {
        if (!this.isLinkCoordinatorEnabled()) {
            return this.mirrorTopicStatesFromControllerContext();
        }
        return ((ZkMetadataCache)super.metadataCache()).mirrorTopicStates();
    }

    @Override
    public Set<String> getAllTopicsInCluster() {
        return this.zkClient().getAllTopicsInCluster(this.zkClient().getAllTopicsInCluster$default$1());
    }

    @Override
    public boolean isTopicQueuedUpForDeletion(String topic) {
        return this.kafkaController().controllerContext().isTopicQueuedUpForDeletion(topic);
    }

    @Override
    public void ensureClusterLinkExists(Uuid linkId) {
        this.adminZkClient().ensureClusterLinkExists(linkId);
    }

    @Override
    public boolean clusterLinkExists(Uuid linkId) {
        return this.zkClient().clusterLinkExists(linkId);
    }

    @Override
    public Option<ClusterLinkData> getClusterLinkData(Uuid linkId) {
        return this.adminZkClient().getClusterLink(linkId);
    }

    @Override
    public Option<ClusterLinkData> getClusterLinkData(String linkName) {
        return this.adminZkClient().getAllClusterLinks().find((Function1 & Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataManagerWithZkSupport.$anonfun$getClusterLinkData$1(linkName, x$3)));
    }

    @Override
    public Properties getClusterLinkConfigProps(Uuid linkId) {
        return this.adminZkClient().fetchClusterLinkConfig(linkId);
    }

    @Override
    public ClusterLinkConfig getClusterLinkConfig(Uuid linkId) {
        return ClusterLinkConfig$.MODULE$.create(this.getClusterLinkConfigProps(linkId), false);
    }

    @Override
    public Properties getTopicConfig(String topic) {
        Option<LogConfig> logConfigOpt = this.replicaManager.getLogConfig(new TopicPartition(topic, 0));
        if (logConfigOpt.isDefined()) {
            LogConfig logConfig = (LogConfig)logConfigOpt.get();
            scala.collection.mutable.Map props = (scala.collection.mutable.Map)CollectionConverters$.MODULE$.MapHasAsScala(logConfig.originals()).asScala().filter((Function1 & Serializable)x0$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkMetadataManagerWithZkSupport.$anonfun$getTopicConfig$1(logConfig, x0$1)));
            Properties properties = new Properties();
            properties.putAll((java.util.Map<?, ?>)CollectionConverters$.MODULE$.MutableMapHasAsJava(props).asJava());
            return properties;
        }
        return this.adminZkClient().fetchEntityConfig(ConfigType$.MODULE$.Topic(), topic);
    }

    @Override
    public void alterClusterLinkConfigZNode(Uuid linkId, Properties props) {
        this.adminZkClient().changeClusterLinkConfig(linkId, props);
        this.zkMetadataCollector.foreach((Function1 & Serializable)metadataCollector -> {
            ClusterLinkMetadataManagerWithZkSupport.$anonfun$alterClusterLinkConfigZNode$1(this, linkId, props, metadataCollector);
            return BoxedUnit.UNIT;
        });
    }

    @Override
    public void maybeReencryptClusterLinkConfigs(Function1<Properties, Option<Properties>> configTransformer) {
        this.adminZkClient().maybeReencryptClusterLinkConfigs(configTransformer);
    }

    @Override
    public void createClusterLinkZNode(ClusterLinkData clusterLinkData, Properties props) {
        this.adminZkClient().createClusterLink(clusterLinkData, props);
        this.zkMetadataCollector.foreach((Function1 & Serializable)metadataCollector -> {
            ClusterLinkMetadataManagerWithZkSupport.$anonfun$createClusterLinkZNode$1(this, props, clusterLinkData, metadataCollector);
            return BoxedUnit.UNIT;
        });
    }

    @Override
    public void setClusterLinkZNode(ClusterLinkData clusterLinkData) {
        this.adminZkClient().setClusterLink(clusterLinkData);
    }

    @Override
    public void alterMirrorTopicState(String topic, Function1<ClusterLinkTopicState, Option<ClusterLinkTopicState>> stateUpdater, boolean validateOnly, boolean failIfNotMirror) {
        this.zkClient().alterMirrorTopicState(topic, stateUpdater, validateOnly, failIfNotMirror, this.zkClient().alterMirrorTopicState$default$5()).foreach((Function1 & Serializable)mirror -> {
            ClusterLinkMetadataManagerWithZkSupport.$anonfun$alterMirrorTopicState$1(this, topic, mirror);
            return BoxedUnit.UNIT;
        });
    }

    @Override
    public boolean alterMirrorTopicState$default$4() {
        return true;
    }

    @Override
    public CompletableFuture<CreateClusterLinksResponse> createClusterLinks(CreateClusterLinksRequestData createClusterLinksRequestData) {
        throw new IllegalStateException("Not supported in Zk mode");
    }

    @Override
    public void deleteClusterLinkZNode(Uuid linkId) {
        String linkName = this.getClusterLinkData(linkId).isDefined() ? ((ClusterLinkData)this.getClusterLinkData(linkId).get()).linkName() : "";
        this.adminZkClient().deleteClusterLink(linkId);
        this.zkMetadataCollector.foreach((Function1 & Serializable)metadataCollector -> {
            metadataCollector.onClusterLinkDelete(linkName);
            return BoxedUnit.UNIT;
        });
    }

    @Override
    public scala.collection.immutable.Seq<ClusterLinkData> getAllLinksFromMetadata() {
        return this.adminZkClient().getAllClusterLinks().toSeq();
    }

    @Override
    public KafkaFuture<Void> stopMirrorTopic(String topic, scala.collection.immutable.Seq<Object> logEndOffsets) {
        if (this.isLinkCoordinatorEnabled()) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(73).append("Sending a request to set the mirror topic ").append(topic).append(" state to STOPPED to controller").toString());
            return this.stopMirrorTopic(topic);
        }
        this.alterMirrorTopicState(topic, (Function1<ClusterLinkTopicState, Option<ClusterLinkTopicState>>)(Function1 & Serializable)currentState -> ClusterLinkMetadataManagerWithZkSupport.toPendingStopped$1(currentState, logEndOffsets, topic), false, this.alterMirrorTopicState$default$4());
        return KafkaFuture.completedFuture(null);
    }

    public KafkaFuture<Void> stopMirrorTopic(String topic) {
        AlterMirrorsRequest.Builder builder = new AlterMirrorsRequest.Builder(Collections.singletonList(new AlterMirrorsRequestData.MirrorOperation().setTopic(topic).setOperationCode(AlterMirrorOp.STOP.id())), false, this.requestTimeoutMs());
        KafkaFutureImpl result = new KafkaFutureImpl();
        ClusterLinkControllerRequestCompletionHandler requestHandler = new ClusterLinkControllerRequestCompletionHandler(null, topic, result){
            private final String topic$3;
            private final KafkaFutureImpl result$1;

            public void handleResponse(AbstractResponse response) {
                if (this.logger().underlying().isDebugEnabled()) {
                    this.logger().underlying().debug(this.msgWithLogIdent("Received response for alterMirrorsRequest"));
                }
                AlterMirrorsResponse message = (AlterMirrorsResponse)response;
                LinkedHashMap<String, KafkaFutureImpl> resultMap = new LinkedHashMap<String, KafkaFutureImpl>();
                resultMap.put(this.topic$3, this.result$1);
                message.complete(resultMap);
            }

            public void onFailure(Throwable ex) {
                if (this.logger().underlying().isDebugEnabled()) {
                    this.logger().underlying().debug(this.msgWithLogIdent("alterMirrors request failed with exception"), ex);
                }
                this.result$1.completeExceptionally(ex);
            }

            public static final /* synthetic */ String $anonfun$handleResponse$1() {
                return "Received response for alterMirrorsRequest";
            }

            public static final /* synthetic */ String $anonfun$onFailure$1() {
                return "alterMirrors request failed with exception";
            }

            public static final /* synthetic */ Throwable $anonfun$onFailure$2(Throwable ex$1) {
                return ex$1;
            }
            {
                this.topic$3 = topic$3;
                this.result$1 = result$1;
            }
        };
        super.controllerChannelManager().sendRequest((AbstractRequest.Builder<? extends AbstractRequest>)builder, requestHandler);
        return result;
    }

    private ClusterLinkInfo getClusterLinkInfo(ClusterLinkData cld, ClusterLinkConfig config) {
        return new ClusterLinkInfo(cld.linkName(), cld.linkId(), config.linkMode(), config.connectionMode(), (String)cld.clusterId().getOrElse((Function0 & Serializable)() -> ""), (String)this.zkClient().getClusterId().getOrElse((Function0 & Serializable)() -> ""));
    }

    public static final /* synthetic */ boolean $anonfun$mirrorTopicStatesFromControllerContext$2(Uuid linkId$1, Tuple2 cl) {
        return ((ClusterLinkTopicState)cl._2()).linkId().equals((Object)linkId$1);
    }

    public static final /* synthetic */ boolean $anonfun$mirrorTopicStatesFromMetadataCache$1(Uuid linkId$2, Tuple2 x$1) {
        Uuid uuid = ((ClusterLinkTopicState)x$1._2()).linkId();
        return !(uuid != null ? !uuid.equals(linkId$2) : linkId$2 != null);
    }

    public static final /* synthetic */ boolean $anonfun$mirrorTopicStatesFromMetadataCache$3(Tuple2 x$2) {
        return x$2._2() == null;
    }

    public static final /* synthetic */ boolean $anonfun$getClusterLinkData$1(String linkName$1, ClusterLinkData x$3) {
        String string = x$3.linkName();
        return !(string != null ? !string.equals(linkName$1) : linkName$1 != null);
    }

    public static final /* synthetic */ boolean $anonfun$getTopicConfig$1(LogConfig logConfig$1, Tuple2 x0$1) {
        if (x0$1 != null) {
            String k = (String)x0$1._1();
            return logConfig$1.overriddenConfigs.contains(k);
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ void $anonfun$alterClusterLinkConfigZNode$1(ClusterLinkMetadataManagerWithZkSupport $this, Uuid linkId$3, Properties props$1, ZKMetadataCollector metadataCollector) {
        String linkName = $this.getClusterLinkData(linkId$3).isDefined() ? ((ClusterLinkData)$this.getClusterLinkData(linkId$3).get()).linkName() : "";
        ClusterLinkConfig config = ClusterLinkConfig$.MODULE$.create(props$1, false);
        metadataCollector.onClusterLinkConfigChange(linkName, config);
    }

    public static final /* synthetic */ void $anonfun$createClusterLinkZNode$1(ClusterLinkMetadataManagerWithZkSupport $this, Properties props$2, ClusterLinkData clusterLinkData$1, ZKMetadataCollector metadataCollector) {
        ClusterLinkConfig config = ClusterLinkConfig$.MODULE$.create(props$2, false);
        ClusterLinkInfo clusterLinkInfo = $this.getClusterLinkInfo(clusterLinkData$1, config);
        metadataCollector.onClusterLinkCreate(clusterLinkInfo);
    }

    public static final /* synthetic */ void $anonfun$alterMirrorTopicState$1(ClusterLinkMetadataManagerWithZkSupport $this, String topic$1, ClusterLinkTopicState mirror) {
        if (mirror != null) {
            $this.zkMetadataCollector.foreach((Function1 & Serializable)x$4 -> {
                x$4.onMirrorTopicStateChange(topic$1, mirror.state().name());
                return BoxedUnit.UNIT;
            });
            return;
        }
        $this.zkMetadataCollector.foreach((Function1 & Serializable)x$5 -> {
            x$5.onMirrorTopicStateChange(topic$1, null);
            return BoxedUnit.UNIT;
        });
    }

    private static final Option toPendingStopped$1(ClusterLinkTopicState currentState, scala.collection.immutable.Seq logEndOffsets$1, String topic$2) {
        if (currentState instanceof ClusterLinkTopicState.PendingStoppedMirror) {
            ClusterLinkTopicState.PendingStoppedMirror pendingStoppedMirror = (ClusterLinkTopicState.PendingStoppedMirror)currentState;
            String linkName = pendingStoppedMirror.linkName();
            Uuid linkId = pendingStoppedMirror.linkId();
            Uuid sourceTopicId = pendingStoppedMirror.sourceTopicId();
            return new Some((Object)new ClusterLinkTopicState.StoppedMirror(linkName, linkId, sourceTopicId, (Seq<Object>)logEndOffsets$1, Time.SYSTEM.milliseconds()));
        }
        if (currentState instanceof ClusterLinkTopicState.Mirror ? true : (currentState instanceof ClusterLinkTopicState.PausedMirror ? true : (currentState instanceof ClusterLinkTopicState.StoppedMirror ? true : currentState instanceof ClusterLinkTopicState.FailedMirror))) {
            throw new InvalidRequestException(new StringBuilder(57).append("Topic ").append(topic$2).append(" is not promoted/failed over. Cannot stop the topic").toString());
        }
        throw new IllegalStateException(new StringBuilder(37).append("Unknown mirror state for topic ").append(topic$2).append(" found").toString());
    }

    public ClusterLinkMetadataManagerWithZkSupport(KafkaConfig brokerConfig, ClusterLinkScheduler scheduler, ZkMetadataCache metadataCache, BrokerToControllerChannelManager controllerChannelManager, KafkaController kafkaController, KafkaZkClient zkClient, Function0<Admin> localAdminFactory, ReplicaManager replicaManager, AuthorizerServerInfo serverInfo, Option<ZKMetadataCollector> zkMetadataCollector) {
        this.brokerConfig = brokerConfig;
        this.kafkaController = kafkaController;
        this.zkClient = zkClient;
        this.replicaManager = replicaManager;
        this.zkMetadataCollector = zkMetadataCollector;
        super(brokerConfig, scheduler, metadataCache, controllerChannelManager, localAdminFactory, serverInfo);
        this.adminZkClient = new AdminZkClient(zkClient);
    }
}

