/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.kafka.schemaregistry.RestApp;
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import io.confluent.kafka.schemaregistry.utils.ZkUtils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.After;
import org.junit.Before;
import org.junit.experimental.categories.Category;
import scala.Option;
import scala.Option$;
import scala.collection.JavaConversions;
import scala.collection.Seq;

@Category(value={IntegrationTest.class})
public abstract class ClusterTestHarness {
    public static final int DEFAULT_NUM_BROKERS = 1;
    public static final String KAFKASTORE_TOPIC = "_schemas";
    protected static final Option<Properties> EMPTY_SASL_PROPERTIES = Option$.MODULE$.empty();
    private int numBrokers;
    private boolean setupRestApp;
    protected String compatibilityType;
    protected EmbeddedZookeeper zookeeper;
    protected String zkConnect;
    protected ZkUtils zkUtils;
    protected int zkConnectionTimeout = 30000;
    protected int zkSessionTimeout = 6000;
    protected List<KafkaConfig> configs = null;
    protected List<KafkaServer> servers = null;
    protected String brokerList = null;
    protected String bootstrapServers = null;
    protected Integer schemaRegistryPort;
    protected RestApp restApp = null;

    public static int[] choosePorts(int count) {
        try {
            int i;
            ServerSocket[] sockets = new ServerSocket[count];
            int[] ports = new int[count];
            for (i = 0; i < count; ++i) {
                sockets[i] = new ServerSocket(0, 0, InetAddress.getByName("0.0.0.0"));
                ports[i] = sockets[i].getLocalPort();
            }
            for (i = 0; i < count; ++i) {
                sockets[i].close();
            }
            return ports;
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static int choosePort() {
        return ClusterTestHarness.choosePorts(1)[0];
    }

    public ClusterTestHarness() {
        this(1);
    }

    public ClusterTestHarness(int numBrokers) {
        this(numBrokers, false);
    }

    public ClusterTestHarness(int numBrokers, boolean setupRestApp) {
        this(numBrokers, setupRestApp, AvroCompatibilityLevel.NONE.name);
    }

    public ClusterTestHarness(int numBrokers, boolean setupRestApp, String compatibilityType) {
        this.numBrokers = numBrokers;
        this.setupRestApp = setupRestApp;
        this.compatibilityType = compatibilityType;
    }

    protected boolean setZkAcls() {
        return this.getSecurityProtocol() == SecurityProtocol.SASL_PLAINTEXT || this.getSecurityProtocol() == SecurityProtocol.SASL_SSL;
    }

    @Before
    public void setUp() throws Exception {
        this.zookeeper = new EmbeddedZookeeper();
        this.zkConnect = String.format("localhost:%d", this.zookeeper.port());
        this.zkUtils = new ZkUtils(this.zkConnect, this.zkSessionTimeout, this.zkConnectionTimeout, this.setZkAcls());
        this.configs = new Vector<KafkaConfig>();
        this.servers = new Vector<KafkaServer>();
        for (int i = 0; i < this.numBrokers; ++i) {
            KafkaConfig config = this.getKafkaConfig(i);
            this.configs.add(config);
            KafkaServer server = TestUtils.createServer((KafkaConfig)config, (Time)Time.SYSTEM);
            this.servers.add(server);
        }
        this.brokerList = TestUtils.getBrokerListStrFromServers((Seq)JavaConversions.asScalaBuffer(this.servers), (SecurityProtocol)this.getSecurityProtocol());
        Object[] serverUrls = new String[this.servers.size()];
        ListenerName listenerType = ListenerName.forSecurityProtocol((SecurityProtocol)this.getSecurityProtocol());
        for (int i = 0; i < this.servers.size(); ++i) {
            serverUrls[i] = this.getSecurityProtocol() + "://" + Utils.formatAddress((String)((EndPoint)this.servers.get(i).config().advertisedListeners().head()).host(), (Integer)this.servers.get(i).boundPort(listenerType));
        }
        this.bootstrapServers = Utils.join((Object[])serverUrls, (String)",");
        if (this.setupRestApp) {
            if (this.schemaRegistryPort == null) {
                this.schemaRegistryPort = ClusterTestHarness.choosePort();
            }
            Properties schemaRegistryProps = this.getSchemaRegistryProperties();
            schemaRegistryProps.put("listeners", this.getSchemaRegistryProtocol() + "://0.0.0.0:" + this.schemaRegistryPort);
            schemaRegistryProps.put("mode.mutability", (Object)true);
            this.setupRestApp(schemaRegistryProps);
        }
    }

    protected void setupRestApp(Properties schemaRegistryProps) throws Exception {
        this.restApp = new RestApp(this.schemaRegistryPort, this.zkConnect, null, KAFKASTORE_TOPIC, this.compatibilityType, true, schemaRegistryProps);
        this.restApp.start();
    }

    protected Properties getSchemaRegistryProperties() {
        return new Properties();
    }

    protected void injectProperties(Properties props) {
        props.setProperty("auto.create.topics.enable", "true");
        props.setProperty("num.partitions", "1");
    }

    protected KafkaConfig getKafkaConfig(int brokerId) {
        Option noFile = Option.apply(null);
        Option noInterBrokerSecurityProtocol = Option.apply(null);
        Properties props = TestUtils.createBrokerConfig((int)brokerId, (String)this.zkConnect, (boolean)false, (boolean)false, (int)TestUtils.RandomPort(), (Option)noInterBrokerSecurityProtocol, (Option)noFile, EMPTY_SASL_PROPERTIES, (boolean)true, (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (Option)Option.empty(), (int)1, (boolean)false, (int)1, (short)1);
        this.injectProperties(props);
        return KafkaConfig.fromProps((Properties)props);
    }

    protected SecurityProtocol getSecurityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    protected String getSchemaRegistryProtocol() {
        return "http";
    }

    @After
    public void tearDown() throws Exception {
        if (this.restApp != null) {
            this.restApp.stop();
        }
        if (this.servers != null) {
            for (KafkaServer server : this.servers) {
                server.shutdown();
            }
            for (KafkaServer server : this.servers) {
                CoreUtils.delete((Seq)server.config().logDirs());
            }
        }
        if (this.zkUtils != null) {
            this.zkUtils.close();
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }
}

