/*
 * Decompiled with CFR 0.152.
 */
package kafka.server;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.DescribeFeaturesResult;
import org.apache.kafka.clients.admin.DescribeMetadataQuorumResult;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.FinalizedVersionRange;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.clients.admin.UpdateFeaturesResult;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.InvalidUpdateVersionException;
import org.apache.kafka.common.errors.MismatchedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedEndpointTypeException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Timeout;

@Timeout(value=120L)
@ClusterTestDefaults(types={Type.KRAFT})
public class BootstrapControllersIntegrationTest {
    private Map<String, Object> adminConfig(ClusterInstance clusterInstance, boolean usingBootstrapControllers) {
        return usingBootstrapControllers ? Collections.singletonMap("bootstrap.controllers", clusterInstance.bootstrapControllers()) : Collections.singletonMap("bootstrap.servers", clusterInstance.bootstrapServers());
    }

    @ClusterTest
    public void testPutBrokersInBootstrapControllersConfig(ClusterInstance clusterInstance) {
        Map<String, String> config = Collections.singletonMap("bootstrap.controllers", clusterInstance.bootstrapServers());
        try (Admin admin = Admin.create(config);){
            ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> admin.describeCluster().clusterId().get(1L, TimeUnit.MINUTES));
            Assertions.assertNotNull((Object)exception.getCause());
            Assertions.assertEquals(MismatchedEndpointTypeException.class, exception.getCause().getClass());
            Assertions.assertEquals((Object)"The request was sent to an endpoint of type BROKER, but we wanted an endpoint of type CONTROLLER", (Object)exception.getCause().getMessage());
        }
    }

    @ClusterTest
    public void testPutControllersInBootstrapBrokersConfig(ClusterInstance clusterInstance) {
        Map<String, String> config = Collections.singletonMap("bootstrap.servers", clusterInstance.bootstrapControllers());
        try (Admin admin = Admin.create(config);){
            ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> admin.describeCluster().clusterId().get(1L, TimeUnit.MINUTES));
            Assertions.assertNotNull((Object)exception.getCause());
            Assertions.assertEquals(UnsupportedVersionException.class, exception.getCause().getClass());
            Assertions.assertEquals((Object)"The node does not support METADATA", (Object)exception.getCause().getMessage());
        }
    }

    @ClusterTest
    public void testDescribeClusterByControllers(ClusterInstance clusterInstance) throws Exception {
        this.testDescribeCluster(clusterInstance, true);
    }

    @ClusterTest
    public void testDescribeCluster(ClusterInstance clusterInstance) throws Exception {
        this.testDescribeCluster(clusterInstance, false);
    }

    private void testDescribeCluster(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, usingBootstrapControllers));){
            DescribeClusterResult result = admin.describeCluster();
            Assertions.assertEquals((Object)clusterInstance.clusterId(), (Object)result.clusterId().get(1L, TimeUnit.MINUTES));
            if (usingBootstrapControllers) {
                Assertions.assertTrue((boolean)clusterInstance.controllerIds().contains(((Node)result.controller().get()).id()));
            }
        }
    }

    @ClusterTest
    public void testDescribeFeaturesByControllers(ClusterInstance clusterInstance) throws Exception {
        this.testDescribeFeatures(clusterInstance, true);
    }

    @ClusterTest
    public void testDescribeFeatures(ClusterInstance clusterInstance) throws Exception {
        this.testDescribeFeatures(clusterInstance, false);
    }

    private void testDescribeFeatures(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, usingBootstrapControllers));){
            DescribeFeaturesResult result = admin.describeFeatures();
            short metadataVersion = clusterInstance.config().metadataVersion().featureLevel();
            Assertions.assertEquals((Object)new FinalizedVersionRange(metadataVersion, metadataVersion), ((FeatureMetadata)result.featureMetadata().get(1L, TimeUnit.MINUTES)).finalizedFeatures().get("metadata.version"));
        }
    }

    @ClusterTest
    public void testUpdateFeaturesByControllers(ClusterInstance clusterInstance) {
        this.testUpdateFeatures(clusterInstance, true);
    }

    @ClusterTest
    public void testUpdateFeatures(ClusterInstance clusterInstance) {
        this.testUpdateFeatures(clusterInstance, false);
    }

    private void testUpdateFeatures(ClusterInstance clusterInstance, boolean usingBootstrapControllers) {
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, usingBootstrapControllers));){
            UpdateFeaturesResult result = admin.updateFeatures(Collections.singletonMap("foo.bar.feature", new FeatureUpdate(1, FeatureUpdate.UpgradeType.UPGRADE)), new UpdateFeaturesOptions());
            ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> result.all().get(1L, TimeUnit.MINUTES));
            Assertions.assertNotNull((Object)exception.getCause());
            Assertions.assertEquals(InvalidUpdateVersionException.class, exception.getCause().getClass());
            Assertions.assertTrue((boolean)exception.getCause().getMessage().endsWith("does not support this feature."), (String)("expected message to end with 'does not support this feature', but it was: " + exception.getCause().getMessage()));
        }
    }

    @ClusterTest
    public void testDescribeMetadataQuorumByControllers(ClusterInstance clusterInstance) throws Exception {
        this.testDescribeMetadataQuorum(clusterInstance, true);
    }

    @ClusterTest
    public void testDescribeMetadataQuorum(ClusterInstance clusterInstance) throws Exception {
        this.testDescribeMetadataQuorum(clusterInstance, false);
    }

    private void testDescribeMetadataQuorum(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, usingBootstrapControllers));){
            DescribeMetadataQuorumResult result = admin.describeMetadataQuorum();
            Assertions.assertTrue((boolean)clusterInstance.controllerIds().contains(((QuorumInfo)result.quorumInfo().get(1L, TimeUnit.MINUTES)).leaderId()));
        }
    }

    @ClusterTest
    public void testUsingBootstrapControllersOnUnsupportedAdminApi(ClusterInstance clusterInstance) {
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, true));){
            ListOffsetsResult result = admin.listOffsets(Collections.singletonMap(new TopicPartition("foo", 0), OffsetSpec.earliest()));
            ExecutionException exception = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> result.all().get(1L, TimeUnit.MINUTES));
            Assertions.assertNotNull((Object)exception.getCause());
            Assertions.assertEquals(UnsupportedEndpointTypeException.class, exception.getCause().getClass());
            Assertions.assertEquals((Object)"This Admin API is not yet supported when communicating directly with the controller quorum.", (Object)exception.getCause().getMessage());
        }
    }

    @ClusterTest
    public void testIncrementalAlterConfigsByControllers(ClusterInstance clusterInstance) throws Exception {
        this.testIncrementalAlterConfigs(clusterInstance, true);
    }

    @ClusterTest
    public void testIncrementalAlterConfigs(ClusterInstance clusterInstance) throws Exception {
        this.testIncrementalAlterConfigs(clusterInstance, false);
    }

    private void testIncrementalAlterConfigs(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, usingBootstrapControllers));){
            int nodeId = usingBootstrapControllers ? ((ControllerServer)clusterInstance.controllers().values().iterator().next()).config().nodeId() : ((KafkaBroker)clusterInstance.brokers().values().iterator().next()).config().nodeId();
            ConfigResource nodeResource = new ConfigResource(ConfigResource.Type.BROKER, "" + nodeId);
            ConfigResource defaultResource = new ConfigResource(ConfigResource.Type.BROKER, "");
            HashMap<ConfigResource, List<AlterConfigOp>> alterations = new HashMap<ConfigResource, List<AlterConfigOp>>();
            alterations.put(nodeResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry("my.custom.config", "foo"), AlterConfigOp.OpType.SET)));
            alterations.put(defaultResource, Collections.singletonList(new AlterConfigOp(new ConfigEntry("my.custom.config", "bar"), AlterConfigOp.OpType.SET)));
            admin.incrementalAlterConfigs(alterations).all().get(1L, TimeUnit.MINUTES);
            TestUtils.retryOnExceptionWithTimeout((long)30000L, () -> {
                Config config = (Config)((Map)admin.describeConfigs(Collections.singletonList(nodeResource)).all().get(1L, TimeUnit.MINUTES)).get(nodeResource);
                ConfigEntry entry = config.entries().stream().filter(e -> e.name().equals("my.custom.config")).findFirst().get();
                Assertions.assertEquals((Object)ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG, (Object)entry.source(), (String)("Expected entry for my.custom.config to come from DYNAMIC_BROKER_CONFIG. Instead, the entry was: " + String.valueOf(entry)));
            });
        }
    }

    @ClusterTest(brokers=3)
    public void testAlterReassignmentsWithBootstrapControllers(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
        String topicName = "foo";
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, false));){
            HashMap<Integer, List<Integer>> assignments = new HashMap<Integer, List<Integer>>();
            assignments.put(0, Arrays.asList(0, 1, 2));
            assignments.put(1, Arrays.asList(1, 2, 0));
            assignments.put(2, Arrays.asList(2, 1, 0));
            CreateTopicsResult createTopicResult = admin.createTopics(Collections.singletonList(new NewTopic(topicName, assignments)));
            createTopicResult.all().get();
            BootstrapControllersIntegrationTest.waitForTopics(admin, Collections.singleton(topicName));
            List<Integer> part0Reassignment = Arrays.asList(2, 1, 0);
            List<Integer> part1Reassignment = Arrays.asList(0, 1, 2);
            List<Integer> part2Reassignment = Arrays.asList(1, 2);
            HashMap<TopicPartition, Optional<NewPartitionReassignment>> reassignments = new HashMap<TopicPartition, Optional<NewPartitionReassignment>>();
            reassignments.put(new TopicPartition(topicName, 0), Optional.of(new NewPartitionReassignment(part0Reassignment)));
            reassignments.put(new TopicPartition(topicName, 1), Optional.of(new NewPartitionReassignment(part1Reassignment)));
            reassignments.put(new TopicPartition(topicName, 2), Optional.of(new NewPartitionReassignment(part2Reassignment)));
            try (Admin adminWithBootstrapControllers = Admin.create(this.adminConfig(clusterInstance, true));){
                adminWithBootstrapControllers.alterPartitionReassignments(reassignments).all().get();
                TestUtils.waitForCondition(() -> ((Map)adminWithBootstrapControllers.listPartitionReassignments().reassignments().get()).isEmpty(), (String)"The reassignment never completed.");
            }
            List<List> expectedMapping = Arrays.asList(part0Reassignment, part1Reassignment, part2Reassignment);
            TestUtils.waitForCondition(() -> {
                Map topicInfoMap = (Map)admin.describeTopics(Collections.singleton(topicName)).allTopicNames().get();
                if (topicInfoMap.containsKey(topicName)) {
                    List<List<Integer>> currentMapping = BootstrapControllersIntegrationTest.translatePartitionInfoToNodeIdList(((TopicDescription)topicInfoMap.get(topicName)).partitions());
                    return expectedMapping.equals(currentMapping);
                }
                return false;
            }, (String)("Timed out waiting for replica assignments for topic " + topicName));
        }
    }

    private static void waitForTopics(Admin admin, Set<String> expectedTopics) throws InterruptedException {
        TestUtils.waitForCondition(() -> ((Set)admin.listTopics().names().get()).containsAll(expectedTopics), (String)"timed out waiting for topics");
    }

    private static List<List<Integer>> translatePartitionInfoToNodeIdList(List<TopicPartitionInfo> partitions) {
        return partitions.stream().map(partition -> partition.replicas().stream().map(Node::id).collect(Collectors.toList())).collect(Collectors.toList());
    }

    @ClusterTest(serverProperties={@ClusterConfigProperty(key="super.users", value="User:ANONYMOUS"), @ClusterConfigProperty(key="authorizer.class.name", value="org.apache.kafka.metadata.authorizer.StandardAuthorizer")})
    public void testAclsByControllers(ClusterInstance clusterInstance) throws Exception {
        this.testAcls(clusterInstance, true);
    }

    @ClusterTest(serverProperties={@ClusterConfigProperty(key="super.users", value="User:ANONYMOUS"), @ClusterConfigProperty(key="authorizer.class.name", value="org.apache.kafka.metadata.authorizer.StandardAuthorizer")})
    public void testAcls(ClusterInstance clusterInstance) throws Exception {
        this.testAcls(clusterInstance, false);
    }

    private void testAcls(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
        try (Admin admin = Admin.create(this.adminConfig(clusterInstance, usingBootstrapControllers));){
            ResourcePattern resourcePattern = new ResourcePattern(ResourceType.TOPIC, "mytopic3", PatternType.LITERAL);
            AccessControlEntry accessControlEntry = new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW);
            AclBinding aclBinding = new AclBinding(resourcePattern, accessControlEntry);
            Assertions.assertDoesNotThrow(() -> (Void)admin.createAcls(Collections.singleton(aclBinding)).all().get(1L, TimeUnit.MINUTES));
            clusterInstance.waitAcls(new AclBindingFilter(resourcePattern.toFilter(), AccessControlEntryFilter.ANY), Collections.singleton(accessControlEntry));
            Collection aclBindings = (Collection)admin.describeAcls(AclBindingFilter.ANY).values().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)aclBindings.size());
            Assertions.assertEquals((Object)aclBinding, aclBindings.iterator().next());
            Collection deletedAclBindings = (Collection)admin.deleteAcls(Collections.singleton(AclBindingFilter.ANY)).all().get(1L, TimeUnit.MINUTES);
            Assertions.assertEquals((int)1, (int)deletedAclBindings.size());
            Assertions.assertEquals((Object)aclBinding, deletedAclBindings.iterator().next());
        }
    }
}

