/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.admin.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ClusterLinkDescription;
import org.apache.kafka.clients.admin.ClusterLinkTaskError;
import org.apache.kafka.clients.admin.MirrorTopicDescription;
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy;
import org.apache.kafka.clients.admin.internals.AllBrokersStrategy;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.CoordinatorStrategy;
import org.apache.kafka.common.ClusterLinkError;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.MirrorTopicError;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.DescribeClusterLinksResponseData;
import org.apache.kafka.common.message.DescribeMirrorsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.DescribeClusterLinksRequest;
import org.apache.kafka.common.requests.DescribeClusterLinksResponse;
import org.apache.kafka.common.requests.DescribeMirrorsRequest;
import org.apache.kafka.common.requests.DescribeMirrorsResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class ClusterLink {
    private static String describeClusterLinksApiName() {
        return "describeClusterLinks";
    }

    private static String describeMirrorsApiName() {
        return "describeMirrors";
    }

    private static String mirrorTopicResourceName() {
        return "mirror topic";
    }

    private static DescribeClusterLinksRequest.Builder buildDescribeClusterLinksRequestBuilder(Optional<Collection<String>> linkNames, boolean includeTopics, boolean includeTasks, Integer timeoutMs) {
        return new DescribeClusterLinksRequest.Builder(linkNames, includeTopics, includeTasks, timeoutMs, 4, ApiKeys.DESCRIBE_CLUSTER_LINKS.latestVersion());
    }

    private static MirrorTopicDescription toMirrorTopicDescription(DescribeMirrorsResponseData.TopicData topicData) {
        MirrorTopicDescription.State state = ClusterLink.toState(topicData.state());
        MirrorTopicError failureReason = MirrorTopicError.forCode(topicData.mirrorTopicError(), state == MirrorTopicDescription.State.FAILED);
        List<ClusterLinkTaskError> transitionsErrs = ClusterLink.toMirrorStateTransitionErrors(topicData.mirrorStateTransitionErrors());
        return new MirrorTopicDescription(topicData.linkName(), Utils.toKafkaUuid(UUID.fromString(topicData.linkId())), topicData.mirrorTopic(), topicData.numPartitions(), ClusterLink.toState(topicData.state()), topicData.stateTimeMs(), topicData.stoppedLogEndOffsets(), topicData.stoppedEpochs(), topicData.sourceTopicId(), failureReason, transitionsErrs, topicData.stoppedSequenceNumber());
    }

    private static List<ClusterLinkTaskError> toMirrorStateTransitionErrors(List<DescribeMirrorsResponseData.MirrorStateTransitionErrorData> errs) {
        if (errs == null) {
            return Collections.emptyList();
        }
        return errs.stream().filter(err -> err != null).map(err -> new ClusterLinkTaskError(ClusterLinkTaskError.ClusterLinkTaskErrorCode.fromShort(err.mirrorStateTransitionErrorCode()), err.mirrorStateTransitionErrorMessage())).collect(Collectors.toList());
    }

    private static MirrorTopicDescription.State toState(String state) {
        try {
            return MirrorTopicDescription.State.valueOf(state);
        }
        catch (IllegalArgumentException e) {
            return MirrorTopicDescription.State.UNKNOWN;
        }
    }

    private static ClusterLinkDescription toClusterLinkDescription(DescribeClusterLinksResponseData.EntryData entryData) {
        ClusterLinkDescription linkDescription = new ClusterLinkDescription.Builder().setLinkName(entryData.linkName()).setLinkId(entryData.linkId()).setRemoteClusterId(entryData.remoteClusterId()).setLocalClusterId(entryData.localClusterId()).setTopics(entryData.topics()).setLinkState(ClusterLinkDescription.LinkState.fromShort(entryData.linkState())).setLinkMode(ClusterLinkDescription.LinkMode.fromShort(entryData.linkMode())).setConnectionMode(ClusterLinkDescription.ConnectionMode.fromShort(entryData.connectionMode())).setClusterLinkError(ClusterLinkError.fromShort(entryData.linkErrorCode())).setLinkErrorMessage(entryData.linkErrorMessage()).setLinkCoordinator(new Node(entryData.linkCoordinatorId(), entryData.linkCoordinatorHost(), entryData.linkCoordinatorPort())).setRemoteLinkState(ClusterLinkDescription.LinkState.fromShort(entryData.remoteLinkState())).setRemoteLinkError(ClusterLinkError.fromShort(entryData.remoteLinkErrorCode())).setRemoteLinkErrorMessage(entryData.remoteLinkErrorMessage()).setRemoteLinkState(ClusterLinkDescription.LinkState.fromShort(entryData.remoteLinkState())).setRemoteLinkStateTimeMs(entryData.remoteLinkStateTimeMs()).setTaskDescriptions(DescribeClusterLinksResponse.toAdminClientLinkTaskDescriptions(entryData.tasks())).build();
        return linkDescription;
    }

    public static KafkaFuture<Collection<ClusterLinkDescription>> allBrokersFutureToResultFuture(KafkaFutureImpl<Map<Integer, KafkaFutureImpl<Collection<ClusterLinkDescription>>>> brokerIdToDescriptionsFuture, Consumer<DescribeClusterLinksToControllerInput> unsupportedVersionHandler, Logger log) {
        KafkaFutureImpl<Collection<ClusterLinkDescription>> futureOfDescriptions = new KafkaFutureImpl<Collection<ClusterLinkDescription>>();
        HashSet descriptions = new HashSet();
        brokerIdToDescriptionsFuture.whenComplete((map, topLevelException) -> {
            if (topLevelException != null) {
                log.warn("Failed to get descriptions from all brokers.", topLevelException);
                futureOfDescriptions.completeExceptionally((Throwable)topLevelException);
            } else {
                HashSet remainingResponses = new HashSet(map.keySet());
                map.forEach((brokerId, future) -> future.whenComplete((desc, brokerException) -> {
                    if (brokerException != null) {
                        futureOfDescriptions.completeExceptionally((Throwable)brokerException);
                    } else if (!futureOfDescriptions.isDone()) {
                        descriptions.addAll(desc);
                        remainingResponses.remove(brokerId);
                        if (remainingResponses.isEmpty()) {
                            futureOfDescriptions.complete(descriptions);
                        }
                    }
                }));
            }
        });
        return ClusterLink.toFutureHandlingUnsupportedVersion(futureOfDescriptions, unsupportedVersionHandler, log);
    }

    public static KafkaFuture<Collection<ClusterLinkDescription>> linkCoordinatorsFutureToResultFuture(AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ClusterLinkDescription> linkCoordinatorsHandlerFuture, Consumer<DescribeClusterLinksToControllerInput> unsupportedVersionHandler, Logger log) {
        Collection<KafkaFuture<ClusterLinkDescription>> futuresOfDescriptions = linkCoordinatorsHandlerFuture.all().values();
        KafkaFuture<Collection<ClusterLinkDescription>> futureOfDescriptions = KafkaFuture.allOf(futuresOfDescriptions.toArray(new KafkaFuture[0])).thenApply(v -> {
            try {
                HashSet<ClusterLinkDescription> descriptions = new HashSet<ClusterLinkDescription>(futuresOfDescriptions.size());
                for (KafkaFuture entry : futuresOfDescriptions) {
                    ClusterLinkDescription desc = (ClusterLinkDescription)entry.get();
                    if (desc == null) continue;
                    descriptions.add(desc);
                }
                return descriptions;
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
        return ClusterLink.toFutureHandlingUnsupportedVersion(futureOfDescriptions, unsupportedVersionHandler, log);
    }

    public static Map<String, KafkaFuture<MirrorTopicDescription>> toDescribeMirrorsResultFuture(AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, MirrorTopicDescription> adminApiFuture, Consumer<DescribeMirrorsToControllerInput> unsupportedVersionHandler, Logger log) {
        Map<CoordinatorKey, KafkaFuture<MirrorTopicDescription>> originalMapOfFutures = adminApiFuture.all();
        HashSet<CoordinatorKey> remainingResponses = new HashSet<CoordinatorKey>(originalMapOfFutures.keySet());
        HashMap mirrorsToCompleteToController = new HashMap();
        HashMap result = new HashMap();
        originalMapOfFutures.entrySet().forEach(entry -> {
            String mirrorTopic = ((CoordinatorKey)entry.getKey()).idValue;
            KafkaFutureImpl newFuture = new KafkaFutureImpl();
            result.put(mirrorTopic, newFuture);
            ((KafkaFuture)entry.getValue()).whenComplete((desc, throwable) -> {
                if (throwable != null) {
                    Throwable cause;
                    Throwable throwable2 = cause = throwable instanceof CompletionException ? throwable.getCause() : throwable;
                    if (ClusterLink.isUnsupported(cause)) {
                        mirrorsToCompleteToController.put(mirrorTopic, newFuture);
                    } else {
                        newFuture.completeExceptionally(cause);
                    }
                } else {
                    newFuture.complete(desc);
                }
                remainingResponses.remove(entry.getKey());
                if (remainingResponses.isEmpty()) {
                    if (!mirrorsToCompleteToController.isEmpty()) {
                        log.debug("Sending `DescribeMirrors` request to controller for {}.", mirrorsToCompleteToController.keySet());
                        unsupportedVersionHandler.accept(new DescribeMirrorsToControllerInput(mirrorsToCompleteToController));
                    } else {
                        log.debug("No mirrors need to be sent to controller.");
                    }
                }
            });
        });
        return Collections.unmodifiableMap(result);
    }

    private static KafkaFuture<Collection<ClusterLinkDescription>> toFutureHandlingUnsupportedVersion(KafkaFuture<Collection<ClusterLinkDescription>> futureOfDescriptions, Consumer<DescribeClusterLinksToControllerInput> unsupportedVersionHandler, Logger log) {
        KafkaFutureImpl<Collection<ClusterLinkDescription>> result = new KafkaFutureImpl<Collection<ClusterLinkDescription>>();
        futureOfDescriptions.whenComplete((descriptions, throwable) -> {
            if (throwable != null) {
                Throwable cause;
                log.warn("Failed to get link descriptions.", throwable);
                Throwable throwable2 = cause = throwable instanceof CompletionException ? throwable.getCause() : throwable;
                if (ClusterLink.isUnsupported(cause)) {
                    unsupportedVersionHandler.accept(new DescribeClusterLinksToControllerInput(result));
                } else {
                    result.completeExceptionally(cause);
                }
            } else {
                result.complete((Collection<ClusterLinkDescription>)descriptions);
            }
        });
        return result;
    }

    private static boolean isUnsupported(Throwable cause) {
        if (cause instanceof UnsupportedVersionException) {
            return true;
        }
        if (cause instanceof InvalidRequestException) {
            InvalidRequestException invalidRequestException = (InvalidRequestException)cause;
            String message = invalidRequestException.getMessage();
            return message.contains("FindCoordinator request for key");
        }
        return false;
    }

    private static void handleError(CoordinatorKey key, Errors error, Map<CoordinatorKey, Throwable> failed, Set<CoordinatorKey> keysToUnmap, String rpcName, String resourceName, Logger log) {
        switch (error) {
            case CLUSTER_AUTHORIZATION_FAILED: {
                log.debug("`{}` request for {} {} failed due to error {}", new Object[]{rpcName, resourceName, key.idValue, error});
                failed.put(key, error.exception());
                break;
            }
            case COORDINATOR_LOAD_IN_PROGRESS: {
                log.debug("`{}` request for {} {} failed because the coordinator is still in the process of loading state. Will retry", new Object[]{rpcName, resourceName, key.idValue});
                break;
            }
            case NOT_CONTROLLER: 
            case COORDINATOR_NOT_AVAILABLE: 
            case NOT_COORDINATOR: {
                log.debug("`{}` request for {} {} returned error {}. Will attempt to find the coordinator again and retry", new Object[]{rpcName, resourceName, key.idValue, error});
                keysToUnmap.add(key);
                break;
            }
            default: {
                log.error("`{}` request for {} {} failed due to unexpected error {}", new Object[]{rpcName, resourceName, key.idValue, error});
                failed.put(key, error.exception());
            }
        }
    }

    public static class DescribeMirrorsToControllerInput {
        public final Map<String, KafkaFutureImpl<MirrorTopicDescription>> result;

        public DescribeMirrorsToControllerInput(Map<String, KafkaFutureImpl<MirrorTopicDescription>> result) {
            this.result = result;
        }
    }

    public static class DescribeClusterLinksToControllerInput {
        public final KafkaFutureImpl<Collection<ClusterLinkDescription>> result;

        public DescribeClusterLinksToControllerInput(KafkaFutureImpl<Collection<ClusterLinkDescription>> result) {
            this.result = result;
        }
    }

    public static class DescribeClusterLinksAllBrokersHandler
    extends AdminApiHandler.Batched<AllBrokersStrategy.BrokerKey, Collection<ClusterLinkDescription>> {
        private final Logger log;
        private final AllBrokersStrategy lookupStrategy;
        private final boolean includeTopics;
        private final boolean includeTasks;
        private final Integer timeoutMs;

        public DescribeClusterLinksAllBrokersHandler(LogContext logContext, boolean includeTopics, boolean includeTasks, Integer timeoutMs) {
            this.log = logContext.logger(DescribeClusterLinksAllBrokersHandler.class);
            this.lookupStrategy = new AllBrokersStrategy(logContext);
            this.includeTopics = includeTopics;
            this.includeTasks = includeTasks;
            this.timeoutMs = timeoutMs;
        }

        public static AllBrokersStrategy.AllBrokersFuture<Collection<ClusterLinkDescription>> newFuture() {
            return new AllBrokersStrategy.AllBrokersFuture<Collection<ClusterLinkDescription>>();
        }

        @Override
        public String apiName() {
            return ClusterLink.describeClusterLinksApiName();
        }

        @Override
        public AdminApiHandler.ApiResult<AllBrokersStrategy.BrokerKey, Collection<ClusterLinkDescription>> handleResponse(Node broker, Set<AllBrokersStrategy.BrokerKey> keys, AbstractResponse abstractResponse) {
            int brokerId = broker.id();
            AllBrokersStrategy.BrokerKey key = this.requireSingleton(keys, brokerId);
            DescribeClusterLinksResponseData responseData = ((DescribeClusterLinksResponse)abstractResponse).data();
            Errors error = Errors.forCode(responseData.errorCode());
            if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                this.log.debug("The `DescribeClusterLinks` request sent to broker {} failed because the coordinator is still loading state. Will try again after backing off", (Object)brokerId);
                return AdminApiHandler.ApiResult.empty();
            }
            if (error.exception() instanceof RetriableException) {
                this.log.debug("The `DescribeClusterLinks` request sent to broker {} failed because of a retriable error. Will try again after backing off", (Object)brokerId);
                return AdminApiHandler.ApiResult.empty();
            }
            if (error != Errors.NONE) {
                this.log.error("The `DescribeClusterLinks` request sent to broker {} failed because of an unexpected error {}", (Object)brokerId, (Object)error);
                return AdminApiHandler.ApiResult.failed(key, error.exception("DescribeClusterLinks request sent to broker " + brokerId + " failed with an unexpected exception"));
            }
            Collection descriptions = responseData.entries().stream().map(entry -> ClusterLink.toClusterLinkDescription(entry)).collect(Collectors.toList());
            return AdminApiHandler.ApiResult.completed(key, descriptions);
        }

        @Override
        public AdminApiLookupStrategy<AllBrokersStrategy.BrokerKey> lookupStrategy() {
            return this.lookupStrategy;
        }

        DescribeClusterLinksRequest.Builder buildBatchedRequest(int brokerId, Set<AllBrokersStrategy.BrokerKey> keys) {
            this.log.debug("Building request destined for {} containing keys {}.", (Object)brokerId, keys);
            return ClusterLink.buildDescribeClusterLinksRequestBuilder(Optional.empty(), this.includeTopics, this.includeTasks, this.timeoutMs);
        }

        private AllBrokersStrategy.BrokerKey requireSingleton(Set<AllBrokersStrategy.BrokerKey> keys, int brokerId) {
            if (keys.size() != 1) {
                throw new IllegalArgumentException("Unexpected key set: " + keys);
            }
            AllBrokersStrategy.BrokerKey key = keys.iterator().next();
            if (!key.brokerId.isPresent() || key.brokerId.getAsInt() != brokerId) {
                throw new IllegalArgumentException("Unexpected broker key: " + key);
            }
            return key;
        }
    }

    public static class DescribeMirrorsLinkCoordinatorsHandler
    extends AdminApiHandler.Batched<CoordinatorKey, MirrorTopicDescription> {
        private final Logger log;
        private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
        private final Collection<String> linkNames;
        private final Collection<String> states;
        private final Integer timeoutMs;
        private final boolean includeTransitionErrors;

        public DescribeMirrorsLinkCoordinatorsHandler(LogContext logContext, Collection<String> linkNames, Collection<String> states, Integer timeoutMs, boolean includeTransitionErrors) {
            this.log = logContext.logger(DescribeMirrorsLinkCoordinatorsHandler.class);
            this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.MIRROR_TOPIC, logContext);
            this.linkNames = linkNames;
            this.states = states;
            this.timeoutMs = timeoutMs;
            this.includeTransitionErrors = includeTransitionErrors;
        }

        public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, MirrorTopicDescription> newFuture(Collection<String> topics) {
            return AdminApiFuture.forKeys(DescribeMirrorsLinkCoordinatorsHandler.buildKeySet(topics));
        }

        private static Set<CoordinatorKey> buildKeySet(Collection<String> topics) {
            return topics.stream().map(CoordinatorKey::byMirrorTopic).collect(Collectors.toSet());
        }

        @Override
        public String apiName() {
            return ClusterLink.describeMirrorsApiName();
        }

        @Override
        public AdminApiHandler.ApiResult<CoordinatorKey, MirrorTopicDescription> handleResponse(Node coordinator, Set<CoordinatorKey> keys, AbstractResponse abstractResponse) {
            this.log.debug("Handling response {} from {} for keys {}", new Object[]{abstractResponse, coordinator, keys});
            HashMap<CoordinatorKey, MirrorTopicDescription> completed = new HashMap<CoordinatorKey, MirrorTopicDescription>();
            HashMap failed = new HashMap();
            HashSet mirrorTopicsToUnmap = new HashSet();
            DescribeMirrorsResponseData responseData = ((DescribeMirrorsResponse)abstractResponse).data();
            Errors error = Errors.forCode(responseData.errorCode());
            if (error != Errors.NONE) {
                keys.forEach(key -> ClusterLink.handleError(key, error, failed, mirrorTopicsToUnmap, ClusterLink.describeMirrorsApiName(), ClusterLink.mirrorTopicResourceName(), this.log));
            } else {
                Set requestedMirrorTopics = keys.stream().map(key -> key.idValue).collect(Collectors.toSet());
                HashSet<String> returnedMirrorTopics = new HashSet<String>();
                for (DescribeMirrorsResponseData.TopicData topicData : responseData.topics()) {
                    CoordinatorKey key2 = CoordinatorKey.byMirrorTopic(topicData.topic());
                    if (topicData.errorCode() == Errors.NONE.code()) {
                        completed.put(key2, ClusterLink.toMirrorTopicDescription(topicData));
                    } else {
                        Errors mirrorError = Errors.forCode(topicData.errorCode());
                        ClusterLink.handleError(key2, mirrorError, failed, mirrorTopicsToUnmap, ClusterLink.describeMirrorsApiName(), ClusterLink.mirrorTopicResourceName(), this.log);
                    }
                    returnedMirrorTopics.add(topicData.topic());
                }
                HashSet<String> nonReturnedMirrorTopics = new HashSet<String>(requestedMirrorTopics);
                nonReturnedMirrorTopics.removeAll(returnedMirrorTopics);
                nonReturnedMirrorTopics.forEach(ln -> {
                    MirrorTopicDescription cfr_ignored_0 = completed.put(CoordinatorKey.byMirrorTopic(ln), null);
                });
            }
            return new AdminApiHandler.ApiResult<CoordinatorKey, MirrorTopicDescription>(completed, failed, new ArrayList(mirrorTopicsToUnmap));
        }

        @Override
        public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
            return this.lookupStrategy;
        }

        DescribeMirrorsRequest.Builder buildBatchedRequest(int brokerId, Set<CoordinatorKey> keys) {
            this.log.debug("Building request destined for {} containing keys {}.", (Object)brokerId, keys);
            List<String> topics = keys.stream().map(key -> {
                if (key.type != FindCoordinatorRequest.CoordinatorType.MIRROR_TOPIC) {
                    throw new IllegalArgumentException("Invalid mirror topic coordinator key " + key + " when building `DescribeMirrors` request");
                }
                return key.idValue;
            }).collect(Collectors.toList());
            return new DescribeMirrorsRequest.Builder(topics, this.linkNames, this.states, this.timeoutMs, this.includeTransitionErrors, 6, ApiKeys.DESCRIBE_MIRRORS.latestVersion());
        }
    }

    public static class DescribeClusterLinksLinkCoordinatorsHandler
    extends AdminApiHandler.Batched<CoordinatorKey, ClusterLinkDescription> {
        private final Logger log;
        private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;
        private final boolean includeTopics;
        private final boolean includeTasks;
        private final Integer timeoutMs;

        public DescribeClusterLinksLinkCoordinatorsHandler(LogContext logContext, boolean includeTopics, boolean includeTasks, Integer timeoutMs) {
            this.log = logContext.logger(DescribeClusterLinksLinkCoordinatorsHandler.class);
            this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.CLUSTER_LINK, logContext);
            this.includeTopics = includeTopics;
            this.includeTasks = includeTasks;
            this.timeoutMs = timeoutMs;
        }

        public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ClusterLinkDescription> newFuture(Collection<String> linkNames) {
            return AdminApiFuture.forKeys(DescribeClusterLinksLinkCoordinatorsHandler.buildKeySet(linkNames));
        }

        private static Set<CoordinatorKey> buildKeySet(Collection<String> linkNames) {
            return linkNames.stream().map(CoordinatorKey::byLinkName).collect(Collectors.toSet());
        }

        @Override
        public String apiName() {
            return ClusterLink.describeClusterLinksApiName();
        }

        @Override
        public AdminApiHandler.ApiResult<CoordinatorKey, ClusterLinkDescription> handleResponse(Node coordinator, Set<CoordinatorKey> keys, AbstractResponse abstractResponse) {
            this.log.debug("Handling response {} from {} for keys {}", new Object[]{abstractResponse, coordinator, keys});
            HashMap<CoordinatorKey, ClusterLinkDescription> completed = new HashMap<CoordinatorKey, ClusterLinkDescription>();
            HashMap failed = new HashMap();
            HashSet linksToUnmap = new HashSet();
            DescribeClusterLinksResponseData responseData = ((DescribeClusterLinksResponse)abstractResponse).data();
            Errors error = Errors.forCode(responseData.errorCode());
            if (error != Errors.NONE) {
                keys.forEach(key -> ClusterLink.handleError(key, error, failed, linksToUnmap, this.apiName(), "link", this.log));
            } else {
                Set requestedLinkNames = keys.stream().map(key -> key.idValue).collect(Collectors.toSet());
                HashSet<String> returnedLinkNames = new HashSet<String>();
                for (DescribeClusterLinksResponseData.EntryData entryData : responseData.entries()) {
                    completed.put(CoordinatorKey.byLinkName(entryData.linkName()), ClusterLink.toClusterLinkDescription(entryData));
                    returnedLinkNames.add(entryData.linkName());
                }
                HashSet<String> nonReturnedLinkNames = new HashSet<String>(requestedLinkNames);
                nonReturnedLinkNames.removeAll(returnedLinkNames);
                nonReturnedLinkNames.forEach(ln -> {
                    ClusterLinkDescription cfr_ignored_0 = completed.put(CoordinatorKey.byLinkName(ln), null);
                });
            }
            return new AdminApiHandler.ApiResult<CoordinatorKey, ClusterLinkDescription>(completed, failed, new ArrayList(linksToUnmap));
        }

        @Override
        public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
            return this.lookupStrategy;
        }

        DescribeClusterLinksRequest.Builder buildBatchedRequest(int brokerId, Set<CoordinatorKey> keys) {
            this.log.debug("Building request destined for {} containing keys {}.", (Object)brokerId, keys);
            List linkNames = keys.stream().map(key -> {
                if (key.type != FindCoordinatorRequest.CoordinatorType.CLUSTER_LINK) {
                    throw new IllegalArgumentException("Invalid link coordinator key " + key + " when building `DescribeClusterLinks` request");
                }
                return key.idValue;
            }).collect(Collectors.toList());
            return ClusterLink.buildDescribeClusterLinksRequestBuilder(Optional.of(linkNames), this.includeTopics, this.includeTasks, this.timeoutMs);
        }
    }
}

