/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.KafkaConfig;
import kafka.server.NodeToControllerQueueItem;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0005\u0005Uf\u0001\u0002\u0010 \u0001\u0011B\u0001b\u000e\u0001\u0003\u0002\u0003\u0006I\u0001\u000f\u0005\t}\u0001\u0011\t\u0019!C\u0001\u007f!Aa\t\u0001BA\u0002\u0013\u0005q\t\u0003\u0005N\u0001\t\u0005\t\u0015)\u0003A\u0011!q\u0005A!A!\u0002\u0013y\u0005\u0002\u0003,\u0001\u0005\u0003\u0005\u000b\u0011B,\t\u0011i\u0003!\u0011!Q\u0001\nmC\u0001B\u0018\u0001\u0003\u0002\u0003\u0006Ia\u0018\u0005\tE\u0002\u0011\t\u0011)A\u0005G\"A!\u000e\u0001B\u0001B\u0003%1\u000e\u0003\u0005w\u0001\t\u0005\t\u0015!\u0003x\u0011\u0015Q\b\u0001\"\u0001|\u0011\u001d\ti\u0001\u0001C\u0005\u0003\u001fA\u0011\"!\u0006\u0001\u0005\u0004%I!a\u0006\t\u0011\u0005E\u0002\u0001)A\u0005\u00033A\u0011\"a\r\u0001\u0005\u0004%I!!\u000e\t\u0011\u0005-\u0003\u0001)A\u0005\u0003oA\u0011\"!\u0014\u0001\u0001\u0004%\taH \t\u0015\u0005=\u0003\u00011A\u0005\u0002}\t\t\u0006C\u0004\u0002V\u0001\u0001\u000b\u0015\u0002!\t\u000f\u0005}\u0003\u0001\"\u0001\u0002b!9\u0011\u0011\u000e\u0001\u0005\n\u0005-\u0004bBA9\u0001\u0011\u0005\u00111\u000f\u0005\b\u0003s\u0002A\u0011AA>\u0011\u001d\t\u0019\t\u0001C!\u0003\u000bC\u0001\"!&\u0001\t\u0003y\u0012q\u0013\u0005\b\u0003S\u0003A\u0011IAV\u0011\u001d\ti\u000b\u0001C!\u0003WCA\"a,\u0001!\u0003\u0005\t\u0011!C\u0001\u0003c\u0013QDT8eKR{7i\u001c8ue>dG.\u001a:SKF,Xm\u001d;UQJ,\u0017\r\u001a\u0006\u0003A\u0005\naa]3sm\u0016\u0014(\"\u0001\u0012\u0002\u000b-\fgm[1\u0004\u0001M\u0019\u0001!J\u0019\u0011\u0005\u0019zS\"A\u0014\u000b\u0005!J\u0013\u0001B;uS2T!\u0001\t\u0016\u000b\u0005\tZ#B\u0001\u0017.\u0003\u0019\t\u0007/Y2iK*\ta&A\u0002pe\u001eL!\u0001M\u0014\u0003+%sG/\u001a:Ce>\\WM]*f]\u0012$\u0006N]3bIB\u0011!'N\u0007\u0002g)\u0011A'I\u0001\u0006kRLGn]\u0005\u0003mM\u0012q\u0001T8hO&tw-\u0001\u000bj]&$\u0018.\u00197OKR<xN]6DY&,g\u000e\u001e\t\u0003sqj\u0011A\u000f\u0006\u0003w)\nqa\u00197jK:$8/\u0003\u0002>u\tY1*\u00194lC\u000ec\u0017.\u001a8u\u0003yI7OT3uo>\u00148n\u00117jK:$hi\u001c:[W\u000e{g\u000e\u001e:pY2,'/F\u0001A!\t\tE)D\u0001C\u0015\u0005\u0019\u0015!B:dC2\f\u0017BA#C\u0005\u001d\u0011un\u001c7fC:\f!%[:OKR<xN]6DY&,g\u000e\u001e$pej[7i\u001c8ue>dG.\u001a:`I\u0015\fHC\u0001%L!\t\t\u0015*\u0003\u0002K\u0005\n!QK\\5u\u0011\u001da5!!AA\u0002\u0001\u000b1\u0001\u001f\u00132\u0003}I7OT3uo>\u00148n\u00117jK:$hi\u001c:[W\u000e{g\u000e\u001e:pY2,'\u000fI\u0001\u0015]\u0016$xo\u001c:l\u00072LWM\u001c;GC\u000e$xN]=\u0011\t\u0005\u0003&\u000bO\u0005\u0003#\n\u0013\u0011BR;oGRLwN\\\u0019\u0011\u0005M#V\"A\u0010\n\u0005U{\"!F\"p]R\u0014x\u000e\u001c7fe&sgm\u001c:nCRLwN\\\u0001\u0010[\u0016$\u0018\rZ1uCV\u0003H-\u0019;feB\u0011\u0011\bW\u0005\u00033j\u0012Q#T1ok\u0006dW*\u001a;bI\u0006$\u0018-\u00169eCR,'/\u0001\fd_:$(o\u001c7mKJtu\u000eZ3Qe>4\u0018\u000eZ3s!\t\u0019F,\u0003\u0002^?\t12i\u001c8ue>dG.\u001a:O_\u0012,\u0007K]8wS\u0012,'/\u0001\u0004d_:4\u0017n\u001a\t\u0003'\u0002L!!Y\u0010\u0003\u0017-\u000bgm[1D_:4\u0017nZ\u0001\u0005i&lW\r\u0005\u0002eQ6\tQM\u0003\u00025M*\u0011qMK\u0001\u0007G>lWn\u001c8\n\u0005%,'\u0001\u0002+j[\u0016\f!\u0002\u001e5sK\u0006$g*Y7f!\ta7O\u0004\u0002ncB\u0011aNQ\u0007\u0002_*\u0011\u0001oI\u0001\u0007yI|w\u000e\u001e \n\u0005I\u0014\u0015A\u0002)sK\u0012,g-\u0003\u0002uk\n11\u000b\u001e:j]\u001eT!A\u001d\"\u0002\u001dI,GO]=US6,w.\u001e;NgB\u0011\u0011\t_\u0005\u0003s\n\u0013A\u0001T8oO\u00061A(\u001b8jiz\"\u0002\u0003`?\u007f\u007f\u0006\u0005\u00111AA\u0003\u0003\u000f\tI!a\u0003\u0011\u0005M\u0003\u0001\"B\u001c\r\u0001\u0004A\u0004\"\u0002 \r\u0001\u0004\u0001\u0005\"\u0002(\r\u0001\u0004y\u0005\"\u0002,\r\u0001\u00049\u0006\"\u0002.\r\u0001\u0004Y\u0006\"\u00020\r\u0001\u0004y\u0006\"\u00022\r\u0001\u0004\u0019\u0007\"\u00026\r\u0001\u0004Y\u0007\"\u0002<\r\u0001\u00049\u0018aF7bs\n,'+Z:fi:+Go^8sW\u000ec\u0017.\u001a8u)\rA\u0015\u0011\u0003\u0005\u0007\u0003'i\u0001\u0019\u0001*\u0002+\r|g\u000e\u001e:pY2,'/\u00138g_Jl\u0017\r^5p]\u0006a!/Z9vKN$\u0018+^3vKV\u0011\u0011\u0011\u0004\t\u0007\u00037\t9#a\u000b\u000e\u0005\u0005u!\u0002BA\u0010\u0003C\t!bY8oGV\u0014(/\u001a8u\u0015\rA\u00131\u0005\u0006\u0003\u0003K\tAA[1wC&!\u0011\u0011FA\u000f\u0005Ma\u0015N\\6fI\ncwnY6j]\u001e$U-];f!\r\u0019\u0016QF\u0005\u0004\u0003_y\"!\u0007(pI\u0016$vnQ8oiJ|G\u000e\\3s#V,W/Z%uK6\fQB]3rk\u0016\u001cH/U;fk\u0016\u0004\u0013\u0001E1di&4XmQ8oiJ|G\u000e\\3s+\t\t9\u0004\u0005\u0004\u0002:\u0005}\u00121I\u0007\u0003\u0003wQA!!\u0010\u0002\u001e\u00051\u0011\r^8nS\u000eLA!!\u0011\u0002<\ty\u0011\t^8nS\u000e\u0014VMZ3sK:\u001cW\r\u0005\u0003\u0002F\u0005\u001dS\"\u00014\n\u0007\u0005%cM\u0001\u0003O_\u0012,\u0017!E1di&4XmQ8oiJ|G\u000e\\3sA\u000591\u000f^1si\u0016$\u0017aC:uCJ$X\rZ0%KF$2\u0001SA*\u0011\u001da5#!AA\u0002\u0001\u000b\u0001b\u001d;beR,G\r\t\u0015\u0004)\u0005e\u0003cA!\u0002\\%\u0019\u0011Q\f\"\u0003\u0011Y|G.\u0019;jY\u0016\fq#Y2uSZ,7i\u001c8ue>dG.\u001a:BI\u0012\u0014Xm]:\u0015\u0005\u0005\r\u0004#B!\u0002f\u0005\r\u0013bAA4\u0005\n1q\n\u001d;j_:\fq#\u001e9eCR,7i\u001c8ue>dG.\u001a:BI\u0012\u0014Xm]:\u0015\u0007!\u000bi\u0007C\u0004\u0002pY\u0001\r!a\u0011\u0002'9,w/Q2uSZ,7i\u001c8ue>dG.\u001a:\u0002\u000f\u0015t\u0017/^3vKR\u0019\u0001*!\u001e\t\u000f\u0005]t\u00031\u0001\u0002,\u00059!/Z9vKN$\u0018!C9vKV,7+\u001b>f+\t\ti\bE\u0002B\u0003\u007fJ1!!!C\u0005\rIe\u000e^\u0001\u0011O\u0016tWM]1uKJ+\u0017/^3tiN$\"!a\"\u0011\r\u0005%\u00151RAH\u001b\t\t\t#\u0003\u0003\u0002\u000e\u0006\u0005\"AC\"pY2,7\r^5p]B\u0019a%!%\n\u0007\u0005MuEA\u000eSKF,Xm\u001d;B]\u0012\u001cu.\u001c9mKRLwN\u001c%b]\u0012dWM]\u0001\u000fQ\u0006tG\r\\3SKN\u0004xN\\:f)\u0011\tI*!*\u0015\u0007!\u000bY\nC\u0004\u0002\u001ej\u0001\r!a(\u0002\u0011I,7\u000f]8og\u0016\u00042!OAQ\u0013\r\t\u0019K\u000f\u0002\u000f\u00072LWM\u001c;SKN\u0004xN\\:f\u0011\u001d\t9K\u0007a\u0001\u0003W\t\u0011\"];fk\u0016LE/Z7\u0002\r\u0011|wk\u001c:l)\u0005A\u0015!B:uCJ$\u0018a\u00069s_R,7\r^3eI9,Go^8sW\u000ec\u0017.\u001a8u)\rA\u00141\u0017\u0005\b\u0019v\t\t\u00111\u0001}\u0001")
public class NodeToControllerRequestThread
extends InterBrokerSendThread
implements Logging {
    private boolean isNetworkClientForZkController;
    private final Function1<ControllerInformation, KafkaClient> networkClientFactory;
    private final ManualMetadataUpdater metadataUpdater;
    private final ControllerNodeProvider controllerNodeProvider;
    private final Time time;
    private final long retryTimeoutMs;
    private final LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue;
    private final AtomicReference<Node> activeController;
    private volatile boolean started;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public /* synthetic */ KafkaClient protected$networkClient(NodeToControllerRequestThread x$1) {
        return x$1.networkClient;
    }

    public boolean isNetworkClientForZkController() {
        return this.isNetworkClientForZkController;
    }

    public void isNetworkClientForZkController_$eq(boolean x$1) {
        this.isNetworkClientForZkController = x$1;
    }

    private void maybeResetNetworkClient(ControllerInformation controllerInformation) {
        if (this.isNetworkClientForZkController() != controllerInformation.isZkController()) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(88).append("Controller changed to ").append((Object)(this.isNetworkClientForZkController() ? "kraft" : "zk")).append(" mode. ").append("Resetting network client with new controller information : ").append(controllerInformation).toString());
            KafkaClient oldClient = this.networkClient;
            oldClient.initiateClose();
            oldClient.close();
            this.isNetworkClientForZkController_$eq(controllerInformation.isZkController());
            this.updateControllerAddress((Node)controllerInformation.node().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
            controllerInformation.node().foreach((Function1 & Serializable)n -> {
                this.metadataUpdater.setNodes(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)n, (List)Nil$.MODULE$)).asJava());
                return BoxedUnit.UNIT;
            });
            this.networkClient = (KafkaClient)this.networkClientFactory.apply((Object)controllerInformation);
            return;
        }
    }

    private LinkedBlockingDeque<NodeToControllerQueueItem> requestQueue() {
        return this.requestQueue;
    }

    private AtomicReference<Node> activeController() {
        return this.activeController;
    }

    public boolean started() {
        return this.started;
    }

    public void started_$eq(boolean x$1) {
        this.started = x$1;
    }

    public Option<Node> activeControllerAddress() {
        return Option$.MODULE$.apply((Object)this.activeController().get());
    }

    private void updateControllerAddress(Node newActiveController) {
        this.activeController().set(newActiveController);
    }

    public void enqueue(NodeToControllerQueueItem request) {
        if (!this.started()) {
            throw new IllegalStateException("Cannot enqueue a request if the request thread is not running");
        }
        this.requestQueue().add(request);
        if (this.activeControllerAddress().isDefined()) {
            this.wakeup();
            return;
        }
    }

    public int queueSize() {
        return this.requestQueue().size();
    }

    public Collection<RequestAndCompletionHandler> generateRequests() {
        long currentTimeMs = this.time.milliseconds();
        Iterator<NodeToControllerQueueItem> requestIter = this.requestQueue().iterator();
        while (requestIter.hasNext()) {
            NodeToControllerQueueItem request = requestIter.next();
            if (currentTimeMs - request.createdTimeMs() >= this.retryTimeoutMs) {
                requestIter.remove();
                request.callback().onTimeout();
                continue;
            }
            Option<Node> controllerAddress = this.activeControllerAddress();
            if (!controllerAddress.isDefined()) continue;
            requestIter.remove();
            return Collections.singletonList(new RequestAndCompletionHandler(this.time.milliseconds(), (Node)controllerAddress.get(), request.request(), response -> this.handleResponse(request, response)));
        }
        return Collections.emptyList();
    }

    public void handleResponse(NodeToControllerQueueItem queueItem, ClientResponse response) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(18).append("Request ").append(queueItem.request()).append(" received ").append(response).toString());
        if (response.authenticationException() != null) {
            this.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(59).append("Request ").append(queueItem.request()).append(" failed due to authentication error with controller").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> response.authenticationException());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.versionMismatch() != null) {
            this.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(48).append("Request ").append(queueItem.request()).append(" failed due to unsupported version error").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> response.versionMismatch());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.wasDisconnected()) {
            this.updateControllerAddress(null);
            this.requestQueue().putFirst(queueItem);
            return;
        }
        if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(97).append("Request ").append(queueItem.request()).append(" received NOT_CONTROLLER exception. Disconnecting the ").append("connection to the stale controller ").append(this.activeControllerAddress().map((Function1 & Serializable)x$1 -> x$1.idString()).getOrElse((Function0 & Serializable)() -> "null")).toString());
            this.activeControllerAddress().foreach((Function1 & Serializable)controllerAddress -> {
                NodeToControllerRequestThread.$anonfun$handleResponse$9(this, controllerAddress);
                return BoxedUnit.UNIT;
            });
            this.requestQueue().putFirst(queueItem);
            return;
        }
        queueItem.callback().onComplete(response);
    }

    public void doWork() {
        ControllerInformation controllerInformation = this.controllerNodeProvider.getControllerInfo();
        this.maybeResetNetworkClient(controllerInformation);
        if (this.activeControllerAddress().isDefined()) {
            super.pollOnce(Long.MAX_VALUE);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Controller isn't cached, looking for local metadata changes");
        Option<Node> option = controllerInformation.node();
        if (option instanceof Some) {
            Node controllerNode = (Node)((Some)option).value();
            this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(51).append("Recorded new controller, from now on will use node ").append(controllerNode).toString());
            this.updateControllerAddress(controllerNode);
            this.metadataUpdater.setNodes(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)controllerNode, (List)Nil$.MODULE$)).asJava());
            return;
        }
        if (None$.MODULE$.equals(option)) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> "No controller provided, retrying after backoff");
            super.pollOnce(100L);
            return;
        }
        throw new MatchError(option);
    }

    public void start() {
        super.start();
        this.started_$eq(true);
    }

    public static final /* synthetic */ void $anonfun$handleResponse$9(NodeToControllerRequestThread $this, Node controllerAddress) {
        try {
            $this.protected$networkClient($this).disconnect(controllerAddress.idString());
        }
        catch (Throwable t) {
            $this.error((Function0<String>)(Function0 & Serializable)() -> "Had an error while disconnecting from NetworkClient.", (Function0<Throwable>)(Function0 & Serializable)() -> t);
        }
        $this.updateControllerAddress(null);
    }

    public NodeToControllerRequestThread(KafkaClient initialNetworkClient, boolean isNetworkClientForZkController, Function1<ControllerInformation, KafkaClient> networkClientFactory, ManualMetadataUpdater metadataUpdater, ControllerNodeProvider controllerNodeProvider, KafkaConfig config, Time time, String threadName, long retryTimeoutMs) {
        this.isNetworkClientForZkController = isNetworkClientForZkController;
        this.networkClientFactory = networkClientFactory;
        this.metadataUpdater = metadataUpdater;
        this.controllerNodeProvider = controllerNodeProvider;
        this.time = time;
        this.retryTimeoutMs = retryTimeoutMs;
        super(threadName, initialNetworkClient, (int)Math.min(Integer.MAX_VALUE, Math.min((long)config.controllerSocketTimeoutMs(), retryTimeoutMs)), time, false);
        this.logIdent_$eq(this.logPrefix);
        this.requestQueue = new LinkedBlockingDeque();
        this.activeController = new AtomicReference<Object>(null);
        this.started = false;
    }
}

