/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class TopicMetadataRequestManager
implements RequestManager {
    private final boolean allowAutoTopicCreation;
    private final Map<Optional<String>, TopicMetadataRequestState> inflightRequests;
    private final long retryBackoffMs;
    private final Logger log;
    private final LogContext logContext;

    public TopicMetadataRequestManager(LogContext context, ConsumerConfig config) {
        this.logContext = context;
        this.log = this.logContext.logger(this.getClass());
        this.inflightRequests = new HashMap<Optional<String>, TopicMetadataRequestState>();
        this.retryBackoffMs = config.getLong("retry.backoff.ms");
        this.allowAutoTopicCreation = config.getBoolean("allow.auto.create.topics");
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        List requests = this.inflightRequests.values().stream().map(req -> ((TopicMetadataRequestState)req).send(currentTimeMs)).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
        return requests.isEmpty() ? new NetworkClientDelegate.PollResult(Long.MAX_VALUE, new ArrayList<NetworkClientDelegate.UnsentRequest>()) : new NetworkClientDelegate.PollResult(0L, Collections.unmodifiableList(requests));
    }

    public CompletableFuture<Map<String, List<PartitionInfo>>> requestTopicMetadata(Optional<String> topic) {
        if (this.inflightRequests.containsKey(topic)) {
            return this.inflightRequests.get(topic).future;
        }
        TopicMetadataRequestState newRequest = new TopicMetadataRequestState(this.logContext, topic, this.retryBackoffMs);
        this.inflightRequests.put(topic, newRequest);
        return newRequest.future;
    }

    List<TopicMetadataRequestState> inflightRequests() {
        return new ArrayList<TopicMetadataRequestState>(this.inflightRequests.values());
    }

    class TopicMetadataRequestState
    extends RequestState {
        private final Optional<String> topic;
        CompletableFuture<Map<String, List<PartitionInfo>>> future;

        public TopicMetadataRequestState(LogContext logContext, Optional<String> topic, long retryBackoffMs) {
            super(logContext, TopicMetadataRequestState.class.getSimpleName(), retryBackoffMs);
            this.future = new CompletableFuture();
            this.topic = topic;
        }

        private Optional<NetworkClientDelegate.UnsentRequest> send(long currentTimeMs) {
            if (!this.canSendRequest(currentTimeMs)) {
                return Optional.empty();
            }
            this.onSendAttempt(currentTimeMs);
            MetadataRequest.Builder request = this.topic.map(t -> new MetadataRequest.Builder(Collections.singletonList(t), TopicMetadataRequestManager.this.allowAutoTopicCreation)).orElseGet(MetadataRequest.Builder::allTopics);
            return Optional.of(this.createUnsentRequest(request));
        }

        private NetworkClientDelegate.UnsentRequest createUnsentRequest(MetadataRequest.Builder request) {
            return new NetworkClientDelegate.UnsentRequest(request, Optional.empty(), this::processResponseOrException);
        }

        private void processResponseOrException(ClientResponse response, Throwable exception) {
            if (exception == null) {
                this.handleResponse(response, response.receivedTimeMs());
                return;
            }
            if (exception instanceof RetriableException) {
                this.onFailedAttempt(response.receivedTimeMs());
            } else {
                this.completeFutureAndRemoveRequest(new KafkaException(exception));
            }
        }

        private void handleResponse(ClientResponse response, long responseTimeMs) {
            try {
                Map<String, List<PartitionInfo>> res = this.handleTopicMetadataResponse((MetadataResponse)response.responseBody());
                this.future.complete(res);
                TopicMetadataRequestManager.this.inflightRequests.remove(this.topic);
            }
            catch (RetriableException e) {
                this.onFailedAttempt(responseTimeMs);
            }
            catch (Exception t) {
                this.completeFutureAndRemoveRequest(t);
            }
        }

        private void completeFutureAndRemoveRequest(Throwable throwable) {
            this.future.completeExceptionally(throwable);
            TopicMetadataRequestManager.this.inflightRequests.remove(this.topic);
        }

        private Map<String, List<PartitionInfo>> handleTopicMetadataResponse(MetadataResponse response) {
            Cluster cluster = response.buildCluster();
            Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
            if (!unauthorizedTopics.isEmpty()) {
                throw new TopicAuthorizationException(unauthorizedTopics);
            }
            Map<String, Errors> errors = response.errors();
            if (!errors.isEmpty()) {
                TopicMetadataRequestManager.this.log.debug("Topic metadata fetch included errors: {}", errors);
                for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
                    String topic = errorEntry.getKey();
                    Errors error = errorEntry.getValue();
                    if (error == Errors.INVALID_TOPIC_EXCEPTION) {
                        throw new InvalidTopicException("Topic '" + topic + "' is invalid");
                    }
                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) continue;
                    if (error.exception() instanceof RetriableException) {
                        throw error.exception();
                    }
                    throw new KafkaException("Unexpected error fetching metadata for topic " + topic, error.exception());
                }
            }
            HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<String, List<PartitionInfo>>();
            for (String topic : cluster.topics()) {
                topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic));
            }
            return topicsPartitionInfos;
        }

        public Optional<String> topic() {
            return this.topic;
        }
    }
}

