/*
 * Decompiled with CFR 0.152.
 */
package kafka.controller;

import java.util.concurrent.BlockingQueue;
import kafka.api.LeaderAndIsrResponse$;
import kafka.api.RequestKeys$;
import kafka.api.RequestOrResponse;
import kafka.api.StopReplicaResponse$;
import kafka.api.UpdateMetadataResponse$;
import kafka.cluster.Broker;
import kafka.controller.ControllerContext;
import kafka.controller.KafkaController$;
import kafka.network.BlockingChannel;
import kafka.network.Receive;
import kafka.utils.ShutdownableThread;
import kafka.utils.ShutdownableThread$;
import kafka.utils.Utils$;
import org.apache.log4j.Logger;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001\u0005\u0015a\u0001B\u0001\u0003\u0001\u001d\u0011\u0011CU3rk\u0016\u001cHoU3oIRC'/Z1e\u0015\t\u0019A!\u0001\u0006d_:$(o\u001c7mKJT\u0011!B\u0001\u0006W\u000647.Y\u0002\u0001'\t\u0001\u0001\u0002\u0005\u0002\n\u00195\t!B\u0003\u0002\f\t\u0005)Q\u000f^5mg&\u0011QB\u0003\u0002\u0013'\",H\u000fZ8x]\u0006\u0014G.\u001a+ie\u0016\fG\r\u0003\u0005\u0010\u0001\t\u0015\r\u0011\"\u0001\u0011\u00031\u0019wN\u001c;s_2dWM]%e+\u0005\t\u0002C\u0001\n\u0016\u001b\u0005\u0019\"\"\u0001\u000b\u0002\u000bM\u001c\u0017\r\\1\n\u0005Y\u0019\"aA%oi\"A\u0001\u0004\u0001B\u0001B\u0003%\u0011#A\u0007d_:$(o\u001c7mKJLE\r\t\u0005\t5\u0001\u0011)\u0019!C\u00017\u0005\t2m\u001c8ue>dG.\u001a:D_:$X\r\u001f;\u0016\u0003q\u0001\"!\b\u0010\u000e\u0003\tI!a\b\u0002\u0003#\r{g\u000e\u001e:pY2,'oQ8oi\u0016DH\u000f\u0003\u0005\"\u0001\t\u0005\t\u0015!\u0003\u001d\u0003I\u0019wN\u001c;s_2dWM]\"p]R,\u0007\u0010\u001e\u0011\t\u0011\r\u0002!Q1A\u0005\u0002\u0011\n\u0001\u0002^8Ce>\\WM]\u000b\u0002KA\u0011a%K\u0007\u0002O)\u0011\u0001\u0006B\u0001\bG2,8\u000f^3s\u0013\tQsE\u0001\u0004Ce>\\WM\u001d\u0005\tY\u0001\u0011\t\u0011)A\u0005K\u0005IAo\u001c\"s_.,'\u000f\t\u0005\t]\u0001\u0011)\u0019!C\u0001_\u0005)\u0011/^3vKV\t\u0001\u0007E\u00022qij\u0011A\r\u0006\u0003gQ\n!bY8oGV\u0014(/\u001a8u\u0015\t)d'\u0001\u0003vi&d'\"A\u001c\u0002\t)\fg/Y\u0005\u0003sI\u0012QB\u00117pG.LgnZ)vKV,\u0007\u0003\u0002\n<{\rK!\u0001P\n\u0003\rQ+\b\u000f\\33!\tq\u0014)D\u0001@\u0015\t\u0001E!A\u0002ba&L!AQ \u0003#I+\u0017/^3ti>\u0013(+Z:q_:\u001cX\r\u0005\u0003\u0013\tv2\u0015BA#\u0014\u0005%1UO\\2uS>t\u0017\u0007\u0005\u0002\u0013\u000f&\u0011\u0001j\u0005\u0002\u0005+:LG\u000f\u0003\u0005K\u0001\t\u0005\t\u0015!\u00031\u0003\u0019\tX/Z;fA!AA\n\u0001BC\u0002\u0013\u0005Q*A\u0004dQ\u0006tg.\u001a7\u0016\u00039\u0003\"a\u0014*\u000e\u0003AS!!\u0015\u0003\u0002\u000f9,Go^8sW&\u00111\u000b\u0015\u0002\u0010\u00052|7m[5oO\u000eC\u0017M\u001c8fY\"AQ\u000b\u0001B\u0001B\u0003%a*\u0001\u0005dQ\u0006tg.\u001a7!\u0011\u00159\u0006\u0001\"\u0001Y\u0003\u0019a\u0014N\\5u}Q1\u0011LW.];z\u0003\"!\b\u0001\t\u000b=1\u0006\u0019A\t\t\u000bi1\u0006\u0019\u0001\u000f\t\u000b\r2\u0006\u0019A\u0013\t\u000b92\u0006\u0019\u0001\u0019\t\u000b13\u0006\u0019\u0001(\t\u000f\u0001\u0004!\u0019!C\u0005C\u0006!An\\2l+\u0005\u0011\u0007CA2g\u001b\u0005!'BA37\u0003\u0011a\u0017M\\4\n\u0005\u001d$'AB(cU\u0016\u001cG\u000f\u0003\u0004j\u0001\u0001\u0006IAY\u0001\u0006Y>\u001c7\u000e\t\u0005\bW\u0002\u0011\r\u0011\"\u0003m\u0003E\u0019H/\u0019;f\u0007\"\fgnZ3M_\u001e<WM]\u000b\u0002[B\u0011a.^\u0007\u0002_*\u0011\u0001/]\u0001\u0006Y><GG\u001b\u0006\u0003eN\fa!\u00199bG\",'\"\u0001;\u0002\u0007=\u0014x-\u0003\u0002w_\n1Aj\\4hKJDa\u0001\u001f\u0001!\u0002\u0013i\u0017AE:uCR,7\t[1oO\u0016dunZ4fe\u0002BQA\u001f\u0001\u0005Bm\fa\u0001Z8X_J\\G#\u0001$\t\u000bu\u0004A\u0011\u0002@\u0002\u001f\r|gN\\3diR{'I]8lKJ$BAR@\u0002\u0004!1\u0011\u0011\u0001?A\u0002\u0015\naA\u0019:pW\u0016\u0014\b\"\u0002'}\u0001\u0004q\u0005")
public class RequestSendThread
extends ShutdownableThread {
    private final int controllerId;
    private final ControllerContext controllerContext;
    private final Broker toBroker;
    private final BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue;
    private final BlockingChannel channel;
    private final Object lock;
    private final Logger stateChangeLogger;

    public int controllerId() {
        return this.controllerId;
    }

    public ControllerContext controllerContext() {
        return this.controllerContext;
    }

    public Broker toBroker() {
        return this.toBroker;
    }

    public BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue() {
        return this.queue;
    }

    public BlockingChannel channel() {
        return this.channel;
    }

    private Object lock() {
        return this.lock;
    }

    private Logger stateChangeLogger() {
        return this.stateChangeLogger;
    }

    @Override
    public void doWork() {
        block7: {
            Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>> queueItem = this.queue().take();
            RequestOrResponse request = (RequestOrResponse)queueItem._1();
            Function1 callback = (Function1)queueItem._2();
            Receive receive = null;
            try {
                Object object = this.lock();
                synchronized (object) {
                    short s;
                    block11: {
                        RequestOrResponse response;
                        block9: {
                            block10: {
                                block8: {
                                    BooleanRef isSendSuccessful = new BooleanRef(false);
                                    while (this.isRunning().get() && !isSendSuccessful.elem) {
                                        this.liftedTree1$1(request, isSendSuccessful);
                                    }
                                    receive = this.channel().receive();
                                    response = null;
                                    s = BoxesRunTime.unboxToShort((Object)request.requestId().get());
                                    if (RequestKeys$.MODULE$.LeaderAndIsrKey() != s) break block8;
                                    response = LeaderAndIsrResponse$.MODULE$.readFrom(receive.buffer());
                                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                    break block9;
                                }
                                if (RequestKeys$.MODULE$.StopReplicaKey() != s) break block10;
                                response = StopReplicaResponse$.MODULE$.readFrom(receive.buffer());
                                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                                break block9;
                            }
                            if (RequestKeys$.MODULE$.UpdateMetadataKey() != s) break block11;
                            response = UpdateMetadataResponse$.MODULE$.readFrom(receive.buffer());
                            BoxedUnit boxedUnit = BoxedUnit.UNIT;
                        }
                        this.stateChangeLogger().trace((Object)new StringOps(Predef$.MODULE$.augmentString("Controller %d epoch %d received response correlationId %d for a request sent to broker %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.controllerId()), BoxesRunTime.boxToInteger((int)this.controllerContext().epoch()), BoxesRunTime.boxToInteger((int)response.correlationId()), this.toBroker().toString()})));
                        Object object2 = callback == null ? BoxedUnit.UNIT : callback.apply((Object)response);
                        break block7;
                    }
                    throw new MatchError((Object)BoxesRunTime.boxToShort((short)s));
                }
            }
            catch (Throwable throwable) {
                this.warn((Function0<String>)new Serializable(this){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ RequestSendThread $outer;

                    public final String apply() {
                        return new StringOps(Predef$.MODULE$.augmentString("Controller %d fails to send a request to broker %s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.$outer.toBroker().toString()}));
                    }
                    {
                        if ($outer == null) {
                            throw new NullPointerException();
                        }
                        this.$outer = $outer;
                    }
                }, (Function0<Throwable>)new Serializable(this, throwable){
                    public static final long serialVersionUID = 0L;
                    private final Throwable e$3;

                    public final Throwable apply() {
                        return this.e$3;
                    }
                    {
                        this.e$3 = e$3;
                    }
                });
                this.channel().disconnect();
            }
        }
    }

    private void connectToBroker(Broker broker, BlockingChannel channel) {
        try {
            channel.connect();
            this.info((Function0<String>)new Serializable(this, broker){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RequestSendThread $outer;
                private final Broker broker$2;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Controller %d connected to %s for sending state change requests")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.broker$2.toString()}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.broker$2 = broker$2;
                }
            });
        }
        catch (Throwable throwable) {
            channel.disconnect();
            this.error((Function0<String>)new Serializable(this, broker){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RequestSendThread $outer;
                private final Broker broker$2;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Controller %d's connection to broker %s was unsuccessful")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), this.broker$2.toString()}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.broker$2 = broker$2;
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable e$4;

                public final Throwable apply() {
                    return this.e$4;
                }
                {
                    this.e$4 = e$4;
                }
            });
        }
    }

    private final void liftedTree1$1(RequestOrResponse request$2, BooleanRef isSendSuccessful$1) {
        try {
            this.channel().send(request$2);
            isSendSuccessful$1.elem = true;
        }
        catch (Throwable throwable) {
            this.error((Function0<String>)new Serializable(this, request$2){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RequestSendThread $outer;
                private final RequestOrResponse request$2;

                public final String apply() {
                    return new StringOps(Predef$.MODULE$.augmentString("Controller %d epoch %d failed to send %s request with correlation id %s to broker %s. Reconnecting to broker.")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)this.$outer.controllerId()), BoxesRunTime.boxToInteger((int)this.$outer.controllerContext().epoch()), RequestKeys$.MODULE$.nameForKey(BoxesRunTime.unboxToShort((Object)this.request$2.requestId().get())), BoxesRunTime.boxToInteger((int)this.request$2.correlationId()), this.$outer.toBroker().toString()}));
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.request$2 = request$2;
                }
            }, (Function0<Throwable>)new Serializable(this, throwable){
                public static final long serialVersionUID = 0L;
                private final Throwable e$2;

                public final Throwable apply() {
                    return this.e$2;
                }
                {
                    this.e$2 = e$2;
                }
            });
            this.channel().disconnect();
            this.connectToBroker(this.toBroker(), this.channel());
            isSendSuccessful$1.elem = false;
            Utils$.MODULE$.swallow((Function0<BoxedUnit>)new Serializable(this){
                public static final long serialVersionUID = 0L;

                public final void apply() {
                    this.apply$mcV$sp();
                }

                public void apply$mcV$sp() {
                    Thread.sleep(300L);
                }
            });
        }
    }

    public RequestSendThread(int controllerId, ControllerContext controllerContext, Broker toBroker, BlockingQueue<Tuple2<RequestOrResponse, Function1<RequestOrResponse, BoxedUnit>>> queue, BlockingChannel channel) {
        this.controllerId = controllerId;
        this.controllerContext = controllerContext;
        this.toBroker = toBroker;
        this.queue = queue;
        this.channel = channel;
        super(new StringOps(Predef$.MODULE$.augmentString("Controller-%d-to-broker-%d-send-thread")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)controllerId), BoxesRunTime.boxToInteger((int)toBroker.id())})), ShutdownableThread$.MODULE$.$lessinit$greater$default$2());
        this.lock = new Object();
        this.stateChangeLogger = Logger.getLogger((String)KafkaController$.MODULE$.stateChangeLogger());
        this.connectToBroker(toBroker, channel);
    }
}

