/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.config;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.pulsar.common.config.PulsarConfigValidator;
import org.apache.flink.connector.pulsar.common.config.PulsarOptions;
import org.apache.flink.connector.pulsar.source.PulsarSourceOptions;
import org.apache.flink.connector.pulsar.source.config.PulsarConsumerBuilder;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;

@Internal
public final class PulsarSourceConfigUtils {
    private static final BatchReceivePolicy DISABLED_BATCH_RECEIVE_POLICY = BatchReceivePolicy.builder().timeout(0, TimeUnit.MILLISECONDS).maxNumMessages(1).build();
    public static final PulsarConfigValidator SOURCE_CONFIG_VALIDATOR = PulsarConfigValidator.builder().requiredOption(PulsarOptions.PULSAR_SERVICE_URL).requiredOption(PulsarOptions.PULSAR_ADMIN_URL).requiredOption(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME).conflictOptions(PulsarOptions.PULSAR_AUTH_PARAMS, PulsarOptions.PULSAR_AUTH_PARAM_MAP).build();

    private PulsarSourceConfigUtils() {
    }

    public static <T> ConsumerBuilder<T> createConsumerBuilder(PulsarClient client, Schema<T> schema, SourceConfiguration configuration) {
        PulsarConsumerBuilder builder = new PulsarConsumerBuilder(client, schema);
        configuration.useOption(PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME, builder::subscriptionName);
        configuration.useOption(PulsarSourceOptions.PULSAR_ACK_TIMEOUT_MILLIS, v -> builder.ackTimeout((long)v, TimeUnit.MILLISECONDS));
        configuration.useOption(PulsarSourceOptions.PULSAR_ACK_RECEIPT_ENABLED, builder::isAckReceiptEnabled);
        configuration.useOption(PulsarSourceOptions.PULSAR_TICK_DURATION_MILLIS, v -> builder.ackTimeoutTickTime((long)v, TimeUnit.MILLISECONDS));
        configuration.useOption(PulsarSourceOptions.PULSAR_NEGATIVE_ACK_REDELIVERY_DELAY_MICROS, v -> builder.negativeAckRedeliveryDelay((long)v, TimeUnit.MICROSECONDS));
        configuration.useOption(PulsarSourceOptions.PULSAR_SUBSCRIPTION_TYPE, builder::subscriptionType);
        configuration.useOption(PulsarSourceOptions.PULSAR_SUBSCRIPTION_MODE, builder::subscriptionMode);
        configuration.useOption(PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION, builder::cryptoFailureAction);
        configuration.useOption(PulsarSourceOptions.PULSAR_RECEIVER_QUEUE_SIZE, builder::receiverQueueSize);
        configuration.useOption(PulsarSourceOptions.PULSAR_ACKNOWLEDGEMENTS_GROUP_TIME_MICROS, v -> builder.acknowledgmentGroupTime((long)v, TimeUnit.MICROSECONDS));
        configuration.useOption(PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE, builder::replicateSubscriptionState);
        configuration.useOption(PulsarSourceOptions.PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS, builder::maxTotalReceiverQueueSizeAcrossPartitions);
        configuration.useOption(PulsarSourceOptions.PULSAR_CONSUMER_NAME, consumerName -> String.format(consumerName, UUID.randomUUID()), builder::consumerName);
        configuration.useOption(PulsarSourceOptions.PULSAR_READ_COMPACTED, builder::readCompacted);
        configuration.useOption(PulsarSourceOptions.PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
        PulsarSourceConfigUtils.createDeadLetterPolicy(configuration).ifPresent(builder::deadLetterPolicy);
        configuration.useOption(PulsarSourceOptions.PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS, v -> builder.autoUpdatePartitionsInterval((int)v, TimeUnit.SECONDS));
        configuration.useOption(PulsarSourceOptions.PULSAR_RETRY_ENABLE, builder::enableRetry);
        configuration.useOption(PulsarSourceOptions.PULSAR_MAX_PENDING_CHUNKED_MESSAGE, builder::maxPendingChunkedMessage);
        configuration.useOption(PulsarSourceOptions.PULSAR_AUTO_ACK_OLDEST_CHUNKED_MESSAGE_ON_QUEUE_FULL, builder::autoAckOldestChunkedMessageOnQueueFull);
        configuration.useOption(PulsarSourceOptions.PULSAR_EXPIRE_TIME_OF_INCOMPLETE_CHUNKED_MESSAGE_MILLIS, v -> builder.expireTimeOfIncompleteChunkedMessage((long)v, TimeUnit.MILLISECONDS));
        configuration.useOption(PulsarSourceOptions.PULSAR_POOL_MESSAGES, builder::poolMessages);
        Map<String, String> properties = configuration.getProperties(PulsarSourceOptions.PULSAR_CONSUMER_PROPERTIES);
        if (!properties.isEmpty()) {
            builder.properties(properties);
        }
        builder.batchReceivePolicy(DISABLED_BATCH_RECEIVE_POLICY);
        return builder;
    }

    private static Optional<DeadLetterPolicy> createDeadLetterPolicy(SourceConfiguration configuration) {
        if (configuration.contains(PulsarSourceOptions.PULSAR_MAX_REDELIVER_COUNT) || configuration.contains(PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC) || configuration.contains(PulsarSourceOptions.PULSAR_DEAD_LETTER_TOPIC)) {
            DeadLetterPolicy.DeadLetterPolicyBuilder builder = DeadLetterPolicy.builder();
            configuration.useOption(PulsarSourceOptions.PULSAR_MAX_REDELIVER_COUNT, builder::maxRedeliverCount);
            configuration.useOption(PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC, builder::retryLetterTopic);
            configuration.useOption(PulsarSourceOptions.PULSAR_DEAD_LETTER_TOPIC, builder::deadLetterTopic);
            return Optional.of(builder.build());
        }
        return Optional.empty();
    }
}

