/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.server.plugins.policy;

import io.confluent.kafka.multitenant.MultiTenantConfigRestrictions;
import io.confluent.kafka.server.plugins.policy.AbstractPolicyConfig;
import io.confluent.kafka.server.plugins.policy.PolicyUtils;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkUtils;
import kafka.server.link.ConnectionMode;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.PolicyViolationException;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.utils.Utils;

public class ClusterLinkPolicyConfig
extends AbstractPolicyConfig {
    public static final String BASE_PREFIX = "confluent.plugins.";
    public static final String CLUSTER_LINK_PREFIX = "confluent.plugins.link.policy.";
    public static final String DOC_MIN_PREFIX = "The minimum allowed value for the ";
    public static final String DOC_MAX_PREFIX = "The maximum allowed value for the ";
    public static final String DOC_SUFFIX = " cluster link config property.";
    public static final String ACL_SYNC_MS_MIN_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.AclSyncMsProp() + ".min";
    public static final int DEFAULT_ACL_SYNC_MS_MIN = 1000;
    protected static final String ACL_SYNC_MS_MIN_CONFIG_DOC = "The minimum allowed value for the " + ClusterLinkConfig.AclSyncMsProp() + " cluster link config property.";
    public static final String ACL_SYNC_MS_MAX_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.AclSyncMsProp() + ".max";
    public static final int DEFAULT_ACL_SYNC_MS_MAX = 300000;
    protected static final String ACL_SYNC_MS_MAX_CONFIG_DOC = "The maximum allowed value for the " + ClusterLinkConfig.AclSyncMsProp() + " cluster link config property.";
    public static final String CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + ".min";
    public static final int DEFAULT_CONSUMER_OFFSET_SYNC_MS_MIN = 1000;
    protected static final String CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG_DOC = "The minimum allowed value for the " + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + " cluster link config property.";
    public static final String CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + ".max";
    public static final int DEFAULT_CONSUMER_OFFSET_SYNC_MS_MAX = 300000;
    protected static final String CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG_DOC = "The maximum allowed value for the " + ClusterLinkConfig.ConsumerOffsetSyncMsProp() + " cluster link config property.";
    public static final String TOPIC_CONFIG_SYNC_MS_MIN_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.TopicConfigSyncMsProp() + ".min";
    public static final int DEFAULT_TOPIC_CONFIG_SYNC_MS_MIN = 1000;
    protected static final String TOPIC_CONFIG_SYNC_MS_MIN_CONFIG_DOC = "The minimum allowed value for the " + ClusterLinkConfig.TopicConfigSyncMsProp() + " cluster link config property.";
    public static final String TOPIC_CONFIG_SYNC_MS_MAX_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.TopicConfigSyncMsProp() + ".max";
    public static final int DEFAULT_TOPIC_CONFIG_SYNC_MS_MAX = 300000;
    protected static final String TOPIC_CONFIG_SYNC_MS_MAX_CONFIG_DOC = "The maximum allowed value for the " + ClusterLinkConfig.TopicConfigSyncMsProp() + " cluster link config property.";
    public static final String SASL_MECHANISMS_ALLOWED_CONFIG = "confluent.plugins.link.policy.sasl.mechanism.allowed";
    public static final List<String> DEFAULT_SASL_MECHANISMS_ALLOWED = Arrays.asList("PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512", "OAUTHBEARER");
    protected static final String SASL_MECHANISMS_ALLOWED_CONFIG_DOC = "The allowed values for the sasl.mechanism cluster link config property.";
    public static final String SASL_LOGIN_MODULES_ALLOWED_CONFIG = "confluent.plugins.link.policy.sasl.login.module.allowed";
    public static final List<String> DEFAULT_SASL_LOGIN_MODULES_ALLOWED = Arrays.asList(PlainLoginModule.class.getName(), ScramLoginModule.class.getName(), OAuthBearerLoginModule.class.getName());
    protected static final String SASL_LOGIN_MODULES_ALLOWED_CONFIG_DOC = "The allowed values for the login module specified in sasl.jaas.config cluster link config property.";
    public static final String SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG = "confluent.plugins.link.policy.sasl.login.callback.handler.class.allowed";
    public static final List<String> DEFAULT_SASL_LOGIN_CALLBACK_HANDLER_ALLOWED = Collections.singletonList(OAuthBearerLoginCallbackHandler.class.getName());
    protected static final String SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG_DOC = "The allowed values for the login callback handler specified in sasl.login.callback.handler.class cluster link config property.";
    public static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_CONFIG = "confluent.plugins.link.policy." + KafkaConfig.ReplicaSocketReceiveBufferBytesProp() + ".min";
    public static final int DEFAULT_REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN = 32768;
    protected static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_DOC = "The minimum allowed value for the " + KafkaConfig.ReplicaSocketReceiveBufferBytesProp() + " cluster link config property.";
    public static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_CONFIG = "confluent.plugins.link.policy." + KafkaConfig.ReplicaSocketReceiveBufferBytesProp() + ".max";
    public static final int DEFAULT_REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX = 0x100000;
    protected static final String REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_DOC = "The maximum allowed value for the " + KafkaConfig.ReplicaSocketReceiveBufferBytesProp() + " cluster link config property.";
    public static final String AVAILABILITY_CHECK_MS_MIN_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.AvailabilityCheckMsProp() + ".min";
    public static final int DEFAULT_AVAILABILITY_CHECK_MS_MIN = 3000;
    public static final String AVAILABILITY_CHECK_MS_MIN_DOC = "The minimum allowed value for the " + ClusterLinkConfig.AvailabilityCheckMsProp() + " cluster link config property.";
    public static final String AVAILABILITY_CHECK_MS_MAX_CONFIG = "confluent.plugins.link.policy." + ClusterLinkConfig.AvailabilityCheckMsProp() + ".max";
    public static final int DEFAULT_AVAILABILITY_CHECK_MS_MAX = 60000;
    public static final String AVAILABILITY_CHECK_MS_MAX_DOC = "The maximum allowed value for the " + ClusterLinkConfig.AvailabilityCheckMsProp() + " cluster link config property.";
    public static final String MAX_DEST_LINKS_PER_TENANT_CONFIG = "confluent.plugins.cluster.link.policy.max.destination.links.per.tenant";
    public static final String MAX_SOURCE_LINKS_PER_TENANT_CONFIG = "confluent.plugins.cluster.link.policy.max.source.links.per.tenant";
    private static final ConfigDef CONFIG;
    private final Set<String> allowedSaslMechanisms = new HashSet<String>();
    private final Set<String> allowedSaslLoginModules = new HashSet<String>();
    private final Set<String> allowedSaslLoginCallbackHandlers = new HashSet<String>();
    private static final Set<String> ALLOWED_CCLOUD_TO_CCLOUD_SECURITY_PROTOCOLS;

    public ClusterLinkPolicyConfig(Map<String, ?> clientConfigs) {
        super(CONFIG, clientConfigs);
        this.allowedSaslMechanisms.addAll(this.getList(SASL_MECHANISMS_ALLOWED_CONFIG));
        this.allowedSaslLoginModules.addAll(this.getList(SASL_LOGIN_MODULES_ALLOWED_CONFIG));
        this.allowedSaslLoginCallbackHandlers.addAll(this.getList(SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG));
    }

    public static void main(String[] args) {
        System.out.println(CONFIG.toRst());
    }

    private void validateConfigsAreInRange(Map<String, String> configs) {
        Set nonUpdatableConfigs;
        this.checkPolicyMin(configs, ACL_SYNC_MS_MIN_CONFIG, ClusterLinkConfig.AclSyncMsProp());
        this.checkPolicyMax(configs, ACL_SYNC_MS_MAX_CONFIG, ClusterLinkConfig.AclSyncMsProp());
        this.checkPolicyMin(configs, CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG, ClusterLinkConfig.ConsumerOffsetSyncMsProp());
        this.checkPolicyMax(configs, CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG, ClusterLinkConfig.ConsumerOffsetSyncMsProp());
        this.checkPolicyMin(configs, TOPIC_CONFIG_SYNC_MS_MIN_CONFIG, ClusterLinkConfig.TopicConfigSyncMsProp());
        this.checkPolicyMax(configs, TOPIC_CONFIG_SYNC_MS_MAX_CONFIG, ClusterLinkConfig.TopicConfigSyncMsProp());
        this.checkPolicyMin(configs, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_CONFIG, KafkaConfig.ReplicaSocketReceiveBufferBytesProp());
        this.checkPolicyMax(configs, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_CONFIG, KafkaConfig.ReplicaSocketReceiveBufferBytesProp());
        this.checkPolicyMin(configs, AVAILABILITY_CHECK_MS_MIN_CONFIG, ClusterLinkConfig.AvailabilityCheckMsProp());
        this.checkPolicyMax(configs, AVAILABILITY_CHECK_MS_MAX_CONFIG, ClusterLinkConfig.AvailabilityCheckMsProp());
        List<String> syncConfigs = this.parseList(configs, ClusterLinkConfig.TopicConfigSyncIncludeProp());
        if (syncConfigs != null && !(nonUpdatableConfigs = syncConfigs.stream().filter(n -> !MultiTenantConfigRestrictions.updatableTopicConfig(n, false)).collect(Collectors.toSet())).isEmpty()) {
            throw new PolicyViolationException(ClusterLinkConfig.TopicConfigSyncIncludeProp() + " should not include " + nonUpdatableConfigs);
        }
    }

    public void validateClusterLinkConfigs(Map<String, String> configs) {
        if (configs == null) {
            return;
        }
        PolicyUtils.validateConfigsAreUpdatable(configs, MultiTenantConfigRestrictions.UPDATABLE_CLUSTER_LINK_CONFIGS::contains);
        this.validateConfigsAreInRange(configs);
        this.validateSecurityConfigs(configs);
        ClusterLinkPolicyConfig.validateBootstrap(configs);
    }

    void validateSecurityConfigs(Map<String, String> configs) {
        if (ClusterLinkPolicyConfig.usesInboundConnections(configs)) {
            return;
        }
        this.validateSecurityProtocol(configs);
        String saslMechanism = configs.get("sasl.mechanism");
        if (saslMechanism != null && !this.allowedSaslMechanisms.contains(saslMechanism.toUpperCase(Locale.ROOT))) {
            throw new PolicyViolationException("sasl.mechanism=" + saslMechanism + " must be one of: " + this.allowedSaslMechanisms);
        }
        this.validateSaslJaasConfig(configs.get("sasl.jaas.config"));
        String loginCallbackHandler = configs.get("sasl.login.callback.handler.class");
        if (loginCallbackHandler != null && !this.allowedSaslLoginCallbackHandlers.contains(loginCallbackHandler)) {
            throw new PolicyViolationException("sasl.login.callback.handler.class=" + loginCallbackHandler + ", can only be set to " + OAuthBearerLoginCallbackHandler.class);
        }
        this.validateOAuthTokenEndpoint(configs.get("sasl.oauthbearer.token.endpoint.url"));
    }

    private void validateSecurityProtocol(Map<String, String> configs) {
        String securityProtocol = configs.get("security.protocol");
        ConnectionMode connectionMode = ClusterLinkConfig.clusterLinkConnectionMode(configs);
        String bootstrapConfig = configs.get("bootstrap.servers");
        if (bootstrapConfig == null || securityProtocol == null || connectionMode == null) {
            return;
        }
        List bootstrapServers = (List)ConfigDef.parseType((String)"bootstrap.servers", (Object)bootstrapConfig, (ConfigDef.Type)ConfigDef.Type.LIST);
        if (!ALLOWED_CCLOUD_TO_CCLOUD_SECURITY_PROTOCOLS.contains(securityProtocol.toUpperCase(Locale.ROOT)) && ClusterLinkUtils.isOutboundBootstrapCCloudHost((Map)this.values(), (ConnectionMode)connectionMode, (List)bootstrapServers)) {
            throw new PolicyViolationException(String.format("Invalid security protocol %s for a Confluent Cloud to Confluent Cloud link, it must be SASL_SSL.", securityProtocol));
        }
    }

    private void validateSaslJaasConfig(String saslJaasConfig) {
        if (saslJaasConfig != null && !saslJaasConfig.trim().isEmpty()) {
            JaasContext jaasContext;
            try {
                jaasContext = JaasContext.loadClientContext(Collections.singletonMap("sasl.jaas.config", new Password(saslJaasConfig)));
            }
            catch (Exception e) {
                throw new InvalidConfigurationException("sasl.jaas.config could not be loaded." + saslJaasConfig);
            }
            jaasContext.configurationEntries().forEach(config -> {
                String loginModule = config.getLoginModuleName();
                if (loginModule != null && !this.allowedSaslLoginModules.contains(loginModule)) {
                    throw new PolicyViolationException("sasl.jaas.config contains unsupported login moodule '" + loginModule + "', must be one of: " + this.allowedSaslLoginModules);
                }
            });
        }
    }

    private void validateOAuthTokenEndpoint(String tokenEndpoint) {
        try {
            String protocol;
            if (!(tokenEndpoint == null || tokenEndpoint.isEmpty() || (protocol = new URL(tokenEndpoint).getProtocol().toLowerCase(Locale.ROOT)).equals("http") || protocol.equals("https"))) {
                throw new PolicyViolationException("sasl.oauthbearer.token.endpoint.url=" + tokenEndpoint + " contains unsupported protocol '" + protocol + "', only http and https are supported.");
            }
        }
        catch (MalformedURLException e) {
            throw new InvalidConfigurationException("sasl.oauthbearer.token.endpoint.url=" + tokenEndpoint + " contains malformed URL : " + e);
        }
    }

    static void validateBootstrap(Map<String, String> configs) {
        String bootstrapConfig = configs.get("bootstrap.servers");
        String dnsLookupConfig = configs.get("client.dns.lookup");
        if (bootstrapConfig == null || ClusterLinkPolicyConfig.usesInboundConnections(configs)) {
            return;
        }
        ClientDnsLookup dnsLookup = dnsLookupConfig == null ? ClientDnsLookup.USE_ALL_DNS_IPS : ClientDnsLookup.forConfig((String)dnsLookupConfig);
        List bootstrapServers = (List)ConfigDef.parseType((String)"bootstrap.servers", (Object)bootstrapConfig, (ConfigDef.Type)ConfigDef.Type.LIST);
        ArrayList<InetSocketAddress> invalidAddresses = new ArrayList<InetSocketAddress>();
        for (InetSocketAddress inetSocketAddress : ClientUtils.parseAndValidateAddresses((List)bootstrapServers, (ClientDnsLookup)dnsLookup)) {
            if (!ClusterLinkUtils.isInternalNetworkOrPort((InetSocketAddress)inetSocketAddress)) continue;
            invalidAddresses.add(inetSocketAddress);
        }
        if (!invalidAddresses.isEmpty()) {
            throw new PolicyViolationException("Invalid bootstrap addresses or ports that cannot be used for cluster links on Confluent Cloud: " + invalidAddresses);
        }
    }

    private static boolean usesInboundConnections(Map<String, String> configs) {
        ConnectionMode connectionMode = ClusterLinkConfig.clusterLinkConnectionMode(configs);
        return connectionMode == ConnectionMode.Inbound$.MODULE$;
    }

    static {
        ALLOWED_CCLOUD_TO_CCLOUD_SECURITY_PROTOCOLS = Utils.mkSet((Object[])new String[]{SecurityProtocol.SASL_SSL.name});
        CONFIG = new ConfigDef().define(ACL_SYNC_MS_MIN_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.LOW, ACL_SYNC_MS_MIN_CONFIG_DOC).define(ACL_SYNC_MS_MAX_CONFIG, ConfigDef.Type.INT, (Object)300000, ConfigDef.Importance.LOW, ACL_SYNC_MS_MAX_CONFIG_DOC).define(CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.LOW, CONSUMER_OFFSET_SYNC_MS_MIN_CONFIG_DOC).define(CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG, ConfigDef.Type.INT, (Object)300000, ConfigDef.Importance.LOW, CONSUMER_OFFSET_SYNC_MS_MAX_CONFIG_DOC).define(TOPIC_CONFIG_SYNC_MS_MIN_CONFIG, ConfigDef.Type.INT, (Object)1000, ConfigDef.Importance.LOW, TOPIC_CONFIG_SYNC_MS_MIN_CONFIG_DOC).define(TOPIC_CONFIG_SYNC_MS_MAX_CONFIG, ConfigDef.Type.INT, (Object)300000, ConfigDef.Importance.LOW, TOPIC_CONFIG_SYNC_MS_MAX_CONFIG_DOC).define(SASL_MECHANISMS_ALLOWED_CONFIG, ConfigDef.Type.LIST, DEFAULT_SASL_MECHANISMS_ALLOWED, ConfigDef.Importance.MEDIUM, SASL_MECHANISMS_ALLOWED_CONFIG_DOC).define(SASL_LOGIN_MODULES_ALLOWED_CONFIG, ConfigDef.Type.LIST, DEFAULT_SASL_LOGIN_MODULES_ALLOWED, ConfigDef.Importance.MEDIUM, SASL_LOGIN_MODULES_ALLOWED_CONFIG_DOC).define(SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG, ConfigDef.Type.LIST, DEFAULT_SASL_LOGIN_CALLBACK_HANDLER_ALLOWED, ConfigDef.Importance.MEDIUM, SASL_LOGIN_CALLBACK_HANDLER_ALLOWED_CONFIG_DOC).define(REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_CONFIG, ConfigDef.Type.INT, (Object)32768, ConfigDef.Importance.LOW, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MIN_DOC).define(REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_CONFIG, ConfigDef.Type.INT, (Object)0x100000, ConfigDef.Importance.LOW, REPLICA_SOCKET_RECEIVE_BUFFER_BYTES_MAX_DOC).define(AVAILABILITY_CHECK_MS_MIN_CONFIG, ConfigDef.Type.INT, (Object)3000, ConfigDef.Importance.LOW, AVAILABILITY_CHECK_MS_MIN_DOC).define(AVAILABILITY_CHECK_MS_MAX_CONFIG, ConfigDef.Type.INT, (Object)60000, ConfigDef.Importance.LOW, AVAILABILITY_CHECK_MS_MAX_DOC).define(MAX_DEST_LINKS_PER_TENANT_CONFIG, ConfigDef.Type.INT, (Object)5, ConfigDef.Importance.LOW, "The maximum destination cluster links per tenant.").define(MAX_SOURCE_LINKS_PER_TENANT_CONFIG, ConfigDef.Type.INT, (Object)5, ConfigDef.Importance.LOW, "The maximum source cluster links per tenant. This limit is only applicable to source initiated links.");
    }
}

