/*
 * Decompiled with CFR 0.152.
 */
package kafka.coordinator.transaction;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import kafka.network.SocketServer;
import kafka.server.IntegrationTestUtils$;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TransactionState;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.message.InitProducerIdRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.TestUtils;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterFeature;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.ClusterTests;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import org.junit.jupiter.api.Assertions;
import scala.Array$;
import scala.Function0;
import scala.Function1;
import scala.Predef$;
import scala.collection.SeqOps;
import scala.collection.immutable.Seq;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction0;

@ClusterTestDefaults(types={Type.KRAFT}, serverProperties={@ClusterConfigProperty(key="transaction.state.log.replication.factor", value="1"), @ClusterConfigProperty(key="transaction.state.log.num.partitions", value="1"), @ClusterConfigProperty(key="transaction.state.log.min.isr", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="offsets.topic.num.partitions", value="1")})
@ScalaSignature(bytes="\u0006\u0005\u0005\rd\u0001\u0002\u0005\n\u0001AAQa\u0006\u0001\u0005\u0002aAQa\u0007\u0001\u0005\u0002qAQ!\u0012\u0001\u0005\u0002\u0019CQA\u0019\u0001\u0005\u0002\rDQ!\u001c\u0001\u0005\u00029DQ\u0001\u001f\u0001\u0005\neDQa\u001f\u0001\u0005\nq\u0014q\u0003\u0015:pIV\u001cWM]%oi\u0016<'/\u0019;j_:$Vm\u001d;\u000b\u0005)Y\u0011a\u0003;sC:\u001c\u0018m\u0019;j_:T!\u0001D\u0007\u0002\u0017\r|wN\u001d3j]\u0006$xN\u001d\u0006\u0002\u001d\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u0012!\t\u0011R#D\u0001\u0014\u0015\u0005!\u0012!B:dC2\f\u0017B\u0001\f\u0014\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012!\u0007\t\u00035\u0001i\u0011!C\u0001\u0016i\u0016\u001cH/\u00168jcV,\u0007K]8ek\u000e,'/\u00133t)\ti\u0002\u0005\u0005\u0002\u0013=%\u0011qd\u0005\u0002\u0005+:LG\u000fC\u0003\"\u0005\u0001\u0007!%A\bdYV\u001cH/\u001a:J]N$\u0018M\\2f!\t\u0019S&D\u0001%\u0015\t)c%\u0001\u0003uKN$(BA\u0014)\u0003\u0019\u0019w.\\7p]*\u0011a\"\u000b\u0006\u0003U-\na!\u00199bG\",'\"\u0001\u0017\u0002\u0007=\u0014x-\u0003\u0002/I\ty1\t\\;ti\u0016\u0014\u0018J\\:uC:\u001cW\r\u000b\u0003\u0003aY:\u0004CA\u00195\u001b\u0005\u0011$BA\u001a%\u0003\r\t\u0007/[\u0005\u0003kI\u0012Ab\u00117vgR,'\u000fV3tiN\fQA^1mk\u0016d\u0013\u0001O\u0016\u0004sqj\u0004CA\u0019;\u0013\tY$GA\u0006DYV\u001cH/\u001a:UKN$\u0018aD7fi\u0006$\u0017\r^1WKJ\u001c\u0018n\u001c8%\u0003yJ!a\u0010!\u0002\u0017%\u0013\u0005kX\u001a`g}Kek\r\u0006\u0003\u0003\n\u000bq\"T3uC\u0012\fG/\u0019,feNLwN\u001c\u0006\u0003O\rS!\u0001\u0012\u0015\u0002\rM,'O^3s\u0003\u0005\"Xm\u001d;Ue\u0006t7/Y2uS>tw+\u001b;i\u0003:$w+\u001b;i_V$8+\u001a8e)\tir\tC\u0003I\u0007\u0001\u0007!%A\u0004dYV\u001cH/\u001a:)\t\r\u0001dG\u0013\u0017\u0004\u0017js6fA\u001dM\u001b\u0006Aa-Z1ukJ,7\u000fL\u0001OW\u0015y%k\u0015-Z!\t\t\u0004+\u0003\u0002Re\tq1\t\\;ti\u0016\u0014h)Z1ukJ,\u0017a\u00024fCR,(/\u001a\u0013\u0002)&\u0011QKV\u0001\u0014)J\u000bejU!D)&{ej\u0018,F%NKuJ\u0014\u0006\u0003/\n\u000bqAR3biV\u0014X-A\u0004wKJ\u001c\u0018n\u001c8\u001c\u0003\u0001Y3!\u000f'\\Y\u0005a6&B(S'bk6$A\u0001,\u0007ebu\fL\u0001aW\u0015y%k\u0015-b7\u0005\u0011\u0011A\r;fgR$&/\u00198tC\u000e$\u0018n\u001c8XSRD\u0017J\u001c<bY&$7+\u001a8e\u0003:$WI\u001c3Uq:\u0014V-];fgR\u001cVM\u001c;\u0015\u0005u!\u0007\"\u0002%\u0005\u0001\u0004\u0011\u0003\u0006\u0002\u00031m\u0019d3aZ5lW\rID\n\u001b\u0017\u0002\u001d.\u001a\u0011\b\u00146-\u0003q[3!\u000f'mY\u0005\u0001\u0017!\b;fgR$&/\u00198tC\u000e$\u0018n\u001c8XSRD7+\u001a8e\u001f\u001a47/\u001a;\u0015\u0005uy\u0007\"\u0002%\u0006\u0001\u0004\u0011\u0003\u0006B\u00031mEd3A\u001d;wW\rIDj\u001d\u0017\u0002\u001d.\u001a\u0011\bT;-\u0003q[3!\u000f'xY\u0005\u0001\u0017a\u0004<fe&4\u00170\u00168jcV,\u0017\nZ:\u0015\u0005uQ\b\"B\u0011\u0007\u0001\u0004\u0011\u0013A\u00048fqR\u0004&o\u001c3vG\u0016\u0014\u0018\n\u001a\u000b\u0006{\u0006\u0005\u0011\u0011\u0003\t\u0003%yL!a`\n\u0003\t1{gn\u001a\u0005\b\u0003\u00079\u0001\u0019AA\u0003\u0003\u0019\u0011'o\\6feB!\u0011qAA\u0007\u001b\t\tIAC\u0002\u0002\f5\tqA\\3uo>\u00148.\u0003\u0003\u0002\u0010\u0005%!\u0001D*pG.,GoU3sm\u0016\u0014\bbBA\n\u000f\u0001\u0007\u0011QC\u0001\tY&\u001cH/\u001a8feB!\u0011qCA\u000e\u001b\t\tIBC\u0002\u0002\f\u0019JA!!\b\u0002\u001a\taA*[:uK:,'OT1nK\"Z\u0001!!\t\u0002(\u0005%\u0012QGA\u001c!\r\t\u00141E\u0005\u0004\u0003K\u0011$aE\"mkN$XM\u001d+fgR$UMZ1vYR\u001c\u0018!\u0002;za\u0016\u001cHFAA\u0016I\t\ti#\u0003\u0003\u00020\u0005E\u0012!B&S\u0003\u001a#&bAA\u001ae\u0005!A+\u001f9f\u0003A\u0019XM\u001d<feB\u0013x\u000e]3si&,7\u000f\f\u0006\u0002:\u0005-\u0013\u0011KA,\u0003;Z\u0013\"a\u000f\u0002B\u0005\rc'a\u0012\u0011\u0007E\ni$C\u0002\u0002@I\u0012Qc\u00117vgR,'oQ8oM&<\u0007K]8qKJ$\u00180A\u0002lKf\f#!!\u0012\u0002QQ\u0014\u0018M\\:bGRLwN\u001c\u0018ti\u0006$XM\f7pO:\u0012X\r\u001d7jG\u0006$\u0018n\u001c8/M\u0006\u001cGo\u001c:\"\u0005\u0005%\u0013!A\u0019,\u0013\u0005m\u0012\u0011IA'm\u0005\u001d\u0013EAA(\u0003\u0011\"(/\u00198tC\u000e$\u0018n\u001c8/gR\fG/\u001a\u0018m_\u001etc.^7/a\u0006\u0014H/\u001b;j_:\u001c8&CA\u001e\u0003\u0003\n\u0019FNA$C\t\t)&A\u000fue\u0006t7/Y2uS>tgf\u001d;bi\u0016tCn\\4/[&tg&[:sW%\tY$!\u0011\u0002ZY\n9%\t\u0002\u0002\\\u0005\u0001sN\u001a4tKR\u001ch\u0006^8qS\u000et#/\u001a9mS\u000e\fG/[8o]\u0019\f7\r^8sW%\tY$!\u0011\u0002`Y\n9%\t\u0002\u0002b\u0005arN\u001a4tKR\u001ch\u0006^8qS\u000etc.^7/a\u0006\u0014H/\u001b;j_:\u001c\b")
public class ProducerIntegrationTest {
    @ClusterTests(value={@ClusterTest(metadataVersion=MetadataVersion.IBP_3_3_IV3)})
    public void testUniqueProducerIds(ClusterInstance clusterInstance) {
        this.verifyUniqueIds(clusterInstance);
    }

    @ClusterTests(value={@ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=0)}), @ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=1)}), @ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=2)})})
    public void testTransactionWithAndWithoutSend(ClusterInstance cluster) {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("transactional.id", "foobar");
        properties.put("client.id", "test");
        properties.put("enable.idempotence", "true");
        Producer producer = cluster.producer(properties);
        try {
            producer.initTransactions();
            producer.beginTransaction();
            producer.send(new ProducerRecord("test", (Object)"key".getBytes(), (Object)"value".getBytes()));
            producer.commitTransaction();
            producer.beginTransaction();
            producer.commitTransaction();
        }
        catch (Throwable throwable) {
            if (producer != null) {
                producer.close();
            }
            throw throwable;
        }
        producer.close();
    }

    @ClusterTests(value={@ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=0)}), @ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=1)}), @ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=2)})})
    public void testTransactionWithInvalidSendAndEndTxnRequestSent(ClusterInstance cluster) {
        NewTopic topic = new NewTopic("foobar", 1, (short)1).configs(Map.of("max.message.bytes", "100"));
        String txnId = "test-txn";
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("transactional.id", txnId);
        properties.put("client.id", "test");
        properties.put("enable.idempotence", "true");
        Admin admin = cluster.admin();
        Producer producer = cluster.producer(properties);
        try {
            admin.createTopics(List.of(topic));
            producer.initTransactions();
            producer.beginTransaction();
            Assertions.assertInstanceOf(RecordTooLargeException.class, (Object)Assertions.assertThrows(ExecutionException.class, () -> producer.send(new ProducerRecord(topic.name(), Array$.MODULE$.fill(100, (Function0)(JFunction0.mcB.sp & Serializable)() -> 0, (ClassTag)ClassTag$.MODULE$.Byte()), Array$.MODULE$.fill(100, (Function0)(JFunction0.mcB.sp & Serializable)() -> 0, (ClassTag)ClassTag$.MODULE$.Byte()))).get()).getCause());
            producer.abortTransaction();
        }
        catch (Throwable throwable) {
            if (admin != null) {
                admin.close();
            }
            if (producer != null) {
                producer.close();
            }
            throw throwable;
        }
        admin.close();
        producer.close();
    }

    @ClusterTests(value={@ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=0)}), @ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=1)}), @ClusterTest(features={@ClusterFeature(feature=Feature.TRANSACTION_VERSION, version=2)})})
    public void testTransactionWithSendOffset(ClusterInstance cluster) {
        String inputTopic = "my-input-topic";
        ObjectRef producer = ObjectRef.create((Object)cluster.producer());
        try {
            RichInt$.MODULE$.until$extension(Predef$.MODULE$.intWrapper(0), 5).foreach((Function1 & Serializable)i -> ProducerIntegrationTest.$anonfun$testTransactionWithSendOffset$1(producer, inputTopic, BoxesRunTime.unboxToInt((Object)i)));
        }
        finally {
            if ((Producer)producer.elem != null) {
                ((Producer)producer.elem).close();
            }
        }
        String txnId = "foobar";
        HashMap<String, String> producerProperties = new HashMap<String, String>();
        producerProperties.put("transactional.id", txnId);
        producerProperties.put("client.id", "test");
        producerProperties.put("enable.idempotence", "true");
        HashMap<String, String> consumerProperties = new HashMap<String, String>();
        consumerProperties.put("group.id", "test-consumer-group");
        consumerProperties.put("auto.offset.reset", "earliest");
        producer.elem = cluster.producer(producerProperties);
        Consumer consumer = cluster.consumer(consumerProperties);
        try {
            ((Producer)producer.elem).initTransactions();
            ((Producer)producer.elem).beginTransaction();
            consumer.subscribe(List.of(inputTopic));
            ObjectRef records = ObjectRef.create(null);
            TestUtils.waitForCondition(() -> {
                records$1.elem = consumer.poll(Duration.ZERO);
                return Predef$.MODULE$.boolean2Boolean(((ConsumerRecords)records$1.elem).count() == 5);
            }, (String)"poll records size not match");
            ConsumerRecord lastRecord = StreamSupport.stream(((ConsumerRecords)records.elem).spliterator(), false).reduce((x$1, second) -> second).orElse(null);
            Map<TopicPartition, OffsetAndMetadata> offsets = Map.of(new TopicPartition(lastRecord.topic(), lastRecord.partition()), new OffsetAndMetadata(lastRecord.offset() + 1L));
            ((Producer)producer.elem).sendOffsetsToTransaction(offsets, consumer.groupMetadata());
            ((Producer)producer.elem).commitTransaction();
        }
        catch (Throwable throwable) {
            if ((Producer)producer.elem != null) {
                ((Producer)producer.elem).close();
            }
            if (consumer != null) {
                consumer.close();
            }
            throw throwable;
        }
        if ((Producer)producer.elem != null) {
            ((Producer)producer.elem).close();
        }
        consumer.close();
        try (Admin admin = cluster.admin();){
            TestUtils.waitForCondition(() -> Predef$.MODULE$.boolean2Boolean(((Collection)admin.listTransactions().all().get()).stream().filter(txn -> {
                String string = txn.transactionalId();
                return !(string != null ? !string.equals(txnId) : txnId != null);
            }).anyMatch(txn -> txn.state() == TransactionState.COMPLETE_COMMIT)), (String)"transaction is not in COMPLETE_COMMIT state");
        }
    }

    private void verifyUniqueIds(ClusterInstance clusterInstance) {
        Seq ids = CollectionConverters$.MODULE$.ListHasAsScala(clusterInstance.brokerSocketServers().stream().flatMap(broker -> IntStream.range(0, 1001).parallel().mapToObj(x$2 -> BoxesRunTime.boxToLong((long)this.nextProducerId(broker, clusterInstance.clientListener())))).collect(Collectors.toList())).asScala().toSeq();
        int brokerCount = clusterInstance.brokerIds().size();
        int expectedTotalCount = 1001 * brokerCount;
        Assertions.assertEquals((int)expectedTotalCount, (int)ids.size(), (String)("Expected exactly " + expectedTotalCount + " IDs"));
        Assertions.assertEquals((int)expectedTotalCount, (int)((SeqOps)ids.distinct()).size(), (String)"Found duplicate producer IDs");
    }

    private long nextProducerId(SocketServer broker, ListenerName listener) {
        Deadline deadline = new package.DurationInt(package$.MODULE$.DurationInt(5)).seconds().fromNow();
        boolean shouldRetry = true;
        InitProducerIdResponse response = null;
        while (shouldRetry && deadline.hasTimeLeft()) {
            InitProducerIdRequestData data = new InitProducerIdRequestData().setProducerEpoch((short)-1).setProducerId(-1L).setTransactionalId(null).setTransactionTimeoutMs(10);
            InitProducerIdRequest request = (InitProducerIdRequest)new InitProducerIdRequest.Builder(data).build();
            response = (InitProducerIdResponse)IntegrationTestUtils$.MODULE$.connectAndReceive((AbstractRequest)request, broker, listener, ClassTag$.MODULE$.apply(InitProducerIdResponse.class));
            shouldRetry = response.data().errorCode() == Errors.COORDINATOR_LOAD_IN_PROGRESS.code();
        }
        Assertions.assertTrue((boolean)deadline.hasTimeLeft());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)response.data().errorCode());
        return response.data().producerId();
    }

    public static final /* synthetic */ RecordMetadata $anonfun$testTransactionWithSendOffset$1(ObjectRef producer$2, String inputTopic$1, int i) {
        byte[] key = ("key-" + i).getBytes();
        byte[] value = ("value-" + i).getBytes();
        return (RecordMetadata)((Producer)producer$2.elem).send(new ProducerRecord(inputTopic$1, (Object)key, (Object)value)).get();
    }
}

