/*
 * Decompiled with CFR 0.152.
 */
package kafka.admin;

import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FenceProducersOptions;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(serverProperties={@ClusterConfigProperty(key="auto.create.topics.enable", value="false"), @ClusterConfigProperty(key="transaction.state.log.num.partitions", value="1"), @ClusterConfigProperty(key="transaction.state.log.replication.factor", value="1"), @ClusterConfigProperty(key="transaction.state.log.min.isr", value="1"), @ClusterConfigProperty(key="transaction.abort.timed.out.transaction.cleanup.interval.ms", value="2000")})
public class AdminFenceProducersTest {
    private static final String TOPIC_NAME = "mytopic";
    private static final String TXN_ID = "mytxnid";
    private static final String INCORRECT_BROKER_PORT = "225";
    private static final ProducerRecord<byte[], byte[]> RECORD = new ProducerRecord("mytopic", null, (Object)new byte[1]);
    private final ClusterInstance clusterInstance;

    AdminFenceProducersTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    private KafkaProducer<byte[], byte[]> createProducer() {
        Properties config = new Properties();
        config.put("bootstrap.servers", this.clusterInstance.bootstrapServers());
        config.put("transactional.id", TXN_ID);
        config.put("transaction.timeout.ms", "2000");
        config.put("key.serializer", ByteArraySerializer.class.getName());
        config.put("value.serializer", ByteArraySerializer.class.getName());
        return new KafkaProducer(config);
    }

    @ClusterTest
    void testFenceAfterProducerCommit() throws Exception {
        this.clusterInstance.createTopic(TOPIC_NAME, 1, (short)1);
        try (KafkaProducer<byte[], byte[]> producer = this.createProducer();
             Admin adminClient = this.clusterInstance.admin();){
            producer.initTransactions();
            producer.beginTransaction();
            producer.send(RECORD).get();
            producer.commitTransaction();
            adminClient.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
            producer.beginTransaction();
            ExecutionException exceptionDuringSend = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> producer.send(RECORD).get(), (String)"expected InvalidProducerEpochException");
            Assertions.assertInstanceOf(InvalidProducerEpochException.class, (Object)exceptionDuringSend.getCause());
            Assertions.assertThrows(InvalidProducerEpochException.class, () -> producer.commitTransaction());
        }
    }

    @ClusterTest
    void testFenceProducerTimeoutMs() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("bootstrap.servers", "localhost:225");
        try (Admin adminClient = this.clusterInstance.admin(config);){
            ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> adminClient.fenceProducers(Collections.singletonList(TXN_ID), (FenceProducersOptions)new FenceProducersOptions().timeoutMs(Integer.valueOf(0))).all().get());
            Assertions.assertInstanceOf(TimeoutException.class, (Object)exception.getCause());
        }
    }

    @ClusterTest
    void testFenceBeforeProducerCommit() throws Exception {
        this.clusterInstance.createTopic(TOPIC_NAME, 1, (short)1);
        try (KafkaProducer<byte[], byte[]> producer = this.createProducer();
             Admin adminClient = this.clusterInstance.admin();){
            producer.initTransactions();
            producer.beginTransaction();
            producer.send(RECORD).get();
            adminClient.fenceProducers(Collections.singletonList(TXN_ID)).all().get();
            ExecutionException exceptionDuringSend = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> producer.send(RECORD).get(), (String)"expected ProducerFencedException");
            Assertions.assertTrue((exceptionDuringSend.getCause() instanceof ProducerFencedException || exceptionDuringSend.getCause() instanceof InvalidProducerEpochException ? 1 : 0) != 0);
            ApiException exceptionDuringCommit = (ApiException)Assertions.assertThrows(ApiException.class, () -> producer.commitTransaction(), (String)"Expected Exception");
            Assertions.assertTrue((exceptionDuringCommit instanceof ProducerFencedException || exceptionDuringCommit instanceof InvalidProducerEpochException ? 1 : 0) != 0);
        }
    }
}

