/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.io.DataInputStream;
import java.io.File;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collection;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import kafka.cluster.Broker;
import kafka.controller.ControllerChannelManager;
import kafka.controller.ControllerChannelManager$;
import kafka.controller.ControllerContext;
import kafka.controller.StateChangeLogger;
import kafka.log.LogManager$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.server.KafkaServer$;
import kafka.server.NotRunning$;
import kafka.utils.CoreUtils$;
import kafka.utils.TestUtils$;
import kafka.zk.ZooKeeperTestHarness;
import kafka.zookeeper.ZooKeeperClientTimeoutException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractControlRequest;
import org.apache.kafka.common.requests.LeaderAndIsrRequest;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Array$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.Iterable$;
import scala.collection.JavaConverters$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.Set;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.generic.CanBuildFrom;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.ObjectRef;

@ScalaSignature(bytes="\u0006\u0001\u0005Eb\u0001B\f\u0019\u0001uAQ\u0001\n\u0001\u0005\u0002\u0015Bq\u0001\u000b\u0001A\u0002\u0013\u0005\u0011\u0006C\u0004.\u0001\u0001\u0007I\u0011\u0001\u0018\t\r]\u0002\u0001\u0015)\u0003+\u0011\u001dA\u0004A1A\u0005\u0002eBaA\u0011\u0001!\u0002\u0013Q\u0004bB\"\u0001\u0005\u0004%\t!\u000f\u0005\u0007\t\u0002\u0001\u000b\u0011\u0002\u001e\t\u000f\u0015\u0003!\u0019!C\u0001\r\"1q\n\u0001Q\u0001\n\u001dCq\u0001\u0015\u0001C\u0002\u0013\u0005a\t\u0003\u0004R\u0001\u0001\u0006Ia\u0012\u0005\u0006%\u0002!\te\u0015\u0005\u0006;\u0002!\ta\u0015\u0005\u0006E\u0002!\ta\u0015\u0005\u0006I\u0002!\ta\u0015\u0005\u0006M\u0002!\ta\u0015\u0005\u0006Q\u0002!I!\u001b\u0005\t\u0003'\u0001\u0001\u0015\"\u0003\u0002\u0016!1\u0011q\u0005\u0001\u0005\u0002MCa!!\u000b\u0001\t\u0003\u0019\u0006BBA\u0017\u0001\u0011\u00051K\u0001\nTKJ4XM]*ikR$wn\u001e8UKN$(BA\r\u001b\u0003\u0019\u0019XM\u001d<fe*\t1$A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001q\u0002CA\u0010#\u001b\u0005\u0001#BA\u0011\u001b\u0003\tQ8.\u0003\u0002$A\t!\"l\\8LK\u0016\u0004XM\u001d+fgRD\u0015M\u001d8fgN\fa\u0001P5oSRtD#\u0001\u0014\u0011\u0005\u001d\u0002Q\"\u0001\r\u0002\r\r|gNZ5h+\u0005Q\u0003CA\u0014,\u0013\ta\u0003DA\u0006LC\u001a\\\u0017mQ8oM&<\u0017AC2p]\u001aLwm\u0018\u0013fcR\u0011q&\u000e\t\u0003aMj\u0011!\r\u0006\u0002e\u0005)1oY1mC&\u0011A'\r\u0002\u0005+:LG\u000fC\u00047\u0007\u0005\u0005\t\u0019\u0001\u0016\u0002\u0007a$\u0013'A\u0004d_:4\u0017n\u001a\u0011\u0002\t!|7\u000f^\u000b\u0002uA\u00111\bQ\u0007\u0002y)\u0011QHP\u0001\u0005Y\u0006twMC\u0001@\u0003\u0011Q\u0017M^1\n\u0005\u0005c$AB*ue&tw-A\u0003i_N$\b%A\u0003u_BL7-\u0001\u0004u_BL7\rI\u0001\u0006g\u0016tG/M\u000b\u0002\u000fB\u0019\u0001*\u0014\u001e\u000e\u0003%S!AS&\u0002\u0013%lW.\u001e;bE2,'B\u0001'2\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0003\u001d&\u0013A\u0001T5ti\u000611/\u001a8uc\u0001\nQa]3oiJ\naa]3oiJ\u0002\u0013!B:fiV\u0003H#A\u0018)\u00055)\u0006C\u0001,\\\u001b\u00059&B\u0001-Z\u0003\u0015QWO\\5u\u0015\u0005Q\u0016aA8sO&\u0011Al\u0016\u0002\u0007\u0005\u00164wN]3\u0002#Q,7\u000f^\"mK\u0006t7\u000b[;uI><h\u000e\u000b\u0002\u000f?B\u0011a\u000bY\u0005\u0003C^\u0013A\u0001V3ti\u00069C/Z:u\u00072,\u0017M\\*ikR$wn\u001e8XSRDG)\u001a7fi\u0016$v\u000e]5d\u000b:\f'\r\\3eQ\tyq,A\u0012uKN$8\t\\3b]NCW\u000f\u001e3po:\fe\r^3s\r\u0006LG.\u001a3Ti\u0006\u0014H/\u001e9)\u0005Ay\u0016a\r;fgR\u001cE.Z1o'\",H\u000fZ8x]\u00063G/\u001a:GC&dW\rZ*uCJ$X\u000f\u001d#vKR{7i\u001c:skB$Hj\\4tQ\t\tr,A\u0013wKJLg-_\"mK\u0006t7\u000b[;uI><h.\u00114uKJ4\u0015-\u001b7fIN#\u0018M\u001d;vaV\u0011!N\u001e\u000b\u0004W\u0006EACA\u0018m\u0011\u0015i'\u0003q\u0001o\u0003E)\u0007pY3qi&|gn\u00117bgN$\u0016m\u001a\t\u0004_J$X\"\u00019\u000b\u0005E\f\u0014a\u0002:fM2,7\r^\u0005\u0003gB\u0014\u0001b\u00117bgN$\u0016m\u001a\t\u0003kZd\u0001\u0001B\u0003x%\t\u0007\u0001PA\u0001F#\tIH\u0010\u0005\u00021u&\u001110\r\u0002\b\u001d>$\b.\u001b8h!\ri\u00181\u0002\b\u0004}\u0006\u001dabA@\u0002\u00065\u0011\u0011\u0011\u0001\u0006\u0004\u0003\u0007a\u0012A\u0002\u001fs_>$h(C\u00013\u0013\r\tI!M\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\ti!a\u0004\u0003\u0013\u0015C8-\u001a9uS>t'bAA\u0005c!)\u0001F\u0005a\u0001U\u00051\u0012n\u001d(p]\u0012\u000bW-\\8o\u0017\u000647.\u0019+ie\u0016\fG\r\u0006\u0003\u0002\u0018\u0005u\u0001c\u0001\u0019\u0002\u001a%\u0019\u00111D\u0019\u0003\u000f\t{w\u000e\\3b]\"9\u0011qD\nA\u0002\u0005\u0005\u0012!\u0001;\u0011\u0007m\n\u0019#C\u0002\u0002&q\u0012a\u0001\u00165sK\u0006$\u0017\u0001\b<fe&4\u0017PT8o\t\u0006,Wn\u001c8UQJ,\u0017\rZ:Ti\u0006$Xo]\u0001\u0018i\u0016\u001cHoQ8og\u0016\u001cW\u000f^5wKNCW\u000f\u001e3po:D#!F0\u0002AQ,7\u000f^\"p]R\u0014x\u000e\u001c7feNCW\u000f\u001e3po:$UO]5oON+g\u000e\u001a\u0015\u0003-}\u0003")
public class ServerShutdownTest
extends ZooKeeperTestHarness {
    private KafkaConfig config = null;
    private final String host;
    private final String topic;
    private final List<String> sent1 = new .colon.colon((Object)"hello", (List)new .colon.colon((Object)"there", (List)Nil$.MODULE$));
    private final List<String> sent2 = new .colon.colon((Object)"more", (List)new .colon.colon((Object)"messages", (List)Nil$.MODULE$));

    public KafkaConfig config() {
        return this.config;
    }

    public void config_$eq(KafkaConfig x$1) {
        this.config = x$1;
    }

    public String host() {
        return this.host;
    }

    public String topic() {
        return this.topic;
    }

    public List<String> sent1() {
        return this.sent1;
    }

    public List<String> sent2() {
        return this.sent2;
    }

    @Override
    @Before
    public void setUp() {
        super.setUp();
        Properties props = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        this.config_$eq(KafkaConfig$.MODULE$.fromProps(props));
    }

    /*
     * WARNING - void declaration
     */
    @Test
    public void testCleanShutdown() {
        Object object;
        void map_bf;
        Object object2;
        void map_bf2;
        KafkaConfig x$1 = this.config();
        Option x$22 = Option$.MODULE$.apply((Object)this.getClass().getName());
        Time x$3 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        Seq x$42 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(x$1, x$3, x$22, x$42);
        server.startup();
        ObjectRef producer = ObjectRef.create((Object)ServerShutdownTest.createProducer$1(server));
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), 1, 1, (Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), TestUtils$.MODULE$.createTopic$default$6());
        List<String> list = this.sent1();
        CanBuildFrom canBuildFrom = List$.MODULE$.canBuildFrom();
        Function1 & Serializable & scala.Serializable intersect = (Function1 & Serializable & scala.Serializable)value -> ((KafkaProducer)producer$1.elem).send(new ProducerRecord(this.topic(), (Object)Predef$.MODULE$.int2Integer(0), value));
        if (list == null) {
            throw null;
        }
        List<String> map_this = list;
        if (map_bf2 == List$.MODULE$.ReusableCBF()) {
            if (map_this == Nil$.MODULE$) {
                object2 = Nil$.MODULE$;
            } else {
                .colon.colon map_h;
                String string = (String)map_this.head();
                .colon.colon map_t = map_h = new .colon.colon((Object)ServerShutdownTest.$anonfun$testCleanShutdown$1(this, producer, string), (List)Nil$.MODULE$);
                for (List map_rest = (List)map_this.tail(); map_rest != Nil$.MODULE$; map_rest = (List)map_rest.tail()) {
                    string = (String)map_rest.head();
                    .colon.colon map_nx = new .colon.colon((Object)ServerShutdownTest.$anonfun$testCleanShutdown$1(this, producer, string), (List)Nil$.MODULE$);
                    map_t.tl_$eq((List)map_nx);
                    map_t = map_nx;
                }
                object2 = map_h;
            }
        } else {
            void map_f;
            object2 = TraversableLike.map$(map_this, (Function1)map_f, (CanBuildFrom)map_bf2);
        }
        List list2 = (List)object2;
        if (list2 == null) {
            throw null;
        }
        List foreach_these = list2;
        while (!foreach_these.isEmpty()) {
            ServerShutdownTest.$anonfun$testCleanShutdown$2((Future)foreach_these.head());
            foreach_these = (List)foreach_these.tail();
        }
        server.shutdown();
        this.config().logDirs().foreach((Function1 & Serializable & scala.Serializable)logDir -> {
            ServerShutdownTest.$anonfun$testCleanShutdown$3(logDir);
            return BoxedUnit.UNIT;
        });
        ((KafkaProducer)producer.elem).close();
        server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        TestUtils$.MODULE$.waitUntilMetadataIsPropagated((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), this.topic(), 0, TestUtils$.MODULE$.waitUntilMetadataIsPropagated$default$4());
        producer.elem = ServerShutdownTest.createProducer$1(server);
        KafkaConsumer consumer = ServerShutdownTest.createConsumer$1(server);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{this.topic()}))).asJava());
        Seq consumerRecords = TestUtils$.MODULE$.consumeRecords(consumer, this.sent1().size(), TestUtils$.MODULE$.consumeRecords$default$3());
        Assert.assertEquals(this.sent1(), (Object)consumerRecords.map((Function1 & Serializable & scala.Serializable)x$2 -> (String)x$2.value(), Seq$.MODULE$.canBuildFrom()));
        List<String> list3 = this.sent2();
        CanBuildFrom canBuildFrom2 = List$.MODULE$.canBuildFrom();
        Function1 & Serializable & scala.Serializable intersect2 = (Function1 & Serializable & scala.Serializable)value -> ((KafkaProducer)producer$1.elem).send(new ProducerRecord(this.topic(), (Object)Predef$.MODULE$.int2Integer(0), value));
        if (list3 == null) {
            throw null;
        }
        List<String> map_this2 = list3;
        if (map_bf == List$.MODULE$.ReusableCBF()) {
            if (map_this2 == Nil$.MODULE$) {
                object = Nil$.MODULE$;
            } else {
                .colon.colon map_h;
                String string = (String)map_this2.head();
                .colon.colon map_t = map_h = new .colon.colon((Object)ServerShutdownTest.$anonfun$testCleanShutdown$5(this, producer, string), (List)Nil$.MODULE$);
                for (List map_rest = (List)map_this2.tail(); map_rest != Nil$.MODULE$; map_rest = (List)map_rest.tail()) {
                    string = (String)map_rest.head();
                    .colon.colon map_nx = new .colon.colon((Object)ServerShutdownTest.$anonfun$testCleanShutdown$5(this, producer, string), (List)Nil$.MODULE$);
                    map_t.tl_$eq((List)map_nx);
                    map_t = map_nx;
                }
                object = map_h;
            }
        } else {
            void map_f;
            object = TraversableLike.map$(map_this2, (Function1)map_f, (CanBuildFrom)map_bf);
        }
        List list4 = (List)object;
        if (list4 == null) {
            throw null;
        }
        List foreach_these2 = list4;
        while (!foreach_these2.isEmpty()) {
            ServerShutdownTest.$anonfun$testCleanShutdown$6((Future)foreach_these2.head());
            foreach_these2 = (List)foreach_these2.tail();
        }
        Seq consumerRecords2 = TestUtils$.MODULE$.consumeRecords(consumer, this.sent2().size(), TestUtils$.MODULE$.consumeRecords$default$3());
        Assert.assertEquals(this.sent2(), (Object)consumerRecords2.map((Function1 & Serializable & scala.Serializable)x$4 -> (String)x$4.value(), Seq$.MODULE$.canBuildFrom()));
        consumer.close();
        ((KafkaProducer)producer.elem).close();
        server.shutdown();
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownWithDeleteTopicEnabled() {
        Properties newProps = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        newProps.setProperty("delete.topic.enable", "true");
        KafkaConfig newConfig = KafkaConfig$.MODULE$.fromProps(newProps);
        Option x$2 = Option$.MODULE$.apply((Object)this.getClass().getName());
        Time x$3 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        Seq x$4 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(newConfig, x$3, x$2, x$4);
        server.startup();
        server.shutdown();
        server.awaitShutdown();
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    @Test
    public void testCleanShutdownAfterFailedStartup() {
        Properties newProps = TestUtils$.MODULE$.createBrokerConfig(0, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20());
        newProps.setProperty(KafkaConfig$.MODULE$.ZkConnectionTimeoutMsProp(), "50");
        newProps.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), "some.invalid.hostname.foo.bar.local:65535");
        KafkaConfig newConfig = KafkaConfig$.MODULE$.fromProps(newProps);
        this.verifyCleanShutdownAfterFailedStartup(newConfig, ClassTag$.MODULE$.apply(ZooKeeperClientTimeoutException.class));
    }

    @Test
    public void testCleanShutdownAfterFailedStartupDueToCorruptLogs() {
        KafkaServer server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        TestUtils$.MODULE$.createTopic(this.zkClient(), this.topic(), 1, 1, (Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), TestUtils$.MODULE$.createTopic$default$6());
        server.shutdown();
        server.awaitShutdown();
        this.config().logDirs().foreach((Function1 & Serializable & scala.Serializable)dirName -> {
            ServerShutdownTest.$anonfun$testCleanShutdownAfterFailedStartupDueToCorruptLogs$1(this, dirName);
            return BoxedUnit.UNIT;
        });
        this.verifyCleanShutdownAfterFailedStartup(this.config(), ClassTag$.MODULE$.apply(KafkaStorageException.class));
    }

    private <E extends Exception> void verifyCleanShutdownAfterFailedStartup(KafkaConfig config, ClassTag<E> exceptionClassTag) {
        Option x$2 = Option$.MODULE$.apply((Object)this.getClass().getName());
        Time x$3 = KafkaServer$.MODULE$.$lessinit$greater$default$2();
        Seq x$4 = KafkaServer$.MODULE$.$lessinit$greater$default$4();
        KafkaServer server = new KafkaServer(config, x$3, x$2, x$4);
        try {
            try {
                server.startup();
                Assert.fail((String)"Expected KafkaServer setup to fail and throw exception");
            }
            catch (Exception e) {
                Assert.assertTrue((String)new StringBuilder(21).append("Unexpected exception ").append(e).toString(), (boolean)exceptionClassTag.runtimeClass().isInstance(e));
                Assert.assertEquals((long)NotRunning$.MODULE$.state(), (long)server.brokerState().currentState());
            }
        }
        finally {
            if (server.brokerState().currentState() != NotRunning$.MODULE$.state()) {
                server.shutdown();
            }
            server.awaitShutdown();
        }
        CoreUtils$.MODULE$.delete(server.config().logDirs());
        this.verifyNonDaemonThreadsStatus();
    }

    private boolean isNonDaemonKafkaThread(Thread t) {
        return !t.isDaemon() && t.isAlive() && t.getName().startsWith(this.getClass().getName());
    }

    public void verifyNonDaemonThreadsStatus() {
        Function1 & Serializable & scala.Serializable count_p = (Function1 & Serializable & scala.Serializable)t -> BoxesRunTime.boxToBoolean((boolean)this.isNonDaemonKafkaThread(t));
        ArrayOps.ofRef count_this = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])TraversableLike.map$((TraversableLike)new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(Thread.getAllStackTraces().keySet().toArray())), (Function1 & Serializable & scala.Serializable)x$5 -> (Thread)x$5, (CanBuildFrom)Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(Thread.class)))));
        IntRef count_cnt = IntRef.create((int)0);
        count_this.foreach(arg_0 -> TraversableOnce.$anonfun$count$1$adapted((Function1)count_p, (IntRef)count_cnt, arg_0));
        Assert.assertEquals((long)0L, (long)count_cnt.elem);
    }

    @Test
    public void testConsecutiveShutdown() {
        KafkaServer server = new KafkaServer(this.config(), KafkaServer$.MODULE$.$lessinit$greater$default$2(), KafkaServer$.MODULE$.$lessinit$greater$default$3(), KafkaServer$.MODULE$.$lessinit$greater$default$4());
        server.startup();
        server.shutdown();
        server.awaitShutdown();
        server.shutdown();
    }

    @Test
    public void testControllerShutdownDuringSend() {
        SecurityProtocol securityProtocol = SecurityProtocol.PLAINTEXT;
        ListenerName listenerName = ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol);
        int controllerId = 2;
        Metrics metrics = new Metrics();
        ExecutorService executor = Executors.newSingleThreadExecutor();
        ObjectRef serverSocket = ObjectRef.create(null);
        ObjectRef controllerChannelManager = ObjectRef.create(null);
        try {
            serverSocket.elem = new ServerSocket(0);
            Future<?> receiveFuture = executor.submit(new Runnable(null, serverSocket){
                private final ObjectRef serverSocket$1;

                public void run() {
                    Socket socket = ((ServerSocket)this.serverSocket$1.elem).accept();
                    new DataInputStream(socket.getInputStream()).readByte();
                }
                {
                    this.serverSocket$1 = serverSocket$1;
                }
            });
            scala.collection.immutable.Map brokerAndEpochs = (scala.collection.immutable.Map)Predef$.MODULE$.Map().apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple2[]{new Tuple2((Object)new Broker(1, "localhost", ((ServerSocket)serverSocket.elem).getLocalPort(), listenerName, securityProtocol), (Object)BoxesRunTime.boxToLong((long)0L))}));
            KafkaConfig controllerConfig = KafkaConfig$.MODULE$.fromProps(TestUtils$.MODULE$.createBrokerConfig(controllerId, this.zkConnect(), TestUtils$.MODULE$.createBrokerConfig$default$3(), TestUtils$.MODULE$.createBrokerConfig$default$4(), TestUtils$.MODULE$.createBrokerConfig$default$5(), TestUtils$.MODULE$.createBrokerConfig$default$6(), TestUtils$.MODULE$.createBrokerConfig$default$7(), TestUtils$.MODULE$.createBrokerConfig$default$8(), TestUtils$.MODULE$.createBrokerConfig$default$9(), TestUtils$.MODULE$.createBrokerConfig$default$10(), TestUtils$.MODULE$.createBrokerConfig$default$11(), TestUtils$.MODULE$.createBrokerConfig$default$12(), TestUtils$.MODULE$.createBrokerConfig$default$13(), TestUtils$.MODULE$.createBrokerConfig$default$14(), TestUtils$.MODULE$.createBrokerConfig$default$15(), TestUtils$.MODULE$.createBrokerConfig$default$16(), TestUtils$.MODULE$.createBrokerConfig$default$17(), TestUtils$.MODULE$.createBrokerConfig$default$18(), TestUtils$.MODULE$.createBrokerConfig$default$19(), TestUtils$.MODULE$.createBrokerConfig$default$20()));
            ControllerContext controllerContext = new ControllerContext();
            controllerContext.setLiveBrokerAndEpochs((Map)brokerAndEpochs);
            controllerChannelManager.elem = new ControllerChannelManager(controllerContext, controllerConfig, Time.SYSTEM, metrics, new StateChangeLogger(controllerId, true, (Option)None$.MODULE$), ControllerChannelManager$.MODULE$.$lessinit$greater$default$6());
            ((ControllerChannelManager)controllerChannelManager.elem).startup();
            LeaderAndIsrRequest.Builder requestBuilder = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(), controllerId, 1, 0L, (java.util.List)JavaConverters$.MODULE$.seqAsJavaListConverter((Seq)Seq$.MODULE$.empty()).asJava(), (Collection)JavaConverters$.MODULE$.setAsJavaSetConverter((Set)((TraversableOnce)brokerAndEpochs.keys().map((Function1 & Serializable & scala.Serializable)x$6 -> x$6.node(listenerName), Iterable$.MODULE$.canBuildFrom())).toSet()).asJava());
            ControllerChannelManager qual$1 = (ControllerChannelManager)controllerChannelManager.elem;
            int x$1 = 1;
            Function1 x$3 = qual$1.sendRequest$default$3();
            qual$1.sendRequest(x$1, (AbstractControlRequest.Builder)requestBuilder, x$3);
            receiveFuture.get(10L, TimeUnit.SECONDS);
            executor.submit(new Runnable(null, controllerChannelManager){
                private final ObjectRef controllerChannelManager$1;

                public void run() {
                    ((ControllerChannelManager)this.controllerChannelManager$1.elem).shutdown();
                }
                {
                    this.controllerChannelManager$1 = controllerChannelManager$1;
                }
            }).get(10L, TimeUnit.SECONDS);
        }
        finally {
            if ((ServerSocket)serverSocket.elem != null) {
                ((ServerSocket)serverSocket.elem).close();
            }
            if ((ControllerChannelManager)controllerChannelManager.elem != null) {
                ((ControllerChannelManager)controllerChannelManager.elem).shutdown();
            }
            executor.shutdownNow();
            metrics.close();
        }
    }

    private static final KafkaProducer createProducer$1(KafkaServer server) {
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        IntegerSerializer x$2 = new IntegerSerializer();
        StringSerializer x$3 = new StringSerializer();
        int x$4 = TestUtils$.MODULE$.createProducer$default$2();
        long x$5 = TestUtils$.MODULE$.createProducer$default$3();
        long x$6 = TestUtils$.MODULE$.createProducer$default$4();
        int x$7 = TestUtils$.MODULE$.createProducer$default$5();
        int x$8 = TestUtils$.MODULE$.createProducer$default$6();
        int x$9 = TestUtils$.MODULE$.createProducer$default$7();
        int x$10 = TestUtils$.MODULE$.createProducer$default$8();
        String x$11 = TestUtils$.MODULE$.createProducer$default$9();
        int x$12 = TestUtils$.MODULE$.createProducer$default$10();
        SecurityProtocol x$13 = TestUtils$.MODULE$.createProducer$default$11();
        Option<File> x$14 = TestUtils$.MODULE$.createProducer$default$12();
        Option<Properties> x$15 = TestUtils$.MODULE$.createProducer$default$13();
        boolean x$16 = TestUtils$.MODULE$.createProducer$default$16();
        return TestUtils$.MODULE$.createProducer(x$1, x$4, x$5, x$6, x$7, x$8, x$9, x$10, x$11, x$12, x$13, x$14, x$15, x$2, x$3, x$16);
    }

    private static final KafkaConsumer createConsumer$1(KafkaServer server) {
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)((Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new KafkaServer[]{server}))), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        SecurityProtocol x$2 = SecurityProtocol.PLAINTEXT;
        IntegerDeserializer x$3 = new IntegerDeserializer();
        StringDeserializer x$4 = new StringDeserializer();
        String x$5 = TestUtils$.MODULE$.createConsumer$default$2();
        String x$6 = TestUtils$.MODULE$.createConsumer$default$3();
        boolean x$7 = TestUtils$.MODULE$.createConsumer$default$4();
        boolean x$8 = TestUtils$.MODULE$.createConsumer$default$5();
        int x$9 = TestUtils$.MODULE$.createConsumer$default$6();
        Option<File> x$10 = TestUtils$.MODULE$.createConsumer$default$8();
        Option<Properties> x$11 = TestUtils$.MODULE$.createConsumer$default$9();
        return TestUtils$.MODULE$.createConsumer(x$1, x$5, x$6, x$7, x$8, x$9, x$2, x$10, x$11, x$3, x$4);
    }

    public static final /* synthetic */ RecordMetadata $anonfun$testCleanShutdown$2(Future x$1) {
        return (RecordMetadata)x$1.get();
    }

    public static final /* synthetic */ void $anonfun$testCleanShutdown$3(String logDir) {
        File OffsetCheckpointFile = new File(logDir, LogManager$.MODULE$.RecoveryPointCheckpointFile());
        Assert.assertTrue((boolean)OffsetCheckpointFile.exists());
        Assert.assertTrue((OffsetCheckpointFile.length() > 0L ? 1 : 0) != 0);
    }

    public static final /* synthetic */ RecordMetadata $anonfun$testCleanShutdown$6(Future x$3) {
        return (RecordMetadata)x$3.get();
    }

    public static final /* synthetic */ void $anonfun$testCleanShutdownAfterFailedStartupDueToCorruptLogs$1(ServerShutdownTest $this, String dirName) {
        File partitionDir = new File(dirName, new StringBuilder(2).append($this.topic()).append("-0").toString());
        ArrayOps.ofRef foreach_this = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[])partitionDir.listFiles()));
        int foreach_len = foreach_this.length();
        for (int foreach_i = 0; foreach_i < foreach_len; ++foreach_i) {
            TestUtils$.MODULE$.appendNonsenseToFile((File)foreach_this.apply(foreach_i), TestUtils$.MODULE$.random().nextInt(1024) + 1);
        }
    }

    public ServerShutdownTest() {
        this.host = "localhost";
        this.topic = "test";
    }

    public static final /* synthetic */ Object $anonfun$testCleanShutdownAfterFailedStartupDueToCorruptLogs$2$adapted(File f) {
        TestUtils$.MODULE$.appendNonsenseToFile(f, TestUtils$.MODULE$.random().nextInt(1024) + 1);
        return BoxedUnit.UNIT;
    }
}

