/*
 * Decompiled with CFR 0.152.
 */
package kafka;

import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.TestLinearWriteSpeed;
import kafka.log.LogConfig;
import kafka.log.LogConfig$;
import kafka.message.CompressionCodec;
import kafka.message.CompressionCodec$;
import kafka.message.NoCompressionCodec$;
import kafka.utils.CommandLineUtils$;
import kafka.utils.Exit$;
import kafka.utils.KafkaScheduler;
import kafka.utils.KafkaScheduler$;
import kafka.utils.Scheduler;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.SimpleRecord;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.IndexedSeq;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.StringOps;
import scala.math.package$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

public final class TestLinearWriteSpeed$ {
    public static TestLinearWriteSpeed$ MODULE$;

    static {
        new TestLinearWriteSpeed$();
    }

    public void main(String[] args) {
        OptionParser parser = new OptionParser(false);
        ArgumentAcceptingOptionSpec dirOpt = parser.accepts("dir", "The directory to write to.").withRequiredArg().describedAs("path").ofType(String.class).defaultsTo((Object)System.getProperty("java.io.tmpdir"), (Object[])new String[0]);
        ArgumentAcceptingOptionSpec bytesOpt = parser.accepts("bytes", "REQUIRED: The total number of bytes to write.").withRequiredArg().describedAs("num_bytes").ofType(Long.class);
        ArgumentAcceptingOptionSpec sizeOpt = parser.accepts("size", "REQUIRED: The size of each write.").withRequiredArg().describedAs("num_bytes").ofType(Integer.class);
        ArgumentAcceptingOptionSpec messageSizeOpt = parser.accepts("message-size", "REQUIRED: The size of each message in the message set.").withRequiredArg().describedAs("num_bytes").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1024), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec filesOpt = parser.accepts("files", "REQUIRED: The number of logs or files.").withRequiredArg().describedAs("num_files").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(1), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec reportingIntervalOpt = parser.accepts("reporting-interval", "The number of ms between updates.").withRequiredArg().describedAs("ms").ofType(Long.class).defaultsTo((Object)Predef$.MODULE$.long2Long(1000L), (Object[])new Long[0]);
        ArgumentAcceptingOptionSpec maxThroughputOpt = parser.accepts("max-throughput-mb", "The maximum throughput.").withRequiredArg().describedAs("mb").ofType(Integer.class).defaultsTo((Object)Predef$.MODULE$.int2Integer(Integer.MAX_VALUE), (Object[])new Integer[0]);
        ArgumentAcceptingOptionSpec flushIntervalOpt = parser.accepts("flush-interval", "The number of messages between flushes").withRequiredArg().describedAs("message_count").ofType(Long.class).defaultsTo((Object)Predef$.MODULE$.long2Long(Long.MAX_VALUE), (Object[])new Long[0]);
        ArgumentAcceptingOptionSpec compressionCodecOpt = parser.accepts("compression", "The compression codec to use").withRequiredArg().describedAs("codec").ofType(String.class).defaultsTo((Object)NoCompressionCodec$.MODULE$.name(), (Object[])new String[0]);
        OptionSpecBuilder mmapOpt = parser.accepts("mmap", "Do writes to memory-mapped files.");
        OptionSpecBuilder channelOpt = parser.accepts("channel", "Do writes to file channels.");
        OptionSpecBuilder logOpt = parser.accepts("log", "Do writes to kafka logs.");
        OptionSet options = parser.parse(args);
        CommandLineUtils$.MODULE$.checkRequiredArgs(parser, options, (Seq)Predef$.MODULE$.wrapRefArray((Object[])new OptionSpec[]{bytesOpt, sizeOpt, filesOpt}));
        LongRef bytesToWrite = LongRef.create((long)((Long)options.valueOf((OptionSpec)bytesOpt)));
        int bufferSize = (Integer)options.valueOf((OptionSpec)sizeOpt);
        int numFiles = (Integer)options.valueOf((OptionSpec)filesOpt);
        long reportingInterval = (Long)options.valueOf((OptionSpec)reportingIntervalOpt);
        String dir = (String)options.valueOf((OptionSpec)dirOpt);
        long maxThroughputBytes = (long)((Integer)options.valueOf((OptionSpec)maxThroughputOpt)).intValue() * 1024L * 1024L;
        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
        int messageSize = (Integer)options.valueOf((OptionSpec)messageSizeOpt);
        long flushInterval = (Long)options.valueOf((OptionSpec)flushIntervalOpt);
        CompressionCodec compressionCodec = CompressionCodec$.MODULE$.getCompressionCodec((String)options.valueOf((OptionSpec)compressionCodecOpt));
        Random rand = new Random();
        rand.nextBytes(buffer.array());
        int numMessages = bufferSize / (messageSize + 12);
        long createTime = System.currentTimeMillis();
        CompressionType compressionType = CompressionType.forId((int)compressionCodec.codec());
        IndexedSeq records = (IndexedSeq)RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numMessages).map((Function1 & Serializable & scala.Serializable)x$1 -> TestLinearWriteSpeed$.$anonfun$main$1(createTime, messageSize, BoxesRunTime.unboxToInt((Object)x$1)), IndexedSeq$.MODULE$.canBuildFrom());
        MemoryRecords messageSet = MemoryRecords.withRecords((CompressionType)compressionType, (SimpleRecord[])((SimpleRecord[])records.toArray(ClassTag$.MODULE$.apply(SimpleRecord.class))));
        TestLinearWriteSpeed.Writable[] writables = new TestLinearWriteSpeed.Writable[numFiles];
        KafkaScheduler scheduler = new KafkaScheduler(1, KafkaScheduler$.MODULE$.$lessinit$greater$default$2(), KafkaScheduler$.MODULE$.$lessinit$greater$default$3());
        scheduler.startup();
        RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), numFiles).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> {
            if (options.has((OptionSpec)mmapOpt)) {
                writables$1[i] = new TestLinearWriteSpeed.MmapWritable(new File(dir, new StringBuilder(15).append("kafka-test-").append(i).append(".dat").toString()), bytesToWrite$1.elem / (long)numFiles, buffer);
                return;
            }
            if (options.has((OptionSpec)channelOpt)) {
                writables$1[i] = new TestLinearWriteSpeed.ChannelWritable(new File(dir, new StringBuilder(15).append("kafka-test-").append(i).append(".dat").toString()), buffer);
                return;
            }
            if (options.has((OptionSpec)logOpt)) {
                int segmentSize = rand.nextInt(512) * 1024 * 1024 + 0x4000000;
                Properties logProperties = new Properties();
                logProperties.put(LogConfig$.MODULE$.SegmentBytesProp(), Predef$.MODULE$.int2Integer(segmentSize));
                logProperties.put(LogConfig$.MODULE$.FlushMessagesProp(), Predef$.MODULE$.long2Long(flushInterval));
                writables$1[i] = new TestLinearWriteSpeed.LogWritable(new File(dir, new StringBuilder(11).append("kafka-test-").append(i).toString()), new LogConfig((Map)logProperties, LogConfig$.MODULE$.$lessinit$greater$default$2()), (Scheduler)scheduler, messageSet);
                return;
            }
            System.err.println("Must specify what to write to with one of --log, --channel, or --mmap");
            throw Exit$.MODULE$.exit(1, Exit$.MODULE$.exit$default$2());
        });
        bytesToWrite.elem = bytesToWrite.elem / (long)numFiles * (long)numFiles;
        Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("%10s\t%10s\t%10s")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{"mb_sec", "avg_latency", "max_latency"})));
        long beginTest = System.nanoTime();
        long maxLatency = 0L;
        long totalLatency = 0L;
        long count = 0L;
        long written = 0L;
        long totalWritten = 0L;
        long lastReport = beginTest;
        while (totalWritten + (long)bufferSize < bytesToWrite.elem) {
            long now;
            long lastReportMs;
            long sleepMs;
            long start = System.nanoTime();
            int writeSize = writables[RichInt$.MODULE$.abs$extension(Predef$.MODULE$.intWrapper((int)(count % (long)numFiles)))].write();
            long ellapsed = System.nanoTime() - start;
            maxLatency = package$.MODULE$.max(ellapsed, maxLatency);
            totalLatency += ellapsed;
            written += (long)writeSize;
            ++count;
            totalWritten += (long)writeSize;
            if ((double)(start - lastReport) / 1000000.0 > (double)reportingInterval) {
                double ellapsedSecs = (double)(start - lastReport) / 1.0E9;
                double mb = (double)written / 1048576.0;
                Predef$.MODULE$.println((Object)new StringOps(Predef$.MODULE$.augmentString("%10.3f\t%10.3f\t%10.3f")).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToDouble((double)(mb / ellapsedSecs)), BoxesRunTime.boxToDouble((double)((double)totalLatency / (double)count / 1000000.0)), BoxesRunTime.boxToDouble((double)((double)maxLatency / 1000000.0))})));
                lastReport = start;
                written = 0L;
                maxLatency = 0L;
                totalLatency = 0L;
                continue;
            }
            if (!((double)written > (double)maxThroughputBytes * ((double)reportingInterval / 1000.0)) || (sleepMs = (lastReportMs = lastReport / 1000000L) + reportingInterval - (now = System.nanoTime() / 1000000L)) <= 0L) continue;
            Thread.sleep(sleepMs);
        }
        double elapsedSecs = (double)(System.nanoTime() - beginTest) / 1.0E9;
        Predef$.MODULE$.println((Object)new StringBuilder(11).append(Double.toString((double)bytesToWrite.elem / (1048576.0 * elapsedSecs))).append(" MB per sec").toString());
        scheduler.shutdown();
    }

    public static final /* synthetic */ SimpleRecord $anonfun$main$1(long createTime$1, int messageSize$1, int x$1) {
        return new SimpleRecord(createTime$1, null, new byte[messageSize$1]);
    }

    private TestLinearWriteSpeed$() {
        MODULE$ = this;
    }
}

