/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import kafka.controller.KafkaController;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkAdminClient;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConnectionManager;
import kafka.server.link.ClusterLinkDestConnectionManager$;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkNetworkClient;
import kafka.server.link.ConnectionMode;
import kafka.server.link.ConnectionMode$Inbound$;
import kafka.server.link.ReverseClient;
import kafka.utils.CoreUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.ClientInterceptor;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.internals.AdminMetadataManager;
import org.apache.kafka.clients.admin.internals.ConfluentAdminUtils;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.InitiateReverseConnectionsRequestData;
import org.apache.kafka.common.network.KafkaChannel;
import org.apache.kafka.common.network.ReverseChannel;
import org.apache.kafka.common.network.ReverseNode;
import org.apache.kafka.common.requests.InitiateReverseConnectionsRequest;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.event.Level;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005\t%v!B\u0016-\u0011\u0003\u0019d!B\u001b-\u0011\u00031\u0004\"B\u001f\u0002\t\u0003q\u0004bB \u0002\u0005\u0004%\t\u0001\u0011\u0005\u0007\u001b\u0006\u0001\u000b\u0011B!\u0007\tUb\u0003A\u0014\u0005\n3\u0016\u0011\t\u0011)A\u00055\u0002D\u0001\"Y\u0003\u0003\u0002\u0003\u0006IA\u0019\u0005\nK\u0016\u0011\t\u0011)A\u0005MFD\u0001B]\u0003\u0003\u0002\u0003\u0006Ia\u001d\u0005\u000b\u0003\u0007)!\u0011!Q\u0001\n\u0005\u0015\u0001BCA\u0006\u000b\t\u0005\t\u0015!\u0003\u0002\u000e!Q\u00111D\u0003\u0003\u0002\u0003\u0006I!!\b\t\u0015\u0005-RA!b\u0001\n\u0003\ti\u0003\u0003\u0006\u0002:\u0015\u0011\t\u0011)A\u0005\u0003_A!\"a\u000f\u0006\u0005\u0003\u0005\u000b\u0011BA\u001f\u0011)\t)%\u0002B\u0001B\u0003%\u0011q\t\u0005\u0007{\u0015!\t!a\u0016\t\u0013\u00055TA1A\u0005\n\u0005=\u0004\u0002CAC\u000b\u0001\u0006I!!\u001d\t\u0011\u0005\u001dUA1A\u0005\n\u0001Cq!!#\u0006A\u0003%\u0011\t\u0003\u0005\u0002\f\u0016\u0011\r\u0011\"\u0003A\u0011\u001d\ti)\u0002Q\u0001\n\u0005C\u0001\"a$\u0006\u0005\u0004%I\u0001\u0011\u0005\b\u0003#+\u0001\u0015!\u0003B\u0011%\t\u0019*\u0002a\u0001\n\u0013\t)\nC\u0005\u0002\u001a\u0016\u0001\r\u0011\"\u0003\u0002\u001c\"A\u0011qU\u0003!B\u0013\t9\nC\u0004\u00022\u0016!\t%a-\t\u000f\u0005EW\u0001\"\u0011\u0002T\"A\u0011Q[\u0003\u0005\u00021\n9\u000eC\u0004\u0002~\u0016!\t%a@\t\u000f\tUQ\u0001\"\u0011\u0003\u0018!9!qK\u0003\u0005\n\te\u0003b\u0002B8\u000b\u0011%!\u0011\u000f\u0005\b\u0005\u000f+A\u0011\tBE\u0011\u001d\u0011)*\u0002C)\u0003'DqAa&\u0006\t#\n\u0019\u000eC\u0004\u0003\u001a\u0016!\tAa'\t\u000f\t}U\u0001\"\u0011\u0003\"\"9!1U\u0003\u0005B\t\u0005\u0006\"\u0004BS\u000bA\u0005\u0019\u0011!A\u0005\n\t\u001d\u0016/\u0001\u0011DYV\u001cH/\u001a:MS:\\G)Z:u\u0007>tg.Z2uS>tW*\u00198bO\u0016\u0014(BA\u0017/\u0003\u0011a\u0017N\\6\u000b\u0005=\u0002\u0014AB:feZ,'OC\u00012\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0001\"\u0001N\u0001\u000e\u00031\u0012\u0001e\u00117vgR,'\u000fT5oW\u0012+7\u000f^\"p]:,7\r^5p]6\u000bg.Y4feN\u0011\u0011a\u000e\t\u0003qmj\u0011!\u000f\u0006\u0002u\u0005)1oY1mC&\u0011A(\u000f\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005\u0019\u0014\u0001\u0006(fqR\u0014VM^3sg\u0016\u0014V-];fgRLE-F\u0001B!\t\u00115*D\u0001D\u0015\t!U)\u0001\u0004bi>l\u0017n\u0019\u0006\u0003\r\u001e\u000b!bY8oGV\u0014(/\u001a8u\u0015\tA\u0015*\u0001\u0003vi&d'\"\u0001&\u0002\t)\fg/Y\u0005\u0003\u0019\u000e\u0013Q\"\u0011;p[&\u001c\u0017J\u001c;fO\u0016\u0014\u0018!\u0006(fqR\u0014VM^3sg\u0016\u0014V-];fgRLE\rI\n\u0004\u000b=\u0013\u0006C\u0001\u001bQ\u0013\t\tFF\u0001\u000fDYV\u001cH/\u001a:MS:\\7i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0011\u0005M3fB\u0001\u001bU\u0013\t)F&\u0001\nDYV\u001cH/\u001a:MS:\\g)Y2u_JL\u0018BA,Y\u0005U!Um\u001d;D_:tWm\u0019;j_:l\u0015M\\1hKJT!!\u0016\u0017\u0002\u00111Lgn\u001b#bi\u0006\u0004\"a\u00170\u000e\u0003qS!!\u0018\u0019\u0002\u0005i\\\u0017BA0]\u0005=\u0019E.^:uKJd\u0015N\\6ECR\f\u0017BA-Q\u00035Ig.\u001b;jC2\u001cuN\u001c4jOB\u0011AgY\u0005\u0003I2\u0012\u0011c\u00117vgR,'\u000fT5oW\u000e{gNZ5h\u0003MawnY1m\u0019><\u0017nY1m\u00072,8\u000f^3s!\t9gN\u0004\u0002iYB\u0011\u0011.O\u0007\u0002U*\u00111NM\u0001\u0007yI|w\u000e\u001e \n\u00055L\u0014A\u0002)sK\u0012,g-\u0003\u0002pa\n11\u000b\u001e:j]\u001eT!!\\\u001d\n\u0005\u0015\u0004\u0016!E2mS\u0016tG/\u00138uKJ\u001cW\r\u001d;peB\u0019\u0001\b\u001e<\n\u0005UL$AB(qi&|g\u000e\u0005\u0002x\u007f6\t\u0001P\u0003\u0002zu\u000691\r\\5f]R\u001c(BA\u0019|\u0015\taX0\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002}\u0006\u0019qN]4\n\u0007\u0005\u0005\u0001PA\tDY&,g\u000e^%oi\u0016\u00148-\u001a9u_J\fq!\\3ue&\u001c7\u000fE\u00025\u0003\u000fI1!!\u0003-\u0005I\u0019E.^:uKJd\u0015N\\6NKR\u0014\u0018nY:\u0002%I,Wn\u001c;f\u0003\u0012l\u0017N\u001c$bGR|'/\u001f\t\tq\u0005=!-a\u0005\u0002\u0016%\u0019\u0011\u0011C\u001d\u0003\u0013\u0019+hn\u0019;j_:\u0014\u0004C\u0001\u001b\u0006!\r!\u0014qC\u0005\u0004\u00033a#AF\"mkN$XM\u001d'j].\fE-\\5o\u00072LWM\u001c;\u0002\u001d1|7-\u00197D_:t\u0017\tZ7j]B!\u0001\b^A\u0010!\u0011\t\t#a\n\u000e\u0005\u0005\r\"bAA\u0013q\u0006)\u0011\rZ7j]&!\u0011\u0011FA\u0012\u00059\u0019uN\u001c4mk\u0016tG/\u00113nS:\f!bY8oiJ|G\u000e\\3s+\t\ty\u0003\u0005\u0003\u00022\u0005URBAA\u001a\u0015\r\tY\u0003M\u0005\u0005\u0003o\t\u0019DA\bLC\u001a\\\u0017mQ8oiJ|G\u000e\\3s\u0003-\u0019wN\u001c;s_2dWM\u001d\u0011\u0002\u0019\t\u0014xn[3s\u0007>tg-[4\u0011\t\u0005}\u0012\u0011I\u0007\u0002]%\u0019\u00111\t\u0018\u0003\u0017-\u000bgm[1D_:4\u0017nZ\u0001\u0005i&lW\r\u0005\u0003\u0002J\u0005MSBAA&\u0015\u0011\ti%a\u0014\u0002\u000bU$\u0018\u000e\\:\u000b\u0007\u0005E#0\u0001\u0004d_6lwN\\\u0005\u0005\u0003+\nYE\u0001\u0003US6,GCFA\n\u00033\nY&!\u0018\u0002`\u0005\u0005\u00141MA3\u0003O\nI'a\u001b\t\u000be\u000b\u0002\u0019\u0001.\t\u000b\u0005\f\u0002\u0019\u00012\t\u000b\u0015\f\u0002\u0019\u00014\t\u000bI\f\u0002\u0019A:\t\u000f\u0005\r\u0011\u00031\u0001\u0002\u0006!9\u00111B\tA\u0002\u00055\u0001bBA\u000e#\u0001\u0007\u0011Q\u0004\u0005\b\u0003W\t\u0002\u0019AA\u0018\u0011\u001d\tY$\u0005a\u0001\u0003{Aq!!\u0012\u0012\u0001\u0004\t9%\u0001\nd_:tWm\u0019;j_:\u0014V-];fgR\u001cXCAA9!!\t\u0019(!\u001e\u0002z\u0005}T\"A#\n\u0007\u0005]TIA\tD_:\u001cWO\u001d:f]RD\u0015m\u001d5NCB\u00042\u0001OA>\u0013\r\ti(\u000f\u0002\u0004\u0013:$\bc\u0001\u001b\u0002\u0002&\u0019\u00111\u0011\u0017\u0003\u001bI+g/\u001a:tK\u000ec\u0017.\u001a8u\u0003M\u0019wN\u001c8fGRLwN\u001c*fcV,7\u000f^:!\u0003QqW\r\u001f;SKZ,'o]3SKF,Xm\u001d;JI\u0006)b.\u001a=u%\u00164XM]:f%\u0016\fX/Z:u\u0013\u0012\u0004\u0013!\u00069feNL7\u000f^3oi\u000e{gN\\3di&|gn]\u0001\u0017a\u0016\u00148/[:uK:$8i\u001c8oK\u000e$\u0018n\u001c8tA\u0005A\u0012m\u0019;jm\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\:\u00023\u0005\u001cG/\u001b<f%\u00164XM]:f\u0007>tg.Z2uS>t7\u000fI\u0001\u0017e\u00164XM]:f\u0007>tg.Z2uS>t\u0017\tZ7j]V\u0011\u0011q\u0013\t\u0005qQ\fy(\u0001\u000esKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o?\u0012*\u0017\u000f\u0006\u0003\u0002\u001e\u0006\r\u0006c\u0001\u001d\u0002 &\u0019\u0011\u0011U\u001d\u0003\tUs\u0017\u000e\u001e\u0005\n\u0003K[\u0012\u0011!a\u0001\u0003/\u000b1\u0001\u001f\u00132\u0003]\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\\!e[&t\u0007\u0005K\u0002\u001d\u0003W\u00032\u0001OAW\u0013\r\ty+\u000f\u0002\tm>d\u0017\r^5mK\u0006\tRM\\1cY\u0016\u001cE.^:uKJd\u0015N\\6\u0015\r\u0005u\u0015QWA`\u0011\u001d\t9,\ba\u0001\u0003s\u000bQB\\3uo>\u00148n\u00117jK:$\bc\u0001\u001b\u0002<&\u0019\u0011Q\u0018\u0017\u00031\rcWo\u001d;fe2Kgn\u001b(fi^|'o[\"mS\u0016tG\u000fC\u0004\u0002Bv\u0001\r!a1\u0002\u001f5,G/\u00193bi\u0006l\u0015M\\1hKJ\u0004B\u0001\u000f;\u0002FB!\u0011qYAg\u001b\t\tIM\u0003\u0003\u0002L\u0006\r\u0012!C5oi\u0016\u0014h.\u00197t\u0013\u0011\ty-!3\u0003)\u0005#W.\u001b8NKR\fG-\u0019;b\u001b\u0006t\u0017mZ3s\u0003!\u0019\b.\u001e;e_^tGCAAO\u0003e\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\u001c)s_ZLG-\u001a:\u0015\u0011\u0005e\u0017q^A|\u0003s\u0004B\u0001\u000f;\u0002\\B!\u0011Q\\Au\u001d\u0011\ty.!:\u000e\u0005\u0005\u0005(\u0002BAr\u0003\u001f\nqA\\3uo>\u00148.\u0003\u0003\u0002h\u0006\u0005\u0018a\u0003*fm\u0016\u00148/\u001a(pI\u0016LA!a;\u0002n\n\u00112i\u001c8oK\u000e$\u0018n\u001c8Qe>4\u0018\u000eZ3s\u0015\u0011\t9/!9\t\u000f\u0005]v\u00041\u0001\u0002rB\u0019q/a=\n\u0007\u0005U\bPA\u0007OKR<xN]6DY&,g\u000e\u001e\u0005\b\u0003\u0003|\u0002\u0019AAb\u0011\u0019\tYp\ba\u0001M\u0006A1\r\\5f]RLE-\u0001\rqe>\u001cWm]:SKZ,'o]3D_:tWm\u0019;j_:$b!!(\u0003\u0002\t-\u0001b\u0002B\u0002A\u0001\u0007!QA\u0001\bG\"\fgN\\3m!\u0011\tyNa\u0002\n\t\t%\u0011\u0011\u001d\u0002\r\u0017\u000647.Y\"iC:tW\r\u001c\u0005\b\u0005\u001b\u0001\u0003\u0019\u0001B\b\u0003-\u0011XM^3sg\u0016tu\u000eZ3\u0011\t\u0005}'\u0011C\u0005\u0005\u0005'\t\tOA\u0006SKZ,'o]3O_\u0012,\u0017AG5oSRL\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\u001cHC\u0002B\r\u0005{\u0011i\u0005\u0005\u0004\u0003\u001c\t\u0015\"1\u0006\b\u0005\u0005;\u0011\tCD\u0002j\u0005?I\u0011AO\u0005\u0004\u0005GI\u0014a\u00029bG.\fw-Z\u0005\u0005\u0005O\u0011ICA\u0002TKFT1Aa\t:!\u0019\t\u0019H!\f\u00032%\u0019!qF#\u0003#\r{W\u000e\u001d7fi\u0006\u0014G.\u001a$viV\u0014X\r\u0005\u0003\u00034\teRB\u0001B\u001b\u0015\r\u00119$S\u0001\u0005Y\u0006tw-\u0003\u0003\u0003<\tU\"\u0001\u0002,pS\u0012DqAa\u0010\"\u0001\u0004\u0011\t%A\rj]&$\u0018.\u0019;f\u0007>tg.Z2uS>t'+Z9vKN$\b\u0003\u0002B\"\u0005\u0013j!A!\u0012\u000b\t\t\u001d\u0013qJ\u0001\te\u0016\fX/Z:ug&!!1\nB#\u0005\u0005Je.\u001b;jCR,'+\u001a<feN,7i\u001c8oK\u000e$\u0018n\u001c8t%\u0016\fX/Z:u\u0011\u001d\u0011y%\ta\u0001\u0005#\naB]3rk\u0016\u001cHoQ8oi\u0016DH\u000f\u0005\u0003\u0003D\tM\u0013\u0002\u0002B+\u0005\u000b\u0012aBU3rk\u0016\u001cHoQ8oi\u0016DH/\u0001\rsKF,Xm\u001d;SKZ,'o]3D_:tWm\u0019;j_:$\u0002\"!(\u0003\\\t}#1\r\u0005\b\u0005;\u0012\u0003\u0019AA=\u0003%\u0011X-];fgRLE\rC\u0004\u0003b\t\u0002\r!a \u0002\r\rd\u0017.\u001a8u\u0011\u001d\u0011)G\ta\u0001\u0005O\nAA\\8eKB!!\u0011\u000eB6\u001b\t\ty%\u0003\u0003\u0003n\u0005=#\u0001\u0002(pI\u0016\f\u0011DZ8so\u0006\u0014H\rV8SK6|G/Z\"p]R\u0014x\u000e\u001c7feR1\u0011Q\u0014B:\u0005\u0007CqA!\u001e$\u0001\u0004\u00119(A\u0006sKF,Xm\u001d;ECR\f\u0007\u0003\u0002B=\u0005\u007fj!Aa\u001f\u000b\t\tu\u0014qJ\u0001\b[\u0016\u001c8/Y4f\u0013\u0011\u0011\tIa\u001f\u0003K%s\u0017\u000e^5bi\u0016\u0014VM^3sg\u0016\u001cuN\u001c8fGRLwN\\:SKF,Xm\u001d;ECR\f\u0007b\u0002BCG\u0001\u0007!\u0011D\u0001\bMV$XO]3t\u0003IygnQ8oiJ|G\u000e\\3s\u0007\"\fgnZ3\u0015\t\u0005u%1\u0012\u0005\b\u0005\u001b#\u0003\u0019\u0001BH\u0003II7/Q2uSZ,7i\u001c8ue>dG.\u001a:\u0011\u0007a\u0012\t*C\u0002\u0003\u0014f\u0012qAQ8pY\u0016\fg.A\u000edY>\u001cXMU3wKJ\u001cXmQ8o]\u0016\u001cG/[8o\u0003\u0012l\u0017N\\\u0001\u001dGJ,\u0017\r^3SKZ,'o]3D_:tWm\u0019;j_:\fE-\\5o\u0003]\u0011XM^3sg\u0016\u001cuN\u001c8fGRLwN\\\"mS\u0016tG/\u0006\u0002\u0003\u001eB!\u0001\b^Ay\u0003e\u0001XM]:jgR,g\u000e^\"p]:,7\r^5p]\u000e{WO\u001c;\u0016\u0005\u0005e\u0014A\u0006:fm\u0016\u00148/Z\"p]:,7\r^5p]\u000e{WO\u001c;\u00023M,\b/\u001a:%Y>\u001c\u0017\r\u001c'pO&\u001c\u0017\r\\\"mkN$XM]\u000b\u0002M\u0002")
public class ClusterLinkDestConnectionManager
extends ClusterLinkConnectionManager
implements ClusterLinkFactory.DestConnectionManager {
    private final Option<ClientInterceptor> clientInterceptor;
    private final ClusterLinkMetrics metrics;
    private final Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory;
    private final Option<ConfluentAdmin> localConnAdmin;
    private final KafkaController controller;
    private final KafkaConfig brokerConfig;
    private final Time time;
    private final ConcurrentHashMap<Object, ReverseClient> connectionRequests;
    private final AtomicInteger nextReverseRequestId;
    private final AtomicInteger persistentConnections;
    private final AtomicInteger activeReverseConnections;
    private volatile Option<ReverseClient> reverseConnectionAdmin;

    public static AtomicInteger NextReverseRequestId() {
        return ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
    }

    private /* synthetic */ String super$localLogicalCluster() {
        return super.localLogicalCluster();
    }

    public KafkaController controller() {
        return this.controller;
    }

    private ConcurrentHashMap<Object, ReverseClient> connectionRequests() {
        return this.connectionRequests;
    }

    private AtomicInteger nextReverseRequestId() {
        return this.nextReverseRequestId;
    }

    private AtomicInteger persistentConnections() {
        return this.persistentConnections;
    }

    private AtomicInteger activeReverseConnections() {
        return this.activeReverseConnections;
    }

    private Option<ReverseClient> reverseConnectionAdmin() {
        return this.reverseConnectionAdmin;
    }

    private void reverseConnectionAdmin_$eq(Option<ReverseClient> x$1) {
        this.reverseConnectionAdmin = x$1;
    }

    @Override
    public void enableClusterLink(ClusterLinkNetworkClient networkClient, Option<AdminMetadataManager> metadataManager) {
        KafkaClient kafkaClient = networkClient.networkClient();
        if (kafkaClient instanceof NetworkClient) {
            NetworkClient networkClient2 = (NetworkClient)kafkaClient;
            networkClient2.enableDestinationClusterLink(super.linkData().linkId(), (ClientInterceptor)this.clientInterceptor.orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()), (ReverseNode.ConnectionProvider)this.reverseConnectionProvider(networkClient2, metadataManager, networkClient.clientId()).orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
            return;
        }
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode == null) {
            return;
        }
        if (connectionMode.equals(connectionMode$Inbound$)) {
            throw new IllegalStateException("Reverse connections are supported only with NetworkClient");
        }
    }

    @Override
    public void shutdown() {
        super.shutdown();
        this.localConnAdmin.foreach((Function1 & Serializable)x$1 -> {
            x$1.close(Duration.ZERO);
            return BoxedUnit.UNIT;
        });
    }

    public Option<ReverseNode.ConnectionProvider> reverseConnectionProvider(NetworkClient networkClient, Option<AdminMetadataManager> metadataManager, String clientId) {
        ConnectionMode connectionMode = this.currentConfig().connectionMode();
        ConnectionMode$Inbound$ connectionMode$Inbound$ = ConnectionMode$Inbound$.MODULE$;
        if (connectionMode != null && connectionMode.equals(connectionMode$Inbound$)) {
            None$ none$;
            None$ none$2 = none$ = None$.MODULE$;
            none$ = null;
            None$ x$4 = none$2;
            ReverseClient reverseClient = new ReverseClient(networkClient, metadataManager, (Option<ClusterLinkAdminClient>)x$4, clientId);
            ReverseNode.ConnectionProvider provider = node -> this.requestReverseConnection(this.nextReverseRequestId().incrementAndGet(), reverseClient, node);
            return new Some((Object)provider);
        }
        return None$.MODULE$;
    }

    @Override
    public void processReverseConnection(KafkaChannel channel2, ReverseNode reverseNode) {
        boolean bl;
        Option option;
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(53).append("Process reverse connection in destination cluster : ").append(channel2).append(" ").append(reverseNode).toString());
        this.ensureReverseConnectionsEnabled();
        if (!reverseNode.requestId().isPresent()) {
            option = this.reverseConnectionAdmin();
            bl = true;
        } else {
            option = Option$.MODULE$.apply((Object)this.connectionRequests().remove(reverseNode.requestId().get()));
            bl = false;
        }
        boolean bl2 = bl;
        Option option2 = option;
        if (option2 instanceof Some) {
            ReverseClient client = (ReverseClient)((Some)option2).value();
            Consumer<KafkaChannel> closeCallback = channel -> {
                this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(38).append("Reverse channel ").append(channel).append(" has been disconnected").toString());
                $this.metrics.reverseConnectionClosedSensor().record();
                this.activeReverseConnections().decrementAndGet();
                if (bl2 && this.persistentConnections().decrementAndGet() <= 0 && this.controller().isActive()) {
                    this.info((Function0<String>)(Function0 & Serializable)() -> "Persistent connection to source controller was disconnected, awaiting new connection.");
                    return;
                }
            };
            this.activeReverseConnections().incrementAndGet();
            if (bl2) {
                this.persistentConnections().incrementAndGet();
            }
            this.metrics.reverseConnectionCreatedSensor().record();
            ReverseChannel reverseChannel = new ReverseChannel(channel2, reverseNode, closeCallback);
            client.networkClient().reverseAndAdd(reverseChannel);
            client.bootstrapWithReverseChannel(reverseChannel, this.time.milliseconds());
            this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(64).append("Added reverse channel ").append(reverseChannel).append(" from source to network client, requestId=").append(reverseNode.requestId()).toString());
            return;
        }
        if (None$.MODULE$.equals(option2)) {
            throw new NetworkException("Reverse connection is no longer required");
        }
        throw new MatchError((Object)option2);
    }

    public Seq<CompletableFuture<Void>> initiateReverseConnections(InitiateReverseConnectionsRequest initiateConnectionRequest, RequestContext requestContext) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(70).append("Initiate or forward reverse connection request: localCluster=").append(this.super$localLogicalCluster()).append(" request=").append(initiateConnectionRequest).toString());
        this.ensureReverseConnectionsEnabled();
        InitiateReverseConnectionsRequestData connData = initiateConnectionRequest.data();
        List futures = (List)package$.MODULE$.List().fill(connData.entries().size(), (Function0 & Serializable)() -> new CompletableFuture());
        try {
            String string = super.localLogicalCluster();
            String string2 = connData.sourceClusterId();
            if (!(string != null ? !string.equals(string2) : string2 != null)) {
                throw new InvalidRequestException(new StringBuilder(70).append("Cannot initiate reverse connection from destination cluster ").append(super.localLogicalCluster()).append(" to itself").toString());
            }
            this.forwardToRemoteController(connData, (Seq<CompletableFuture<Void>>)futures);
        }
        catch (Throwable e) {
            this.error((Function0<String>)(Function0 & Serializable)() -> "Failing reverse connection request", (Function0<Throwable>)(Function0 & Serializable)() -> e);
            futures.foreach((Function1 & Serializable)x$3 -> BoxesRunTime.boxToBoolean((boolean)x$3.completeExceptionally(e)));
        }
        return futures;
    }

    private void requestReverseConnection(int requestId, ReverseClient client, Node node) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(66).append("Requesting reverse connection with requestId ").append(requestId).append(" to node ").append(node).append(" for client ").append(client.clientId()).toString());
        this.ensureReverseConnectionsEnabled();
        if (this.reverseConnectionAdmin().exists((Function1 & Serializable)x$4 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkDestConnectionManager.$anonfun$requestReverseConnection$2(client, x$4)))) {
            throw new NetworkException("Waiting for persistent connection");
        }
        InitiateReverseConnectionsRequestData.EntryData entry = new InitiateReverseConnectionsRequestData.EntryData().setInitiateRequestId(requestId).setSourceBrokerId(node.id()).setTargetBrokerId(this.brokerConfig.brokerId());
        InitiateReverseConnectionsRequestData requestData = new InitiateReverseConnectionsRequestData().setClusterLinkId(this.linkId()).setForwardToBroker(true).setTimeoutMs(Predef$.MODULE$.Integer2int(this.currentConfig().reverseConnectionSetupTimeoutMs())).setSourceClusterId((String)super.linkData().clusterId().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl())).setTargetClusterId(super.localLogicalCluster()).setEntries(Collections.singletonList(entry));
        if (this.controller().isActive()) {
            CompletableFuture future = new CompletableFuture();
            this.forwardToRemoteController(requestData, (Seq<CompletableFuture<Void>>)((Seq)package$.MODULE$.Seq().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new CompletableFuture[]{future}))));
            future.whenComplete((x$5, e) -> this.onCompletion$1((Throwable)e, requestId, client, node));
        } else {
            ((KafkaFutureImpl)ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)((ConfluentAdmin)this.localConnAdmin.getOrElse((Function0 & Serializable)() -> {
                throw new IllegalStateException("Connection admin not created");
            })), (InitiateReverseConnectionsRequestData)requestData, null).get(BoxesRunTime.boxToInteger((int)requestId))).whenComplete((x$6, e) -> this.onCompletion$1((Throwable)e, requestId, client, node));
        }
        this.connectionRequests().put(BoxesRunTime.boxToInteger((int)requestId), client);
    }

    private void forwardToRemoteController(InitiateReverseConnectionsRequestData requestData, Seq<CompletableFuture<Void>> futures) {
        ConfluentAdmin admin = (ConfluentAdmin)this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$7 -> x$7.adminClient().map((Function1 & Serializable)x$8 -> x$8.admin())).getOrElse((Function0 & Serializable)() -> {
            if (this.controller().isActive()) {
                throw new NetworkException("Request cannot be forwarded to remote controller at this time.");
            }
            throw new NotControllerException("Request cannot be forwarded to remote controller since this broker is not the controller.");
        });
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(66).append("Forward initiate reverse connection request to remote controller: ").append(requestData).toString());
        Map requestFutures = ConfluentAdminUtils.initiateReverseConnections((ConfluentAdmin)admin, (InitiateReverseConnectionsRequestData)requestData, null);
        ((IterableOnceOps)CollectionConverters$.MODULE$.ListHasAsScala(requestData.entries()).asScala().zip(futures)).foreach((Function1 & Serializable)x0$1 -> {
            if (x0$1 == null) {
                throw new MatchError(null);
            }
            InitiateReverseConnectionsRequestData.EntryData entry = (InitiateReverseConnectionsRequestData.EntryData)x0$1._1();
            CompletableFuture future = (CompletableFuture)x0$1._2();
            KafkaFuture kafkaFuture = ((KafkaFutureImpl)requestFutures.get(BoxesRunTime.boxToInteger((int)entry.initiateRequestId()))).whenComplete((x0$2, x1$1) -> {
                if (x1$1 != null) {
                    this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(57).append("Initiate reverse connection request failed for requestId=").append(entry.initiateRequestId()).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> x1$1);
                    future.completeExceptionally((Throwable)x1$1);
                    return;
                }
                this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(58).append("Completed InitiateReverseConnectionsRequest for requestId=").append(entry.initiateRequestId()).toString());
                future.complete(x0$2);
            });
            return kafkaFuture;
        });
    }

    @Override
    public void onControllerChange(boolean isActiveController) {
        Object object = this.stateChangeLock();
        synchronized (object) {
            if (this.reverseConnectionAdmin().isEmpty()) {
                if (isActiveController) {
                    this.resetReverseConnectionAdmin();
                }
            } else if (!isActiveController) {
                this.closeReverseConnectionAdmin();
            }
            return;
        }
    }

    @Override
    public void closeReverseConnectionAdmin() {
        this.reverseConnectionAdmin().flatMap((Function1 & Serializable)x$9 -> x$9.adminClient()).foreach((Function1 & Serializable)admin -> {
            CoreUtils$.MODULE$.swallow((Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> admin.close(), this, Level.WARN);
            return BoxedUnit.UNIT;
        });
        this.reverseConnectionAdmin_$eq((Option<ReverseClient>)None$.MODULE$);
    }

    @Override
    public void createReverseConnectionAdmin() {
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Recreate admin client used to initiate connection reversal requests");
        if (this.controller().isActive()) {
            ClusterLinkAdminClient admin = (ClusterLinkAdminClient)this.remoteAdminFactory.apply((Object)this.currentConfig(), (Object)this);
            this.reverseConnectionAdmin_$eq((Option<ReverseClient>)new Some((Object)new ReverseClient(admin.networkClient(), (Option<AdminMetadataManager>)new Some((Object)admin.metadataManager()), (Option<ClusterLinkAdminClient>)new Some((Object)admin), admin.clientId())));
        }
    }

    public Option<NetworkClient> reverseConnectionClient() {
        return this.reverseConnectionAdmin().map((Function1 & Serializable)x$10 -> x$10.networkClient());
    }

    @Override
    public int persistentConnectionCount() {
        if (this.controller().isActive()) {
            return this.persistentConnections().get();
        }
        return 0;
    }

    @Override
    public int reverseConnectionCount() {
        return this.activeReverseConnections().get();
    }

    public static final /* synthetic */ boolean $anonfun$requestReverseConnection$2(ReverseClient client$1, ReverseClient x$4) {
        return x$4.networkClient() == client$1.networkClient();
    }

    private final void onCompletion$1(Throwable e, int requestId$1, ReverseClient client$1, Node node$2) {
        if (e != null) {
            this.warn((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(50).append("Failed to create reverse connection for requestId=").append(requestId$1).toString(), (Function0<Throwable>)(Function0 & Serializable)() -> e);
            this.connectionRequests().remove(BoxesRunTime.boxToInteger((int)requestId$1));
            client$1.networkClient().processReverseConnectionFailure(node$2);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(50).append("Reverse connection has been created for requestId=").append(requestId$1).toString());
    }

    public ClusterLinkDestConnectionManager(ClusterLinkData linkData, ClusterLinkConfig initialConfig, String localLogicalCluster, Option<ClientInterceptor> clientInterceptor, ClusterLinkMetrics metrics, Function2<ClusterLinkConfig, ClusterLinkDestConnectionManager, ClusterLinkAdminClient> remoteAdminFactory, Option<ConfluentAdmin> localConnAdmin, KafkaController controller, KafkaConfig brokerConfig, Time time) {
        this.clientInterceptor = clientInterceptor;
        this.metrics = metrics;
        this.remoteAdminFactory = remoteAdminFactory;
        this.localConnAdmin = localConnAdmin;
        this.controller = controller;
        this.brokerConfig = brokerConfig;
        this.time = time;
        super(linkData, initialConfig, localLogicalCluster, metrics);
        this.connectionRequests = new ConcurrentHashMap();
        this.nextReverseRequestId = ClusterLinkDestConnectionManager$.MODULE$.NextReverseRequestId();
        this.persistentConnections = new AtomicInteger();
        this.activeReverseConnections = new AtomicInteger();
        this.reverseConnectionAdmin = None$.MODULE$;
        this.logIdent_$eq(new StringBuilder(44).append("[ClusterLinkDestConnectionManager-").append(super.linkData().linkName()).append("-broker-").append(brokerConfig.brokerId()).append("] ").toString());
    }
}

