/*
 * Decompiled with CFR 0.152.
 */
package kafka.common;

import com.typesafe.scalalogging.Logger;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import kafka.common.InterBrokerSendThread$;
import kafka.common.RequestAndCompletionHandler;
import kafka.common.UnsentRequests;
import kafka.utils.Logging;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.ShutdownableThread;
import scala.Function0;
import scala.Function1;
import scala.collection.Iterable;
import scala.collection.IterableLike;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.LongRef;

@ScalaSignature(bytes="\u0006\u0001\u0005}d!\u0002\u000e\u001c\u0003\u0003\u0001\u0003\u0002\u0003\u001b\u0001\u0005\u0003\u0005\u000b\u0011B\u001b\t\u0011\t\u0003!\u00111A\u0005\u0002\rC\u0001B\u0013\u0001\u0003\u0002\u0004%\ta\u0013\u0005\t%\u0002\u0011\t\u0011)Q\u0005\t\"Aq\u000b\u0001B\u0001B\u0003%\u0001\f\u0003\u0005\\\u0001\t\u0005\t\u0015!\u0003]\u0011!\u0011\u0007A!A!\u0002\u0013\u0019\u0007\"\u00024\u0001\t\u00039\u0007bB8\u0001\u0005\u0004%I\u0001\u001d\u0005\u0007i\u0002\u0001\u000b\u0011B9\t\u000bU\u0004a\u0011\u0001<\t\u000f\u0005\u001d\u0001\u0001\"\u0001\u0002\n!9\u00111\u0002\u0001\u0005B\u00055\u0001bBA\b\u0001\u0011%\u0011Q\u0002\u0005\b\u0003#\u0001A\u0011CA\n\u0011\u001d\ty\u0002\u0001C!\u0003\u001bAq!!\t\u0001\t\u0013\t\u0019\u0003C\u0004\u0002,\u0001!I!!\f\t\u000f\u0005E\u0002\u0001\"\u0003\u00024!9\u0011q\u0007\u0001\u0005\u0002\u0005e\u0002bBA,\u0001\u0011\u0005\u0011QB\u0004\n\u00033Z\u0012\u0011!E\u0001\u000372\u0001BG\u000e\u0002\u0002#\u0005\u0011Q\f\u0005\u0007M^!\t!!\u001a\t\u0013\u0005\u001dt#%A\u0005\u0002\u0005%$!F%oi\u0016\u0014(I]8lKJ\u001cVM\u001c3UQJ,\u0017\r\u001a\u0006\u00039u\taaY8n[>t'\"\u0001\u0010\u0002\u000b-\fgm[1\u0004\u0001M\u0019\u0001!\t\u0018\u0011\u0005\tbS\"A\u0012\u000b\u0005\u0011*\u0013\u0001B;uS2T!AJ\u0014\u0002\rM,'O^3s\u0015\tq\u0002F\u0003\u0002*U\u00051\u0011\r]1dQ\u0016T\u0011aK\u0001\u0004_J<\u0017BA\u0017$\u0005I\u0019\u0006.\u001e;e_^t\u0017M\u00197f)\"\u0014X-\u00193\u0011\u0005=\u0012T\"\u0001\u0019\u000b\u0005Ej\u0012!B;uS2\u001c\u0018BA\u001a1\u0005\u001daunZ4j]\u001e\fAA\\1nKB\u0011ag\u0010\b\u0003ou\u0002\"\u0001O\u001e\u000e\u0003eR!AO\u0010\u0002\rq\u0012xn\u001c;?\u0015\u0005a\u0014!B:dC2\f\u0017B\u0001 <\u0003\u0019\u0001&/\u001a3fM&\u0011\u0001)\u0011\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005yZ\u0014!\u00048fi^|'o[\"mS\u0016tG/F\u0001E!\t)\u0005*D\u0001G\u0015\t9u%A\u0004dY&,g\u000e^:\n\u0005%3%aC&bM.\f7\t\\5f]R\f\u0011C\\3uo>\u00148n\u00117jK:$x\fJ3r)\ta\u0005\u000b\u0005\u0002N\u001d6\t1(\u0003\u0002Pw\t!QK\\5u\u0011\u001d\t6!!AA\u0002\u0011\u000b1\u0001\u001f\u00132\u00039qW\r^<pe.\u001cE.[3oi\u0002B#\u0001\u0002+\u0011\u00055+\u0016B\u0001,<\u0005!1x\u000e\\1uS2,\u0017\u0001\u0005:fcV,7\u000f\u001e+j[\u0016|W\u000f^'t!\ti\u0015,\u0003\u0002[w\t\u0019\u0011J\u001c;\u0002\tQLW.\u001a\t\u0003;\u0002l\u0011A\u0018\u0006\u0003c}S!\u0001H\u0014\n\u0005\u0005t&\u0001\u0002+j[\u0016\fq\"[:J]R,'O];qi&\u0014G.\u001a\t\u0003\u001b\u0012L!!Z\u001e\u0003\u000f\t{w\u000e\\3b]\u00061A(\u001b8jiz\"b\u0001\u001b6lY6t\u0007CA5\u0001\u001b\u0005Y\u0002\"\u0002\u001b\t\u0001\u0004)\u0004\"\u0002\"\t\u0001\u0004!\u0005\"B,\t\u0001\u0004A\u0006\"B.\t\u0001\u0004a\u0006b\u00022\t!\u0003\u0005\raY\u0001\u000fk:\u001cXM\u001c;SKF,Xm\u001d;t+\u0005\t\bCA5s\u0013\t\u00198D\u0001\bV]N,g\u000e\u001e*fcV,7\u000f^:\u0002\u001fUt7/\u001a8u%\u0016\fX/Z:ug\u0002\n\u0001cZ3oKJ\fG/\u001a*fcV,7\u000f^:\u0015\u0003]\u0004B\u0001_?\u0002\u00029\u0011\u0011p\u001f\b\u0003qiL\u0011\u0001P\u0005\u0003yn\nq\u0001]1dW\u0006<W-\u0003\u0002\u007f\u007f\nA\u0011\n^3sC\ndWM\u0003\u0002}wA\u0019\u0011.a\u0001\n\u0007\u0005\u00151DA\u000eSKF,Xm\u001d;B]\u0012\u001cu.\u001c9mKRLwN\u001c%b]\u0012dWM]\u0001\u0012Q\u0006\u001cXK\\:f]R\u0014V-];fgR\u001cX#A2\u0002\u0011MDW\u000f\u001e3po:$\u0012\u0001T\u0001\u0017IJ\f\u0017N\\$f]\u0016\u0014\u0018\r^3e%\u0016\fX/Z:ug\u0006A\u0001o\u001c7m\u001f:\u001cW\rF\u0002M\u0003+Aq!a\u0006\u0010\u0001\u0004\tI\"\u0001\u0007nCb$\u0016.\\3pkRl5\u000fE\u0002N\u00037I1!!\b<\u0005\u0011auN\\4\u0002\r\u0011|wk\u001c:l\u00031\u0019XM\u001c3SKF,Xm\u001d;t)\u0019\tI\"!\n\u0002*!9\u0011qE\tA\u0002\u0005e\u0011a\u00018po\"9\u0011qC\tA\u0002\u0005e\u0011\u0001E2iK\u000e\\G)[:d_:tWm\u0019;t)\ra\u0015q\u0006\u0005\b\u0003O\u0011\u0002\u0019AA\r\u0003M1\u0017-\u001b7FqBL'/\u001a3SKF,Xm\u001d;t)\ra\u0015Q\u0007\u0005\b\u0003O\u0019\u0002\u0019AA\r\u0003Y\u0019w.\u001c9mKR,w+\u001b;i\t&\u001c8m\u001c8oK\u000e$Hc\u0002'\u0002<\u0005\u0015\u0013q\t\u0005\b\u0003{!\u0002\u0019AA \u0003\u001d\u0011X-];fgR\u00042!RA!\u0013\r\t\u0019E\u0012\u0002\u000e\u00072LWM\u001c;SKF,Xm\u001d;\t\u000f\u0005\u001dB\u00031\u0001\u0002\u001a!9\u0011\u0011\n\u000bA\u0002\u0005-\u0013aF1vi\",g\u000e^5dCRLwN\\#yG\u0016\u0004H/[8o!\u0011\ti%a\u0015\u000e\u0005\u0005=#bAA)?\u00061QM\u001d:peNLA!!\u0016\u0002P\t9\u0012)\u001e;iK:$\u0018nY1uS>tW\t_2faRLwN\\\u0001\u0007o\u0006\\W-\u001e9\u0002+%sG/\u001a:Ce>\\WM]*f]\u0012$\u0006N]3bIB\u0011\u0011nF\n\u0004/\u0005}\u0003cA'\u0002b%\u0019\u00111M\u001e\u0003\r\u0005s\u0017PU3g)\t\tY&A\u000e%Y\u0016\u001c8/\u001b8ji\u0012:'/Z1uKJ$C-\u001a4bk2$H%N\u000b\u0003\u0003WR3aYA7W\t\ty\u0007\u0005\u0003\u0002r\u0005mTBAA:\u0015\u0011\t)(a\u001e\u0002\u0013Ut7\r[3dW\u0016$'bAA=w\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u0005u\u00141\u000f\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007")
public abstract class InterBrokerSendThread
extends ShutdownableThread
implements Logging {
    private volatile KafkaClient networkClient;
    private final int requestTimeoutMs;
    private final Time time;
    private final UnsentRequests unsentRequests;
    private Logger logger;
    private String logIdent;
    private volatile boolean bitmap$0;

    public static boolean $lessinit$greater$default$5() {
        return InterBrokerSendThread$.MODULE$.$lessinit$greater$default$5();
    }

    @Override
    public String loggerName() {
        return Logging.loggerName$(this);
    }

    @Override
    public String msgWithLogIdent(String msg) {
        return Logging.msgWithLogIdent$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg) {
        Logging.trace$(this, msg);
    }

    @Override
    public void trace(Function0<String> msg, Function0<Throwable> e) {
        Logging.trace$(this, msg, e);
    }

    @Override
    public boolean isDebugEnabled() {
        return Logging.isDebugEnabled$(this);
    }

    @Override
    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    @Override
    public void debug(Function0<String> msg) {
        Logging.debug$(this, msg);
    }

    @Override
    public void debug(Function0<String> msg, Function0<Throwable> e) {
        Logging.debug$(this, msg, e);
    }

    @Override
    public void info(Function0<String> msg) {
        Logging.info$(this, msg);
    }

    @Override
    public void info(Function0<String> msg, Function0<Throwable> e) {
        Logging.info$(this, msg, e);
    }

    @Override
    public void warn(Function0<String> msg) {
        Logging.warn$(this, msg);
    }

    @Override
    public void warn(Function0<String> msg, Function0<Throwable> e) {
        Logging.warn$(this, msg, e);
    }

    @Override
    public void error(Function0<String> msg) {
        Logging.error$(this, msg);
    }

    @Override
    public void error(Function0<String> msg, Function0<Throwable> e) {
        Logging.error$(this, msg, e);
    }

    @Override
    public void fatal(Function0<String> msg) {
        Logging.fatal$(this, msg);
    }

    @Override
    public void fatal(Function0<String> msg, Function0<Throwable> e) {
        Logging.fatal$(this, msg, e);
    }

    private Logger logger$lzycompute() {
        synchronized (this) {
            if (!this.bitmap$0) {
                this.logger = Logging.logger$(this);
                this.bitmap$0 = true;
            }
        }
        return this.logger;
    }

    @Override
    public Logger logger() {
        if (!this.bitmap$0) {
            return this.logger$lzycompute();
        }
        return this.logger;
    }

    @Override
    public String logIdent() {
        return this.logIdent;
    }

    @Override
    public void logIdent_$eq(String x$1) {
        this.logIdent = x$1;
    }

    public KafkaClient networkClient() {
        return this.networkClient;
    }

    public void networkClient_$eq(KafkaClient x$1) {
        this.networkClient = x$1;
    }

    private UnsentRequests unsentRequests() {
        return this.unsentRequests;
    }

    public abstract Iterable<RequestAndCompletionHandler> generateRequests();

    public boolean hasUnsentRequests() {
        return this.unsentRequests().iterator().hasNext();
    }

    public void shutdown() {
        this.initiateShutdown();
        this.networkClient().initiateClose();
        this.awaitShutdown();
        this.networkClient().close();
    }

    private void drainGeneratedRequests() {
        this.generateRequests().foreach((Function1 & Serializable & scala.Serializable)request -> {
            InterBrokerSendThread.$anonfun$drainGeneratedRequests$1(this, request);
            return BoxedUnit.UNIT;
        });
    }

    public void pollOnce(long maxTimeoutMs) {
        try {
            this.drainGeneratedRequests();
            long now = this.time.milliseconds();
            long timeout = this.sendRequests(now, maxTimeoutMs);
            this.networkClient().poll(timeout, now);
            now = this.time.milliseconds();
            this.checkDisconnects(now);
            this.failExpiredRequests(now);
            this.unsentRequests().clean();
            return;
        }
        catch (Throwable throwable) {
            if (throwable instanceof DisconnectException && !this.networkClient().active()) {
                return;
            }
            if (throwable instanceof FatalExitError) {
                throw (FatalExitError)throwable;
            }
            if (throwable != null) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "unhandled exception caught in InterBrokerSendThread", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
                throw new FatalExitError();
            }
            throw null;
        }
    }

    public void doWork() {
        this.pollOnce(Long.MAX_VALUE);
    }

    private long sendRequests(long now, long maxTimeoutMs) {
        LongRef pollTimeout = LongRef.create((long)maxTimeoutMs);
        ((IterableLike)CollectionConverters$.MODULE$.asScalaSetConverter(this.unsentRequests().nodes()).asScala()).foreach((Function1 & Serializable & scala.Serializable)node -> {
            InterBrokerSendThread.$anonfun$sendRequests$1(this, now, pollTimeout, node);
            return BoxedUnit.UNIT;
        });
        return pollTimeout.elem;
    }

    /*
     * WARNING - void declaration
     */
    private void checkDisconnects(long now) {
        Iterator<Map.Entry<Node, ArrayDeque<ClientRequest>>> iterator = this.unsentRequests().iterator();
        while (iterator.hasNext()) {
            void var6_5;
            void var5_4;
            Map.Entry<Node, ArrayDeque<ClientRequest>> entry = iterator.next();
            ArrayDeque<ClientRequest> arrayDeque = entry.getValue();
            Node node = entry.getKey();
            ArrayDeque<ClientRequest> requests = arrayDeque;
            void node2 = var5_4;
            void requests2 = var6_5;
            if (requests2.isEmpty() || !this.networkClient().connectionFailed((Node)node2)) continue;
            iterator.remove();
            ((IterableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)requests2).asScala()).foreach(arg_0 -> InterBrokerSendThread.$anonfun$checkDisconnects$1$adapted(this, (Node)node2, now, arg_0));
        }
    }

    private void failExpiredRequests(long now) {
        Collection<ClientRequest> timedOutRequests = this.unsentRequests().removeAllTimedOut(now);
        ((IterableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(timedOutRequests).asScala()).foreach((Function1 & Serializable & scala.Serializable)request -> {
            InterBrokerSendThread.$anonfun$failExpiredRequests$1(this, now, request);
            return BoxedUnit.UNIT;
        });
    }

    public void completeWithDisconnect(ClientRequest request, long now, AuthenticationException authenticationException) {
        RequestCompletionHandler handler = request.callback();
        handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().latestAllowedVersion()), handler, request.destination(), now, now, true, null, authenticationException, null));
    }

    public void wakeup() {
        this.networkClient().wakeup();
    }

    public static final /* synthetic */ void $anonfun$drainGeneratedRequests$1(InterBrokerSendThread $this, RequestAndCompletionHandler request) {
        $this.unsentRequests().put(request.destination(), $this.networkClient().newClientRequest(request.destination().idString(), request.request(), request.creationTimeMs(), true, $this.requestTimeoutMs, request.handler()));
    }

    public static final /* synthetic */ void $anonfun$sendRequests$1(InterBrokerSendThread $this, long now$1, LongRef pollTimeout$1, Node node) {
        Iterator<ClientRequest> requestIterator = $this.unsentRequests().requestIterator(node);
        while (requestIterator.hasNext()) {
            ClientRequest request = requestIterator.next();
            if ($this.networkClient().ready(node, now$1)) {
                $this.networkClient().send(request, now$1);
                requestIterator.remove();
                continue;
            }
            pollTimeout$1.elem = Math.min(pollTimeout$1.elem, $this.networkClient().connectionDelay(node, now$1));
        }
    }

    public static final /* synthetic */ void $anonfun$checkDisconnects$1(InterBrokerSendThread $this, Node node$1, long now$2, ClientRequest request) {
        AuthenticationException authenticationException = $this.networkClient().authenticationException(node$1);
        if (authenticationException != null) {
            $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Failed to send the following request due to authentication error: ").append(request).toString());
        }
        $this.completeWithDisconnect(request, now$2, authenticationException);
    }

    public static final /* synthetic */ void $anonfun$failExpiredRequests$1(InterBrokerSendThread $this, long now$3, ClientRequest request) {
        $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(48).append("Failed to send the following request after ").append(request.requestTimeoutMs()).append(" ms: ").append(request).toString());
        $this.completeWithDisconnect(request, now$3, null);
    }

    public InterBrokerSendThread(String name, KafkaClient networkClient, int requestTimeoutMs, Time time, boolean isInterruptible) {
        this.networkClient = networkClient;
        this.requestTimeoutMs = requestTimeoutMs;
        this.time = time;
        super(name, isInterruptible);
        Logging.$init$(this);
        this.logIdent_$eq(this.logPrefix);
        this.unsentRequests = new UnsentRequests();
    }

    public static final /* synthetic */ Object $anonfun$checkDisconnects$1$adapted(InterBrokerSendThread $this, Node node$1, long now$2, ClientRequest request) {
        InterBrokerSendThread.$anonfun$checkDisconnects$1($this, node$1, now$2, request);
        return BoxedUnit.UNIT;
    }
}

