/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import io.confluent.kafka.link.ClusterLinkConfig;
import java.io.File;
import java.io.Serializable;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkManager;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetadataThread;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkOutboundConnectionManager;
import kafka.server.link.ClusterLinkOutboundConnectionManagerTest$;
import kafka.server.link.ConnectionMode;
import kafka.server.link.RemoteNetworkClient;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.SourceReverseConnectionManager;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import scala.;
import scala.$less$colon$less$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.Set;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichLong$;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005\rUb\u0001\u0002)R\u0001aCQa\u0018\u0001\u0005\u0002\u0001Dqa\u0019\u0001C\u0002\u0013%A\r\u0003\u0004j\u0001\u0001\u0006I!\u001a\u0005\bU\u0002\u0011\r\u0011\"\u0003l\u0011\u0019A\b\u0001)A\u0005Y\"9\u0011\u0010\u0001b\u0001\n\u0013Q\bbBA\u0004\u0001\u0001\u0006Ia\u001f\u0005\n\u0003\u0013\u0001!\u0019!C\u0005\u0003\u0017A\u0001\"!\u0007\u0001A\u0003%\u0011Q\u0002\u0005\t\u00037\u0001!\u0019!C\u0005u\"9\u0011Q\u0004\u0001!\u0002\u0013Y\b\u0002CA\u0010\u0001\t\u0007I\u0011\u0002>\t\u000f\u0005\u0005\u0002\u0001)A\u0005w\"I\u00111\u0005\u0001C\u0002\u0013%\u0011Q\u0005\u0005\t\u0003g\u0001\u0001\u0015!\u0003\u0002(!I\u0011Q\u0007\u0001C\u0002\u0013%\u0011q\u0007\u0005\t\u0003\u000b\u0002\u0001\u0015!\u0003\u0002:!I\u0011q\t\u0001C\u0002\u0013%\u0011\u0011\n\u0005\t\u0003#\u0002\u0001\u0015!\u0003\u0002L!I\u00111\u000b\u0001C\u0002\u0013%\u0011Q\u000b\u0005\t\u0003G\u0002\u0001\u0015!\u0003\u0002X!I\u0011Q\r\u0001C\u0002\u0013%\u0011q\r\u0005\t\u0003_\u0002\u0001\u0015!\u0003\u0002j!I\u0011\u0011\u000f\u0001C\u0002\u0013%\u00111\u000f\u0005\t\u0003\u0003\u0003\u0001\u0015!\u0003\u0002v!I\u00111\u0011\u0001C\u0002\u0013%\u0011Q\u0011\u0005\t\u0003#\u0003\u0001\u0015!\u0003\u0002\b\"I\u00111\u0013\u0001C\u0002\u0013%\u0011Q\u0013\u0005\t\u0003C\u0003\u0001\u0015!\u0003\u0002\u0018\"I\u00111\u0015\u0001C\u0002\u0013%\u0011Q\u0015\u0005\t\u0003o\u0003\u0001\u0015!\u0003\u0002(\"I\u0011\u0011\u0018\u0001C\u0002\u0013%\u00111\u0018\u0005\t\u0003'\u0004\u0001\u0015!\u0003\u0002>\"Y\u0011Q\u001b\u0001A\u0002\u0003\u0007I\u0011BAl\u0011-\ty\u000e\u0001a\u0001\u0002\u0004%I!!9\t\u0017\u00055\b\u00011A\u0001B\u0003&\u0011\u0011\u001c\u0005\f\u0003_\u0004\u0001\u0019!a\u0001\n\u0013\t\t\u0010C\u0006\u0002z\u0002\u0001\r\u00111A\u0005\n\u0005m\bbCA\u0000\u0001\u0001\u0007\t\u0011)Q\u0005\u0003gD1B!\u0001\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003\u0004!Y!1\u0002\u0001A\u0002\u0003\u0007I\u0011\u0002B\u0007\u0011-\u0011\t\u0002\u0001a\u0001\u0002\u0003\u0006KA!\u0002\t\u0013\tM\u0001A1A\u0005\n\tU\u0001\u0002\u0003B\u0012\u0001\u0001\u0006IAa\u0006\t\u0017\t\u0015\u0002\u00011AA\u0002\u0013%!q\u0005\u0005\f\u0005k\u0001\u0001\u0019!a\u0001\n\u0013\u00119\u0004C\u0006\u0003<\u0001\u0001\r\u0011!Q!\n\t%\u0002b\u0003B\u001f\u0001\u0001\u0007\t\u0019!C\u0005\u0005\u007fA1Ba\u0012\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0003J!Y!Q\n\u0001A\u0002\u0003\u0005\u000b\u0015\u0002B!\u0011%\u0011y\u0005\u0001a\u0001\n\u0013\u0011\t\u0006C\u0005\u0003`\u0001\u0001\r\u0011\"\u0003\u0003b!A!Q\r\u0001!B\u0013\u0011\u0019\u0006C\u0005\u0003h\u0001\u0001\r\u0011\"\u0003\u0003j!I!\u0011\u000f\u0001A\u0002\u0013%!1\u000f\u0005\t\u0005o\u0002\u0001\u0015)\u0003\u0003l!9!\u0011\u0010\u0001\u0005\u0002\tm\u0004b\u0002BJ\u0001\u0011\u0005!1\u0010\u0005\b\u0005;\u0003A\u0011\u0001B>\u0011\u001d\u0011\t\u000b\u0001C\u0001\u0005wBqA!*\u0001\t\u0003\u0011Y\bC\u0004\u0003*\u0002!\tAa\u001f\t\u000f\t5\u0006\u0001\"\u0001\u0003|!9!\u0011\u0017\u0001\u0005\u0002\tm\u0004b\u0002B[\u0001\u0011\u0005!1\u0010\u0005\b\u0005s\u0003A\u0011\u0001B>\u0011\u001d\u0011i\f\u0001C\u0005\u0005\u007fCqA!1\u0001\t\u0013\u0011\u0019\rC\u0005\u0003R\u0002\t\n\u0011\"\u0003\u0003T\"9!\u0011\u001e\u0001\u0005\n\t-\bb\u0002Bz\u0001\u0011%!Q\u001f\u0005\n\u0007\u000b\u0001\u0011\u0013!C\u0005\u0007\u000fAqaa\u0003\u0001\t\u0013\u0019i\u0001C\u0004\u0004\u0016\u0001!Iaa\u0006\b\u000f\re\u0011\u000b#\u0001\u0004\u001c\u00191\u0001+\u0015E\u0001\u0007;Aaa\u0018'\u0005\u0002\r}\u0001bBB\u0011\u0019\u0012\u0005!1\u0010\u0005\b\u0007WaE\u0011\u0001B>\u0005!\u001aE.^:uKJd\u0015N\\6PkR\u0014w.\u001e8e\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014H+Z:u\u0015\t\u00116+\u0001\u0003mS:\\'B\u0001+V\u0003\u0019\u0019XM\u001d<fe*\ta+A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001I\u0006C\u0001.^\u001b\u0005Y&\"\u0001/\u0002\u000bM\u001c\u0017\r\\1\n\u0005y[&AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002CB\u0011!\rA\u0007\u0002#\u0006a!M]8lKJ\u001cuN\u001c4jOV\tQ\r\u0005\u0002gO6\t1+\u0003\u0002i'\nY1*\u00194lC\u000e{gNZ5h\u00035\u0011'o\\6fe\u000e{gNZ5hA\u0005Q1/\u001a:wKJLeNZ8\u0016\u00031\u0004\"!\u001c<\u000e\u00039T!a\u001c9\u0002\u0015\u0005,H\u000f[8sSj,'O\u0003\u0002Uc*\u0011aK\u001d\u0006\u0003gR\fa!\u00199bG\",'\"A;\u0002\u0007=\u0014x-\u0003\u0002x]\n!\u0012)\u001e;i_JL'0\u001a:TKJ4XM]%oM>\f1b]3sm\u0016\u0014\u0018J\u001c4pA\u0005AA.\u001b8l\u001d\u0006lW-F\u0001|!\ra\u00181A\u0007\u0002{*\u0011ap`\u0001\u0005Y\u0006twM\u0003\u0002\u0002\u0002\u0005!!.\u0019<b\u0013\r\t)! \u0002\u0007'R\u0014\u0018N\\4\u0002\u00131Lgn\u001b(b[\u0016\u0004\u0013A\u00027j].LE-\u0006\u0002\u0002\u000eA!\u0011qBA\u000b\u001b\t\t\tBC\u0002\u0002\u0014E\faaY8n[>t\u0017\u0002BA\f\u0003#\u0011A!V;jI\u00069A.\u001b8l\u0013\u0012\u0004\u0013aD:pkJ\u001cWm\u00117vgR,'/\u00133\u0002!M|WO]2f\u00072,8\u000f^3s\u0013\u0012\u0004\u0013!\u00043fgR\u001cE.^:uKJLE-\u0001\beKN$8\t\\;ti\u0016\u0014\u0018\n\u001a\u0011\u0002\u00111Lgn\u001b#bi\u0006,\"!a\n\u0011\t\u0005%\u0012qF\u0007\u0003\u0003WQ1!!\fV\u0003\tQ8.\u0003\u0003\u00022\u0005-\"aD\"mkN$XM\u001d'j].$\u0015\r^1\u0002\u00131Lgn\u001b#bi\u0006\u0004\u0013!\u00037j].\u0004&o\u001c9t+\t\tI\u0004\u0005\u0003\u0002<\u0005\u0005SBAA\u001f\u0015\r\tyd`\u0001\u0005kRLG.\u0003\u0003\u0002D\u0005u\"A\u0003)s_B,'\u000f^5fg\u0006QA.\u001b8l!J|\u0007o\u001d\u0011\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ,\"!a\u0013\u0011\u0007\t\fi%C\u0002\u0002PE\u0013!d\u00117vgR,'\u000fT5oW6+G/\u00193bi\u0006l\u0015M\\1hKJ\f\u0001#\\3uC\u0012\fG/Y'b]\u0006<WM\u001d\u0011\u0002\u0019M|7m[3u'\u0016\u0014h/\u001a:\u0016\u0005\u0005]\u0003\u0003BA-\u0003?j!!a\u0017\u000b\u0007\u0005uS+A\u0004oKR<xN]6\n\t\u0005\u0005\u00141\f\u0002\r'>\u001c7.\u001a;TKJ4XM]\u0001\u000eg>\u001c7.\u001a;TKJ4XM\u001d\u0011\u0002\u00171Lgn['b]\u0006<WM]\u000b\u0003\u0003S\u00022AYA6\u0013\r\ti'\u0015\u0002\u0013\u00072,8\u000f^3s\u0019&t7.T1oC\u001e,'/\u0001\u0007mS:\\W*\u00198bO\u0016\u0014\b%\u0001\u0003uS6,WCAA;!\u0011\t9(! \u000e\u0005\u0005e$bAA>+\u0006)Q\u000f^5mg&!\u0011qPA=\u0005!iunY6US6,\u0017!\u0002;j[\u0016\u0004\u0013aB7fiJL7m]\u000b\u0003\u0003\u000f\u0003B!!#\u0002\u000e6\u0011\u00111\u0012\u0006\u0005\u0003\u0007\u000b\t\"\u0003\u0003\u0002\u0010\u0006-%aB'fiJL7m]\u0001\t[\u0016$(/[2tA\u000591\r[1o]\u0016dWCAAL!\u0011\tI*!(\u000e\u0005\u0005m%\u0002BA/\u0003#IA!a(\u0002\u001c\na1*\u00194lC\u000eC\u0017M\u001c8fY\u0006A1\r[1o]\u0016d\u0007%\u0001\tnKR\fG-\u0019;b%\u0016\fX/Z:ugV\u0011\u0011q\u0015\t\u0005\u0003S\u000b\u0019,\u0004\u0002\u0002,*!\u0011QVAX\u0003\u0019\tGo\\7jG*!\u0011\u0011WA\u001f\u0003)\u0019wN\\2veJ,g\u000e^\u0005\u0005\u0003k\u000bYKA\u0007Bi>l\u0017nY%oi\u0016<WM]\u0001\u0012[\u0016$\u0018\rZ1uCJ+\u0017/^3tiN\u0004\u0013AD2m_N,Gm\u00115b]:,Gn]\u000b\u0003\u0003{\u0003b!a0\u0002J\u00065WBAAa\u0015\u0011\t\u0019-!2\u0002\u000f5,H/\u00192mK*\u0019\u0011qY.\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002L\u0006\u0005'aA*fiB!\u0011\u0011TAh\u0013\u0011\t\t.a'\u0003\u001dI+g/\u001a:tK\u000eC\u0017M\u001c8fY\u0006y1\r\\8tK\u0012\u001c\u0005.\u00198oK2\u001c\b%\u0001\u0006mS:\\7i\u001c8gS\u001e,\"!!7\u0011\u0007\t\fY.C\u0002\u0002^F\u0013\u0011c\u00117vgR,'\u000fT5oW\u000e{gNZ5h\u00039a\u0017N\\6D_:4\u0017nZ0%KF$B!a9\u0002jB\u0019!,!:\n\u0007\u0005\u001d8L\u0001\u0003V]&$\b\"CAvG\u0005\u0005\t\u0019AAm\u0003\rAH%M\u0001\fY&t7nQ8oM&<\u0007%A\u0006mS:\\W*\u001a;sS\u000e\u001cXCAAz!\r\u0011\u0017Q_\u0005\u0004\u0003o\f&AE\"mkN$XM\u001d'j].lU\r\u001e:jGN\fq\u0002\\5oW6+GO]5dg~#S-\u001d\u000b\u0005\u0003G\fi\u0010C\u0005\u0002l\u001a\n\t\u00111\u0001\u0002t\u0006aA.\u001b8l\u001b\u0016$(/[2tA\u0005Y1m\u001c8o\u001b\u0006t\u0017mZ3s+\t\u0011)\u0001E\u0002c\u0005\u000fI1A!\u0003R\u0005\u0011\u001aE.^:uKJd\u0015N\\6PkR\u0014w.\u001e8e\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014\u0018aD2p]:l\u0015M\\1hKJ|F%Z9\u0015\t\u0005\r(q\u0002\u0005\n\u0003WL\u0013\u0011!a\u0001\u0005\u000b\tAbY8o]6\u000bg.Y4fe\u0002\nq\u0002\\8dC2lunY6DY&,g\u000e^\u000b\u0003\u0005/\u0001BA!\u0007\u0003 5\u0011!1\u0004\u0006\u0004\u0005;\t\u0018aB2mS\u0016tGo]\u0005\u0005\u0005C\u0011YB\u0001\u0006N_\u000e\\7\t\\5f]R\f\u0001\u0003\\8dC2lunY6DY&,g\u000e\u001e\u0011\u0002\u00151|7-\u00197BI6Lg.\u0006\u0002\u0003*A!!1\u0006B\u0019\u001b\t\u0011iC\u0003\u0003\u00030\tm\u0011!B1e[&t\u0017\u0002\u0002B\u001a\u0005[\u0011abQ8oM2,XM\u001c;BI6Lg.\u0001\bm_\u000e\fG.\u00113nS:|F%Z9\u0015\t\u0005\r(\u0011\b\u0005\n\u0003Wt\u0013\u0011!a\u0001\u0005S\t1\u0002\\8dC2\fE-\\5oA\u0005Y!/Z7pi\u0016\fE-\\5o+\t\u0011\t\u0005E\u0002c\u0005\u0007J1A!\u0012R\u0005M\u0011V-\\8uK:+Go^8sW\u000ec\u0017.\u001a8u\u0003=\u0011X-\\8uK\u0006#W.\u001b8`I\u0015\fH\u0003BAr\u0005\u0017B\u0011\"a;2\u0003\u0003\u0005\rA!\u0011\u0002\u0019I,Wn\u001c;f\u0003\u0012l\u0017N\u001c\u0011\u0002-I,Wn\u001c;f\u0007>tGO]8mY\u0016\u0014hj\u001c3f\u0013\u0012,\"Aa\u0015\u0011\u000bi\u0013)F!\u0017\n\u0007\t]3L\u0001\u0004PaRLwN\u001c\t\u00045\nm\u0013b\u0001B/7\n\u0019\u0011J\u001c;\u00025I,Wn\u001c;f\u0007>tGO]8mY\u0016\u0014hj\u001c3f\u0013\u0012|F%Z9\u0015\t\u0005\r(1\r\u0005\n\u0003W$\u0014\u0011!a\u0001\u0005'\nqC]3n_R,7i\u001c8ue>dG.\u001a:O_\u0012,\u0017\n\u001a\u0011\u0002#%\u001cHj\\2bY\u000e{g\u000e\u001e:pY2,'/\u0006\u0002\u0003lA\u0019!L!\u001c\n\u0007\t=4LA\u0004C_>dW-\u00198\u0002+%\u001cHj\\2bY\u000e{g\u000e\u001e:pY2,'o\u0018\u0013fcR!\u00111\u001dB;\u0011%\tYoNA\u0001\u0002\u0004\u0011Y'\u0001\njg2{7-\u00197D_:$(o\u001c7mKJ\u0004\u0013\u0001\u0003;fCJ$un\u001e8\u0015\u0005\u0005\r\bfA\u001d\u0003\u0000A!!\u0011\u0011BH\u001b\t\u0011\u0019I\u0003\u0003\u0003\u0006\n\u001d\u0015aA1qS*!!\u0011\u0012BF\u0003\u001dQW\u000f]5uKJT1A!$u\u0003\u0015QWO\\5u\u0013\u0011\u0011\tJa!\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017!\u0007;fgR\u001cuN\u001c8fGRLwN\\'pI\u0016LeNY8v]\u0012D3A\u000fBL!\u0011\u0011\tI!'\n\t\tm%1\u0011\u0002\u0005)\u0016\u001cH/\u0001\u000euKN$8i\u001c8oK\u000e$\u0018n\u001c8N_\u0012,w*\u001e;c_VtG\rK\u0002<\u0005/\u000b\u0001\u0004^3tiB+'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8oQ\ra$qS\u00011i\u0016\u001cH\u000fU3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>t'+Z7pi\u0016\u001cuN\u001c;s_2dWM\u001d(pi.swn\u001e8)\u0007u\u00129*\u0001\fuKN$hj\u001c;M_\u000e\fGnQ8oiJ|G\u000e\\3sQ\rq$qS\u0001\ri\u0016\u001cH/T3uC\u0012\fG/\u0019\u0015\u0004\u007f\t]\u0015a\u0004;fgR\u0014VmY8oM&<WO]3)\u0007\u0001\u00139*\u0001\u0012uKN$(+Z2p]\u001aLw-\u001e:f/&$\bnQ8o]\u0016\u001cG/[8o%\u0016\u001cX\r\u001e\u0015\u0004\u0003\n]\u0015!\b;fgR\u0014VmY8oM&<WO]3XSRD\u0017i]=oG\u000ecwn]3)\u0007\t\u00139*\u0001\nde\u0016\fG/\u001a\"s_.,'oQ8oM&<G#A3\u0002-M,G/\u001e9D_:tWm\u0019;j_:l\u0015M\\1hKJ$b!a9\u0003F\n=\u0007b\u0002Bd\t\u0002\u0007!\u0011Z\u0001\u000fG>tg.Z2uS>tWj\u001c3f!\r\u0011'1Z\u0005\u0004\u0005\u001b\f&AD\"p]:,7\r^5p]6{G-\u001a\u0005\n\u0005O\"\u0005\u0013!a\u0001\u0005W\n\u0001e]3ukB\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0013eK\u001a\fW\u000f\u001c;%eU\u0011!Q\u001b\u0016\u0005\u0005W\u00129n\u000b\u0002\u0003ZB!!1\u001cBs\u001b\t\u0011iN\u0003\u0003\u0003`\n\u0005\u0018!C;oG\",7m[3e\u0015\r\u0011\u0019oW\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002\u0002Bt\u0005;\u0014\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u00031\u0019X\r^+q\u0007\"\fgN\\3m)\u0019\t\u0019O!<\u0003p\"9\u00111\u0013$A\u0002\u0005]\u0005b\u0002By\r\u0002\u0007!\u0011L\u0001\nG\"\fgN\\3m\u0013\u0012\f1B]3wKJ\u001cXMT8eKR1!q\u001fB\u007f\u0007\u0003\u0001B!!'\u0003z&!!1`AN\u0005-\u0011VM^3sg\u0016tu\u000eZ3\t\u000f\t}x\t1\u0001\u0003Z\u00051an\u001c3f\u0013\u0012D\u0011ba\u0001H!\u0003\u0005\rA!\u0017\u0002\u0013I,\u0017/^3ti&#\u0017!\u0006:fm\u0016\u00148/\u001a(pI\u0016$C-\u001a4bk2$HEM\u000b\u0003\u0007\u0013QCA!\u0017\u0003X\u0006i!/Z7pi\u0016\u001cE.^:uKJ,\"aa\u0004\u0011\t\u0005=1\u0011C\u0005\u0005\u0007'\t\tBA\u0004DYV\u001cH/\u001a:\u0002K%t\u0017\u000e^5bi\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c*fcV,7\u000f^\"pk:$XC\u0001B-\u0003!\u001aE.^:uKJd\u0015N\\6PkR\u0014w.\u001e8e\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014H+Z:u!\t\u0011Gj\u0005\u0002M3R\u001111D\u0001\u000bg\u0016$X\u000b]\"mCN\u001c\bf\u0001(\u0004&A!!\u0011QB\u0014\u0013\u0011\u0019ICa!\u0003\u0013\t+gm\u001c:f\u00032d\u0017!\u0004;fCJ$un\u001e8DY\u0006\u001c8\u000fK\u0002P\u0007_\u0001BA!!\u00042%!11\u0007BB\u0005!\te\r^3s\u00032d\u0007")
public class ClusterLinkOutboundConnectionManagerTest {
    private final KafkaConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig = this.createBrokerConfig();
    private final AuthorizerServerInfo kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo = (AuthorizerServerInfo)Mockito.mock(AuthorizerServerInfo.class);
    private final String linkName;
    private final Uuid linkId = Uuid.randomUuid();
    private final String kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId;
    private final String destClusterId;
    private final ClusterLinkData kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData = new ClusterLinkData(this.linkName(), this.linkId(), (Option)new Some((Object)this.destClusterId()), (Option)None$.MODULE$, false);
    private final Properties linkProps = new Properties();
    private final ClusterLinkMetadataManager kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager = (ClusterLinkMetadataManager)Mockito.mock(ClusterLinkMetadataManager.class);
    private final SocketServer kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer = (SocketServer)Mockito.mock(SocketServer.class);
    private final ClusterLinkManager linkManager = (ClusterLinkManager)Mockito.mock(ClusterLinkManager.class);
    private final MockTime kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time = new MockTime();
    private final Metrics metrics = new Metrics();
    private final KafkaChannel channel = (KafkaChannel)Mockito.mock(KafkaChannel.class);
    private final AtomicInteger kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests = new AtomicInteger();
    private final scala.collection.mutable.Set<ReverseChannel> kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
    private ClusterLinkConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig;
    private ClusterLinkMetrics kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics;
    private ClusterLinkOutboundConnectionManager connManager;
    private final MockClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient = new MockClient((Time)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time());
    private ConfluentAdmin kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin;
    private RemoteNetworkClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin;
    private Option<Object> remoteControllerNodeId = new Some((Object)BoxesRunTime.boxToInteger((int)20));
    private boolean isLocalController = false;

    @AfterAll
    public static void tearDownClass() {
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@AfterAll");
    }

    @BeforeAll
    public static void setUpClass() {
        TestUtils$.MODULE$.verifyNoUnexpectedThreads("@BeforeAll");
    }

    public KafkaConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig;
    }

    public AuthorizerServerInfo kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo;
    }

    private String linkName() {
        return this.linkName;
    }

    private Uuid linkId() {
        return this.linkId;
    }

    public String kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId;
    }

    private String destClusterId() {
        return this.destClusterId;
    }

    public ClusterLinkData kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData;
    }

    private Properties linkProps() {
        return this.linkProps;
    }

    public ClusterLinkMetadataManager kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager;
    }

    public SocketServer kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer;
    }

    private ClusterLinkManager linkManager() {
        return this.linkManager;
    }

    public MockTime kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time;
    }

    private Metrics metrics() {
        return this.metrics;
    }

    private KafkaChannel channel() {
        return this.channel;
    }

    public AtomicInteger kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests;
    }

    public scala.collection.mutable.Set<ReverseChannel> kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels;
    }

    public ClusterLinkConfig kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig;
    }

    private void linkConfig_$eq(ClusterLinkConfig x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig = x$1;
    }

    public ClusterLinkMetrics kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics;
    }

    private void linkMetrics_$eq(ClusterLinkMetrics x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics = x$1;
    }

    private ClusterLinkOutboundConnectionManager connManager() {
        return this.connManager;
    }

    private void connManager_$eq(ClusterLinkOutboundConnectionManager x$1) {
        this.connManager = x$1;
    }

    public MockClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient;
    }

    public ConfluentAdmin kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin;
    }

    public void kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin_$eq(ConfluentAdmin x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin = x$1;
    }

    public RemoteNetworkClient kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin() {
        return this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin;
    }

    public void kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin_$eq(RemoteNetworkClient x$1) {
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin = x$1;
    }

    private Option<Object> remoteControllerNodeId() {
        return this.remoteControllerNodeId;
    }

    private void remoteControllerNodeId_$eq(Option<Object> x$1) {
        this.remoteControllerNodeId = x$1;
    }

    private boolean isLocalController() {
        return this.isLocalController;
    }

    private void isLocalController_$eq(boolean x$1) {
        this.isLocalController = x$1;
    }

    @AfterEach
    public void tearDown() {
        if (this.connManager() != null) {
            this.connManager().shutdown();
        }
        this.metrics().close();
    }

    @Test
    public void testConnectionModeInbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Inbound$.MODULE$, false);
        Assertions.assertThrows(IllegalStateException.class, () -> this.connManager().startup());
        Assertions.assertThrows(InvalidRequestException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin());
    }

    @Test
    public void testConnectionModeOutbound() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnection() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        long waitUntilTrue_pause = 100L;
        long waitUntilTrue_waitTimeMs = 15000L;
        long waitUntilTrue_startTime = System.currentTimeMillis();
        while (!ClusterLinkOutboundConnectionManagerTest.$anonfun$testPersistentConnection$1(this)) {
            if (System.currentTimeMillis() > waitUntilTrue_startTime + waitUntilTrue_waitTimeMs) {
                Assertions.fail((String)"Persistent connection not initiated");
            }
            Thread.sleep(RichLong$.MODULE$.min$extension(Predef$.MODULE$.longWrapper(waitUntilTrue_waitTimeMs), waitUntilTrue_pause));
        }
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testPersistentConnectionRemoteControllerNotKnown() {
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
    }

    @Test
    public void testNotLocalController() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, false);
        this.connManager().startup();
        Assertions.assertEquals((int)0, (int)this.initiateReverseConnectionRequestCount());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertThrows(NotControllerException.class, () -> this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, -1)));
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(1, 5));
        KafkaChannel channel2 = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 10));
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testMetadata() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().startup();
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(true);
        this.connManager().onControllerChange(true);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(false);
        this.connManager().onControllerChange(false);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.isLocalController_$eq(true);
        this.remoteControllerNodeId_$eq((Option<Object>)new Some((Object)BoxesRunTime.boxToInteger((int)25)));
        Node coordinator = new Node(25, "", 0);
        this.connManager().onNewRemoteLinkCoordinator(coordinator);
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        this.remoteControllerNodeId_$eq((Option<Object>)None$.MODULE$);
        this.connManager().onNewRemoteLinkCoordinator(coordinator);
        Assertions.assertEquals((int)3, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigure() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("metadata.max.age.ms", "1000");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"metadata.max.age.ms"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)1, (int)this.connManager().reverseConnectionCount());
    }

    @Test
    public void testReconfigureWithConnectionReset() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        KafkaChannel channel2 = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 5));
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"security.protocol"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new SocketChannel[]{this.channel().socketChannel(), channel2.socketChannel()})), (Object)((IterableOnceOps)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels().map((Function1 & Serializable)x$6 -> x$6.socketChannel())).toSet());
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
    }

    @Test
    public void testReconfigureWithAsyncClose() {
        this.setupConnectionManager((ConnectionMode)ConnectionMode.Outbound$.MODULE$, true);
        this.connManager().startup();
        KafkaChannel channel2 = (KafkaChannel)Mockito.mock(KafkaChannel.class);
        this.setUpChannel(channel2, 2);
        this.connManager().onReverseConnection(this.channel(), this.reverseNode(BoxesRunTime.unboxToInt((Object)this.remoteControllerNodeId().get()), -1));
        this.connManager().onReverseConnection(channel2, this.reverseNode(2, 5));
        scala.collection.mutable.Set closingChannels = (scala.collection.mutable.Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        ((ConcurrentHashMap)TestUtils.fieldValue((Object)this.connManager(), ClusterLinkOutboundConnectionManager.class, (String)"activeReverseConnections")).values().forEach(c -> c.closeRunnable(() -> closingChannels.$plus$eq(c)));
        Assertions.assertEquals((int)0, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        RemoteNetworkClient oldAdmin = this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
        this.linkProps().setProperty("security.protocol", SecurityProtocol.SASL_PLAINTEXT.name);
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), true));
        this.connManager().reconfigure(this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), (Set)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new String[]{"security.protocol"})));
        Assertions.assertEquals((Object)this.connManager().currentConfig().originals(), (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig().originals());
        Assertions.assertNotNull((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertNotSame((Object)oldAdmin, (Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
        Assertions.assertEquals((int)1, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
        Assertions.assertEquals((int)1, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)2, (int)this.connManager().reverseConnectionCount());
        closingChannels.foreach((Function1 & Serializable)c -> {
            ClusterLinkOutboundConnectionManagerTest.$anonfun$testReconfigureWithAsyncClose$3(c);
            return BoxedUnit.UNIT;
        });
        Assertions.assertEquals((int)0, (int)this.connManager().persistentConnectionCount());
        Assertions.assertEquals((int)0, (int)this.connManager().reverseConnectionCount());
        Assertions.assertEquals((Object)Predef$.MODULE$.Set().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new SocketChannel[]{this.channel().socketChannel(), channel2.socketChannel()})), (Object)((IterableOnceOps)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels().map((Function1 & Serializable)x$7 -> x$7.socketChannel())).toSet());
        Assertions.assertEquals((int)2, (int)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().get());
    }

    private KafkaConfig createBrokerConfig() {
        Properties props = TestUtils$.MODULE$.createBrokerConfig(1, "localhost:1234", true, true, TestUtils$.MODULE$.RandomPort(), (Option<SecurityProtocol>)None$.MODULE$, (Option<File>)None$.MODULE$, (Option<Properties>)None$.MODULE$, true, false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), false, TestUtils$.MODULE$.RandomPort(), (Option<String>)None$.MODULE$, 1, false, 1, (short)1, false);
        props.put(KafkaConfig$.MODULE$.ClusterLinkEnableProp(), "true");
        return KafkaConfig$.MODULE$.fromProps(props);
    }

    private void setupConnectionManager(ConnectionMode connectionMode, boolean isLocalController) {
        this.isLocalController_$eq(isLocalController);
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LinkModeProp(), ClusterLinkConfig.LinkMode.SOURCE.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.ConnectionModeProp(), connectionMode.name());
        this.linkProps().put(ClusterLinkConfig$.MODULE$.LocalListenerNameProp(), "EXTERNAL");
        this.linkProps().put("bootstrap.servers", "localhost:123");
        this.linkConfig_$eq(ClusterLinkConfig$.MODULE$.create((Map)this.linkProps(), true));
        this.linkMetrics_$eq(new ClusterLinkMetrics(this.linkName(), this.linkId(), ClusterLinkConfig.LinkMode.SOURCE, (ConnectionMode)ConnectionMode.Outbound$.MODULE$, false, this.linkManager(), (Option)None$.MODULE$, this.metrics(), (Option)None$.MODULE$));
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics().startup();
        Endpoint endpoint = new Endpoint("EXTERNAL", SecurityProtocol.PLAINTEXT, "host", 123);
        Mockito.when((Object)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo().endpoints()).thenReturn(Collections.singletonList(endpoint));
        Mockito.when((Object)BoxesRunTime.boxToBoolean((boolean)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager().isLinkCoordinator(ArgumentMatchers.anyString()))).thenAnswer(invocation -> BoxesRunTime.boxToBoolean((boolean)this.isLocalController()));
        this.setUpChannel(this.channel(), 123);
        this.connManager_$eq(new ClusterLinkOutboundConnectionManager(this){
            private final /* synthetic */ ClusterLinkOutboundConnectionManagerTest $outer;

            public ConfluentAdmin createLocalAdmin() {
                Map<String, String> configs = Collections.singletonMap("bootstrap.servers", "localhost:9092");
                AdminClientConfig config = new AdminClientConfig(configs);
                AdminMetadataManager adminMetadataManager = new AdminMetadataManager(new LogContext(), 1000L, 300000L);
                List<Node> nodes = Collections.singletonList(new Node(1, "host1", 123));
                Cluster cluster = new Cluster(this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), nodes.get(0));
                adminMetadataManager.update(cluster, this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time().milliseconds());
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin_$eq(ConfluentAdminUtils.createConfluentAdmin((AdminClientConfig)config, (AdminMetadataManager)adminMetadataManager, (KafkaClient)this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient(), (Time)this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time()));
                return this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin();
            }

            public RemoteNetworkClient createRemoteAdmin() {
                NetworkClient networkClient = (NetworkClient)Mockito.mock(NetworkClient.class);
                SourceReverseConnectionManager reverseConnectionManager = (SourceReverseConnectionManager)Mockito.mock(SourceReverseConnectionManager.class);
                ClusterLinkMetadata metadata = (ClusterLinkMetadata)Mockito.mock(ClusterLinkMetadata.class);
                ClusterLinkMetadataThread metadataThread = (ClusterLinkMetadataThread)Mockito.mock(ClusterLinkMetadataThread.class);
                Mockito.when((Object)networkClient.reverseConnectionManager()).thenReturn((Object)reverseConnectionManager);
                Mockito.when((Object)metadataThread.clusterLinkMetadata()).thenReturn((Object)metadata);
                Mockito.when((Object)metadataThread.remoteLinkCoordinator()).thenAnswer(invocation -> Option$.MODULE$.apply((Object)$this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteCluster().controller()));
                Mockito.when((Object)metadata.fetch()).thenAnswer(invocation -> $this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteCluster());
                Mockito.when((Object)BoxesRunTime.boxToInteger((int)metadata.requestUpdate())).thenAnswer(invocation -> BoxesRunTime.boxToInteger((int)$anon$1.$anonfun$createRemoteAdmin$3(this, invocation)));
                Assertions.assertNull((Object)this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin());
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin_$eq(new RemoteNetworkClient(networkClient, metadataThread));
                return this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin();
            }

            public void closeReverseConnectionAdmin() {
                if (this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin() != null) {
                    this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin().close(Duration.ZERO);
                }
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localAdmin_$eq(null);
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteAdmin_$eq(null);
            }

            public boolean closeReverseChannel(ReverseChannel reverseChannel) {
                this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$closedChannels().$plus$eq((Object)reverseChannel);
                return super.closeReverseChannel(reverseChannel);
            }

            public static final /* synthetic */ int $anonfun$createRemoteAdmin$3($anon$1 $this, InvocationOnMock invocation) {
                return $this.$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataRequests().incrementAndGet();
            }
            {
                if ($outer == null) {
                    throw null;
                }
                this.$outer = $outer;
                super($outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkData(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkConfig(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId(), (Option)None$.MODULE$, $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$linkMetrics(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$metadataManager(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$socketServer(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$brokerConfig(), $outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$serverInfo(), (Time)$outer.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$time(), true);
            }
        });
    }

    private boolean setupConnectionManager$default$2() {
        return false;
    }

    private void setUpChannel(KafkaChannel channel, int channelId) {
        Mockito.when((Object)channel.id()).thenReturn((Object)Integer.toString(channelId));
        SocketChannel socketChannel = (SocketChannel)Mockito.mock(SocketChannel.class);
        Mockito.when((Object)channel.socketChannel()).thenReturn((Object)socketChannel);
    }

    private ReverseNode reverseNode(int nodeId, int requestId) {
        return new ReverseNode(nodeId, nodeId, new StringBuilder(4).append("host").append(nodeId).toString(), 1234, this.linkId(), requestId, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), KafkaPrincipal.ANONYMOUS, Optional.empty(), null);
    }

    private int reverseNode$default$2() {
        return -1;
    }

    public Cluster kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$remoteCluster() {
        ArrayList<Node> nodes = new ArrayList<Node>();
        nodes.add(new Node(1, "host1", 123));
        Option remoteControllerNode = this.remoteControllerNodeId().map((Function1 & Serializable)id -> ClusterLinkOutboundConnectionManagerTest.$anonfun$remoteCluster$1(BoxesRunTime.unboxToInt((Object)id)));
        remoteControllerNode.foreach((Function1 & Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)nodes.add(x$1)));
        return new Cluster(this.destClusterId(), nodes, Collections.emptyList(), Collections.emptySet(), Collections.emptySet(), (Node)remoteControllerNode.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
    }

    private int initiateReverseConnectionRequestCount() {
        return CollectionConverters$.MODULE$.CollectionHasAsScala((Collection)this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$localMockClient().requests()).asScala().count((Function1 & Serializable)x$8 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkOutboundConnectionManagerTest.$anonfun$initiateReverseConnectionRequestCount$1(x$8)));
    }

    public static final /* synthetic */ boolean $anonfun$testPersistentConnection$1(ClusterLinkOutboundConnectionManagerTest $this) {
        return $this.initiateReverseConnectionRequestCount() > 0;
    }

    public static final /* synthetic */ String $anonfun$testPersistentConnection$2() {
        return "Persistent connection not initiated";
    }

    public static final /* synthetic */ void $anonfun$testReconfigureWithAsyncClose$3(ReverseChannel c) {
        c.closeListener().accept(c.channel());
    }

    public static final /* synthetic */ Node $anonfun$remoteCluster$1(int id) {
        return new Node(id, new StringBuilder(4).append("host").append(id).toString(), 123);
    }

    public static final /* synthetic */ boolean $anonfun$initiateReverseConnectionRequestCount$1(ClientRequest x$8) {
        ApiKeys apiKeys = x$8.apiKey();
        ApiKeys apiKeys2 = ApiKeys.INITIATE_REVERSE_CONNECTIONS;
        return !(apiKeys != null ? !apiKeys.equals(apiKeys2) : apiKeys2 != null);
    }

    public ClusterLinkOutboundConnectionManagerTest() {
        this.linkName = "testLink";
        this.kafka$server$link$ClusterLinkOutboundConnectionManagerTest$$sourceClusterId = "sourceCluster";
        this.destClusterId = "destCluster";
    }
}

