/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.util;

import java.io.OutputStream;
import java.util.concurrent.TimeUnit;
import org.apache.spark.internal.Logging;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.Serializable;
import scala.collection.mutable.StringBuilder;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

@ScalaSignature(bytes="\u0006\u0001Y4Q!\u0001\u0002\u0001\t1\u0011qCU1uK2KW.\u001b;fI>+H\u000f];u'R\u0014X-Y7\u000b\u0005\r!\u0011\u0001B;uS2T!!\u0002\u0004\u0002\u0013M$(/Z1nS:<'BA\u0004\t\u0003\u0015\u0019\b/\u0019:l\u0015\tI!\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u0017\u0005\u0019qN]4\u0014\u0007\u0001iQ\u0003\u0005\u0002\u000f'5\tqB\u0003\u0002\u0011#\u0005\u0011\u0011n\u001c\u0006\u0002%\u0005!!.\u0019<b\u0013\t!rB\u0001\u0007PkR\u0004X\u000f^*ue\u0016\fW\u000e\u0005\u0002\u001735\tqC\u0003\u0002\u0019\r\u0005A\u0011N\u001c;fe:\fG.\u0003\u0002\u001b/\t9Aj\\4hS:<\u0007\u0002\u0003\u000f\u0001\u0005\u0003\u0005\u000b\u0011B\u0007\u0002\u0007=,Ho\u0001\u0001\t\u0011}\u0001!\u0011!Q\u0001\n\u0001\n!\u0003Z3tSJ,GMQ=uKN\u0004VM]*fGB\u0011\u0011\u0005J\u0007\u0002E)\t1%A\u0003tG\u0006d\u0017-\u0003\u0002&E\t\u0019\u0011J\u001c;\t\u000b\u001d\u0002A\u0011\u0001\u0015\u0002\rqJg.\u001b;?)\rI3\u0006\f\t\u0003U\u0001i\u0011A\u0001\u0005\u00069\u0019\u0002\r!\u0004\u0005\u0006?\u0019\u0002\r\u0001\t\u0005\b]\u0001\u0011\r\u0011\"\u00030\u00035\u0019\u0016LT\"`\u0013:#VI\u0015,B\u0019V\t\u0001\u0007\u0005\u0002\"c%\u0011!G\t\u0002\u0005\u0019>tw\r\u0003\u00045\u0001\u0001\u0006I\u0001M\u0001\u000f'fs5iX%O)\u0016\u0013f+\u0011'!\u0011\u001d1\u0004A1A\u0005\n]\n!b\u0011%V\u001d.{6+\u0013.F+\u0005\u0001\u0003BB\u001d\u0001A\u0003%\u0001%A\u0006D\u0011Vs5jX*J5\u0016\u0003\u0003bB\u001e\u0001\u0001\u0004%IaL\u0001\rY\u0006\u001cHoU=oGRKW.\u001a\u0005\b{\u0001\u0001\r\u0011\"\u0003?\u0003Aa\u0017m\u001d;Ts:\u001cG+[7f?\u0012*\u0017\u000f\u0006\u0002@\u0005B\u0011\u0011\u0005Q\u0005\u0003\u0003\n\u0012A!\u00168ji\"91\tPA\u0001\u0002\u0004\u0001\u0014a\u0001=%c!1Q\t\u0001Q!\nA\nQ\u0002\\1tiNKhn\u0019+j[\u0016\u0004\u0003bB$\u0001\u0001\u0004%IaL\u0001\u0016Ef$Xm],sSR$XM\\*j]\u000e,7+\u001f8d\u0011\u001dI\u0005\u00011A\u0005\n)\u000b\u0011DY=uKN<&/\u001b;uK:\u001c\u0016N\\2f'ft7m\u0018\u0013fcR\u0011qh\u0013\u0005\b\u0007\"\u000b\t\u00111\u00011\u0011\u0019i\u0005\u0001)Q\u0005a\u00051\"-\u001f;fg^\u0013\u0018\u000e\u001e;f]NKgnY3Ts:\u001c\u0007\u0005C\u0003P\u0001\u0011\u0005\u0003+A\u0003xe&$X\r\u0006\u0002@#\")!K\u0014a\u0001A\u0005\t!\rC\u0003P\u0001\u0011\u0005C\u000b\u0006\u0002@+\")ak\u0015a\u0001/\u0006)!-\u001f;fgB\u0019\u0011\u0005\u0017.\n\u0005e\u0013#!B!se\u0006L\bCA\u0011\\\u0013\ta&E\u0001\u0003CsR,\u0007\"B(\u0001\t\u000brF\u0003B `A\nDQAV/A\u0002]CQ!Y/A\u0002\u0001\naa\u001c4gg\u0016$\b\"B2^\u0001\u0004\u0001\u0013A\u00027f]\u001e$\b\u000e\u000b\u0002^KB\u0011a-[\u0007\u0002O*\u0011\u0001NI\u0001\u000bC:tw\u000e^1uS>t\u0017B\u00016h\u0005\u001d!\u0018-\u001b7sK\u000eDQ\u0001\u001c\u0001\u0005B5\fQA\u001a7vg\"$\u0012a\u0010\u0005\u0006_\u0002!\t%\\\u0001\u0006G2|7/\u001a\u0005\u0006c\u0002!IA]\u0001\fo\u0006LG\u000fV8Xe&$X\r\u0006\u0002@g\")A\u000f\u001da\u0001A\u0005Aa.^7CsR,7\u000f\u000b\u0002qK\u0002")
public class RateLimitedOutputStream
extends OutputStream
implements Logging {
    private final OutputStream out;
    public final int org$apache$spark$streaming$util$RateLimitedOutputStream$$desiredBytesPerSec;
    private final long SYNC_INTERVAL;
    private final int CHUNK_SIZE;
    private long lastSyncTime;
    private long bytesWrittenSinceSync;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger x$1) {
        this.org$apache$spark$internal$Logging$$log_ = x$1;
    }

    public String logName() {
        return Logging.class.logName((Logging)this);
    }

    public Logger log() {
        return Logging.class.log((Logging)this);
    }

    public void logInfo(Function0<String> msg) {
        Logging.class.logInfo((Logging)this, msg);
    }

    public void logDebug(Function0<String> msg) {
        Logging.class.logDebug((Logging)this, msg);
    }

    public void logTrace(Function0<String> msg) {
        Logging.class.logTrace((Logging)this, msg);
    }

    public void logWarning(Function0<String> msg) {
        Logging.class.logWarning((Logging)this, msg);
    }

    public void logError(Function0<String> msg) {
        Logging.class.logError((Logging)this, msg);
    }

    public void logInfo(Function0<String> msg, Throwable throwable) {
        Logging.class.logInfo((Logging)this, msg, (Throwable)throwable);
    }

    public void logDebug(Function0<String> msg, Throwable throwable) {
        Logging.class.logDebug((Logging)this, msg, (Throwable)throwable);
    }

    public void logTrace(Function0<String> msg, Throwable throwable) {
        Logging.class.logTrace((Logging)this, msg, (Throwable)throwable);
    }

    public void logWarning(Function0<String> msg, Throwable throwable) {
        Logging.class.logWarning((Logging)this, msg, (Throwable)throwable);
    }

    public void logError(Function0<String> msg, Throwable throwable) {
        Logging.class.logError((Logging)this, msg, (Throwable)throwable);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled((Logging)this);
    }

    public void initializeLogIfNecessary(boolean isInterpreter) {
        Logging.class.initializeLogIfNecessary((Logging)this, (boolean)isInterpreter);
    }

    private long SYNC_INTERVAL() {
        return this.SYNC_INTERVAL;
    }

    private int CHUNK_SIZE() {
        return this.CHUNK_SIZE;
    }

    private long lastSyncTime() {
        return this.lastSyncTime;
    }

    private void lastSyncTime_$eq(long x$1) {
        this.lastSyncTime = x$1;
    }

    private long bytesWrittenSinceSync() {
        return this.bytesWrittenSinceSync;
    }

    private void bytesWrittenSinceSync_$eq(long x$1) {
        this.bytesWrittenSinceSync = x$1;
    }

    @Override
    public void write(int b) {
        this.waitToWrite(1);
        this.out.write(b);
    }

    @Override
    public void write(byte[] bytes) {
        this.write(bytes, 0, bytes.length);
    }

    @Override
    public final void write(byte[] bytes, int offset, int length) {
        int writeSize;
        while ((writeSize = package$.MODULE$.min(length - offset, this.CHUNK_SIZE())) > 0) {
            this.waitToWrite(writeSize);
            this.out.write(bytes, offset, writeSize);
            offset += writeSize;
        }
    }

    @Override
    public void flush() {
        this.out.flush();
    }

    @Override
    public void close() {
        this.out.close();
    }

    /*
     * WARNING - void declaration
     */
    private void waitToWrite(int numBytes) {
        while (true) {
            void var6_4;
            void var4_3;
            void elapsedTimeInMillis;
            long now = System.nanoTime();
            long elapsedNanosecs = package$.MODULE$.max(now - this.lastSyncTime(), 1L);
            double rate = (double)this.bytesWrittenSinceSync() * (double)1000000000 / (double)elapsedNanosecs;
            if (rate < (double)this.org$apache$spark$streaming$util$RateLimitedOutputStream$$desiredBytesPerSec) {
                BoxedUnit boxedUnit;
                this.bytesWrittenSinceSync_$eq(this.bytesWrittenSinceSync() + (long)numBytes);
                if (now > this.lastSyncTime() + this.SYNC_INTERVAL()) {
                    this.lastSyncTime_$eq(now);
                    this.bytesWrittenSinceSync_$eq(numBytes);
                    boxedUnit = BoxedUnit.UNIT;
                } else {
                    boxedUnit = BoxedUnit.UNIT;
                }
                return;
            }
            long targetTimeInMillis = this.bytesWrittenSinceSync() * 1000L / (long)this.org$apache$spark$streaming$util$RateLimitedOutputStream$$desiredBytesPerSec;
            long sleepTimeInMillis = targetTimeInMillis - (elapsedTimeInMillis = var4_3 / 1000000L);
            if (sleepTimeInMillis <= 0L) continue;
            this.logTrace((Function0<String>)new Serializable(this, (double)var6_4, sleepTimeInMillis){
                public static final long serialVersionUID = 0L;
                private final /* synthetic */ RateLimitedOutputStream $outer;
                private final double rate$1;
                private final long sleepTimeInMillis$1;

                public final String apply() {
                    return new StringBuilder().append((Object)"Natural rate is ").append((Object)BoxesRunTime.boxToDouble((double)this.rate$1)).append((Object)" per second but desired rate is ").append((Object)BoxesRunTime.boxToInteger((int)this.$outer.org$apache$spark$streaming$util$RateLimitedOutputStream$$desiredBytesPerSec)).append((Object)", sleeping for ").append((Object)BoxesRunTime.boxToLong((long)this.sleepTimeInMillis$1)).append((Object)" ms to compensate.").toString();
                }
                {
                    if ($outer == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = $outer;
                    this.rate$1 = rate$1;
                    this.sleepTimeInMillis$1 = sleepTimeInMillis$1;
                }
            });
            Thread.sleep(sleepTimeInMillis);
        }
    }

    public RateLimitedOutputStream(OutputStream out, int desiredBytesPerSec) {
        this.out = out;
        this.org$apache$spark$streaming$util$RateLimitedOutputStream$$desiredBytesPerSec = desiredBytesPerSec;
        Logging.class.$init$((Logging)this);
        Predef$.MODULE$.require(desiredBytesPerSec > 0);
        this.SYNC_INTERVAL = TimeUnit.NANOSECONDS.convert(10L, TimeUnit.SECONDS);
        this.CHUNK_SIZE = 8192;
        this.lastSyncTime = System.nanoTime();
        this.bytesWrittenSinceSync = 0L;
    }
}

