/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.sql.streaming.continuous;

import java.io.Serializable;
import org.apache.spark.SparkEnv$;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.LocalSparkSession;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.execution.streaming.continuous.CommitPartitionEpoch;
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution;
import org.apache.spark.sql.execution.streaming.continuous.EpochCoordinatorRef$;
import org.apache.spark.sql.execution.streaming.continuous.GetCurrentEpoch$;
import org.apache.spark.sql.execution.streaming.continuous.ReportPartitionOffset;
import org.apache.spark.sql.execution.streaming.continuous.SetReaderPartitions;
import org.apache.spark.sql.execution.streaming.continuous.SetWriterPartitions;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.test.TestSparkSession;
import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.scalactic.source.Position;
import org.scalatest.Args;
import org.scalatest.BeforeAndAfterEach;
import org.scalatest.FunSuiteLike;
import org.scalatest.Status;
import org.scalatest.Tag;
import org.scalatest.mockito.MockitoSugar;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.Seq;
import scala.collection.immutable.List$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.java8.JFunction0;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0001\u0005Uc\u0001B\f\u0019\u0001\u0015BQA\u000f\u0001\u0005\u0002mB\u0011B\u0010\u0001A\u0002\u0003\u0007I\u0011B \t\u0013\u0019\u0003\u0001\u0019!a\u0001\n\u00139\u0005\"\u0003)\u0001\u0001\u0004\u0005\t\u0015)\u0003A\u0011%\t\u0006\u00011AA\u0002\u0013%!\u000bC\u0005^\u0001\u0001\u0007\t\u0019!C\u0005=\"I\u0001\r\u0001a\u0001\u0002\u0003\u0006Ka\u0015\u0005\nC\u0002\u0001\r\u00111A\u0005\n\tD\u0011b\u001b\u0001A\u0002\u0003\u0007I\u0011\u00027\t\u00139\u0004\u0001\u0019!A!B\u0013\u0019\u0007\"C8\u0001\u0001\u0004\u0005\r\u0011\"\u0003q\u0011%1\b\u00011AA\u0002\u0013%q\u000fC\u0005z\u0001\u0001\u0007\t\u0011)Q\u0005c\")!\u0010\u0001C!w\")A\u0010\u0001C\u0005{\"9\u0011q\u0001\u0001\u0005\n\u0005%\u0001bBA\u0007\u0001\u0011%\u0011q\u0002\u0005\b\u0003?\u0001A\u0011BA\u0011\u0011\u0019\t9\u0003\u0001C\u0005w\"9\u0011\u0011\u0006\u0001\u0005\n\u0005-\u0002bBA\u0018\u0001\u0011%\u0011\u0011\u0007\u0005\b\u0003k\u0001A\u0011BA\u001c\u0005U)\u0005o\\2i\u0007>|'\u000fZ5oCR|'oU;ji\u0016T!!\u0007\u000e\u0002\u0015\r|g\u000e^5ok>,8O\u0003\u0002\u001c9\u0005I1\u000f\u001e:fC6Lgn\u001a\u0006\u0003;y\t1a]9m\u0015\ty\u0002%A\u0003ta\u0006\u00148N\u0003\u0002\"E\u00051\u0011\r]1dQ\u0016T\u0011aI\u0001\u0004_J<7\u0001A\n\u0006\u0001\u0019RcF\u000e\t\u0003O!j\u0011AH\u0005\u0003Sy\u0011Qb\u00159be.4UO\\*vSR,\u0007CA\u0016-\u001b\u0005a\u0012BA\u0017\u001d\u0005EaunY1m'B\f'o[*fgNLwN\u001c\t\u0003_Qj\u0011\u0001\r\u0006\u0003cI\nq!\\8dW&$xN\u0003\u00024E\u0005I1oY1mCR,7\u000f^\u0005\u0003kA\u0012A\"T8dW&$xnU;hCJ\u0004\"a\u000e\u001d\u000e\u0003IJ!!\u000f\u001a\u0003%\t+gm\u001c:f\u0003:$\u0017I\u001a;fe\u0016\u000b7\r[\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0003q\u0002\"!\u0010\u0001\u000e\u0003a\t\u0001#\u001a9pG\"\u001cun\u001c:eS:\fGo\u001c:\u0016\u0003\u0001\u0003\"!\u0011#\u000e\u0003\tS!a\u0011\u0010\u0002\u0007I\u00048-\u0003\u0002F\u0005\nq!\u000b]2F]\u0012\u0004x.\u001b8u%\u00164\u0017\u0001F3q_\u000eD7i\\8sI&t\u0017\r^8s?\u0012*\u0017\u000f\u0006\u0002I\u001dB\u0011\u0011\nT\u0007\u0002\u0015*\t1*A\u0003tG\u0006d\u0017-\u0003\u0002N\u0015\n!QK\\5u\u0011\u001dy5!!AA\u0002\u0001\u000b1\u0001\u001f\u00132\u0003E)\u0007o\\2i\u0007>|'\u000fZ5oCR|'\u000fI\u0001\u0007oJLG/\u001a:\u0016\u0003M\u0003\"\u0001V.\u000e\u0003US!a\u0007,\u000b\u0005E;&B\u0001-Z\u0003\t1(G\u0003\u0002[9\u000591o\\;sG\u0016\u001c\u0018B\u0001/V\u00051\u0019FO]3b[^\u0013\u0018\u000e^3s\u0003)9(/\u001b;fe~#S-\u001d\u000b\u0003\u0011~Cqa\u0014\u0004\u0002\u0002\u0003\u00071+A\u0004xe&$XM\u001d\u0011\u0002\u000bE,XM]=\u0016\u0003\r\u0004\"\u0001Z5\u000e\u0003\u0015T!!\u00074\u000b\u0005m9'B\u00015\u001d\u0003%)\u00070Z2vi&|g.\u0003\u0002kK\n\u00192i\u001c8uS:,x.^:Fq\u0016\u001cW\u000f^5p]\u0006I\u0011/^3ss~#S-\u001d\u000b\u0003\u00116DqaT\u0005\u0002\u0002\u0003\u00071-\u0001\u0004rk\u0016\u0014\u0018\u0010I\u0001\u000e_J$WM\u001d,fe&4\u0017.\u001a:\u0016\u0003E\u0004\"A\u001d;\u000e\u0003MT!!\r\u0012\n\u0005U\u001c(aB%o\u001fJ$WM]\u0001\u0012_J$WM\u001d,fe&4\u0017.\u001a:`I\u0015\fHC\u0001%y\u0011\u001dyE\"!AA\u0002E\fab\u001c:eKJ4VM]5gS\u0016\u0014\b%\u0001\u0006cK\u001a|'/Z#bG\"$\u0012\u0001S\u0001\u0014g\u0016$xK]5uKJ\u0004\u0016M\u001d;ji&|gn\u001d\u000b\u0003\u0011zDaa`\bA\u0002\u0005\u0005\u0011!\u00048v[B\u000b'\u000f^5uS>t7\u000fE\u0002J\u0003\u0007I1!!\u0002K\u0005\rIe\u000e^\u0001\u0014g\u0016$(+Z1eKJ\u0004\u0016M\u001d;ji&|gn\u001d\u000b\u0004\u0011\u0006-\u0001BB@\u0011\u0001\u0004\t\t!\u0001\u000bd_6l\u0017\u000e\u001e)beRLG/[8o\u000bB|7\r\u001b\u000b\u0006\u0011\u0006E\u0011Q\u0003\u0005\b\u0003'\t\u0002\u0019AA\u0001\u0003-\u0001\u0018M\u001d;ji&|g.\u00133\t\u000f\u0005]\u0011\u00031\u0001\u0002\u001a\u0005)Q\r]8dQB\u0019\u0011*a\u0007\n\u0007\u0005u!J\u0001\u0003M_:<\u0017!\u0006:fa>\u0014H\u000fU1si&$\u0018n\u001c8PM\u001a\u001cX\r\u001e\u000b\u0006\u0011\u0006\r\u0012Q\u0005\u0005\b\u0003'\u0011\u0002\u0019AA\u0001\u0011\u001d\t9B\u0005a\u0001\u00033\t1#\\1lKNKhn\u00195s_:|Wo]\"bY2\fAB^3sS\u001aL8i\\7nSR$2\u0001SA\u0017\u0011\u001d\t9\u0002\u0006a\u0001\u00033\t\u0011C^3sS\u001aLhj\\\"p[6LGOR8s)\rA\u00151\u0007\u0005\b\u0003/)\u0002\u0019AA\r\u0003Y1XM]5gs\u000e{W.\\5ug&swJ\u001d3fe>3Gc\u0001%\u0002:!9\u00111\b\fA\u0002\u0005u\u0012AB3q_\u000eD7\u000f\u0005\u0004\u0002@\u0005=\u0013\u0011\u0004\b\u0005\u0003\u0003\nYE\u0004\u0003\u0002D\u0005%SBAA#\u0015\r\t9\u0005J\u0001\u0007yI|w\u000e\u001e \n\u0003-K1!!\u0014K\u0003\u001d\u0001\u0018mY6bO\u0016LA!!\u0015\u0002T\t\u00191+Z9\u000b\u0007\u00055#\n")
public class EpochCoordinatorSuite
extends SparkFunSuite
implements LocalSparkSession,
MockitoSugar {
    private RpcEndpointRef epochCoordinator;
    private StreamWriter writer;
    private ContinuousExecution query;
    private InOrder orderVerifier;
    private transient SparkSession spark;

    public <T> T mock(ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, classTag);
    }

    public <T> T mock(Answer<?> defaultAnswer, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, defaultAnswer, classTag);
    }

    public <T> T mock(MockSettings mockSettings, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, (MockSettings)mockSettings, classTag);
    }

    public <T> T mock(String name, ClassTag<T> classTag) {
        return (T)MockitoSugar.mock$((MockitoSugar)this, (String)name, classTag);
    }

    @Override
    public /* synthetic */ void org$apache$spark$sql$LocalSparkSession$$super$beforeAll() {
        super.beforeAll();
    }

    @Override
    public /* synthetic */ void org$apache$spark$sql$LocalSparkSession$$super$afterEach() {
        BeforeAndAfterEach.afterEach$((BeforeAndAfterEach)this);
    }

    @Override
    public void beforeAll() {
        LocalSparkSession.beforeAll$(this);
    }

    @Override
    public void afterEach() {
        LocalSparkSession.afterEach$(this);
    }

    public /* synthetic */ Status org$scalatest$BeforeAndAfterEach$$super$runTest(String testName, Args args) {
        return FunSuiteLike.runTest$((FunSuiteLike)this, (String)testName, (Args)args);
    }

    public Status runTest(String testName, Args args) {
        return BeforeAndAfterEach.runTest$((BeforeAndAfterEach)this, (String)testName, (Args)args);
    }

    @Override
    public SparkSession spark() {
        return this.spark;
    }

    @Override
    public void spark_$eq(SparkSession x$1) {
        this.spark = x$1;
    }

    private RpcEndpointRef epochCoordinator() {
        return this.epochCoordinator;
    }

    private void epochCoordinator_$eq(RpcEndpointRef x$1) {
        this.epochCoordinator = x$1;
    }

    private StreamWriter writer() {
        return this.writer;
    }

    private void writer_$eq(StreamWriter x$1) {
        this.writer = x$1;
    }

    private ContinuousExecution query() {
        return this.query;
    }

    private void query_$eq(ContinuousExecution x$1) {
        this.query = x$1;
    }

    private InOrder orderVerifier() {
        return this.orderVerifier;
    }

    private void orderVerifier_$eq(InOrder x$1) {
        this.orderVerifier = x$1;
    }

    public void beforeEach() {
        ContinuousReader reader = (ContinuousReader)this.mock(ClassTag$.MODULE$.apply(ContinuousReader.class));
        this.writer_$eq((StreamWriter)this.mock(ClassTag$.MODULE$.apply(StreamWriter.class)));
        this.query_$eq((ContinuousExecution)this.mock(ClassTag$.MODULE$.apply(ContinuousExecution.class)));
        this.orderVerifier_$eq(Mockito.inOrder((Object[])new Object[]{this.writer(), this.query()}));
        this.spark_$eq(new TestSparkSession());
        this.epochCoordinator_$eq(EpochCoordinatorRef$.MODULE$.create(this.writer(), reader, this.query(), "test", 1L, this.spark(), SparkEnv$.MODULE$.get()));
    }

    private void setWriterPartitions(int numPartitions) {
        this.epochCoordinator().askSync((Object)new SetWriterPartitions(numPartitions), ClassTag$.MODULE$.Unit());
    }

    private void setReaderPartitions(int numPartitions) {
        this.epochCoordinator().askSync((Object)new SetReaderPartitions(numPartitions), ClassTag$.MODULE$.Unit());
    }

    private void commitPartitionEpoch(int partitionId, long epoch) {
        WriterCommitMessage dummyMessage = (WriterCommitMessage)this.mock(ClassTag$.MODULE$.apply(WriterCommitMessage.class));
        this.epochCoordinator().send((Object)new CommitPartitionEpoch(partitionId, epoch, dummyMessage));
    }

    private void reportPartitionOffset(int partitionId, long epoch) {
        PartitionOffset dummyOffset = (PartitionOffset)this.mock(ClassTag$.MODULE$.apply(PartitionOffset.class));
        this.epochCoordinator().send((Object)new ReportPartitionOffset(partitionId, epoch, dummyOffset));
    }

    private void makeSynchronousCall() {
        this.epochCoordinator().askSync((Object)GetCurrentEpoch$.MODULE$, ClassTag$.MODULE$.Long());
    }

    private void verifyCommit(long epoch) {
        ((StreamWriter)this.orderVerifier().verify((Object)this.writer())).commit(Matchers.eq((long)epoch), (WriterCommitMessage[])Matchers.any());
        ((ContinuousExecution)this.orderVerifier().verify((Object)this.query())).commit(epoch);
    }

    private void verifyNoCommitFor(long epoch) {
        ((StreamWriter)Mockito.verify((Object)this.writer(), (VerificationMode)Mockito.never())).commit(Matchers.eq((long)epoch), (WriterCommitMessage[])Matchers.any());
        ((ContinuousExecution)Mockito.verify((Object)this.query(), (VerificationMode)Mockito.never())).commit(epoch);
    }

    private void verifyCommitsInOrderOf(Seq<Object> epochs) {
        epochs.foreach((Function1)(JFunction1.mcVJ.sp & Serializable & scala.Serializable)epoch -> this.verifyCommit(epoch));
    }

    public EpochCoordinatorSuite() {
        BeforeAndAfterEach.$init$((BeforeAndAfterEach)this);
        LocalSparkSession.$init$(this);
        MockitoSugar.$init$((MockitoSugar)this);
        this.test("single epoch", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(3);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.commitPartitionEpoch(2, 1L);
            this.reportPartitionOffset(0, 1L);
            this.reportPartitionOffset(1, 1L);
            this.makeSynchronousCall();
            this.verifyCommit(1L);
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 59));
        this.test("single epoch, all but one writer partition has committed", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(3);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.reportPartitionOffset(0, 1L);
            this.reportPartitionOffset(1, 1L);
            this.makeSynchronousCall();
            this.verifyNoCommitFor(1L);
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 76));
        this.test("single epoch, all but one reader partition has reported an offset", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(3);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.commitPartitionEpoch(2, 1L);
            this.reportPartitionOffset(0, 1L);
            this.makeSynchronousCall();
            this.verifyNoCommitFor(1L);
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 90));
        this.test("consequent epochs, messages for epoch (k + 1) arrive after messages for epoch k", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(2);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.reportPartitionOffset(0, 1L);
            this.reportPartitionOffset(1, 1L);
            this.commitPartitionEpoch(0, 2L);
            this.commitPartitionEpoch(1, 2L);
            this.reportPartitionOffset(0, 2L);
            this.reportPartitionOffset(1, 2L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 104));
        this.test("consequent epochs, a message for epoch k arrives after messages for epoch (k + 1)", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(2);
            this.setReaderPartitions(2);
            this.commitPartitionEpoch(0, 1L);
            this.commitPartitionEpoch(1, 1L);
            this.reportPartitionOffset(0, 1L);
            this.commitPartitionEpoch(0, 2L);
            this.commitPartitionEpoch(1, 2L);
            this.reportPartitionOffset(0, 2L);
            this.reportPartitionOffset(1, 2L);
            this.reportPartitionOffset(1, 1L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 123));
        this.test("several epochs, messages arrive in order 1 -> 3 -> 4 -> 2", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(1);
            this.setReaderPartitions(1);
            this.commitPartitionEpoch(0, 1L);
            this.reportPartitionOffset(0, 1L);
            this.commitPartitionEpoch(0, 3L);
            this.reportPartitionOffset(0, 3L);
            this.commitPartitionEpoch(0, 4L);
            this.reportPartitionOffset(0, 4L);
            this.commitPartitionEpoch(0, 2L);
            this.reportPartitionOffset(0, 2L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L, 3L, 4L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 144));
        this.test("several epochs, messages arrive in order 1 -> 3 -> 5 -> 4 -> 2", (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tag[0]), (Function0)(JFunction0.mcV.sp & Serializable & scala.Serializable)() -> {
            this.setWriterPartitions(1);
            this.setReaderPartitions(1);
            this.commitPartitionEpoch(0, 1L);
            this.reportPartitionOffset(0, 1L);
            this.commitPartitionEpoch(0, 3L);
            this.reportPartitionOffset(0, 3L);
            this.commitPartitionEpoch(0, 5L);
            this.reportPartitionOffset(0, 5L);
            this.commitPartitionEpoch(0, 4L);
            this.reportPartitionOffset(0, 4L);
            this.commitPartitionEpoch(0, 2L);
            this.reportPartitionOffset(0, 2L);
            this.makeSynchronousCall();
            this.verifyCommitsInOrderOf((Seq<Object>)List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapLongArray(new long[]{1L, 2L, 3L, 4L, 5L})));
        }, new Position("EpochCoordinatorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 165));
    }
}

