/*
 * Decompiled with CFR 0.152.
 */
package kafka.api;

import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import kafka.api.FixedPortTestUtils$;
import kafka.api.IntegrationTestHarness;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.server.util.ShutdownableThread;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Function0;
import scala.Function1;
import scala.Function3;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.collection.IterableOnce;
import scala.collection.Seq;
import scala.collection.StringOps$;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.ListBuffer;
import scala.jdk.CollectionConverters$;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.RichInt$;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u0005\u0005\rg\u0001\u0002\u000f\u001e\u0001\tBQa\n\u0001\u0005\u0002!BqA\u000b\u0001C\u0002\u0013%1\u0006\u0003\u00043\u0001\u0001\u0006I\u0001\f\u0005\bg\u0001\u0011\r\u0011\"\u0003,\u0011\u0019!\u0004\u0001)A\u0005Y!9Q\u0007\u0001b\u0001\n\u0013Y\u0003B\u0002\u001c\u0001A\u0003%A\u0006C\u00048\u0001\t\u0007I\u0011B\u0016\t\ra\u0002\u0001\u0015!\u0003-\u0011\u001dI\u0004A1A\u0005\niBaa\u0011\u0001!\u0002\u0013Y\u0004b\u0002#\u0001\u0005\u0004%IA\u000f\u0005\u0007\u000b\u0002\u0001\u000b\u0011B\u001e\t\u000f\u0019\u0003!\u0019!C\u0001\u000f\"1a\n\u0001Q\u0001\n!CQa\u0014\u0001\u0005BACQa\u0018\u0001\u0005R-BQ\u0001\u0019\u0001\u0005\u0002\u0005Dq!!\u0007\u0001\t\u0013\tY\u0002C\u0004\u0002V\u0001!I!a\u0016\t\u000f\u0005u\u0003\u0001\"\u0003\u0002`!I\u0011Q\u0011\u0001\u0012\u0002\u0013%\u0011q\u0011\u0005\b\u0003;\u0003A\u0011BAP\r\u0019\t9\u000b\u0001\u0003\u0002*\"1q\u0005\u0007C\u0001\u0003oCq!!0\u0019\t\u0003\ny\fC\u0004\u0002Bb!\t%a0\u0003-Q\u0013\u0018M\\:bGRLwN\\:C_Vt7-\u001a+fgRT!AH\u0010\u0002\u0007\u0005\u0004\u0018NC\u0001!\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u0012\u0011\u0005\u0011*S\"A\u000f\n\u0005\u0019j\"AF%oi\u0016<'/\u0019;j_:$Vm\u001d;ICJtWm]:\u0002\rqJg.\u001b;?)\u0005I\u0003C\u0001\u0013\u0001\u0003Q\u0019wN\\:v[\u0016\u0014VmY8sIRKW.Z8viV\tA\u0006\u0005\u0002.a5\taFC\u00010\u0003\u0015\u00198-\u00197b\u0013\t\tdFA\u0002J]R\fQcY8ogVlWMU3d_J$G+[7f_V$\b%\u0001\nqe>$WoY3s\u0005V4g-\u001a:TSj,\u0017a\u00059s_\u0012,8-\u001a:Ck\u001a4WM]*ju\u0016\u0004\u0013!F:feZ,'/T3tg\u0006<W-T1y\u0005f$Xm]\u0001\u0017g\u0016\u0014h/\u001a:NKN\u001c\u0018mZ3NCb\u0014\u0015\u0010^3tA\u0005ia.^7QCJ$\u0018\u000e^5p]N\faB\\;n!\u0006\u0014H/\u001b;j_:\u001c\b%A\u0006pkR\u0004X\u000f\u001e+pa&\u001cW#A\u001e\u0011\u0005q\nU\"A\u001f\u000b\u0005yz\u0014\u0001\u00027b]\u001eT\u0011\u0001Q\u0001\u0005U\u00064\u0018-\u0003\u0002C{\t11\u000b\u001e:j]\u001e\fAb\\;uaV$Hk\u001c9jG\u0002\n!\"\u001b8qkR$v\u000e]5d\u0003-Ig\u000e];u)>\u0004\u0018n\u0019\u0011\u0002\u001f=4XM\u001d:jI&tw\r\u0015:paN,\u0012\u0001\u0013\t\u0003\u00132k\u0011A\u0013\u0006\u0003\u0017~\nA!\u001e;jY&\u0011QJ\u0013\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\u0018\u0001E8wKJ\u0014\u0018\u000eZ5oOB\u0013x\u000e]:!\u0003=9WM\\3sCR,7i\u001c8gS\u001e\u001cX#A)\u0011\u0007I;\u0016,D\u0001T\u0015\t!V+A\u0005j[6,H/\u00192mK*\u0011aKL\u0001\u000bG>dG.Z2uS>t\u0017B\u0001-T\u0005\r\u0019V-\u001d\t\u00035vk\u0011a\u0017\u0006\u00039~\taa]3sm\u0016\u0014\u0018B\u00010\\\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\u0017\t\u0014xn[3s\u0007>,h\u000e^\u0001\u0016i\u0016\u001cHoV5uQ\u001e\u0013x.\u001e9NKR\fG-\u0019;b)\t\u0011W\r\u0005\u0002.G&\u0011AM\f\u0002\u0005+:LG\u000fC\u0003g%\u0001\u0007q-A\u0007he>,\b\u000f\u0015:pi>\u001cw\u000e\u001c\t\u0003Q>t!![7\u0011\u0005)tS\"A6\u000b\u00051\f\u0013A\u0002\u001fs_>$h(\u0003\u0002o]\u00051\u0001K]3eK\u001aL!A\u00119\u000b\u00059t\u0003\u0006\u0002\ns}~\u0004\"a\u001d?\u000e\u0003QT!!\u001e<\u0002\rA\f'/Y7t\u0015\t9\b0A\u0004kkBLG/\u001a:\u000b\u0005eT\u0018!\u00026v]&$(\"A>\u0002\u0007=\u0014x-\u0003\u0002~i\n\t\u0002+\u0019:b[\u0016$XM]5{K\u0012$Vm\u001d;\u0002\t9\fW.Z\u0011\u0003\u0003\u0003\tqd\u001f3jgBd\u0017-\u001f(b[\u0016lhf\u001a:pkB\u0004&o\u001c;pG>dWh\u001f\u0019~Q\u001d\u0011\u0012QAA\t\u0003'\u0001B!a\u0002\u0002\u000e5\u0011\u0011\u0011\u0002\u0006\u0004\u0003\u0017!\u0018\u0001\u00039s_ZLG-\u001a:\n\t\u0005=\u0011\u0011\u0002\u0002\r\u001b\u0016$\bn\u001c3T_V\u00148-Z\u0001\u0006m\u0006dW/\u001a\u0017\u0003\u0003+\t#!a\u0006\u0002C\u001d,G\u000fV3ti\u001e\u0013x.\u001e9Qe>$xnY8m!\u0006\u0014\u0018-\\3uKJ\u001c\u0018\t\u001c7\u0002#Q,7\u000f\u001e\"s_.,'OR1jYV\u0014X\rF\u0002c\u0003;Aq!a\b\u0014\u0001\u0004\t\t#\u0001\u0004d_6l\u0017\u000e\u001e\t\n[\u0005\r\u0012qE4\u0002J\tL1!!\n/\u0005%1UO\\2uS>t7\u0007\u0005\u0005\u0002*\u0005e\u0012QHA\u001f\u001b\t\tYC\u0003\u0003\u0002.\u0005=\u0012\u0001\u00039s_\u0012,8-\u001a:\u000b\t\u0005E\u00121G\u0001\bG2LWM\u001c;t\u0015\r\u0001\u0013Q\u0007\u0006\u0004\u0003oQ\u0018AB1qC\u000eDW-\u0003\u0003\u0002<\u0005-\"!D&bM.\f\u0007K]8ek\u000e,'\u000fE\u0003.\u0003\u007f\t\u0019%C\u0002\u0002B9\u0012Q!\u0011:sCf\u00042!LA#\u0013\r\t9E\f\u0002\u0005\u0005f$X\r\u0005\u0005\u0002L\u0005E\u0013QHA\u001f\u001b\t\tiE\u0003\u0003\u0002P\u0005=\u0012\u0001C2p]N,X.\u001a:\n\t\u0005M\u0013Q\n\u0002\t\u0007>t7/^7fe\u0006Y2M]3bi\u0016$&/\u00198tC\u000e$\u0018n\u001c8bYB\u0013x\u000eZ;dKJ$B!a\n\u0002Z!1\u00111\f\u000bA\u0002\u001d\fq\u0002\u001e:b]N\f7\r^5p]\u0006d\u0017\nZ\u0001\u001bGJ,\u0017\r^3D_:\u001cX/\\3s\u0003:$7+\u001e2tGJL'-\u001a\u000b\t\u0003\u0013\n\t'!\u001a\u0002|!1\u00111M\u000bA\u0002\u001d\fqa\u001a:pkBLE\rC\u0004\u0002hU\u0001\r!!\u001b\u0002\rQ|\u0007/[2t!\u0015\tY'!\u001eh\u001d\u0011\ti'!\u001d\u000f\u0007)\fy'C\u00010\u0013\r\t\u0019HL\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t9(!\u001f\u0003\t1K7\u000f\u001e\u0006\u0004\u0003gr\u0003\"CA?+A\u0005\t\u0019AA@\u00035\u0011X-\u00193D_6l\u0017\u000e\u001e;fIB\u0019Q&!!\n\u0007\u0005\reFA\u0004C_>dW-\u00198\u0002I\r\u0014X-\u0019;f\u0007>t7/^7fe\u0006sGmU;cg\u000e\u0014\u0018NY3%I\u00164\u0017-\u001e7uIM*\"!!#+\t\u0005}\u00141R\u0016\u0003\u0003\u001b\u0003B!a$\u0002\u001a6\u0011\u0011\u0011\u0013\u0006\u0005\u0003'\u000b)*A\u0005v]\u000eDWmY6fI*\u0019\u0011q\u0013\u0018\u0002\u0015\u0005tgn\u001c;bi&|g.\u0003\u0003\u0002\u001c\u0006E%!E;oG\",7m[3e-\u0006\u0014\u0018.\u00198dK\u0006a1M]3bi\u0016$v\u000e]5dgR\u0011\u0011\u0011\u0015\t\u0006%\u0006\rF\u0006L\u0005\u0004\u0003K\u001b&aA'ba\ny!i\\;oG\u0016\u001c6\r[3ek2,'oE\u0002\u0019\u0003W\u0003B!!,\u000246\u0011\u0011q\u0016\u0006\u0004\u0017\u0006E&b\u0001/\u00024%!\u0011QWAX\u0005I\u0019\u0006.\u001e;e_^t\u0017M\u00197f)\"\u0014X-\u00193\u0015\u0005\u0005e\u0006cAA^15\t\u0001!\u0001\u0004e_^{'o\u001b\u000b\u0002E\u0006A1\u000f[;uI><h\u000e")
public class TransactionsBounceTest
extends IntegrationTestHarness {
    private final int consumeRecordTimeout;
    private final int producerBufferSize;
    private final int serverMessageMaxBytes = this.producerBufferSize() / 2;
    private final int kafka$api$TransactionsBounceTest$$numPartitions;
    private final String kafka$api$TransactionsBounceTest$$outputTopic;
    private final String inputTopic;
    private final Properties overridingProps = new Properties();

    private int consumeRecordTimeout() {
        return this.consumeRecordTimeout;
    }

    private int producerBufferSize() {
        return this.producerBufferSize;
    }

    private int serverMessageMaxBytes() {
        return this.serverMessageMaxBytes;
    }

    public int kafka$api$TransactionsBounceTest$$numPartitions() {
        return this.kafka$api$TransactionsBounceTest$$numPartitions;
    }

    public String kafka$api$TransactionsBounceTest$$outputTopic() {
        return this.kafka$api$TransactionsBounceTest$$outputTopic;
    }

    private String inputTopic() {
        return this.inputTopic;
    }

    public Properties overridingProps() {
        return this.overridingProps;
    }

    public scala.collection.immutable.Seq<KafkaConfig> generateConfigs() {
        return (scala.collection.immutable.Seq)FixedPortTestUtils$.MODULE$.createBrokerConfigs(this.brokerCount(), true, false).map((Function1 & Serializable)x$1 -> {
            Properties fromProps_overrides = this.overridingProps();
            return KafkaConfig$.MODULE$.fromProps(x$1, fromProps_overrides, true);
        });
    }

    @Override
    public int brokerCount() {
        return 4;
    }

    @ParameterizedTest(name="{displayName}.groupProtocol={0}")
    @MethodSource(value={"getTestGroupProtocolParametersAll"})
    public void testWithGroupMetadata(String groupProtocol) {
        String testBrokerFailure_consumerGroup = "myGroup";
        int testBrokerFailure_numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), testBrokerFailure_numInputRecords, this.brokers());
        Consumer<byte[], byte[]> testBrokerFailure_consumer = this.createConsumerAndSubscribe(testBrokerFailure_consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), false);
        KafkaProducer<byte[], byte[]> testBrokerFailure_producer = this.createTransactionalProducer("test-txn");
        testBrokerFailure_producer.initTransactions();
        BounceScheduler testBrokerFailure_scheduler = new BounceScheduler();
        testBrokerFailure_scheduler.start();
        try {
            IntRef testBrokerFailure_numMessagesProcessed = IntRef.create((int)0);
            IntRef testBrokerFailure_iteration = IntRef.create((int)0);
            while (testBrokerFailure_numMessagesProcessed.elem < testBrokerFailure_numInputRecords) {
                int testBrokerFailure_toRead = Math.min(200, testBrokerFailure_numInputRecords - testBrokerFailure_numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable)() -> iteration$1.elem + ": About to read " + testBrokerFailure_toRead + " messages, processed " + numMessagesProcessed$1.elem + " so far..");
                Seq<ConsumerRecord<byte[], byte[]>> testBrokerFailure_records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_consumer, testBrokerFailure_toRead, this.consumeRecordTimeout());
                this.trace((Function0<String>)(Function0 & Serializable)() -> "Received " + testBrokerFailure_records.size() + " messages, sending them transactionally to " + this.kafka$api$TransactionsBounceTest$$outputTopic());
                testBrokerFailure_producer.beginTransaction();
                boolean testBrokerFailure_shouldAbort = testBrokerFailure_iteration.elem % 3 == 0;
                testBrokerFailure_records.foreach((Function1 & Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable)() -> "Sent " + testBrokerFailure_records.size() + " messages. Committing offsets.");
                testBrokerFailure_producer.sendOffsetsToTransaction(CollectionConverters$.MODULE$.MapHasAsJava(TestUtils$.MODULE$.consumerPositions((Consumer<byte[], byte[]>)testBrokerFailure_consumer)).asJava(), testBrokerFailure_consumer.groupMetadata());
                if (testBrokerFailure_shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable)() -> "Committed offsets. Aborting transaction of " + testBrokerFailure_records.size() + " messages.");
                    testBrokerFailure_producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(testBrokerFailure_consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable)() -> "Committed offsets. committing transaction of " + testBrokerFailure_records.size() + " messages.");
                    testBrokerFailure_producer.commitTransaction();
                    testBrokerFailure_numMessagesProcessed.elem += testBrokerFailure_records.size();
                }
                ++testBrokerFailure_iteration.elem;
            }
        }
        finally {
            testBrokerFailure_scheduler.shutdown();
        }
        Consumer<byte[], byte[]> testBrokerFailure_verifyingConsumer = this.createConsumerAndSubscribe("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap testBrokerFailure_recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(testBrokerFailure_verifyingConsumer, testBrokerFailure_numInputRecords, this.consumeRecordTimeout()).foreach((Function1 & Serializable)record -> {
            int value = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record)));
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            return (ListBuffer)((Buffer)testBrokerFailure_recordsByPartition.getOrElseUpdate((Object)topicPartition, (Function0 & Serializable)() -> new ListBuffer())).append((Object)BoxesRunTime.boxToInteger((int)value));
        });
        ListBuffer testBrokerFailure_outputRecords = new ListBuffer();
        testBrokerFailure_recordsByPartition.values().foreach((Function1 & Serializable)partitionValues -> {
            Assertions.assertEquals((Object)partitionValues, (Object)partitionValues.sorted((Ordering)Ordering.Int$.MODULE$), (String)"Out of order messages detected");
            return (ListBuffer)testBrokerFailure_outputRecords.appendAll((IterableOnce)partitionValues);
        });
        Set testBrokerFailure_recordSet = testBrokerFailure_outputRecords.toSet();
        Assertions.assertEquals((int)testBrokerFailure_numInputRecords, (int)testBrokerFailure_recordSet.size());
        Set testBrokerFailure_expectedValues = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), testBrokerFailure_numInputRecords).toSet();
        Assertions.assertEquals((Object)testBrokerFailure_expectedValues, (Object)testBrokerFailure_recordSet, (String)("Missing messages: " + testBrokerFailure_expectedValues.$minus$minus((IterableOnce)testBrokerFailure_recordSet)));
    }

    private void testBrokerFailure(Function3<KafkaProducer<byte[], byte[]>, String, Consumer<byte[], byte[]>, BoxedUnit> commit) {
        String consumerGroup = "myGroup";
        int numInputRecords = 10000;
        this.createTopics();
        TestUtils$.MODULE$.seedTopicWithNumberedRecords(this.inputTopic(), numInputRecords, this.brokers());
        Consumer<byte[], byte[]> consumer = this.createConsumerAndSubscribe(consumerGroup, (List<String>)new .colon.colon((Object)this.inputTopic(), (List)Nil$.MODULE$), false);
        KafkaProducer<byte[], byte[]> producer = this.createTransactionalProducer("test-txn");
        producer.initTransactions();
        BounceScheduler scheduler = new BounceScheduler();
        scheduler.start();
        try {
            IntRef numMessagesProcessed = IntRef.create((int)0);
            IntRef iteration = IntRef.create((int)0);
            while (numMessagesProcessed.elem < numInputRecords) {
                int toRead = Math.min(200, numInputRecords - numMessagesProcessed.elem);
                this.trace((Function0<String>)(Function0 & Serializable)() -> iteration$1.elem + ": About to read " + testBrokerFailure_toRead + " messages, processed " + numMessagesProcessed$1.elem + " so far..");
                Seq<ConsumerRecord<byte[], byte[]>> records = TestUtils$.MODULE$.pollUntilAtLeastNumRecords(consumer, toRead, this.consumeRecordTimeout());
                this.trace((Function0<String>)(Function0 & Serializable)() -> "Received " + testBrokerFailure_records.size() + " messages, sending them transactionally to " + this.kafka$api$TransactionsBounceTest$$outputTopic());
                producer.beginTransaction();
                boolean shouldAbort = iteration.elem % 3 == 0;
                records.foreach((Function1 & Serializable)record -> testBrokerFailure_producer.send(TestUtils$.MODULE$.producerRecordWithExpectedTransactionStatus(this.kafka$api$TransactionsBounceTest$$outputTopic(), null, (byte[])record.key(), (byte[])record.value(), !testBrokerFailure_shouldAbort), (Callback)new ErrorLoggingCallback(this.kafka$api$TransactionsBounceTest$$outputTopic(), (byte[])record.key(), (byte[])record.value(), true)));
                this.trace((Function0<String>)(Function0 & Serializable)() -> "Sent " + testBrokerFailure_records.size() + " messages. Committing offsets.");
                commit.apply(producer, (Object)consumerGroup, consumer);
                if (shouldAbort) {
                    this.trace((Function0<String>)(Function0 & Serializable)() -> "Committed offsets. Aborting transaction of " + testBrokerFailure_records.size() + " messages.");
                    producer.abortTransaction();
                    TestUtils$.MODULE$.resetToCommittedPositions(consumer);
                } else {
                    this.trace((Function0<String>)(Function0 & Serializable)() -> "Committed offsets. committing transaction of " + testBrokerFailure_records.size() + " messages.");
                    producer.commitTransaction();
                    numMessagesProcessed.elem += records.size();
                }
                ++iteration.elem;
            }
        }
        finally {
            scheduler.shutdown();
        }
        Consumer<byte[], byte[]> verifyingConsumer = this.createConsumerAndSubscribe("randomGroup", (List<String>)new .colon.colon((Object)this.kafka$api$TransactionsBounceTest$$outputTopic(), (List)Nil$.MODULE$), true);
        HashMap recordsByPartition = new HashMap();
        TestUtils$.MODULE$.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords, this.consumeRecordTimeout()).foreach((Function1 & Serializable)record -> {
            int value = StringOps$.MODULE$.toInt$extension(Predef$.MODULE$.augmentString(TestUtils$.MODULE$.assertCommittedAndGetValue((ConsumerRecord<byte[], byte[]>)record)));
            TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
            return (ListBuffer)((Buffer)testBrokerFailure_recordsByPartition.getOrElseUpdate((Object)topicPartition, (Function0 & Serializable)() -> new ListBuffer())).append((Object)BoxesRunTime.boxToInteger((int)value));
        });
        ListBuffer outputRecords = new ListBuffer();
        recordsByPartition.values().foreach((Function1 & Serializable)partitionValues -> {
            Assertions.assertEquals((Object)partitionValues, (Object)partitionValues.sorted((Ordering)Ordering.Int$.MODULE$), (String)"Out of order messages detected");
            return (ListBuffer)testBrokerFailure_outputRecords.appendAll((IterableOnce)partitionValues);
        });
        Set recordSet = outputRecords.toSet();
        Assertions.assertEquals((int)numInputRecords, (int)recordSet.size());
        Set expectedValues = RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), numInputRecords).toSet();
        Assertions.assertEquals((Object)expectedValues, (Object)recordSet, (String)("Missing messages: " + expectedValues.$minus$minus((IterableOnce)recordSet)));
    }

    private KafkaProducer<byte[], byte[]> createTransactionalProducer(String transactionalId) {
        Properties props = new Properties();
        props.put("acks", "all");
        props.put("batch.size", "512");
        props.put("transactional.id", transactionalId);
        props.put("enable.idempotence", "true");
        ByteArraySerializer x$2 = this.createProducer$default$1();
        ByteArraySerializer x$3 = this.createProducer$default$2();
        return this.createProducer(x$2, x$3, props);
    }

    private Consumer<byte[], byte[]> createConsumerAndSubscribe(String groupId, List<String> topics, boolean readCommitted) {
        Properties consumerProps = new Properties();
        consumerProps.put("group.id", groupId);
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("isolation.level", readCommitted ? "read_committed" : "read_uncommitted");
        ByteArrayDeserializer x$2 = this.createConsumer$default$1();
        ByteArrayDeserializer x$3 = this.createConsumer$default$2();
        List<String> x$4 = this.createConsumer$default$4();
        Consumer consumer = this.createConsumer(x$2, x$3, consumerProps, x$4);
        consumer.subscribe((Collection)CollectionConverters$.MODULE$.SeqHasAsJava(topics).asJava());
        return consumer;
    }

    private boolean createConsumerAndSubscribe$default$3() {
        return false;
    }

    private Map<Object, Object> createTopics() {
        Properties topicConfig = new Properties();
        topicConfig.put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, Integer.toString(2));
        this.createTopic(this.inputTopic(), this.kafka$api$TransactionsBounceTest$$numPartitions(), 3, topicConfig, this.createTopic$default$5(), this.createTopic$default$6());
        return this.createTopic(this.kafka$api$TransactionsBounceTest$$outputTopic(), this.kafka$api$TransactionsBounceTest$$numPartitions(), 3, topicConfig, this.createTopic$default$5(), this.createTopic$default$6());
    }

    public TransactionsBounceTest() {
        this.consumeRecordTimeout = 30000;
        this.producerBufferSize = 65536;
        this.kafka$api$TransactionsBounceTest$$numPartitions = 3;
        this.kafka$api$TransactionsBounceTest$$outputTopic = "output-topic";
        this.inputTopic = "input-topic";
        this.overridingProps().put("auto.create.topics.enable", Boolean.toString(false));
        this.overridingProps().put("message.max.bytes", Integer.toString(this.serverMessageMaxBytes()));
        this.overridingProps().put("controlled.shutdown.enable", Boolean.toString(true));
        this.overridingProps().put(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, Boolean.toString(false));
        this.overridingProps().put("auto.leader.rebalance.enable", Boolean.toString(false));
        this.overridingProps().put("offsets.topic.num.partitions", Integer.toString(1));
        this.overridingProps().put("offsets.topic.replication.factor", Integer.toString(3));
        this.overridingProps().put(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG, Integer.toString(2));
        this.overridingProps().put("group.min.session.timeout.ms", "10");
        this.overridingProps().put("group.initial.rebalance.delay.ms", "0");
        this.overridingProps().put("transaction.state.log.num.partitions", Integer.toString(1));
        this.overridingProps().put("transaction.state.log.replication.factor", Integer.toString(3));
    }

    public static final /* synthetic */ Object $anonfun$testWithGroupMetadata$1$adapted(KafkaProducer producer, String x$2, Consumer consumer) {
        producer.sendOffsetsToTransaction(CollectionConverters$.MODULE$.MapHasAsJava(TestUtils$.MODULE$.consumerPositions((Consumer<byte[], byte[]>)consumer)).asJava(), consumer.groupMetadata());
        return BoxedUnit.UNIT;
    }

    private class BounceScheduler
    extends ShutdownableThread {
        public void doWork() {
            this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().brokers().foreach((Function1 & Serializable)server -> {
                BounceScheduler.$anonfun$doWork$1(this, server);
                return BoxedUnit.UNIT;
            });
            RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$numPartitions()).foreach((Function1)(JFunction1.mcII.sp & Serializable)partition -> TestUtils$.MODULE$.waitUntilLeaderIsElectedOrChangedWithAdmin(this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().createAdminClient(this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().createAdminClient$default$1(), this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().createAdminClient$default$2()), this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().kafka$api$TransactionsBounceTest$$outputTopic(), partition, 30000L, (Option<Object>)None$.MODULE$, (Option<Object>)None$.MODULE$));
        }

        public void shutdown() {
            super.shutdown();
        }

        public /* synthetic */ TransactionsBounceTest kafka$api$TransactionsBounceTest$BounceScheduler$$$outer() {
            return TransactionsBounceTest.this;
        }

        public static final /* synthetic */ void $anonfun$doWork$1(BounceScheduler $this, KafkaBroker server) {
            $this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().trace((Function0<String>)(Function0 & Serializable)() -> StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Shutting down server : %s"), (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())})));
            server.shutdown();
            server.awaitShutdown();
            Thread.sleep(500L);
            $this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().trace((Function0<String>)(Function0 & Serializable)() -> StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Server %s shut down. Starting it up again."), (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())})));
            server.startup();
            $this.kafka$api$TransactionsBounceTest$BounceScheduler$$$outer().trace((Function0<String>)(Function0 & Serializable)() -> StringOps$.MODULE$.format$extension(Predef$.MODULE$.augmentString("Restarted server: %s"), (scala.collection.immutable.Seq)ScalaRunTime$.MODULE$.genericWrapArray((Object)new Object[]{BoxesRunTime.boxToInteger((int)server.config().brokerId())})));
            Thread.sleep(500L);
        }

        public BounceScheduler() {
            if (TransactionsBounceTest.this == null) {
                throw null;
            }
            super("daemon-broker-bouncer", false);
        }
    }
}

