/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.integration;

import java.io.File;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.utils.Logging;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import scala.Function0;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;

public class EligibleLeaderReplicasIntegrationTest
extends KafkaServerTestHarness
implements Logging {
    private String bootstrapServer;
    private String testTopicName;
    private Admin adminClient;

    @Override
    public MetadataVersion metadataVersion() {
        return MetadataVersion.IBP_4_0_IV1;
    }

    @Override
    public Seq<KafkaConfig> generateConfigs() {
        ArrayList brokerConfigs = new ArrayList();
        brokerConfigs.addAll(JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(5, true, true, (Option<SecurityProtocol>)Option.empty(), (Option<File>)Option.empty(), (Option<Properties>)Option.empty(), true, false, false, false, (Map<Object, String>)new HashMap(), 1, false, 1, (short)4, 0, false)));
        ArrayList<KafkaConfig> configs = new ArrayList<KafkaConfig>();
        for (Properties props : brokerConfigs) {
            configs.add(KafkaConfig.fromProps((Properties)props));
        }
        return JavaConverters.asScalaBuffer(configs).toSeq();
    }

    @Override
    @BeforeEach
    public void setUp(TestInfo info) {
        super.setUp(info);
        Properties props = new Properties();
        this.bootstrapServer = this.bootstrapServers(this.listenerName());
        props.put("bootstrap.servers", this.bootstrapServer);
        this.adminClient = Admin.create((Properties)props);
        this.adminClient.updateFeatures(java.util.Map.of("eligible.leader.replicas.version", new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)), new UpdateFeaturesOptions());
        this.testTopicName = String.format("%s-%s", ((Method)info.getTestMethod().get()).getName(), "ELR-test");
    }

    @AfterEach
    public void close() throws Exception {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(List.of(new NewTopic(this.testTopicName, 1, 4))).all().get();
        TestUtils.waitForPartitionMetadata(this.brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
        ops.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        java.util.Map configOps = java.util.Map.of(configResource, ops);
        this.adminClient.incrementalAlterConfigs(configOps).all().get();
        KafkaProducer producer = null;
        KafkaConsumer consumer = null;
        try {
            TopicDescription testTopicDescription = (TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName);
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo)testTopicDescription.partitions().get(0);
            List initialReplicas = topicPartitionInfo.replicas();
            Assertions.assertEquals((int)4, (int)topicPartitionInfo.isr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.elr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.lastKnownElr().size());
            Properties producerProps = new Properties();
            producerProps.putIfAbsent("key.serializer", StringSerializer.class.getName());
            producerProps.putIfAbsent("value.serializer", StringSerializer.class.getName());
            producerProps.put("bootstrap.servers", this.bootstrapServer);
            producerProps.put("acks", "1");
            producer = new KafkaProducer(producerProps);
            Properties consumerProps = new Properties();
            consumerProps.put("bootstrap.servers", this.bootstrapServer);
            consumerProps.put("group.id", "test");
            consumerProps.put("fetch.max.wait.ms", "10");
            consumerProps.put("auto.offset.reset", "earliest");
            consumerProps.putIfAbsent("key.deserializer", StringDeserializer.class.getName());
            consumerProps.putIfAbsent("value.deserializer", StringDeserializer.class.getName());
            consumer = new KafkaConsumer(consumerProps);
            consumer.subscribe(Set.of(this.testTopicName));
            producer.send(new ProducerRecord(this.testTopicName, (Object)"0", (Object)"0")).get();
            this.waitUntilOneMessageIsConsumed((Consumer)consumer);
            this.killBroker(((Node)initialReplicas.get(0)).id());
            this.killBroker(((Node)initialReplicas.get(1)).id());
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 2 && elrSize == 1);
            producer.send(new ProducerRecord(this.testTopicName, (Object)"1", (Object)"1")).get();
            Thread.sleep(100L);
            Assertions.assertEquals((int)0, (int)consumer.poll(Duration.ofSeconds(1L)).count());
            this.startBroker(((Node)initialReplicas.get(1)).id());
            this.startBroker(((Node)initialReplicas.get(0)).id());
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 4 && elrSize == 0);
            this.waitUntilOneMessageIsConsumed((Consumer)consumer);
        }
        finally {
            this.restartDeadBrokers(false);
            if (consumer != null) {
                consumer.close();
            }
            if (producer != null) {
                producer.close();
            }
        }
    }

    void waitUntilOneMessageIsConsumed(Consumer consumer) {
        TestUtils.waitUntilTrue((Function0<Object>)((Function0)() -> {
            try {
                ConsumerRecords record = consumer.poll(Duration.ofMillis(100L));
                return record.count() >= 1;
            }
            catch (Exception e) {
                return false;
            }
        }), (Function0<String>)((Function0)() -> "fail to consume messages"), 15000L, 100L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(List.of(new NewTopic(this.testTopicName, 1, 4))).all().get();
        TestUtils.waitForPartitionMetadata(this.brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
        ops.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        java.util.Map configOps = java.util.Map.of(configResource, ops);
        this.adminClient.incrementalAlterConfigs(configOps).all().get();
        try {
            TopicDescription testTopicDescription = (TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName);
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo)testTopicDescription.partitions().get(0);
            List initialReplicas = topicPartitionInfo.replicas();
            Assertions.assertEquals((int)4, (int)topicPartitionInfo.isr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.elr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.lastKnownElr().size());
            this.killBroker(((Node)initialReplicas.get(0)).id());
            this.killBroker(((Node)initialReplicas.get(1)).id());
            this.killBroker(((Node)initialReplicas.get(2)).id());
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2);
            this.killBroker(((Node)initialReplicas.get(3)).id());
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3);
            topicPartitionInfo = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertEquals((int)1, (int)topicPartitionInfo.lastKnownElr().size(), (String)topicPartitionInfo.toString());
            int expectLastKnownLeader = ((Node)initialReplicas.get(3)).id();
            Assertions.assertEquals((int)expectLastKnownLeader, (int)((Node)topicPartitionInfo.lastKnownElr().get(0)).id(), (String)topicPartitionInfo.toString());
            int expectLeader = topicPartitionInfo.elr().stream().filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id();
            this.startBroker(expectLeader);
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 1 && elrSize == 2);
            topicPartitionInfo = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.lastKnownElr().size(), (String)topicPartitionInfo.toString());
            Assertions.assertEquals((int)expectLeader, (int)topicPartitionInfo.leader().id(), (String)topicPartitionInfo.toString());
            topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2L).forEach(node -> this.startBroker(node.id()));
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 3 && elrSize == 0);
            topicPartitionInfo = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.lastKnownElr().size(), (String)topicPartitionInfo.toString());
            Assertions.assertEquals((int)expectLeader, (int)topicPartitionInfo.leader().id(), (String)topicPartitionInfo.toString());
        }
        finally {
            this.restartDeadBrokers(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(List.of(new NewTopic(this.testTopicName, 1, 4))).all().get();
        TestUtils.waitForPartitionMetadata(this.brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
        ops.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        java.util.Map configOps = java.util.Map.of(configResource, ops);
        this.adminClient.incrementalAlterConfigs(configOps).all().get();
        try {
            TopicDescription testTopicDescription = (TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName);
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo)testTopicDescription.partitions().get(0);
            List initialReplicas = topicPartitionInfo.replicas();
            Assertions.assertEquals((int)4, (int)topicPartitionInfo.isr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.elr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.lastKnownElr().size());
            this.killBroker(((Node)initialReplicas.get(0)).id());
            this.killBroker(((Node)initialReplicas.get(1)).id());
            this.killBroker(((Node)initialReplicas.get(2)).id());
            this.killBroker(((Node)initialReplicas.get(3)).id());
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3);
            topicPartitionInfo = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            int brokerToBeUncleanShutdown = ((Node)topicPartitionInfo.elr().get(0)).id();
            KafkaBroker broker = (KafkaBroker)this.brokers().find(b -> b.config().brokerId() == brokerToBeUncleanShutdown).get();
            Seq dirs = broker.logManager().liveLogDirs();
            Assertions.assertEquals((int)1, (int)dirs.size());
            CleanShutdownFileHandler handler = new CleanShutdownFileHandler(((File)dirs.apply(0)).toString());
            Assertions.assertTrue((boolean)handler.exists());
            Assertions.assertDoesNotThrow(() -> handler.delete());
            this.startBroker(brokerToBeUncleanShutdown);
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 2);
            topicPartitionInfo = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertNull((Object)topicPartitionInfo.leader());
            Assertions.assertEquals((int)1, (int)topicPartitionInfo.lastKnownElr().size());
        }
        finally {
            this.restartDeadBrokers(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException {
        this.adminClient.createTopics(List.of(new NewTopic(this.testTopicName, 1, 4))).all().get();
        TestUtils.waitForPartitionMetadata(this.brokers(), this.testTopicName, 0, 1000L);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, this.testTopicName);
        ArrayList<AlterConfigOp> ops = new ArrayList<AlterConfigOp>();
        ops.add(new AlterConfigOp(new ConfigEntry("min.insync.replicas", "3"), AlterConfigOp.OpType.SET));
        java.util.Map configOps = java.util.Map.of(configResource, ops);
        this.adminClient.incrementalAlterConfigs(configOps).all().get();
        try {
            TopicDescription testTopicDescription = (TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName);
            TopicPartitionInfo topicPartitionInfo = (TopicPartitionInfo)testTopicDescription.partitions().get(0);
            List initialReplicas = topicPartitionInfo.replicas();
            Assertions.assertEquals((int)4, (int)topicPartitionInfo.isr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.elr().size());
            Assertions.assertEquals((int)0, (int)topicPartitionInfo.lastKnownElr().size());
            this.killBroker(((Node)initialReplicas.get(0)).id());
            this.killBroker(((Node)initialReplicas.get(1)).id());
            this.killBroker(((Node)initialReplicas.get(2)).id());
            this.killBroker(((Node)initialReplicas.get(3)).id());
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3);
            topicPartitionInfo = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            int lastKnownLeader = ((Node)topicPartitionInfo.lastKnownElr().get(0)).id();
            Set initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet());
            this.brokers().foreach(broker -> {
                if (initialReplicaSet.contains(broker.config().brokerId())) {
                    Seq dirs = broker.logManager().liveLogDirs();
                    Assertions.assertEquals((int)1, (int)dirs.size());
                    CleanShutdownFileHandler handler = new CleanShutdownFileHandler(((File)dirs.apply(0)).toString());
                    Assertions.assertDoesNotThrow(() -> handler.delete());
                }
                return true;
            });
            topicPartitionInfo.replicas().forEach(replica -> {
                if (replica.id() != lastKnownLeader) {
                    this.startBroker(replica.id());
                }
            });
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 1);
            topicPartitionInfo = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
            Assertions.assertNull((Object)topicPartitionInfo.leader());
            Assertions.assertEquals((int)1, (int)topicPartitionInfo.lastKnownElr().size());
            this.startBroker(lastKnownLeader);
            this.waitForIsrAndElr((isrSize, elrSize) -> isrSize > 0 && elrSize == 0);
            TestUtils.waitUntilTrue((Function0<Object>)((Function0)() -> {
                try {
                    TopicPartitionInfo partition = (TopicPartitionInfo)((TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName)).partitions().get(0);
                    if (partition.leader() == null) {
                        return false;
                    }
                    return partition.lastKnownElr().isEmpty() && partition.elr().isEmpty() && partition.leader().id() == lastKnownLeader;
                }
                catch (Exception e) {
                    return false;
                }
            }), (Function0<String>)((Function0)() -> String.format("Partition metadata for %s is not correct", this.testTopicName)), 15000L, 100L);
        }
        finally {
            this.restartDeadBrokers(false);
        }
    }

    void waitForIsrAndElr(BiFunction<Integer, Integer, Boolean> isIsrAndElrSizeSatisfied) {
        TestUtils.waitUntilTrue((Function0<Object>)((Function0)() -> {
            try {
                TopicDescription topicDescription = (TopicDescription)((java.util.Map)this.adminClient.describeTopics(List.of(this.testTopicName)).allTopicNames().get()).get(this.testTopicName);
                TopicPartitionInfo partition = (TopicPartitionInfo)topicDescription.partitions().get(0);
                return isIsrAndElrSizeSatisfied.apply(partition.isr().size(), partition.elr().size());
            }
            catch (Exception e) {
                return false;
            }
        }), (Function0<String>)((Function0)() -> String.format("Partition metadata for %s is not propagated", this.testTopicName)), 15000L, 100L);
    }
}

