/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.streaming.scheduler;

import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Milliseconds$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.TestServer;
import org.apache.spark.streaming.TestSuiteBase;
import org.apache.spark.streaming.dstream.DStream;
import org.apache.spark.streaming.dstream.ReceiverInputDStream;
import org.apache.spark.streaming.scheduler.ConstantEstimator;
import org.apache.spark.streaming.scheduler.RateTestInputDStream;
import org.apache.spark.streaming.scheduler.RateTestReceiver;
import org.apache.spark.streaming.scheduler.RateTestReceiver$;
import org.apache.spark.streaming.scheduler.rate.RateEstimator;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.Tag;
import org.scalatest.compatible.Assertion;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.enablers.Retrying$;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.Predef$;
import scala.Some;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ClassTag;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001)2A\u0001B\u0003\u0001!!)\u0011\u0004\u0001C\u00015!)Q\u0004\u0001C!=!)Q\u0005\u0001C!M\t\u0019\"+\u0019;f\u0007>tGO]8mY\u0016\u00148+^5uK*\u0011aaB\u0001\ng\u000eDW\rZ;mKJT!\u0001C\u0005\u0002\u0013M$(/Z1nS:<'B\u0001\u0006\f\u0003\u0015\u0019\b/\u0019:l\u0015\taQ\"\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002\u001d\u0005\u0019qN]4\u0004\u0001M\u0019\u0001!E\u000b\u0011\u0005I\u0019R\"A\u0005\n\u0005QI!!D*qCJ\\g)\u001e8Tk&$X\r\u0005\u0002\u0017/5\tq!\u0003\u0002\u0019\u000f\tiA+Z:u'VLG/\u001a\"bg\u0016\fa\u0001P5oSRtD#A\u000e\u0011\u0005q\u0001Q\"A\u0003\u0002\u001dU\u001cX-T1ok\u0006d7\t\\8dWV\tq\u0004\u0005\u0002!G5\t\u0011EC\u0001#\u0003\u0015\u00198-\u00197b\u0013\t!\u0013EA\u0004C_>dW-\u00198\u0002\u001b\t\fGo\u00195EkJ\fG/[8o+\u00059\u0003C\u0001\f)\u0013\tIsA\u0001\u0005EkJ\fG/[8o\u0001")
public class RateControllerSuite
extends SparkFunSuite
implements TestSuiteBase {
    private String checkpointDir;
    private final SparkConf conf;
    private final PatienceConfiguration.Timeout eventuallyTimeout;
    private volatile boolean bitmap$0;

    @Override
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$beforeEach() {
        BeforeAndAfterEach.beforeEach$((BeforeAndAfterEach)this);
    }

    @Override
    public /* synthetic */ void org$apache$spark$streaming$TestSuiteBase$$super$afterEach() {
        BeforeAndAfterEach.afterEach$((BeforeAndAfterEach)this);
    }

    @Override
    public String framework() {
        return TestSuiteBase.framework$(this);
    }

    @Override
    public String master() {
        return TestSuiteBase.master$(this);
    }

    @Override
    public int numInputPartitions() {
        return TestSuiteBase.numInputPartitions$(this);
    }

    @Override
    public int maxWaitTimeMillis() {
        return TestSuiteBase.maxWaitTimeMillis$(this);
    }

    @Override
    public boolean actuallyWait() {
        return TestSuiteBase.actuallyWait$(this);
    }

    @Override
    public void beforeFunction() {
        TestSuiteBase.beforeFunction$(this);
    }

    @Override
    public void afterFunction() {
        TestSuiteBase.afterFunction$(this);
    }

    @Override
    public void beforeEach() {
        TestSuiteBase.beforeEach$(this);
    }

    @Override
    public void afterEach() {
        TestSuiteBase.afterEach$(this);
    }

    @Override
    public <R> R withStreamingContext(StreamingContext ssc, Function1<StreamingContext, R> block) {
        return (R)TestSuiteBase.withStreamingContext$(this, ssc, block);
    }

    @Override
    public <R> R withTestServer(TestServer testServer, Function1<TestServer, R> block) {
        return (R)TestSuiteBase.withTestServer$(this, testServer, block);
    }

    @Override
    public <U, V> StreamingContext setupStreams(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, int numPartitions, ClassTag<U> evidence$4, ClassTag<V> evidence$5) {
        return TestSuiteBase.setupStreams$(this, input, operation, numPartitions, evidence$4, evidence$5);
    }

    @Override
    public <U, V> int setupStreams$default$3() {
        return TestSuiteBase.setupStreams$default$3$(this);
    }

    @Override
    public <U, V, W> StreamingContext setupStreams(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, ClassTag<U> evidence$6, ClassTag<V> evidence$7, ClassTag<W> evidence$8) {
        return TestSuiteBase.setupStreams$(this, input1, input2, operation, evidence$6, evidence$7, evidence$8);
    }

    @Override
    public <V> Seq<Seq<V>> runStreams(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$9) {
        return TestSuiteBase.runStreams$(this, ssc, numBatches, numExpectedOutput, preStop, evidence$9);
    }

    @Override
    public <V> Function0<BoxedUnit> runStreams$default$4() {
        return TestSuiteBase.runStreams$default$4$(this);
    }

    @Override
    public <V> Seq<Seq<Seq<V>>> runStreamsWithPartitions(StreamingContext ssc, int numBatches, int numExpectedOutput, Function0<BoxedUnit> preStop, ClassTag<V> evidence$10) {
        return TestSuiteBase.runStreamsWithPartitions$(this, ssc, numBatches, numExpectedOutput, preStop, evidence$10);
    }

    @Override
    public <V> Function0<BoxedUnit> runStreamsWithPartitions$default$4() {
        return TestSuiteBase.runStreamsWithPartitions$default$4$(this);
    }

    @Override
    public <V> void verifyOutput(Seq<Seq<V>> output, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<V> evidence$11) {
        TestSuiteBase.verifyOutput$(this, output, expectedOutput, useSet, evidence$11);
    }

    @Override
    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, boolean useSet, ClassTag<U> evidence$12, ClassTag<V> evidence$13) {
        TestSuiteBase.testOperation$(this, input, operation, expectedOutput, useSet, evidence$12, evidence$13);
    }

    @Override
    public <U, V> boolean testOperation$default$4() {
        return TestSuiteBase.testOperation$default$4$(this);
    }

    @Override
    public <U, V> void testOperation(Seq<Seq<U>> input, Function1<DStream<U>, DStream<V>> operation, Seq<Seq<V>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$14, ClassTag<V> evidence$15) {
        TestSuiteBase.testOperation$(this, input, operation, expectedOutput, numBatches, useSet, evidence$14, evidence$15);
    }

    @Override
    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, boolean useSet, ClassTag<U> evidence$16, ClassTag<V> evidence$17, ClassTag<W> evidence$18) {
        TestSuiteBase.testOperation$(this, input1, input2, operation, expectedOutput, useSet, evidence$16, evidence$17, evidence$18);
    }

    @Override
    public <U, V, W> void testOperation(Seq<Seq<U>> input1, Seq<Seq<V>> input2, Function2<DStream<U>, DStream<V>, DStream<W>> operation, Seq<Seq<W>> expectedOutput, int numBatches, boolean useSet, ClassTag<U> evidence$19, ClassTag<V> evidence$20, ClassTag<W> evidence$21) {
        TestSuiteBase.testOperation$(this, input1, input2, operation, expectedOutput, numBatches, useSet, evidence$19, evidence$20, evidence$21);
    }

    private String checkpointDir$lzycompute() {
        RateControllerSuite rateControllerSuite = this;
        synchronized (rateControllerSuite) {
            if (!this.bitmap$0) {
                this.checkpointDir = TestSuiteBase.checkpointDir$(this);
                this.bitmap$0 = true;
            }
        }
        return this.checkpointDir;
    }

    @Override
    public String checkpointDir() {
        if (!this.bitmap$0) {
            return this.checkpointDir$lzycompute();
        }
        return this.checkpointDir;
    }

    @Override
    public SparkConf conf() {
        return this.conf;
    }

    @Override
    public PatienceConfiguration.Timeout eventuallyTimeout() {
        return this.eventuallyTimeout;
    }

    @Override
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$conf_$eq(SparkConf x$1) {
        this.conf = x$1;
    }

    @Override
    public void org$apache$spark$streaming$TestSuiteBase$_setter_$eventuallyTimeout_$eq(PatienceConfiguration.Timeout x$1) {
        this.eventuallyTimeout = x$1;
    }

    @Override
    public boolean useManualClock() {
        return false;
    }

    @Override
    public Duration batchDuration() {
        return Milliseconds$.MODULE$.apply(50L);
    }

    private final void updateRateAndVerify$1(long rate, ConstantEstimator estimator$1) {
        estimator$1.updateRate(rate);
        Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(5).seconds()), (Function0 & Serializable & scala.Serializable)() -> {
            TripleEqualsSupport.Equalizer $org_scalatest_assert_macro_left = this.convertToEqualizer(BoxesRunTime.boxToLong((long)((RateTestReceiver)((Object)((Object)RateTestReceiver$.MODULE$.getActive().get()))).getDefaultBlockGeneratorRateLimit()));
            long $org_scalatest_assert_macro_right = rate;
            Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)$org_scalatest_assert_macro_left, "===", (Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left.$eq$eq$eq((Object)BoxesRunTime.boxToLong((long)$org_scalatest_assert_macro_right), Equality$.MODULE$.default()), Prettifier$.MODULE$.default());
            return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 63));
        }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 62));
    }

    public static final /* synthetic */ void $anonfun$new$5(RateControllerSuite $this, StreamingContext ssc) {
        ConstantEstimator estimator = new ConstantEstimator(100L);
        RateTestInputDStream dstream = new RateTestInputDStream(null, ssc, estimator){
            private final Some<ReceiverInputDStream.ReceiverRateController> rateController;

            public Some<ReceiverInputDStream.ReceiverRateController> rateController() {
                return this.rateController;
            }
            {
                this.rateController = new Some((Object)new ReceiverInputDStream.ReceiverRateController((ReceiverInputDStream)this, this.id(), (RateEstimator)estimator$1));
            }
        };
        dstream.register();
        ssc.start();
        Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(5).seconds()), (Function0)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> RateTestReceiver$.MODULE$.getActive().nonEmpty(), Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 55));
        ((IterableLike)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapIntArray(new int[]{100, 200, 300}))).foreach((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)rate -> $this.updateRateAndVerify$1(rate, estimator));
    }

    public RateControllerSuite() {
        TestSuiteBase.$init$(this);
        this.test("RateController - rate controller publishes updates after batches complete", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0 & Serializable & scala.Serializable)() -> (Assertion)this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), (Function1 & Serializable & scala.Serializable)ssc -> {
            RateTestInputDStream dstream = new RateTestInputDStream((StreamingContext)ssc);
            dstream.register();
            ssc.start();
            return (Assertion)Eventually$.MODULE$.eventually(Eventually$.MODULE$.timeout(SpanSugar$.MODULE$.convertIntToGrainOfTime(10).seconds()), (Function0 & Serializable & scala.Serializable)() -> {
                int $org_scalatest_assert_macro_left = dstream.publishedRates();
                int $org_scalatest_assert_macro_right = 0;
                Bool $org_scalatest_assert_macro_expr = Bool$.MODULE$.binaryMacroBool((Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_left), ">", (Object)BoxesRunTime.boxToInteger((int)$org_scalatest_assert_macro_right), $org_scalatest_assert_macro_left > $org_scalatest_assert_macro_right, Prettifier$.MODULE$.default());
                return Assertions$.MODULE$.assertionsHelper().macroAssert($org_scalatest_assert_macro_expr, (Object)"", Prettifier$.MODULE$.default(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 39));
            }, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 38));
        }), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 32));
        this.test("ReceiverRateController - published rates reach receivers", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> this.withStreamingContext(new StreamingContext(this.conf(), this.batchDuration()), (Function1 & Serializable & scala.Serializable)ssc -> {
            RateControllerSuite.$anonfun$new$5(this, ssc);
            return BoxedUnit.UNIT;
        }), new Position("RateControllerSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 44));
    }
}

