/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.admin.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
import org.apache.kafka.clients.admin.internals.AdminApiFuture;
import org.apache.kafka.clients.admin.internals.AdminApiHandler;
import org.apache.kafka.clients.admin.internals.AdminApiLookupStrategy;
import org.apache.kafka.clients.admin.internals.AdminUtils;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.admin.internals.CoordinatorStrategy;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeRequest;
import org.apache.kafka.common.requests.StreamsGroupDescribeResponse;
import org.apache.kafka.common.utils.LogContext;
import org.slf4j.Logger;

public class DescribeStreamsGroupsHandler
extends AdminApiHandler.Batched<CoordinatorKey, StreamsGroupDescription> {
    private final boolean includeAuthorizedOperations;
    private final Logger log;
    private final AdminApiLookupStrategy<CoordinatorKey> lookupStrategy;

    public DescribeStreamsGroupsHandler(boolean includeAuthorizedOperations, LogContext logContext) {
        this.includeAuthorizedOperations = includeAuthorizedOperations;
        this.log = logContext.logger(DescribeStreamsGroupsHandler.class);
        this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
    }

    private static Set<CoordinatorKey> buildKeySet(Collection<String> groupIds) {
        return groupIds.stream().map(CoordinatorKey::byGroupId).collect(Collectors.toSet());
    }

    public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, StreamsGroupDescription> newFuture(Collection<String> groupIds) {
        return AdminApiFuture.forKeys(DescribeStreamsGroupsHandler.buildKeySet(groupIds));
    }

    @Override
    public String apiName() {
        return "describeStreamsGroups";
    }

    @Override
    public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
        return this.lookupStrategy;
    }

    public StreamsGroupDescribeRequest.Builder buildBatchedRequest(int coordinatorId, Set<CoordinatorKey> keys) {
        List<String> groupIds = keys.stream().map(key -> {
            if (key.type != FindCoordinatorRequest.CoordinatorType.GROUP) {
                throw new IllegalArgumentException("Invalid group coordinator key " + String.valueOf(key) + " when building `DescribeStreamsGroups` request");
            }
            return key.idValue;
        }).collect(Collectors.toList());
        StreamsGroupDescribeRequestData data = new StreamsGroupDescribeRequestData().setGroupIds(groupIds).setIncludeAuthorizedOperations(this.includeAuthorizedOperations);
        return new StreamsGroupDescribeRequest.Builder(data);
    }

    @Override
    public AdminApiHandler.ApiResult<CoordinatorKey, StreamsGroupDescription> handleResponse(Node coordinator, Set<CoordinatorKey> groupIds, AbstractResponse abstractResponse) {
        StreamsGroupDescribeResponse response = (StreamsGroupDescribeResponse)abstractResponse;
        HashMap<CoordinatorKey, StreamsGroupDescription> completed = new HashMap<CoordinatorKey, StreamsGroupDescription>();
        HashMap<CoordinatorKey, Throwable> failed = new HashMap<CoordinatorKey, Throwable>();
        HashSet<CoordinatorKey> groupsToUnmap = new HashSet<CoordinatorKey>();
        for (StreamsGroupDescribeResponseData.DescribedGroup describedGroup : response.data().groups()) {
            CoordinatorKey groupIdKey = CoordinatorKey.byGroupId(describedGroup.groupId());
            Errors error = Errors.forCode(describedGroup.errorCode());
            if (error != Errors.NONE) {
                this.handleError(groupIdKey, describedGroup, coordinator, error, describedGroup.errorMessage(), completed, failed, groupsToUnmap);
                continue;
            }
            if (describedGroup.topology() == null) {
                this.log.error("`DescribeStreamsGroups` response for group id {} is missing the topology information", (Object)groupIdKey.idValue);
                failed.put(groupIdKey, new IllegalStateException("Topology information is missing"));
                continue;
            }
            Set<AclOperation> authorizedOperations = AdminUtils.validAclOperations(describedGroup.authorizedOperations());
            StreamsGroupDescription streamsGroupDescription = new StreamsGroupDescription(describedGroup.groupId(), describedGroup.groupEpoch(), describedGroup.assignmentEpoch(), describedGroup.topology().epoch(), this.convertSubtopologies(describedGroup.topology().subtopologies()), this.convertMembers(describedGroup.members()), GroupState.parse(describedGroup.groupState()), coordinator, authorizedOperations);
            completed.put(groupIdKey, streamsGroupDescription);
        }
        return new AdminApiHandler.ApiResult<CoordinatorKey, StreamsGroupDescription>(completed, failed, new ArrayList(groupsToUnmap));
    }

    private Collection<StreamsGroupMemberDescription> convertMembers(List<StreamsGroupDescribeResponseData.Member> members) {
        ArrayList<StreamsGroupMemberDescription> memberDescriptions = new ArrayList<StreamsGroupMemberDescription>(members.size());
        members.forEach(groupMember -> memberDescriptions.add(new StreamsGroupMemberDescription(groupMember.memberId(), groupMember.memberEpoch(), Optional.ofNullable(groupMember.instanceId()), Optional.ofNullable(groupMember.rackId()), groupMember.clientId(), groupMember.clientHost(), groupMember.topologyEpoch(), groupMember.processId(), Optional.ofNullable(groupMember.userEndpoint()).map(this::convertEndpoint), this.convertClientTags(groupMember.clientTags()), this.convertTaskOffsets(groupMember.taskOffsets()), this.convertTaskOffsets(groupMember.taskEndOffsets()), this.convertAssignment(groupMember.assignment()), this.convertAssignment(groupMember.targetAssignment()), groupMember.isClassic())));
        return memberDescriptions;
    }

    private Collection<StreamsGroupSubtopologyDescription> convertSubtopologies(List<StreamsGroupDescribeResponseData.Subtopology> subtopologies) {
        ArrayList<StreamsGroupSubtopologyDescription> subtopologyDescriptions = new ArrayList<StreamsGroupSubtopologyDescription>(subtopologies.size());
        subtopologies.forEach(subtopology -> subtopologyDescriptions.add(new StreamsGroupSubtopologyDescription(subtopology.subtopologyId(), subtopology.sourceTopics(), subtopology.repartitionSinkTopics(), this.convertTopicInfos(subtopology.stateChangelogTopics()), this.convertTopicInfos(subtopology.repartitionSourceTopics()))));
        return subtopologyDescriptions;
    }

    private Map<String, StreamsGroupSubtopologyDescription.TopicInfo> convertTopicInfos(List<StreamsGroupDescribeResponseData.TopicInfo> topicInfos) {
        return topicInfos.stream().collect(Collectors.toMap(StreamsGroupDescribeResponseData.TopicInfo::name, topicInfo -> new StreamsGroupSubtopologyDescription.TopicInfo(topicInfo.partitions(), topicInfo.replicationFactor(), topicInfo.topicConfigs().stream().collect(Collectors.toMap(StreamsGroupDescribeResponseData.KeyValue::key, StreamsGroupDescribeResponseData.KeyValue::value)))));
    }

    private StreamsGroupMemberAssignment.TaskIds convertTaskIds(StreamsGroupDescribeResponseData.TaskIds taskIds) {
        return new StreamsGroupMemberAssignment.TaskIds(taskIds.subtopologyId(), taskIds.partitions());
    }

    private StreamsGroupMemberAssignment convertAssignment(StreamsGroupDescribeResponseData.Assignment assignment) {
        return new StreamsGroupMemberAssignment(assignment.activeTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()), assignment.standbyTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()), assignment.warmupTasks().stream().map(this::convertTaskIds).collect(Collectors.toList()));
    }

    private List<StreamsGroupMemberDescription.TaskOffset> convertTaskOffsets(List<StreamsGroupDescribeResponseData.TaskOffset> taskOffsets) {
        return taskOffsets.stream().map(taskOffset -> new StreamsGroupMemberDescription.TaskOffset(taskOffset.subtopologyId(), taskOffset.partition(), taskOffset.offset())).collect(Collectors.toList());
    }

    private Map<String, String> convertClientTags(List<StreamsGroupDescribeResponseData.KeyValue> keyValues) {
        return keyValues.stream().collect(Collectors.toMap(StreamsGroupDescribeResponseData.KeyValue::key, StreamsGroupDescribeResponseData.KeyValue::value));
    }

    private StreamsGroupMemberDescription.Endpoint convertEndpoint(StreamsGroupDescribeResponseData.Endpoint endpoint) {
        return new StreamsGroupMemberDescription.Endpoint(endpoint.host(), endpoint.port());
    }

    private void handleError(CoordinatorKey groupId, StreamsGroupDescribeResponseData.DescribedGroup describedGroup, Node coordinator, Errors error, String errorMsg, Map<CoordinatorKey, StreamsGroupDescription> completed, Map<CoordinatorKey, Throwable> failed, Set<CoordinatorKey> groupsToUnmap) {
        switch (error) {
            case GROUP_AUTHORIZATION_FAILED: 
            case TOPIC_AUTHORIZATION_FAILED: {
                this.log.debug("`DescribeStreamsGroups` request for group id {} failed due to error {}", (Object)groupId.idValue, (Object)error);
                failed.put(groupId, error.exception(errorMsg));
                break;
            }
            case COORDINATOR_LOAD_IN_PROGRESS: {
                this.log.debug("`DescribeStreamsGroups` request for group id {} failed because the coordinator is still in the process of loading state. Will retry", (Object)groupId.idValue);
                break;
            }
            case COORDINATOR_NOT_AVAILABLE: 
            case NOT_COORDINATOR: {
                this.log.debug("`DescribeStreamsGroups` request for group id {} returned error {}. Will attempt to find the coordinator again and retry", (Object)groupId.idValue, (Object)error);
                groupsToUnmap.add(groupId);
                break;
            }
            case GROUP_ID_NOT_FOUND: {
                this.log.debug("`DescribeStreamsGroups` request for group id {} failed because the group does not exist. {}", (Object)groupId.idValue, (Object)(errorMsg != null ? errorMsg : ""));
                StreamsGroupDescription streamsGroupDescription = new StreamsGroupDescription(groupId.idValue, -1, -1, -1, Collections.emptySet(), Collections.emptySet(), GroupState.DEAD, coordinator, AdminUtils.validAclOperations(describedGroup.authorizedOperations()));
                completed.put(groupId, streamsGroupDescription);
                break;
            }
            default: {
                this.log.error("`DescribeStreamsGroups` request for group id {} failed due to unexpected error {}", (Object)groupId.idValue, (Object)error);
                failed.put(groupId, error.exception(errorMsg));
            }
        }
    }
}

