/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.BrokerToControllerQueueItem;
import kafka.server.ControllerInformation;
import kafka.server.ControllerNodeProvider;
import kafka.server.KafkaConfig;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import scala.;
import scala.$less$colon$less$;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0005\u0005Uf\u0001\u0002\u0010 \u0001\u0011B\u0001b\u000e\u0001\u0003\u0002\u0003\u0006I\u0001\u000f\u0005\t}\u0001\u0011\t\u0019!C\u0001\u007f!Aa\t\u0001BA\u0002\u0013\u0005q\t\u0003\u0005N\u0001\t\u0005\t\u0015)\u0003A\u0011!q\u0005A!A!\u0002\u0013y\u0005\u0002\u0003,\u0001\u0005\u0003\u0005\u000b\u0011B,\t\u0011i\u0003!\u0011!Q\u0001\nmC\u0001B\u0018\u0001\u0003\u0002\u0003\u0006Ia\u0018\u0005\tE\u0002\u0011\t\u0011)A\u0005G\"A!\u000e\u0001B\u0001B\u0003%1\u000e\u0003\u0005w\u0001\t\u0005\t\u0015!\u0003x\u0011\u0015Q\b\u0001\"\u0001|\u0011\u001d\ti\u0001\u0001C\u0005\u0003\u001fA\u0011\"!\u0006\u0001\u0005\u0004%I!a\u0006\t\u0011\u0005E\u0002\u0001)A\u0005\u00033A\u0011\"a\r\u0001\u0005\u0004%I!!\u000e\t\u0011\u0005-\u0003\u0001)A\u0005\u0003oA\u0011\"!\u0014\u0001\u0001\u0004%\taH \t\u0015\u0005=\u0003\u00011A\u0005\u0002}\t\t\u0006C\u0004\u0002V\u0001\u0001\u000b\u0015\u0002!\t\u000f\u0005}\u0003\u0001\"\u0001\u0002b!9\u0011\u0011\u000e\u0001\u0005\n\u0005-\u0004bBA9\u0001\u0011\u0005\u00111\u000f\u0005\b\u0003s\u0002A\u0011AA>\u0011\u001d\t\u0019\t\u0001C!\u0003\u000bC\u0001\"!&\u0001\t\u0003y\u0012q\u0013\u0005\b\u0003S\u0003A\u0011IAV\u0011\u001d\ti\u000b\u0001C!\u0003WCA\"a,\u0001!\u0003\u0005\t\u0011!C\u0001\u0003c\u0013qD\u0011:pW\u0016\u0014Hk\\\"p]R\u0014x\u000e\u001c7feJ+\u0017/^3tiRC'/Z1e\u0015\t\u0001\u0013%\u0001\u0004tKJ4XM\u001d\u0006\u0002E\u0005)1.\u00194lC\u000e\u00011c\u0001\u0001&cA\u0011aeL\u0007\u0002O)\u0011\u0001&K\u0001\u0005kRLGN\u0003\u0002!U)\u0011!e\u000b\u0006\u0003Y5\na!\u00199bG\",'\"\u0001\u0018\u0002\u0007=\u0014x-\u0003\u00021O\t)\u0012J\u001c;fe\n\u0013xn[3s'\u0016tG\r\u00165sK\u0006$\u0007C\u0001\u001a6\u001b\u0005\u0019$B\u0001\u001b\"\u0003\u0015)H/\u001b7t\u0013\t14GA\u0004M_\u001e<\u0017N\\4\u0002)%t\u0017\u000e^5bY:+Go^8sW\u000ec\u0017.\u001a8u!\tID(D\u0001;\u0015\tY$&A\u0004dY&,g\u000e^:\n\u0005uR$aC&bM.\f7\t\\5f]R\fa$[:OKR<xN]6DY&,g\u000e\u001e$pej[7i\u001c8ue>dG.\u001a:\u0016\u0003\u0001\u0003\"!\u0011#\u000e\u0003\tS\u0011aQ\u0001\u0006g\u000e\fG.Y\u0005\u0003\u000b\n\u0013qAQ8pY\u0016\fg.\u0001\u0012jg:+Go^8sW\u000ec\u0017.\u001a8u\r>\u0014(l[\"p]R\u0014x\u000e\u001c7fe~#S-\u001d\u000b\u0003\u0011.\u0003\"!Q%\n\u0005)\u0013%\u0001B+oSRDq\u0001T\u0002\u0002\u0002\u0003\u0007\u0001)A\u0002yIE\nq$[:OKR<xN]6DY&,g\u000e\u001e$pej[7i\u001c8ue>dG.\u001a:!\u0003QqW\r^<pe.\u001cE.[3oi\u001a\u000b7\r^8ssB!\u0011\t\u0015*9\u0013\t\t&IA\u0005Gk:\u001cG/[8ocA\u00111\u000bV\u0007\u0002?%\u0011Qk\b\u0002\u0016\u0007>tGO]8mY\u0016\u0014\u0018J\u001c4pe6\fG/[8o\u0003=iW\r^1eCR\fW\u000b\u001d3bi\u0016\u0014\bCA\u001dY\u0013\tI&HA\u000bNC:,\u0018\r\\'fi\u0006$\u0017\r^1Va\u0012\fG/\u001a:\u0002-\r|g\u000e\u001e:pY2,'OT8eKB\u0013xN^5eKJ\u0004\"a\u0015/\n\u0005u{\"AF\"p]R\u0014x\u000e\u001c7fe:{G-\u001a)s_ZLG-\u001a:\u0002\r\r|gNZ5h!\t\u0019\u0006-\u0003\u0002b?\tY1*\u00194lC\u000e{gNZ5h\u0003\u0011!\u0018.\\3\u0011\u0005\u0011DW\"A3\u000b\u0005Q2'BA4+\u0003\u0019\u0019w.\\7p]&\u0011\u0011.\u001a\u0002\u0005)&lW-\u0001\u0006uQJ,\u0017\r\u001a(b[\u0016\u0004\"\u0001\\:\u000f\u00055\f\bC\u00018C\u001b\u0005y'B\u00019$\u0003\u0019a$o\\8u}%\u0011!OQ\u0001\u0007!J,G-\u001a4\n\u0005Q,(AB*ue&twM\u0003\u0002s\u0005\u0006q!/\u001a;ssRKW.Z8vi6\u001b\bCA!y\u0013\tI(I\u0001\u0003M_:<\u0017A\u0002\u001fj]&$h\b\u0006\t}{z|\u0018\u0011AA\u0002\u0003\u000b\t9!!\u0003\u0002\fA\u00111\u000b\u0001\u0005\u0006o1\u0001\r\u0001\u000f\u0005\u0006}1\u0001\r\u0001\u0011\u0005\u0006\u001d2\u0001\ra\u0014\u0005\u0006-2\u0001\ra\u0016\u0005\u000652\u0001\ra\u0017\u0005\u0006=2\u0001\ra\u0018\u0005\u0006E2\u0001\ra\u0019\u0005\u0006U2\u0001\ra\u001b\u0005\u0006m2\u0001\ra^\u0001\u0018[\u0006L(-\u001a*fg\u0016$h*\u001a;x_J\\7\t\\5f]R$2\u0001SA\t\u0011\u0019\t\u0019\"\u0004a\u0001%\u0006)2m\u001c8ue>dG.\u001a:J]\u001a|'/\\1uS>t\u0017\u0001\u0004:fcV,7\u000f^)vKV,WCAA\r!\u0019\tY\"a\n\u0002,5\u0011\u0011Q\u0004\u0006\u0005\u0003?\t\t#\u0001\u0006d_:\u001cWO\u001d:f]RT1\u0001KA\u0012\u0015\t\t)#\u0001\u0003kCZ\f\u0017\u0002BA\u0015\u0003;\u00111\u0003T5oW\u0016$'\t\\8dW&tw\rR3rk\u0016\u00042aUA\u0017\u0013\r\tyc\b\u0002\u001c\u0005J|7.\u001a:U_\u000e{g\u000e\u001e:pY2,'/U;fk\u0016LE/Z7\u0002\u001bI,\u0017/^3tiF+X-^3!\u0003A\t7\r^5wK\u000e{g\u000e\u001e:pY2,'/\u0006\u0002\u00028A1\u0011\u0011HA \u0003\u0007j!!a\u000f\u000b\t\u0005u\u0012QD\u0001\u0007CR|W.[2\n\t\u0005\u0005\u00131\b\u0002\u0010\u0003R|W.[2SK\u001a,'/\u001a8dKB!\u0011QIA$\u001b\u00051\u0017bAA%M\n!aj\u001c3f\u0003E\t7\r^5wK\u000e{g\u000e\u001e:pY2,'\u000fI\u0001\bgR\f'\u000f^3e\u0003-\u0019H/\u0019:uK\u0012|F%Z9\u0015\u0007!\u000b\u0019\u0006C\u0004M'\u0005\u0005\t\u0019\u0001!\u0002\u0011M$\u0018M\u001d;fI\u0002B3\u0001FA-!\r\t\u00151L\u0005\u0004\u0003;\u0012%\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002/\u0005\u001cG/\u001b<f\u0007>tGO]8mY\u0016\u0014\u0018\t\u001a3sKN\u001cHCAA2!\u0015\t\u0015QMA\"\u0013\r\t9G\u0011\u0002\u0007\u001fB$\u0018n\u001c8\u0002/U\u0004H-\u0019;f\u0007>tGO]8mY\u0016\u0014\u0018\t\u001a3sKN\u001cHc\u0001%\u0002n!9\u0011q\u000e\fA\u0002\u0005\r\u0013a\u00058fo\u0006\u001bG/\u001b<f\u0007>tGO]8mY\u0016\u0014\u0018aB3ocV,W/\u001a\u000b\u0004\u0011\u0006U\u0004bBA</\u0001\u0007\u00111F\u0001\be\u0016\fX/Z:u\u0003%\tX/Z;f'&TX-\u0006\u0002\u0002~A\u0019\u0011)a \n\u0007\u0005\u0005%IA\u0002J]R\f\u0001cZ3oKJ\fG/\u001a*fcV,7\u000f^:\u0015\u0005\u0005\u001d\u0005CBAE\u0003\u0017\u000by)\u0004\u0002\u0002\"%!\u0011QRA\u0011\u0005)\u0019u\u000e\u001c7fGRLwN\u001c\t\u0004M\u0005E\u0015bAAJO\tY\"+Z9vKN$\u0018I\u001c3D_6\u0004H.\u001a;j_:D\u0015M\u001c3mKJ\fa\u0002[1oI2,'+Z:q_:\u001cX\r\u0006\u0003\u0002\u001a\u0006\u0015Fc\u0001%\u0002\u001c\"9\u0011Q\u0014\u000eA\u0002\u0005}\u0015\u0001\u0003:fgB|gn]3\u0011\u0007e\n\t+C\u0002\u0002$j\u0012ab\u00117jK:$(+Z:q_:\u001cX\rC\u0004\u0002(j\u0001\r!a\u000b\u0002\u0013E,X-^3Ji\u0016l\u0017A\u00023p/>\u00148\u000eF\u0001I\u0003\u0015\u0019H/\u0019:u\u0003]\u0001(o\u001c;fGR,G\r\n8fi^|'o[\"mS\u0016tG\u000fF\u00029\u0003gCq\u0001T\u000f\u0002\u0002\u0003\u0007A\u0010")
public class BrokerToControllerRequestThread
extends InterBrokerSendThread
implements Logging {
    private boolean isNetworkClientForZkController;
    private final Function1<ControllerInformation, KafkaClient> networkClientFactory;
    private final ManualMetadataUpdater metadataUpdater;
    private final ControllerNodeProvider controllerNodeProvider;
    private final Time time;
    private final long retryTimeoutMs;
    private final LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue;
    private final AtomicReference<Node> activeController;
    private volatile boolean started;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public /* synthetic */ KafkaClient protected$networkClient(BrokerToControllerRequestThread x$1) {
        return x$1.networkClient;
    }

    public boolean isNetworkClientForZkController() {
        return this.isNetworkClientForZkController;
    }

    public void isNetworkClientForZkController_$eq(boolean x$1) {
        this.isNetworkClientForZkController = x$1;
    }

    private void maybeResetNetworkClient(ControllerInformation controllerInformation) {
        if (this.isNetworkClientForZkController() != controllerInformation.isZkController()) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(88).append("Controller changed to ").append((Object)(this.isNetworkClientForZkController() ? "kraft" : "zk")).append(" mode. ").append("Resetting network client with new controller information : ").append(controllerInformation).toString());
            KafkaClient oldClient = this.networkClient;
            oldClient.initiateClose();
            oldClient.close();
            this.isNetworkClientForZkController_$eq(controllerInformation.isZkController());
            this.updateControllerAddress((Node)controllerInformation.node().orNull((.less.colon.less)$less$colon$less$.MODULE$.refl()));
            controllerInformation.node().foreach((Function1 & Serializable)n -> {
                this.metadataUpdater.setNodes(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)n, (List)Nil$.MODULE$)).asJava());
                return BoxedUnit.UNIT;
            });
            this.networkClient = (KafkaClient)this.networkClientFactory.apply((Object)controllerInformation);
            return;
        }
    }

    private LinkedBlockingDeque<BrokerToControllerQueueItem> requestQueue() {
        return this.requestQueue;
    }

    private AtomicReference<Node> activeController() {
        return this.activeController;
    }

    public boolean started() {
        return this.started;
    }

    public void started_$eq(boolean x$1) {
        this.started = x$1;
    }

    public Option<Node> activeControllerAddress() {
        return Option$.MODULE$.apply((Object)this.activeController().get());
    }

    private void updateControllerAddress(Node newActiveController) {
        this.activeController().set(newActiveController);
    }

    public void enqueue(BrokerToControllerQueueItem request) {
        if (!this.started()) {
            throw new IllegalStateException("Cannot enqueue a request if the request thread is not running");
        }
        this.requestQueue().add(request);
        if (this.activeControllerAddress().isDefined()) {
            this.wakeup();
            return;
        }
    }

    public int queueSize() {
        return this.requestQueue().size();
    }

    public Collection<RequestAndCompletionHandler> generateRequests() {
        long currentTimeMs = this.time.milliseconds();
        Iterator<BrokerToControllerQueueItem> requestIter = this.requestQueue().iterator();
        while (requestIter.hasNext()) {
            BrokerToControllerQueueItem request = requestIter.next();
            if (currentTimeMs - request.createdTimeMs() >= this.retryTimeoutMs) {
                requestIter.remove();
                request.callback().onTimeout();
                continue;
            }
            Option<Node> controllerAddress = this.activeControllerAddress();
            if (!controllerAddress.isDefined()) continue;
            requestIter.remove();
            return Collections.singletonList(new RequestAndCompletionHandler(this.time.milliseconds(), (Node)controllerAddress.get(), request.request(), response -> this.handleResponse(request, response)));
        }
        return Collections.emptyList();
    }

    public void handleResponse(BrokerToControllerQueueItem queueItem, ClientResponse response) {
        this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(18).append("Request ").append(queueItem.request()).append(" received ").append(response).toString());
        if (response.authenticationException() != null) {
            this.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(59).append("Request ").append(queueItem.request()).append(" failed due to authentication error with controller").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> response.authenticationException());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.versionMismatch() != null) {
            this.error((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(48).append("Request ").append(queueItem.request()).append(" failed due to unsupported version error").toString(), (Function0<Throwable>)(Function0 & Serializable)() -> response.versionMismatch());
            queueItem.callback().onComplete(response);
            return;
        }
        if (response.wasDisconnected()) {
            this.updateControllerAddress(null);
            this.requestQueue().putFirst(queueItem);
            return;
        }
        if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(97).append("Request ").append(queueItem.request()).append(" received NOT_CONTROLLER exception. Disconnecting the ").append("connection to the stale controller ").append(this.activeControllerAddress().map((Function1 & Serializable)x$1 -> x$1.idString()).getOrElse((Function0 & Serializable)() -> "null")).toString());
            this.activeControllerAddress().foreach((Function1 & Serializable)controllerAddress -> {
                BrokerToControllerRequestThread.$anonfun$handleResponse$9(this, controllerAddress);
                return BoxedUnit.UNIT;
            });
            this.requestQueue().putFirst(queueItem);
            return;
        }
        queueItem.callback().onComplete(response);
    }

    public void doWork() {
        ControllerInformation controllerInformation = this.controllerNodeProvider.getControllerInfo();
        this.maybeResetNetworkClient(controllerInformation);
        if (this.activeControllerAddress().isDefined()) {
            super.pollOnce(Long.MAX_VALUE);
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable)() -> "Controller isn't cached, looking for local metadata changes");
        Option<Node> option = controllerInformation.node();
        if (option instanceof Some) {
            Node controllerNode = (Node)((Some)option).value();
            this.info((Function0<String>)(Function0 & Serializable)() -> new StringBuilder(51).append("Recorded new controller, from now on will use node ").append(controllerNode).toString());
            this.updateControllerAddress(controllerNode);
            this.metadataUpdater.setNodes(CollectionConverters$.MODULE$.SeqHasAsJava((Seq)new .colon.colon((Object)controllerNode, (List)Nil$.MODULE$)).asJava());
            return;
        }
        if (None$.MODULE$.equals(option)) {
            this.debug((Function0<String>)(Function0 & Serializable)() -> "No controller provided, retrying after backoff");
            super.pollOnce(100L);
            return;
        }
        throw new MatchError(option);
    }

    public void start() {
        super.start();
        this.started_$eq(true);
    }

    public static final /* synthetic */ void $anonfun$handleResponse$9(BrokerToControllerRequestThread $this, Node controllerAddress) {
        try {
            $this.protected$networkClient($this).disconnect(controllerAddress.idString());
        }
        catch (Throwable t) {
            $this.error((Function0<String>)(Function0 & Serializable)() -> "Had an error while disconnecting from NetworkClient.", (Function0<Throwable>)(Function0 & Serializable)() -> t);
        }
        $this.updateControllerAddress(null);
    }

    public BrokerToControllerRequestThread(KafkaClient initialNetworkClient, boolean isNetworkClientForZkController, Function1<ControllerInformation, KafkaClient> networkClientFactory, ManualMetadataUpdater metadataUpdater, ControllerNodeProvider controllerNodeProvider, KafkaConfig config, Time time, String threadName, long retryTimeoutMs) {
        this.isNetworkClientForZkController = isNetworkClientForZkController;
        this.networkClientFactory = networkClientFactory;
        this.metadataUpdater = metadataUpdater;
        this.controllerNodeProvider = controllerNodeProvider;
        this.time = time;
        this.retryTimeoutMs = retryTimeoutMs;
        super(threadName, initialNetworkClient, (int)Math.min(Integer.MAX_VALUE, Math.min((long)config.controllerSocketTimeoutMs(), retryTimeoutMs)), time, false);
        this.logIdent_$eq(this.logPrefix);
        this.requestQueue = new LinkedBlockingDeque();
        this.activeController = new AtomicReference<Object>(null);
        this.started = false;
    }
}

