/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.security.authenticator;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkTestUtils;
import org.apache.kafka.common.network.NioEchoServer;
import org.apache.kafka.common.security.TestSecurityConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.authenticator.LoginManager;
import org.apache.kafka.common.security.authenticator.TestJaasConfig;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class ClientAuthenticationFailureTest {
    private static final MockTime TIME = new MockTime(50L);
    private NioEchoServer server;
    private Map<String, Object> saslServerConfigs;
    private Map<String, Object> saslClientConfigs;
    private final String topic = "test";

    @BeforeEach
    public void setup() throws Exception {
        LoginManager.closeAll();
        SecurityProtocol securityProtocol = SecurityProtocol.SASL_PLAINTEXT;
        this.saslServerConfigs = new HashMap<String, Object>();
        this.saslServerConfigs.put("sasl.enabled.mechanisms", Collections.singletonList("PLAIN"));
        this.saslClientConfigs = new HashMap<String, Object>();
        this.saslClientConfigs.put("security.protocol", "SASL_PLAINTEXT");
        this.saslClientConfigs.put("sasl.mechanism", "PLAIN");
        TestJaasConfig testJaasConfig = TestJaasConfig.createConfiguration("PLAIN", Collections.singletonList("PLAIN"));
        testJaasConfig.setClientOptions("PLAIN", "myuser", "anotherpassword");
        this.server = this.createEchoServer(securityProtocol);
    }

    @AfterEach
    public void teardown() throws Exception {
        if (this.server != null) {
            this.server.close();
        }
    }

    @Test
    public void testConsumerWithInvalidCredentials() {
        HashMap<String, Object> props = new HashMap<String, Object>(this.saslClientConfigs);
        props.put("bootstrap.servers", "localhost:" + this.server.port());
        StringDeserializer deserializer = new StringDeserializer();
        try (KafkaConsumer consumer = new KafkaConsumer(props, (Deserializer)deserializer, (Deserializer)deserializer);){
            Assertions.assertThrows(SaslAuthenticationException.class, () -> {
                consumer.assign(Collections.singleton(new TopicPartition("test", 0)));
                consumer.poll(Duration.ofSeconds(10L));
            });
        }
    }

    @Test
    public void testProducerWithInvalidCredentials() {
        HashMap<String, Object> props = new HashMap<String, Object>(this.saslClientConfigs);
        props.put("bootstrap.servers", "localhost:" + this.server.port());
        StringSerializer serializer = new StringSerializer();
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)serializer, (Serializer)serializer);){
            ProducerRecord record = new ProducerRecord("test", (Object)"message");
            Future future = producer.send(record);
            TestUtils.assertFutureThrows(SaslAuthenticationException.class, future);
        }
    }

    @Test
    public void testAdminClientWithInvalidCredentials() {
        HashMap<String, Object> props = new HashMap<String, Object>(this.saslClientConfigs);
        props.put("bootstrap.servers", "localhost:" + this.server.port());
        try (Admin client = Admin.create(props);){
            KafkaFuture future = client.describeTopics(Collections.singleton("test")).allTopicNames();
            TestUtils.assertFutureThrows(SaslAuthenticationException.class, future);
        }
    }

    @Test
    public void testTransactionalProducerWithInvalidCredentials() {
        HashMap<String, Object> props = new HashMap<String, Object>(this.saslClientConfigs);
        props.put("bootstrap.servers", "localhost:" + this.server.port());
        props.put("transactional.id", "txclient-1");
        props.put("enable.idempotence", "true");
        StringSerializer serializer = new StringSerializer();
        try (KafkaProducer producer = new KafkaProducer(props, (Serializer)serializer, (Serializer)serializer);){
            Assertions.assertThrows(SaslAuthenticationException.class, () -> ((KafkaProducer)producer).initTransactions());
        }
    }

    private NioEchoServer createEchoServer(SecurityProtocol securityProtocol) throws Exception {
        return this.createEchoServer(ListenerName.forSecurityProtocol((SecurityProtocol)securityProtocol), securityProtocol);
    }

    private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
        return NetworkTestUtils.createEchoServer(listenerName, securityProtocol, new TestSecurityConfig(this.saslServerConfigs), new CredentialCache(), TIME);
    }
}

