/*
 * Decompiled with CFR 0.152.
 */
package kafka.api;

import java.io.File;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import kafka.api.FixedPortTestUtils$;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.ShutdownableThread;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.Assert;
import org.junit.Test;
import scala.Function0;
import scala.Function1;
import scala.Function3;
import scala.Option;
import scala.Predef$;
import scala.collection.GenTraversableOnce;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Range;
import scala.collection.immutable.Set;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.BufferLike;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.ListBuffer;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;

@ScalaSignature(bytes="\u0006\u0001\u0005ee\u0001B\u000e\u001d\u0001\u0005BQ\u0001\u000b\u0001\u0005\u0002%Bq\u0001\f\u0001C\u0002\u0013%Q\u0006\u0003\u00045\u0001\u0001\u0006IA\f\u0005\bk\u0001\u0011\r\u0011\"\u0003.\u0011\u00191\u0004\u0001)A\u0005]!9q\u0007\u0001b\u0001\n\u0013i\u0003B\u0002\u001d\u0001A\u0003%a\u0006C\u0004:\u0001\t\u0007I\u0011A\u0017\t\ri\u0002\u0001\u0015!\u0003/\u0011\u001dY\u0004A1A\u0005\nqBa!\u0012\u0001!\u0002\u0013i\u0004b\u0002$\u0001\u0005\u0004%I\u0001\u0010\u0005\u0007\u000f\u0002\u0001\u000b\u0011B\u001f\t\u000f!\u0003!\u0019!C\u0001\u0013\"1\u0001\u000b\u0001Q\u0001\n)CQ!\u0015\u0001\u0005BICQa\u0018\u0001\u0005\u0002\u0001DQ!\u001c\u0001\u0005\u0002\u0001DQa\u001c\u0001\u0005\nADq!a\f\u0001\t\u0013\t\t\u0004C\u0005\u0002X\u0001\t\n\u0011\"\u0003\u0002Z!9\u0011q\u000e\u0001\u0005\n\u0005EdABA@\u0001\u0011\t\t\t\u0003\u0004)/\u0011\u0005\u0011q\u0012\u0005\u0007\u0003+;B\u0011\t1\t\r\u0005]u\u0003\"\u0011a\u0005Y!&/\u00198tC\u000e$\u0018n\u001c8t\u0005>,hnY3UKN$(BA\u000f\u001f\u0003\r\t\u0007/\u001b\u0006\u0002?\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001#!\t\u0019c%D\u0001%\u0015\t)c$A\u0006j]R,wM]1uS>t\u0017BA\u0014%\u0005YY\u0015MZ6b'\u0016\u0014h/\u001a:UKN$\b*\u0019:oKN\u001c\u0018A\u0002\u001fj]&$h\bF\u0001+!\tY\u0003!D\u0001\u001d\u0003I\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3\u0016\u00039\u0002\"a\f\u001a\u000e\u0003AR\u0011!M\u0001\u0006g\u000e\fG.Y\u0005\u0003gA\u00121!\u00138u\u0003M\u0001(o\u001c3vG\u0016\u0014()\u001e4gKJ\u001c\u0016N_3!\u0003U\u0019XM\u001d<fe6+7o]1hK6\u000b\u0007PQ=uKN\fac]3sm\u0016\u0014X*Z:tC\u001e,W*\u0019=CsR,7\u000fI\u0001\u000e]Vl\u0007+\u0019:uSRLwN\\:\u0002\u001d9,X\u000eU1si&$\u0018n\u001c8tA\u0005Qa.^7TKJ4XM]:\u0002\u00179,XnU3sm\u0016\u00148\u000fI\u0001\f_V$\b/\u001e;U_BL7-F\u0001>!\tq4)D\u0001@\u0015\t\u0001\u0015)\u0001\u0003mC:<'\"\u0001\"\u0002\t)\fg/Y\u0005\u0003\t~\u0012aa\u0015;sS:<\u0017\u0001D8viB,H\u000fV8qS\u000e\u0004\u0013AC5oaV$Hk\u001c9jG\u0006Y\u0011N\u001c9viR{\u0007/[2!\u0003=yg/\u001a:sS\u0012Lgn\u001a)s_B\u001cX#\u0001&\u0011\u0005-sU\"\u0001'\u000b\u00055\u000b\u0015\u0001B;uS2L!a\u0014'\u0003\u0015A\u0013x\u000e]3si&,7/\u0001\tpm\u0016\u0014(/\u001b3j]\u001e\u0004&o\u001c9tA\u0005yq-\u001a8fe\u0006$XmQ8oM&<7/F\u0001T!\r!v+W\u0007\u0002+*\u0011a\u000bM\u0001\u000bG>dG.Z2uS>t\u0017B\u0001-V\u0005\r\u0019V-\u001d\t\u00035vk\u0011a\u0017\u0006\u00039z\taa]3sm\u0016\u0014\u0018B\u00010\\\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\u001fQ,7\u000f^,ji\"<%o\\;q\u0013\u0012$\u0012!\u0019\t\u0003_\tL!a\u0019\u0019\u0003\tUs\u0017\u000e\u001e\u0015\u0003#\u0015\u0004\"AZ6\u000e\u0003\u001dT!\u0001[5\u0002\u000b),h.\u001b;\u000b\u0003)\f1a\u001c:h\u0013\tawM\u0001\u0003UKN$\u0018!\u0006;fgR<\u0016\u000e\u001e5He>,\b/T3uC\u0012\fG/\u0019\u0015\u0003%\u0015\f\u0011\u0003^3ti\n\u0013xn[3s\r\u0006LG.\u001e:f)\t\t\u0017\u000fC\u0003s'\u0001\u00071/\u0001\u0004d_6l\u0017\u000e\u001e\t\t_Q4\u0018qBA\u0012C&\u0011Q\u000f\r\u0002\n\rVt7\r^5p]N\u0002ba^@\u0002\u0004\u0005\rQ\"\u0001=\u000b\u0005eT\u0018\u0001\u00039s_\u0012,8-\u001a:\u000b\u0005md\u0018aB2mS\u0016tGo\u001d\u0006\u0003?uT!A`5\u0002\r\u0005\u0004\u0018m\u00195f\u0013\r\t\t\u0001\u001f\u0002\u000e\u0017\u000647.\u0019)s_\u0012,8-\u001a:\u0011\u000b=\n)!!\u0003\n\u0007\u0005\u001d\u0001GA\u0003BeJ\f\u0017\u0010E\u00020\u0003\u0017I1!!\u00041\u0005\u0011\u0011\u0015\u0010^3\u0011\t\u0005E\u0011q\u0004\b\u0005\u0003'\tY\u0002E\u0002\u0002\u0016Aj!!a\u0006\u000b\u0007\u0005e\u0001%\u0001\u0004=e>|GOP\u0005\u0004\u0003;\u0001\u0014A\u0002)sK\u0012,g-C\u0002E\u0003CQ1!!\b1!!\t)#a\u000b\u0002\u0004\u0005\rQBAA\u0014\u0015\r\tIC_\u0001\tG>t7/^7fe&!\u0011QFA\u0014\u00055Y\u0015MZ6b\u0007>t7/^7fe\u0006\u00113M]3bi\u0016\u001cuN\\:v[\u0016\u0014\u0018I\u001c3Tk\n\u001c8M]5cKR{Gk\u001c9jGN$\u0002\"a\t\u00024\u0005]\u0012Q\n\u0005\b\u0003k!\u0002\u0019AA\b\u0003\u001d9'o\\;q\u0013\u0012Dq!!\u000f\u0015\u0001\u0004\tY$\u0001\u0004u_BL7m\u001d\t\u0007\u0003{\t9%a\u0004\u000f\t\u0005}\u00121\t\b\u0005\u0003+\t\t%C\u00012\u0013\r\t)\u0005M\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\tI%a\u0013\u0003\t1K7\u000f\u001e\u0006\u0004\u0003\u000b\u0002\u0004\"CA()A\u0005\t\u0019AA)\u00035\u0011X-\u00193D_6l\u0017\u000e\u001e;fIB\u0019q&a\u0015\n\u0007\u0005U\u0003GA\u0004C_>dW-\u00198\u0002Y\r\u0014X-\u0019;f\u0007>t7/^7fe\u0006sGmU;cg\u000e\u0014\u0018NY3U_R{\u0007/[2tI\u0011,g-Y;mi\u0012\u001aTCAA.U\u0011\t\t&!\u0018,\u0005\u0005}\u0003\u0003BA1\u0003Wj!!a\u0019\u000b\t\u0005\u0015\u0014qM\u0001\nk:\u001c\u0007.Z2lK\u0012T1!!\u001b1\u0003)\tgN\\8uCRLwN\\\u0005\u0005\u0003[\n\u0019GA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\fAb\u0019:fCR,Gk\u001c9jGN$\"!a\u001d\u0011\r\u0005U\u00141\u0010\u0018/\u001b\t\t9HC\u0002\u0002zU\u000b\u0011\"[7nkR\f'\r\\3\n\t\u0005u\u0014q\u000f\u0002\u0004\u001b\u0006\u0004(a\u0004\"pk:\u001cWmU2iK\u0012,H.\u001a:\u0014\u0007]\t\u0019\t\u0005\u0003\u0002\u0006\u0006-UBAAD\u0015\r\tIIH\u0001\u0006kRLGn]\u0005\u0005\u0003\u001b\u000b9I\u0001\nTQV$Hm\\<oC\ndW\r\u00165sK\u0006$GCAAI!\r\t\u0019jF\u0007\u0002\u0001\u00051Am\\,pe.\f\u0001b\u001d5vi\u0012|wO\u001c")
public class TransactionsBounceTest
extends KafkaServerTestHarness {
    private final int producerBufferSize;
    private final int serverMessageMaxBytes = this.producerBufferSize() / 2;
    private final int kafka$api$TransactionsBounceTest$$numPartitions;
    private final int numServers;
    private final String kafka$api$TransactionsBounceTest$$outputTopic;
    private final String inputTopic;
    private final Properties overridingProps = new Properties();

    private int producerBufferSize() {
        return this.producerBufferSize;
    }

    private int serverMessageMaxBytes() {
        return this.serverMessageMaxBytes;
    }

    public int kafka$api$TransactionsBounceTest$$numPartitions() {
        return this.kafka$api$TransactionsBounceTest$$numPartitions;
    }

    public int numServers() {
        return this.numServers;
    }

    public String kafka$api$TransactionsBounceTest$$outputTopic() {
        return this.kafka$api$TransactionsBounceTest$$outputTopic;
    }

    private String inputTopic() {
        return this.inputTopic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        return (Seq)FixedPortTestUtils$.MODULE$.createBrokerConfigs(this.numServers(), this.zkConnect(), true, FixedPortTestUtils$.MODULE$.createBrokerConfigs$default$4()).map((Function1 & Serializable & scala.Serializable)x$1 -> KafkaConfig$.MODULE$.fromProps(x$1, this.overridingProps()), Seq$.MODULE$.canBuildFrom());
    }

    @Test
    public void testWithGroupId() {
        String testBrokerFailure_consumerGroup = "myGroup";
        int testBrokerFailure_numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), testBrokerFailure_numInputRecords, (Seq<KafkaServer>)this.servers());
        KafkaConsumer<byte[], byte[]> testBrokerFailure_consumer = this.createConsumerAndSubscribeToTopics(testBrokerFailure_consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), this.createConsumerAndSubscribeToTopics$default$3());
        KafkaProducer<byte[], byte[]> testBrokerFailure_producer = TestUtils$.MODULE$.createTransactionalProducer("test-txn", (Seq<KafkaServer>)this.servers(), 512, TestUtils$.MODULE$.createTransactionalProducer$default$4(), TestUtils$.MODULE$.createTransactionalProducer$default$5(), TestUtils$.MODULE$.createTransactionalProducer$default$6());
        testBrokerFailure_producer.initTransactions();
        BounceScheduler testBrokerFailure_scheduler = new BounceScheduler();
        testBrokerFailure_scheduler.start();
        IntRef testBrokerFailure_numMessagesProcessed = IntRef.create((int)0);
        IntRef testBrokerFailure_iteration = IntRef.create((int)0);
        try {
            while (testBrokerFailure_numMessagesProcessed.elem < testBrokerFailure_numInputRecords) {
                int testBrokerFailure_toRead = Math.min(200, testBrokerFailure_numInputRecords - testBrokerFailure_numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append(iteration$1.elem).append(": About to read ").append(testBrokerFailure_toRead).append(" messages, processed ").append(numMessagesProcessed$1.elem).append(" so far..").toString());
                Seq<ConsumerRecord<byte[], byte[]>> testBrokerFailure_records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_consumer, testBrokerFailure_toRead, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Received ").append(testBrokerFailure_records.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString());
                testBrokerFailure_producer.beginTransaction();
                boolean testBrokerFailure_shouldAbort = testBrokerFailure_iteration.elem % 3 == 0;
                testBrokerFailure_records.foreach((Function1 & Serializable & scala.Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Sent ").append(testBrokerFailure_records.size()).append(" messages. Committing offsets.").toString());
                testBrokerFailure_producer.sendOffsetsToTransaction((Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)testBrokerFailure_consumer)).asJava(), testBrokerFailure_consumerGroup);
                if (testBrokerFailure_shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(testBrokerFailure_consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Committed offsets. committing transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.commitTransaction();
                    testBrokerFailure_numMessagesProcessed.elem += testBrokerFailure_records.size();
                }
                ++testBrokerFailure_iteration.elem;
            }
        }
        finally {
            testBrokerFailure_producer.close();
            testBrokerFailure_consumer.close();
        }
        testBrokerFailure_scheduler.shutdown();
        KafkaConsumer<byte[], byte[]> testBrokerFailure_verifyingConsumer = this.createConsumerAndSubscribeToTopics("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap testBrokerFailure_recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_verifyingConsumer, testBrokerFailure_numInputRecords, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$7(testBrokerFailure_recordsByPartition, record);
            return BoxedUnit.UNIT;
        });
        ListBuffer testBrokerFailure_outputRecords = new ListBuffer();
        testBrokerFailure_recordsByPartition.values().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$9(testBrokerFailure_outputRecords, x0$1);
            return BoxedUnit.UNIT;
        });
        Set testBrokerFailure_recordSet = testBrokerFailure_outputRecords.toSet();
        Assert.assertEquals((long)testBrokerFailure_numInputRecords, (long)testBrokerFailure_recordSet.size());
        int n = 0;
        if (Predef$.MODULE$ == null) {
            throw null;
        }
        Set testBrokerFailure_expectedValues = RichInt$.MODULE$.until$extension0(n, testBrokerFailure_numInputRecords).toSet();
        Assert.assertEquals((String)new StringBuilder(18).append("Missing messages: ").append(testBrokerFailure_expectedValues.$minus$minus((GenTraversableOnce)testBrokerFailure_recordSet)).toString(), (Object)testBrokerFailure_expectedValues, (Object)testBrokerFailure_recordSet);
        testBrokerFailure_verifyingConsumer.close();
    }

    @Test
    public void testWithGroupMetadata() {
        String testBrokerFailure_consumerGroup = "myGroup";
        int testBrokerFailure_numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), testBrokerFailure_numInputRecords, (Seq<KafkaServer>)this.servers());
        KafkaConsumer<byte[], byte[]> testBrokerFailure_consumer = this.createConsumerAndSubscribeToTopics(testBrokerFailure_consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), this.createConsumerAndSubscribeToTopics$default$3());
        KafkaProducer<byte[], byte[]> testBrokerFailure_producer = TestUtils$.MODULE$.createTransactionalProducer("test-txn", (Seq<KafkaServer>)this.servers(), 512, TestUtils$.MODULE$.createTransactionalProducer$default$4(), TestUtils$.MODULE$.createTransactionalProducer$default$5(), TestUtils$.MODULE$.createTransactionalProducer$default$6());
        testBrokerFailure_producer.initTransactions();
        BounceScheduler testBrokerFailure_scheduler = new BounceScheduler();
        testBrokerFailure_scheduler.start();
        IntRef testBrokerFailure_numMessagesProcessed = IntRef.create((int)0);
        IntRef testBrokerFailure_iteration = IntRef.create((int)0);
        try {
            while (testBrokerFailure_numMessagesProcessed.elem < testBrokerFailure_numInputRecords) {
                int testBrokerFailure_toRead = Math.min(200, testBrokerFailure_numInputRecords - testBrokerFailure_numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append(iteration$1.elem).append(": About to read ").append(testBrokerFailure_toRead).append(" messages, processed ").append(numMessagesProcessed$1.elem).append(" so far..").toString());
                Seq<ConsumerRecord<byte[], byte[]>> testBrokerFailure_records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_consumer, testBrokerFailure_toRead, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Received ").append(testBrokerFailure_records.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString());
                testBrokerFailure_producer.beginTransaction();
                boolean testBrokerFailure_shouldAbort = testBrokerFailure_iteration.elem % 3 == 0;
                testBrokerFailure_records.foreach((Function1 & Serializable & scala.Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Sent ").append(testBrokerFailure_records.size()).append(" messages. Committing offsets.").toString());
                testBrokerFailure_producer.sendOffsetsToTransaction((Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)testBrokerFailure_consumer)).asJava(), testBrokerFailure_consumer.groupMetadata());
                if (testBrokerFailure_shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(testBrokerFailure_consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Committed offsets. committing transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    testBrokerFailure_producer.commitTransaction();
                    testBrokerFailure_numMessagesProcessed.elem += testBrokerFailure_records.size();
                }
                ++testBrokerFailure_iteration.elem;
            }
        }
        finally {
            testBrokerFailure_producer.close();
            testBrokerFailure_consumer.close();
        }
        testBrokerFailure_scheduler.shutdown();
        KafkaConsumer<byte[], byte[]> testBrokerFailure_verifyingConsumer = this.createConsumerAndSubscribeToTopics("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap testBrokerFailure_recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_verifyingConsumer, testBrokerFailure_numInputRecords, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$7(testBrokerFailure_recordsByPartition, record);
            return BoxedUnit.UNIT;
        });
        ListBuffer testBrokerFailure_outputRecords = new ListBuffer();
        testBrokerFailure_recordsByPartition.values().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$9(testBrokerFailure_outputRecords, x0$1);
            return BoxedUnit.UNIT;
        });
        Set testBrokerFailure_recordSet = testBrokerFailure_outputRecords.toSet();
        Assert.assertEquals((long)testBrokerFailure_numInputRecords, (long)testBrokerFailure_recordSet.size());
        int n = 0;
        if (Predef$.MODULE$ == null) {
            throw null;
        }
        Set testBrokerFailure_expectedValues = RichInt$.MODULE$.until$extension0(n, testBrokerFailure_numInputRecords).toSet();
        Assert.assertEquals((String)new StringBuilder(18).append("Missing messages: ").append(testBrokerFailure_expectedValues.$minus$minus((GenTraversableOnce)testBrokerFailure_recordSet)).toString(), (Object)testBrokerFailure_expectedValues, (Object)testBrokerFailure_recordSet);
        testBrokerFailure_verifyingConsumer.close();
    }

    private void testBrokerFailure(Function3<KafkaProducer<byte[], byte[]>, String, KafkaConsumer<byte[], byte[]>, BoxedUnit> commit) {
        String consumerGroup = "myGroup";
        int numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), numInputRecords, (Seq<KafkaServer>)this.servers());
        KafkaConsumer<byte[], byte[]> consumer = this.createConsumerAndSubscribeToTopics(consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), this.createConsumerAndSubscribeToTopics$default$3());
        KafkaProducer<byte[], byte[]> producer = TestUtils$.MODULE$.createTransactionalProducer("test-txn", (Seq<KafkaServer>)this.servers(), 512, TestUtils$.MODULE$.createTransactionalProducer$default$4(), TestUtils$.MODULE$.createTransactionalProducer$default$5(), TestUtils$.MODULE$.createTransactionalProducer$default$6());
        producer.initTransactions();
        BounceScheduler scheduler = new BounceScheduler();
        scheduler.start();
        IntRef numMessagesProcessed = IntRef.create((int)0);
        IntRef iteration = IntRef.create((int)0);
        try {
            while (numMessagesProcessed.elem < numInputRecords) {
                int toRead = Math.min(200, numInputRecords - numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append(iteration$1.elem).append(": About to read ").append(testBrokerFailure_toRead).append(" messages, processed ").append(numMessagesProcessed$1.elem).append(" so far..").toString());
                Seq<ConsumerRecord<byte[], byte[]>> records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, toRead, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3());
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Received ").append(testBrokerFailure_records.size()).append(" messages, sending them transactionally to ").append(this.kafka$api$TransactionsBounceTest$$outputTopic()).toString());
                producer.beginTransaction();
                boolean shouldAbort = iteration.elem % 3 == 0;
                records.foreach((Function1 & Serializable & scala.Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Sent ").append(testBrokerFailure_records.size()).append(" messages. Committing offsets.").toString());
                commit.apply(producer, (Object)consumerGroup, consumer);
                if (shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Committed offsets. Aborting transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Committed offsets. committing transaction of ").append(testBrokerFailure_records.size()).append(" messages.").toString());
                    producer.commitTransaction();
                    numMessagesProcessed.elem += records.size();
                }
                ++iteration.elem;
            }
        }
        finally {
            producer.close();
            consumer.close();
        }
        scheduler.shutdown();
        KafkaConsumer<byte[], byte[]> verifyingConsumer = this.createConsumerAndSubscribeToTopics("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords, TestUtils$.MODULE$.pollUntilAtLeastNumRecords$default$3()).foreach((Function1 & Serializable & scala.Serializable)record -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$7(testBrokerFailure_recordsByPartition, record);
            return BoxedUnit.UNIT;
        });
        ListBuffer outputRecords = new ListBuffer();
        recordsByPartition.values().foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            TransactionsBounceTest.$anonfun$testBrokerFailure$9(testBrokerFailure_outputRecords, x0$1);
            return BoxedUnit.UNIT;
        });
        Set recordSet = outputRecords.toSet();
        Assert.assertEquals((long)numInputRecords, (long)recordSet.size());
        int n = 0;
        if (Predef$.MODULE$ == null) {
            throw null;
        }
        Set expectedValues = RichInt$.MODULE$.until$extension0(n, numInputRecords).toSet();
        Assert.assertEquals((String)new StringBuilder(18).append("Missing messages: ").append(expectedValues.$minus$minus((GenTraversableOnce)recordSet)).toString(), (Object)expectedValues, (Object)recordSet);
        verifyingConsumer.close();
    }

    private KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeToTopics(String groupId, List<String> topics, boolean readCommitted) {
        String x$1 = TestUtils$.MODULE$.getBrokerListStrFromServers((Seq<KafkaServer>)this.servers(), TestUtils$.MODULE$.getBrokerListStrFromServers$default$2());
        boolean x$4 = false;
        String x$5 = TestUtils$.MODULE$.createConsumer$default$3();
        int x$6 = TestUtils$.MODULE$.createConsumer$default$6();
        SecurityProtocol x$7 = TestUtils$.MODULE$.createConsumer$default$7();
        Option<File> x$8 = TestUtils$.MODULE$.createConsumer$default$8();
        Option<Properties> x$9 = TestUtils$.MODULE$.createConsumer$default$9();
        ByteArrayDeserializer x$10 = TestUtils$.MODULE$.createConsumer$default$10();
        ByteArrayDeserializer x$11 = TestUtils$.MODULE$.createConsumer$default$11();
        KafkaConsumer consumer = TestUtils$.MODULE$.createConsumer(x$1, groupId, x$5, x$4, readCommitted, x$6, x$7, x$8, x$9, x$10, x$11);
        consumer.subscribe((Collection)JavaConverters$.MODULE$.seqAsJavaListConverter(topics).asJava());
        return consumer;
    }

    private boolean createConsumerAndSubscribeToTopics$default$3() {
        return false;
    }

    private scala.collection.immutable.Map<Object, Object> createTopics() {
        Properties topicConfig = new Properties();
        topicConfig.put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), ((Object)BoxesRunTime.boxToInteger((int)2)).toString());
        this.createTopic(this.inputTopic(), this.kafka$api$TransactionsBounceTest$$numPartitions(), 3, topicConfig);
        return this.createTopic(this.kafka$api$TransactionsBounceTest$$outputTopic(), this.kafka$api$TransactionsBounceTest$$numPartitions(), 3, topicConfig);
    }

    public static final /* synthetic */ void $anonfun$testBrokerFailure$7(HashMap recordsByPartition$1, ConsumerRecord record) {
        String string = TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record);
        if (Predef$.MODULE$ == null) {
            throw null;
        }
        int value = new StringOps(string).toInt();
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        ((BufferLike)recordsByPartition$1.getOrElseUpdate((Object)topicPartition, (Function0 & Serializable & scala.Serializable)() -> new ListBuffer())).append((Seq)Predef$.MODULE$.wrapIntArray(new int[]{value}));
    }

    public static final /* synthetic */ void $anonfun$testBrokerFailure$9(ListBuffer outputRecords$1, ListBuffer x0$1) {
        Assert.assertEquals((String)"Out of order messages detected", (Object)x0$1, (Object)x0$1.sorted((Ordering)Ordering.Int$.MODULE$));
        outputRecords$1.appendAll((TraversableOnce)x0$1);
    }

    public TransactionsBounceTest() {
        this.producerBufferSize = 65536;
        this.kafka$api$TransactionsBounceTest$$numPartitions = 3;
        this.numServers = 4;
        this.kafka$api$TransactionsBounceTest$$outputTopic = "output-topic";
        this.inputTopic = "input-topic";
        this.overridingProps().put(KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)false)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.MessageMaxBytesProp(), ((Object)BoxesRunTime.boxToInteger((int)this.serverMessageMaxBytes())).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.ControlledShutdownEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)true)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.UncleanLeaderElectionEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)false)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.AutoLeaderRebalanceEnableProp(), ((Object)BoxesRunTime.boxToBoolean((boolean)false)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicPartitionsProp(), ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), ((Object)BoxesRunTime.boxToInteger((int)3)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.MinInSyncReplicasProp(), ((Object)BoxesRunTime.boxToInteger((int)2)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicPartitionsProp(), ((Object)BoxesRunTime.boxToInteger((int)1)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.TransactionsTopicReplicationFactorProp(), ((Object)BoxesRunTime.boxToInteger((int)3)).toString());
        this.overridingProps().put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), "10");
        this.overridingProps().put(KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), "0");
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupId$1$adapted(KafkaProducer producer, String groupId, KafkaConsumer consumer) {
        producer.sendOffsetsToTransaction((Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)consumer)).asJava(), groupId);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupMetadata$1$adapted(KafkaProducer producer, String x$2, KafkaConsumer consumer) {
        producer.sendOffsetsToTransaction((Map)JavaConverters$.MODULE$.mapAsJavaMapConverter(TestUtils$.MODULE$.consumerPositions((KafkaConsumer<byte[], byte[]>)consumer)).asJava(), consumer.groupMetadata());
        return BoxedUnit.UNIT;
    }

    public class BounceScheduler
    extends ShutdownableThread {
        public void doWork() {
            this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().servers().foreach((Function1 & Serializable & scala.Serializable)server -> {
                BounceScheduler.$anonfun$doWork$1(this, server);
                return BoxedUnit.UNIT;
            });
            int n = 0;
            if (Predef$.MODULE$ == null) {
                throw null;
            }
            Range range = RichInt$.MODULE$.until$extension0(n, this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$numPartitions());
            if (range == null) {
                throw null;
            }
            Range foreach_this = range;
            if (!foreach_this.isEmpty()) {
                int foreach_i = foreach_this.start();
                while (true) {
                    TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged(this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().zkClient(), this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$outputTopic(), foreach_i, TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$4(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$5(), TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChanged$default$6());
                    if (foreach_i == foreach_this.scala$collection$immutable$Range$$lastElement()) break;
                    foreach_i += foreach_this.step();
                }
            }
        }

        public void shutdown() {
            super.shutdown();
        }

        public /* synthetic */ TransactionsBounceTest kafka$api$TransactionsBounceTest$BounceScheduler$$$outer() {
            return TransactionsBounceTest.this;
        }

        public static final /* synthetic */ void $anonfun$doWork$1(BounceScheduler $this, KafkaServer server) {
            $this.trace((Function0 & Serializable & scala.Serializable)() -> {
                String string = "Shutting down server : %s";
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                return new StringOps(string).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())}));
            });
            server.shutdown();
            server.awaitShutdown();
            Thread.sleep(500L);
            $this.trace((Function0 & Serializable & scala.Serializable)() -> {
                String string = "Server %s shut down. Starting it up again.";
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                return new StringOps(string).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())}));
            });
            server.startup();
            $this.trace((Function0 & Serializable & scala.Serializable)() -> {
                String string = "Restarted server: %s";
                if (Predef$.MODULE$ == null) {
                    throw null;
                }
                return new StringOps(string).format((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())}));
            });
            Thread.sleep(500L);
        }

        public BounceScheduler() {
            if (TransactionsBounceTest.this == null) {
                throw null;
            }
            super("daemon-broker-bouncer", false);
        }
    }
}

