/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkAdminClient;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkInboundConnectionManager$;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ConnectionMode$Outbound$;
import kafka.server.link.ReverseClient;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.event.Level;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\tuw!B\u001a5\u0011\u0003Yd!B\u001f5\u0011\u0003q\u0004\"B#\u0002\t\u00031\u0005bB$\u0002\u0005\u0004%\t\u0001\u0013\u0005\u0007+\u0006\u0001\u000b\u0011B%\u0007\tu\"\u0004A\u0016\u0005\nC\u0016\u0011\t\u0011)A\u0005E\"D\u0001\"[\u0003\u0003\u0002\u0003\u0006IA\u001b\u0005\n[\u0016\u0011\t\u0011)A\u0005]fD\u0001B_\u0003\u0003\u0002\u0003\u0006Ia\u001f\u0005\u000b\u0003')!\u0011!Q\u0001\n\u0005U\u0001BCA\u000e\u000b\t\u0005\t\u0015!\u0003\u0002\u001e!Q\u00111F\u0003\u0003\u0002\u0003\u0006I!!\f\t\u0015\u0005}RA!A!\u0002\u0013\t\t\u0005\u0003\u0006\u0002H\u0015\u0011\t\u0011)A\u0005\u0003\u0013B!\"!\u0015\u0006\u0005\u0003\u0005\u000b\u0011BA*\u0011\u0019)U\u0001\"\u0001\u0002d!I\u0011\u0011P\u0003C\u0002\u0013%\u00111\u0010\u0005\t\u0003#+\u0001\u0015!\u0003\u0002~!A\u00111S\u0003C\u0002\u0013%\u0001\nC\u0004\u0002\u0016\u0016\u0001\u000b\u0011B%\t\u0011\u0005]UA1A\u0005\n!Cq!!'\u0006A\u0003%\u0011\n\u0003\u0005\u0002\u001c\u0016\u0011\r\u0011\"\u0003I\u0011\u001d\ti*\u0002Q\u0001\n%C\u0011\"a(\u0006\u0001\u0004%I!!)\t\u0013\u0005\u0015V\u00011A\u0005\n\u0005\u001d\u0006\u0002CAZ\u000b\u0001\u0006K!a)\t\u0013\u0005uV\u00011A\u0005\n\u0005}\u0006\"CAb\u000b\u0001\u0007I\u0011BAc\u0011!\tI-\u0002Q!\n\u0005\u0005\u0007\"CAg\u000b\t\u0007I\u0011BAh\u0011!\ti.\u0002Q\u0001\n\u0005E\u0007bBAp\u000b\u0011\u0005\u0013\u0011\u001d\u0005\b\u0003G,A\u0011IAs\u0011!\u0011\t!\u0002C\u0001i\t\r\u0001b\u0002B\u0015\u000b\u0011\u0005#1\u0006\u0005\b\u0005\u0003*A\u0011\tB\"\u0011\u001d\u0011i(\u0002C\u0005\u0005\u007fBqA!'\u0006\t\u0013\u0011Y\nC\u0004\u00032\u0016!\tEa-\t\u000f\t}V\u0001\"\u0011\u0002b\"9!\u0011Y\u0003\u0005\n\u0005\u0005\bb\u0002Bb\u000b\u0011E\u0013\u0011\u001d\u0005\b\u0005\u000b,A\u0011KAq\u0011\u001d\u00119-\u0002C\u0005\u0003CDqA!3\u0006\t\u0003\u0011Y\rC\u0004\u0003P\u0016!\tE!5\t\u000f\tMW\u0001\"\u0011\u0003R\"i!Q[\u0003\u0011\u0002\u0007\u0005\t\u0011\"\u0003\u0003XfDQB!7\u0006!\u0003\r\t\u0011!C\u0005\u00057D\u0017aI\"mkN$XM\u001d'j].LeNY8v]\u0012\u001cuN\u001c8fGRLwN\\'b]\u0006<WM\u001d\u0006\u0003kY\nA\u0001\\5oW*\u0011q\u0007O\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003e\nQa[1gW\u0006\u001c\u0001\u0001\u0005\u0002=\u00035\tAGA\u0012DYV\u001cH/\u001a:MS:\\\u0017J\u001c2pk:$7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0014\u0005\u0005y\u0004C\u0001!D\u001b\u0005\t%\"\u0001\"\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0011\u000b%AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002w\u0005!b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012,\u0012!\u0013\t\u0003\u0015Nk\u0011a\u0013\u0006\u0003\u00196\u000ba!\u0019;p[&\u001c'B\u0001(P\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003!F\u000bA!\u001e;jY*\t!+\u0001\u0003kCZ\f\u0017B\u0001+L\u00055\tEo\\7jG&sG/Z4fe\u0006)b*\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u00043cA\u0003X5B\u0011A\bW\u0005\u00033R\u0012Ad\u00117vgR,'\u000fT5oW\u000e{gN\\3di&|g.T1oC\u001e,'\u000f\u0005\u0002\\=:\u0011A\bX\u0005\u0003;R\n!c\u00117vgR,'\u000fT5oW\u001a\u000b7\r^8ss&\u0011q\f\u0019\u0002\u0019\u0013:\u0014w.\u001e8e\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014(BA/5\u0003!a\u0017N\\6ECR\f\u0007CA2g\u001b\u0005!'BA39\u0003\tQ8.\u0003\u0002hI\ny1\t\\;ti\u0016\u0014H*\u001b8l\t\u0006$\u0018-\u0003\u0002b1\u0006i\u0011N\\5uS\u0006d7i\u001c8gS\u001e\u0004\"\u0001P6\n\u00051$$!E\"mkN$XM\u001d'j].\u001cuN\u001c4jO\u0006\u0019Bn\\2bY2{w-[2bY\u000ecWo\u001d;feB\u0011qN\u001e\b\u0003aR\u0004\"!]!\u000e\u0003IT!a\u001d\u001e\u0002\rq\u0012xn\u001c;?\u0013\t)\u0018)\u0001\u0004Qe\u0016$WMZ\u0005\u0003ob\u0014aa\u0015;sS:<'BA;B\u0013\ti\u0007,A\tdY&,g\u000e^%oi\u0016\u00148-\u001a9u_J\u00042\u0001\u0011?\u007f\u0013\ti\u0018I\u0001\u0004PaRLwN\u001c\t\u0004\u007f\u0006=QBAA\u0001\u0015\u0011\t\u0019!!\u0002\u0002\u000f\rd\u0017.\u001a8ug*\u0019\u0011(a\u0002\u000b\t\u0005%\u00111B\u0001\u0007CB\f7\r[3\u000b\u0005\u00055\u0011aA8sO&!\u0011\u0011CA\u0001\u0005E\u0019E.[3oi&sG/\u001a:dKB$xN]\u0001\b[\u0016$(/[2t!\ra\u0014qC\u0005\u0004\u00033!$AE\"mkN$XM\u001d'j].lU\r\u001e:jGN\f!C]3n_R,\u0017\tZ7j]\u001a\u000b7\r^8ssBA\u0001)a\bk\u0003G\t)#C\u0002\u0002\"\u0005\u0013\u0011BR;oGRLwN\u001c\u001a\u0011\u0005q*\u0001c\u0001\u001f\u0002(%\u0019\u0011\u0011\u0006\u001b\u0003-\rcWo\u001d;fe2Kgn[!e[&t7\t\\5f]R\fQ\u0003\\8dC2\u001cuN\u001c8BI6LgNR1di>\u0014\u0018\u0010\u0005\u0004A\u0003_q\u00171G\u0005\u0004\u0003c\t%!\u0003$v]\u000e$\u0018n\u001c82!\u0011\t)$a\u000f\u000e\u0005\u0005]\"\u0002BA\u001d\u0003\u0003\tQ!\u00193nS:LA!!\u0010\u00028\tq1i\u001c8gYV,g\u000e^!e[&t\u0017aD7fi\u0006$\u0017\r^1NC:\fw-\u001a:\u0011\u0007q\n\u0019%C\u0002\u0002FQ\u0012!d\u00117vgR,'\u000fT5oW6+G/\u00193bi\u0006l\u0015M\\1hKJ\fAB\u0019:pW\u0016\u00148i\u001c8gS\u001e\u0004B!a\u0013\u0002N5\ta'C\u0002\u0002PY\u00121bS1gW\u0006\u001cuN\u001c4jO\u0006!A/[7f!\u0011\t)&a\u0018\u000e\u0005\u0005]#\u0002BA-\u00037\nQ!\u001e;jYNTA!!\u0018\u0002\u0006\u000511m\\7n_:LA!!\u0019\u0002X\t!A+[7f)Y\t\u0019#!\u001a\u0002h\u0005%\u00141NA7\u0003_\n\t(a\u001d\u0002v\u0005]\u0004\"B1\u0011\u0001\u0004\u0011\u0007\"B5\u0011\u0001\u0004Q\u0007\"B7\u0011\u0001\u0004q\u0007\"\u0002>\u0011\u0001\u0004Y\bbBA\n!\u0001\u0007\u0011Q\u0003\u0005\b\u00037\u0001\u0002\u0019AA\u000f\u0011\u001d\tY\u0003\u0005a\u0001\u0003[Aq!a\u0010\u0011\u0001\u0004\t\t\u0005C\u0004\u0002HA\u0001\r!!\u0013\t\u000f\u0005E\u0003\u00031\u0001\u0002T\u0005\u00112m\u001c8oK\u000e$\u0018n\u001c8SKF,Xm\u001d;t+\t\ti\b\u0005\u0005\u0002\u0000\u0005\u0005\u0015QQAF\u001b\u0005i\u0015bAAB\u001b\n\t2i\u001c8dkJ\u0014XM\u001c;ICNDW*\u00199\u0011\u0007\u0001\u000b9)C\u0002\u0002\n\u0006\u00131!\u00138u!\ra\u0014QR\u0005\u0004\u0003\u001f#$!\u0004*fm\u0016\u00148/Z\"mS\u0016tG/A\nd_:tWm\u0019;j_:\u0014V-];fgR\u001c\b%\u0001\u000boKb$(+\u001a<feN,'+Z9vKN$\u0018\nZ\u0001\u0016]\u0016DHOU3wKJ\u001cXMU3rk\u0016\u001cH/\u00133!\u0003U\u0001XM]:jgR,g\u000e^\"p]:,7\r^5p]N\fa\u0003]3sg&\u001cH/\u001a8u\u0007>tg.Z2uS>t7\u000fI\u0001\u0019C\u000e$\u0018N^3SKZ,'o]3D_:tWm\u0019;j_:\u001c\u0018!G1di&4XMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8og\u0002\naC]3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u000b\u0003\u0003G\u0003B\u0001\u0011?\u0002\f\u0006Q\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8BI6Lgn\u0018\u0013fcR!\u0011\u0011VAX!\r\u0001\u00151V\u0005\u0004\u0003[\u000b%\u0001B+oSRD\u0011\"!-\u001b\u0003\u0003\u0005\r!a)\u0002\u0007a$\u0013'A\fsKZ,'o]3D_:tWm\u0019;j_:\fE-\\5oA!\u001a1$a.\u0011\u0007\u0001\u000bI,C\u0002\u0002<\u0006\u0013\u0001B^8mCRLG.Z\u0001\u000fY>\u001c\u0017\r\\\"p]:\fE-\\5o+\t\t\t\r\u0005\u0003Ay\u0006M\u0012A\u00057pG\u0006d7i\u001c8o\u0003\u0012l\u0017N\\0%KF$B!!+\u0002H\"I\u0011\u0011W\u000f\u0002\u0002\u0003\u0007\u0011\u0011Y\u0001\u0010Y>\u001c\u0017\r\\\"p]:\fE-\\5oA!\u001aa$a.\u0002=A,'o]5ti\u0016tGoQ8o]\u0016\u001cG/[8o'>,(oY3M_\u000e\\WCAAi!\u0011\t\u0019.!7\u000e\u0005\u0005U'bAAl#\u0006!A.\u00198h\u0013\u0011\tY.!6\u0003\r=\u0013'.Z2u\u0003}\u0001XM]:jgR,g\u000e^\"p]:,7\r^5p]N{WO]2f\u0019>\u001c7\u000eI\u0001\bgR\f'\u000f^;q)\t\tI+A\tf]\u0006\u0014G.Z\"mkN$XM\u001d'j].$b!!+\u0002h\u0006E\bbBAuE\u0001\u0007\u00111^\u0001\u000e]\u0016$xo\u001c:l\u00072LWM\u001c;\u0011\u0007q\ni/C\u0002\u0002pR\u0012\u0001d\u00117vgR,'\u000fT5oW:+Go^8sW\u000ec\u0017.\u001a8u\u0011\u001d\tyD\ta\u0001\u0003g\u0004B\u0001\u0011?\u0002vB!\u0011q_A\u007f\u001b\t\tIP\u0003\u0003\u0002|\u0006]\u0012!C5oi\u0016\u0014h.\u00197t\u0013\u0011\ty0!?\u0003)\u0005#W.\u001b8NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s\u0003e\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c)s_ZLG-\u001a:\u0015\u0011\t\u0015!1\u0004B\u0012\u0005K\u0001B\u0001\u0011?\u0003\bA!!\u0011\u0002B\u000b\u001d\u0011\u0011YA!\u0005\u000e\u0005\t5!\u0002\u0002B\b\u00037\nqA\\3uo>\u00148.\u0003\u0003\u0003\u0014\t5\u0011a\u0003*fm\u0016\u00148/\u001a(pI\u0016LAAa\u0006\u0003\u001a\t\u00112i\u001c8oK\u000e$\u0018n\u001c8Qe>4\u0018\u000eZ3s\u0015\u0011\u0011\u0019B!\u0004\t\u000f\u0005%8\u00051\u0001\u0003\u001eA\u0019qPa\b\n\t\t\u0005\u0012\u0011\u0001\u0002\u000e\u001d\u0016$xo\u001c:l\u00072LWM\u001c;\t\u000f\u0005}2\u00051\u0001\u0002t\"1!qE\u0012A\u00029\f\u0001b\u00197jK:$\u0018\nZ\u0001\u0019aJ|7-Z:t%\u00164XM]:f\u0007>tg.Z2uS>tGCBAU\u0005[\u00119\u0004C\u0004\u00030\u0011\u0002\rA!\r\u0002\u000f\rD\u0017M\u001c8fYB!!1\u0002B\u001a\u0013\u0011\u0011)D!\u0004\u0003\u0019-\u000bgm[1DQ\u0006tg.\u001a7\t\u000f\teB\u00051\u0001\u0003<\u0005Y!/\u001a<feN,gj\u001c3f!\u0011\u0011YA!\u0010\n\t\t}\"Q\u0002\u0002\f%\u00164XM]:f\u001d>$W-\u0001\u000ej]&$\u0018.\u0019;f%\u00164XM]:f\u0007>tg.Z2uS>t7\u000f\u0006\u0004\u0003F\t\r$1\u000f\t\u0007\u0005\u000f\u0012\tFa\u0016\u000f\t\t%#Q\n\b\u0004c\n-\u0013\"\u0001\"\n\u0007\t=\u0013)A\u0004qC\u000e\\\u0017mZ3\n\t\tM#Q\u000b\u0002\u0004'\u0016\f(b\u0001B(\u0003B1\u0011q\u0010B-\u0005;J1Aa\u0017N\u0005E\u0019u.\u001c9mKR\f'\r\\3GkR,(/\u001a\t\u0005\u0003'\u0014y&\u0003\u0003\u0003b\u0005U'\u0001\u0002,pS\u0012DqA!\u001a&\u0001\u0004\u00119'A\rj]&$\u0018.\u0019;f\u0007>tg.Z2uS>t'+Z9vKN$\b\u0003\u0002B5\u0005_j!Aa\u001b\u000b\t\t5\u00141L\u0001\te\u0016\fX/Z:ug&!!\u0011\u000fB6\u0005\u0005Je.\u001b;jCR,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t%\u0016\fX/Z:u\u0011\u001d\u0011)(\na\u0001\u0005o\naB]3rk\u0016\u001cHoQ8oi\u0016DH\u000f\u0005\u0003\u0003j\te\u0014\u0002\u0002B>\u0005W\u0012aBU3rk\u0016\u001cHoQ8oi\u0016DH/\u0001\rsKF,Xm\u001d;SKZ,'o]3D_:tWm\u0019;j_:$\"\"!+\u0003\u0002\n\u0015%\u0011\u0012BK\u0011\u001d\u0011\u0019I\na\u0001\u0003\u000b\u000b\u0011B]3rk\u0016\u001cH/\u00133\t\u000f\t\u001de\u00051\u0001\u0002\f\u000611\r\\5f]RDqAa#'\u0001\u0004\u0011i)\u0001\u0006t_V\u00148-\u001a(pI\u0016\u0004BAa$\u0003\u00126\u0011\u00111L\u0005\u0005\u0005'\u000bYF\u0001\u0003O_\u0012,\u0007b\u0002BLM\u0001\u0007\u0011QQ\u0001\rI\u0016\u001cHO\u0011:pW\u0016\u0014\u0018\nZ\u0001\u001bM>\u0014x/\u0019:e)>\u0014V-\\8uK\u000e{wN\u001d3j]\u0006$xN\u001d\u000b\u0007\u0003S\u0013iJ!,\t\u000f\t}u\u00051\u0001\u0003\"\u0006Y!/Z9vKN$H)\u0019;b!\u0011\u0011\u0019K!+\u000e\u0005\t\u0015&\u0002\u0002BT\u00037\nq!\\3tg\u0006<W-\u0003\u0003\u0003,\n\u0015&!J%oSRL\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\u001c(+Z9vKN$H)\u0019;b\u0011\u001d\u0011yk\na\u0001\u0005\u000b\nqAZ;ukJ,7/\u0001\np]\u000e{g\u000e\u001e:pY2,'o\u00115b]\u001e,G\u0003BAU\u0005kCqAa.)\u0001\u0004\u0011I,\u0001\njg\u0006\u001bG/\u001b<f\u0007>tGO]8mY\u0016\u0014\bc\u0001!\u0003<&\u0019!QX!\u0003\u000f\t{w\u000e\\3b]\u0006\u0019sN\u001c'j].lU\r^1eCR\f\u0007+\u0019:uSRLwN\u001c'fC\u0012,'o\u00115b]\u001e,\u0017!H7bs\n,\u0007K]8dKN\u001c8i\\8sI&t\u0017\r^8s\u0007\"\fgnZ3\u00027\rdwn]3SKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o\u0003q\u0019'/Z1uKJ+g/\u001a:tK\u000e{gN\\3di&|g.\u00113nS:\fa#\\1zE\u0016\u001c%/Z1uKJ+Wn\u001c;f\u0003\u0012l\u0017N\\\u0001\u0018e\u00164XM]:f\u0007>tg.Z2uS>t7\t\\5f]R,\"A!4\u0011\t\u0001c(QD\u0001\u001aa\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8D_VtG/\u0006\u0002\u0002\u0006\u00061\"/\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8D_VtG/A\rtkB,'\u000f\n7pG\u0006dGj\\4jG\u0006d7\t\\;ti\u0016\u0014X#\u00018\u0002\u001dM,\b/\u001a:%Y&t7\u000eR1uCV\t!\r")
public class ClusterLinkInboundConnectionManager
extends ClusterLinkConnectionManager
implements ClusterLinkFactory.InboundConnectionManager {
    private final ClusterLinkConfig initialConfig;
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkInboundConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final Function1<String, ConfluentAdmin> localConnAdminFactory;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;
    private volatile Option<ConfluentAdmin> localConnAdmin;
    private final Object persistentConnectionSourceLock;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkInboundConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    private /* synthetic */ ClusterLinkData super$linkData() {
        return super.linkData();
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> x$1) {
        this.reverseConnectionAdmin = x$1;
    }

    private Option<ConfluentAdmin> localConnAdmin() {
        return this.localConnAdmin;
    }

    private void localConnAdmin_$eq(Option<ConfluentAdmin> x$1) {
        this.localConnAdmin = x$1;
    }

    private Object persistentConnectionSourceLock() {
        return this.persistentConnectionSourceLock;
    }

    @Override
    public void startup() {
        ConnectionMode connectionMode = this.initialConfig.connectionMode();
        ConnectionMode$Outbound$ connectionMode$Outbound$ = ConnectionMode$Outbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Outbound$)) {
            throw new IllegalStateException("Inbound connection manager created in outbound connection mode");
        }
        super.startup();
    }

    @Override
    public void enableClusterLink(ClusterLinkNetworkClient networkClient, Option<AdminMetadataManager> metadataManager) {
        KafkaClient kafkaClient = networkClient.networkClient();
        if (kafkaClient instanceof NetworkClient) {
            NetworkClient networkClient2 = (NetworkClient)kafkaClient;
            networkClient2.enableClusterLinkRequests(super.linkData().linkId(), (ClientInterceptor)this.clientInterceptor.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()), (ReverseNode.ConnectionProvider)this.reverseConnectionProvider(networkClient2, metadataManager, networkClient.clientId()).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
            return;
        }
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode == null) {
            return;
        }
        if (connectionMode.equals(connectionMode$Inbound$)) {
            throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
        }
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> metadataManager, String clientId) {
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Inbound$)) {
            None$ x$4 = None$.MODULE$;
            ReverseClient reverseClient = new ReverseClient(networkClient, metadataManager, (Option<ClusterLinkAdminClient>)x$4, clientId);
            ReverseNode.ConnectionProvider provider = node -> this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node, $this.brokerConfig.brokerId());
            return new Some((Object)provider);
        }
        return None$.MODULE$;
    }

    @Override
    public void processReverseConnection(KafkaChannel channel2, ReverseNode reverseNode) {
        boolean bl;
        Option option;
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(channel2).append(" ").append(reverseNode).toString());
        this.ensureReverseConnectionsEnabled();
        if (!reverseNode.requestId().isPresent()) {
            this.maybeCreateRemoteAdmin();
            option = this.reverseConnectionAdmin();
            bl = true;
        } else {
            option = Option$.MODULE$.apply((Object)this.connectionRequests().remove(reverseNode.requestId().get()));
            bl = false;
        }
        boolean bl2 = bl;
        Option option2 = option;
        if (option2 instanceof Some) {
            ReverseClient client = (ReverseClient)((Some)option2).value();
            Consumer<KafkaChannel> closeCallback = channel -> {
                this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(38).append("Reverse channel ").append(channel).append(" has been disconnected").toString());
                $this.metrics.reverseConnectionClosedSensor().record();
                $this.metrics.deprecatedReverseConnectionClosedSensor().record();
                this.activeReverseConnections().decrementAndGet();
                if (bl2) {
                    Object object = this.persistentConnectionSourceLock();
                    synchronized (object) {
                        if (this.persistentConnections().decrementAndGet() <= 0) {
                            client.persistentConnectionSource_$eq((Option<Integer>)None$.MODULE$);
                            if (this.isLinkCoordinator()) {
                                this.info((Function0<String>)(Function0 & Serializable)() -> "Persistent connection to source link coordinator was disconnected, awaiting new connection.");
                            }
                        }
                        return;
                    }
                }
            };
            this.activeReverseConnections().incrementAndGet();
            if (bl2) {
                Object object = this.persistentConnectionSourceLock();
                synchronized (object) {
                    this.persistentConnections().incrementAndGet();
                    client.persistentConnectionSource_$eq((Option<Integer>)new Some((Object)Predef$.MODULE$.int2Integer(reverseNode.remoteBrokerId())));
                }
            }
            this.metrics.reverseConnectionCreatedSensor().record();
            this.metrics.deprecatedReverseConnectionCreatedSensor().record();
            ReverseChannel reverseChannel = new ReverseChannel(channel2, reverseNode, closeCallback);
            client.networkClient().reverseAndAdd(reverseChannel);
            client.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
            this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString());
            return;
        }
        if (None$.MODULE$.equals(option2)) {
            throw new NetworkException("Reverse connection is no longer required");
        }
        throw new MatchError((Object)option2);
    }

    public Seq<CompletableFuture<Void>> initiateReverseConnections(InitiateReverseConnectionsRequest initiateConnectionRequest, RequestContext requestContext) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(70).append("Initiate or forward reverse connection request: localCluster=").append(this.super$localLogicalCluster()).append(" request=").append(initiateConnectionRequest).toString());
        this.ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData connData = initiateConnectionRequest.data();
        List futures = (List)package$.MODULE$.List().fill(connData.entries().size(), (Function0 & Serializable)() -> new CompletableFuture());
        try {
            String string = super.localLogicalCluster();
            String string2 = connData.sourceClusterId();
            if (!(string != null ? !string.equals(string2) : string2 != null)) {
                throw new InvalidRequestException(new StringBuilder(70).append("Cannot initiate reverse connection from destination cluster ").append(super.localLogicalCluster()).append(" to itself").toString());
            }
            this.forwardToRemoteCoordinator(connData, (Seq<CompletableFuture<Void>>)futures);
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable)() -> "Failing reverse connection request", (Function0<Throwable>)(Function0 & Serializable)() -> e);
            futures.foreach((Function1 & Serializable)x$2 -> BoxesRunTime.boxToBoolean((boolean)x$2.completeExceptionally(e)));
        }
        return futures;
    }

    private void requestReverseConnection(int requestId, ReverseClient client, Node sourceNode, int destBrokerId) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(90).append("Requesting reverse connection for dest broker ").append(destBrokerId).append(" with requestId ").append(requestId).append(" to source node ").append(sourceNode).append(" for client ").append(client.clientId()).toString());
        this.ensureReverseConnectionsEnabled();
        if (this.reverseConnectionAdmin().exists((Function1 & Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkInboundConnectionManager.$anonfun$requestReverseConnection$2(client, x$3)))) {
            throw new NetworkException(new StringBuilder(79).append("Waiting for persistent connection to request reverse connection for request id ").append(requestId).toString());
        }
        InitiateReverseConnectionsRequestData.EntryData entry = new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(requestId).setSourceBrokerId(sourceNode.id()).setTargetBrokerId(destBrokerId);
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(this.linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(this.currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String)super.linkData().clusterId().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(entry));
        this.connectionRequests().put(BoxesRunTime.boxToInteger((int)requestId), client);
        try {
            if (!this.isLinkCoordinator() && this.persistentConnectionCount() <= 0) {
                int linkCoordinatorId = BoxesRunTime.unboxToInt((Object)this.linkCoordinatorId().getOrElse((Function0 & Serializable)() -> {
                    throw new CoordinatorNotAvailableException(new StringBuilder(43).append("Cluster link coordinator not available for ").append(this.super$linkData().linkName()).toString());
                }));
                ((KafkaFutureImpl)ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)((ConfluentAdmin)this.localConnAdmin().getOrElse((Function0 & Serializable)() -> {
                    throw new IllegalStateException("Connection admin not created");
                })), (InitiateReverseConnectionsRequestData)requestData, (Integer)Predef$.MODULE$.int2Integer(linkCoordinatorId)).get(BoxesRunTime.boxToInteger((int)requestId))).whenComplete((x$5, e) -> this.onCompletion$1((Throwable)e, requestId, client, sourceNode));
                return;
            }
            CompletableFuture future = new CompletableFuture();
            this.forwardToRemoteCoordinator(requestData, (Seq<CompletableFuture<Void>>)new .colon.colon(future, (List)Nil$.MODULE$));
            future.whenComplete((x$4, e) -> this.onCompletion$1((Throwable)e, requestId, client, sourceNode));
        }
        catch (Throwable e2) {
            this.connectionRequests().remove(BoxesRunTime.boxToInteger((int)requestId));
            throw e2;
        }
    }

    private void forwardToRemoteCoordinator(InitiateReverseConnectionsRequestData requestData, Seq<CompletableFuture<Void>> futures) {
        boolean isLinkCoordinator = this.isLinkCoordinator();
        ConfluentAdmin admin = (ConfluentAdmin)this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$6 -> x$6.adminClient().map((Function1 & Serializable)x$7 -> x$7.admin())).getOrElse((Function0 & Serializable)() -> {
            if (isLinkCoordinator) {
                throw new NetworkException("Request cannot be forwarded to remote link coordinator at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote link coordinator since this broker is not the local link coordinator.");
        });
        Integer remoteCoordinator = (Integer)this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$8 -> x$8.persistentConnectionSource()).getOrElse((Function0 & Serializable)() -> {
            throw new NetworkException("Request cannot be forwarded to remote link coordinator because persistent connection is not yet available");
        });
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(91).append("Forward initiate reverse connection request to remote link coordinator: ").append(requestData).append(" remoteCoordinator=").append(remoteCoordinator).toString());
        Map requestFutures = ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)admin, (InitiateReverseConnectionsRequestData)requestData, (Integer)remoteCoordinator);
        ((IterableOnceOps)CollectionConverters$.MODULE$.ListHasAsScala(requestData.entries()).asScala().zip(futures)).foreach((Function1 & Serializable)x0$1 -> {
            if (x0$1 != null) {
                InitiateReverseConnectionsRequestData.EntryData entry = (InitiateReverseConnectionsRequestData.EntryData)x0$1._1();
                CompletableFuture future = (CompletableFuture)x0$1._2();
                return ((KafkaFutureImpl)requestFutures.get(BoxesRunTime.boxToInteger((int)entry.initiateRequestId()))).whenComplete((x0$2, x1$1) -> {
                    if (x1$1 != null) {
                        this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entry.initiateRequestId()).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> x1$1);
                        future.completeExceptionally((Throwable)x1$1);
                        return;
                    }
                    this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entry.initiateRequestId()).toString());
                    future.complete(x0$2);
                });
            }
            throw new MatchError(null);
        });
    }

    @Override
    public void onControllerChange(boolean isActiveController) {
        this.maybeProcessCoordinatorChange();
    }

    @Override
    public void onLinkMetadataPartitionLeaderChange() {
        this.maybeProcessCoordinatorChange();
    }

    private void maybeProcessCoordinatorChange() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            boolean isCoordinator = this.isLinkCoordinator();
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(69).append("Process controller or metadata partition leader change isCoordinator=").append(isCoordinator).toString());
            if (this.reverseConnectionAdmin().isEmpty()) {
                if (isCoordinator) {
                    this.resetReverseConnectionAdmin();
                }
            } else if (!isCoordinator) {
                this.closeReverseConnectionAdmin();
            }
            return;
        }
    }

    @Override
    public void closeReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Closing reverse connection admin");
        this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$9 -> x$9.adminClient()).foreach((Function1 & Serializable)admin -> {
            CoreUtils$.MODULE$.swallow((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> admin.close(), this, Level.WARN);
            return BoxedUnit.UNIT;
        });
        this.reverseConnectionAdmin_$eq((Option<ReverseClient>)None$.MODULE$);
        if (!this.isActive()) {
            this.localConnAdmin().foreach((Function1 & Serializable)x$10 -> {
                x$10.close(Duration.ZERO);
                return BoxedUnit.UNIT;
            });
            this.localConnAdmin_$eq((Option<ConfluentAdmin>)None$.MODULE$);
            return;
        }
    }

    @Override
    public void createReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Recreate admin client used to initiate connection reversal requests");
        if (this.localConnAdmin().isEmpty()) {
            this.localConnAdmin_$eq((Option<ConfluentAdmin>)new Some(this.localConnAdminFactory.apply((Object)super.linkData().linkName())));
        }
        if (this.isLinkCoordinator()) {
            this.maybeCreateRemoteAdmin();
            return;
        }
    }

    private void maybeCreateRemoteAdmin() {
        Object object = this.stateChangeLock();
        synchronized (object) {
            if (this.reverseConnectionAdmin().isEmpty()) {
                ClusterLinkAdminClient admin = (ClusterLinkAdminClient)this.remoteAdminFactory.apply((Object)this.currentConfig(), (Object)this);
                this.reverseConnectionAdmin_$eq((Option<ReverseClient>)new Some((Object)new ReverseClient(admin.networkClient(), (Option<AdminMetadataManager>)new Some((Object)admin.metadataManager()), (Option<ClusterLinkAdminClient>)new Some((Object)admin), admin.clientId())));
            }
            return;
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return this.reverseConnectionAdmin().map((Function1 & Serializable)x$11 -> x$11.networkClient());
    }

    @Override
    public int persistentConnectionCount() {
        if (this.isLinkCoordinator()) {
            return this.persistentConnections().get();
        }
        return 0;
    }

    @Override
    public int reverseConnectionCount() {
        return this.activeReverseConnections().get();
    }

    public static final /* synthetic */ boolean $anonfun$requestReverseConnection$2(ReverseClient client$2, ReverseClient x$3) {
        return x$3.networkClient() == client$2.networkClient();
    }

    private final void onCompletion$1(Throwable e, int requestId$1, ReverseClient client$2, Node sourceNode$1) {
        if (e != null) {
            this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(requestId$1).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
            this.connectionRequests().remove(BoxesRunTime.boxToInteger((int)requestId$1));
            client$2.networkClient().processReverseConnectionFailure(sourceNode$1);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(50).append("Reverse connection has been created for requestId=").append(requestId$1).toString());
    }

    public ClusterLinkInboundConnectionManager(ClusterLinkData linkData, ClusterLinkConfig initialConfig, String localLogicalCluster, Option<ClientInterceptor> clientInterceptor, ClusterLinkMetrics metrics, Function2<ClusterLinkConfig, ClusterLinkInboundConnectionManager, ClusterLinkAdminClient> remoteAdminFactory, Function1<String, ConfluentAdmin> localConnAdminFactory, ClusterLinkMetadataManager metadataManager, KafkaConfig brokerConfig, Time time) {
        this.initialConfig = initialConfig;
        this.clientInterceptor = clientInterceptor;
        this.metrics = metrics;
        this.remoteAdminFactory = remoteAdminFactory;
        this.localConnAdminFactory = localConnAdminFactory;
        this.brokerConfig = brokerConfig;
        this.time = time;
        super(linkData, initialConfig, localLogicalCluster, metadataManager, metrics, brokerConfig);
        this.connectionRequests = new ConcurrentHashMap();
        this.nextReverseRequestId = ClusterLinkInboundConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        this.localConnAdmin = None$.MODULE$;
        this.persistentConnectionSourceLock = new Object();
        this.logIdent_$eq(new StringBuilder(47).append("[ClusterLinkInboundConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(brokerConfig.brokerId()).append("] ").toString());
    }
}

