/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.migration;

import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.DegradedBrokerComponent;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.image.AclsDelta;
import org.apache.kafka.image.AclsImage;
import org.apache.kafka.image.BrokerReplicaExclusionsImage;
import org.apache.kafka.image.ClientQuotaImage;
import org.apache.kafka.image.ClientQuotasImage;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.ClusterLinksDelta;
import org.apache.kafka.image.ClusterLinksImage;
import org.apache.kafka.image.ConfigurationsDelta;
import org.apache.kafka.image.ConfigurationsImage;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.ProducerIdsDelta;
import org.apache.kafka.image.ProducerIdsImage;
import org.apache.kafka.image.ScramImage;
import org.apache.kafka.image.TopicImage;
import org.apache.kafka.image.TopicsDelta;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerReplicaExclusion;
import org.apache.kafka.metadata.ClusterLink;
import org.apache.kafka.metadata.DegradedBrokerHealthState;
import org.apache.kafka.metadata.MirrorTopic;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.ScramCredentialData;
import org.apache.kafka.metadata.authorizer.ConfluentStandardAcl;
import org.apache.kafka.metadata.authorizer.StandardAcl;
import org.apache.kafka.metadata.migration.ClusterLinkMetadata;
import org.apache.kafka.metadata.migration.ConfigMigrationClient;
import org.apache.kafka.metadata.migration.KRaftMigrationOperationConsumer;
import org.apache.kafka.metadata.migration.MigrationClient;
import org.apache.kafka.metadata.migration.TopicMigrationClient;
import org.apache.kafka.server.common.ProducerIdsBlock;

public class KRaftMigrationZkWriter {
    private static final String UPDATE_PRODUCER_ID = "UpdateProducerId";
    private static final String CREATE_TOPIC = "CreateTopic";
    private static final String UPDATE_TOPIC = "UpdateTopic";
    private static final String DELETE_TOPIC = "DeleteTopic";
    private static final String UPDATE_PARTITON = "UpdatePartition";
    private static final String DELETE_PARTITION = "DeletePartition";
    private static final String UPDATE_BROKER_CONFIG = "UpdateBrokerConfig";
    private static final String DELETE_BROKER_CONFIG = "DeleteBrokerConfig";
    private static final String UPDATE_TOPIC_CONFIG = "UpdateTopicConfig";
    private static final String DELETE_TOPIC_CONFIG = "DeleteTopicConfig";
    private static final String UPDATE_CLIENT_QUOTA = "UpdateClientQuota";
    private static final String UPDATE_ACL = "UpdateAcl";
    private static final String DELETE_ACL = "DeleteAcl";
    private static final String CREATE_LINK = "CreateLink";
    private static final String CREATE_LINK_CONFIG = "CreateLinkConfig";
    private static final String UPDATE_LINK_CONFIG = "UpdateLinkConfig";
    private static final String DELETE_LINK = "DeletedLink";
    private static final String DELETE_LINK_CONFIG = "DeletedLinkConfig";
    private static final String UPDATE_BROKER_HEALTH = "UpdateBrokerHealth";
    private static final String CREATE_REPLICA_EXCLUSIONS = "CreateReplicaExclusions";
    private static final String UPDATE_REPLICA_EXCLUSIONS = "UpdateReplicaExclusions";
    private final MigrationClient migrationClient;

    public KRaftMigrationZkWriter(MigrationClient migrationClient) {
        this.migrationClient = migrationClient;
    }

    public void handleSnapshot(MetadataImage image, KRaftMigrationOperationConsumer operationConsumer) {
        this.handleClusterImage(image.cluster(), operationConsumer);
        Map<Uuid, ClusterLinkMetadata> visitedLinks = this.handleNewAndChangedClusterLinksInSnapshot(image.clusterLinks(), image.configs(), operationConsumer);
        this.handleTopicsSnapshot(image.topics(), visitedLinks, operationConsumer);
        this.handleBrokerAndTopicConfigsSnapshot(image.configs(), operationConsumer);
        this.handleClientQuotasSnapshot(image.clientQuotas(), image.scram(), operationConsumer);
        this.handleProducerIdSnapshot(image.producerIds(), operationConsumer);
        this.handleAclsSnapshot(image.acls(), operationConsumer);
        this.handleDeletedClusterLinksInSnapshot(image.clusterLinks(), visitedLinks, operationConsumer);
        this.handleReplicaExclusionImage(image.brokerReplicaExclusions(), image.cluster().brokers().keySet(), operationConsumer);
    }

    public void handleDelta(MetadataImage previousImage, MetadataImage image, MetadataDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
        if (delta.clusterDelta() != null) {
            this.handleClusterImage(image.cluster(), operationConsumer);
        }
        this.handleNewAndChangedClusterLinksDelta(image, delta.clusterLinksDelta(), delta.configsDelta(), operationConsumer);
        if (delta.topicsDelta() != null) {
            this.handleTopicsDelta(previousImage.topics().topicIdToNameView()::get, image.topics(), delta.topicsDelta(), operationConsumer);
        }
        if (delta.configsDelta() != null) {
            this.handleConfigsDelta(image.configs(), delta.configsDelta(), operationConsumer);
        }
        if (delta.clientQuotasDelta() != null || delta.scramDelta() != null) {
            this.handleClientQuotasDelta(image, delta, operationConsumer);
        }
        if (delta.producerIdsDelta() != null) {
            this.handleProducerIdDelta(delta.producerIdsDelta(), operationConsumer);
        }
        if (delta.aclsDelta() != null) {
            this.handleAclsDelta(image.acls(), delta.aclsDelta(), operationConsumer);
        }
        this.handleDeletedClusterLinksInDelta(delta.clusterLinksDelta(), operationConsumer);
        if (delta.brokerReplicaExclusionsDelta() != null) {
            this.handleReplicaExclusionImage(image.brokerReplicaExclusions(), image.cluster().brokers().keySet(), operationConsumer);
        }
    }

    void handleClusterImage(ClusterImage clusterImage, KRaftMigrationOperationConsumer operationConsumer) {
        Map<Integer, Set> kraftBrokerHealth;
        Map<Integer, Set<DegradedBrokerComponent>> zkBrokerHealth = this.migrationClient.brokerHealthClient().readBrokerHealthState(clusterImage.brokers().keySet());
        if (!zkBrokerHealth.equals(kraftBrokerHealth = clusterImage.brokers().values().stream().filter(registration -> !registration.degradedComponents().isEmpty()).collect(Collectors.toMap(BrokerRegistration::id, registration -> DegradedBrokerHealthState.toDegradedBrokerComponents(registration.degradedComponents()))))) {
            operationConsumer.accept(UPDATE_BROKER_HEALTH, "Changing broker health to " + kraftBrokerHealth, migrationState -> this.migrationClient.brokerHealthClient().writeBrokerHealthState(kraftBrokerHealth, migrationState));
        }
    }

    Map<Uuid, ClusterLinkMetadata> handleNewAndChangedClusterLinksInSnapshot(ClusterLinksImage clusterLinksImage, ConfigurationsImage configsImage, KRaftMigrationOperationConsumer operationConsumer) {
        HashSet<Uuid> newClusterLinks = new HashSet<Uuid>(clusterLinksImage.linksById().keySet());
        HashSet changedClusterLinks = new HashSet();
        HashMap<Uuid, ClusterLinkMetadata> visitedLinks = new HashMap<Uuid, ClusterLinkMetadata>();
        Function<Uuid, ConfigResource> clusterLinkResource = linkId -> new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkId.toString());
        this.migrationClient.clusterLinkClient().iterateClusterLinks((linkMetadata, isDeleted) -> {
            visitedLinks.put(linkMetadata.linkId(), linkMetadata);
            if (newClusterLinks.remove(linkMetadata.linkId())) {
                Properties properties = configsImage.configProperties((ConfigResource)clusterLinkResource.apply(linkMetadata.linkId()));
                if (!linkMetadata.linkProperties().equals(properties)) {
                    changedClusterLinks.add(linkMetadata.linkId());
                }
            }
        });
        newClusterLinks.forEach(linkId -> {
            ClusterLink clusterLinkImage = clusterLinksImage.linksById().get(linkId);
            ConfigResource resource = (ConfigResource)clusterLinkResource.apply((Uuid)linkId);
            Map<String, String> linkProperties = configsImage.configMapForResource(resource);
            ClusterLinkMetadata linkMetadata = new ClusterLinkMetadata(clusterLinkImage, linkProperties);
            operationConsumer.accept(CREATE_LINK_CONFIG, "Creating link configs for link " + clusterLinkImage.linkName(), migrationState -> this.migrationClient.configClient().writeConfigs(resource, linkProperties, migrationState));
            operationConsumer.accept(CREATE_LINK, "Creating link metadata for link " + clusterLinkImage.linkName(), migrationState -> this.migrationClient.clusterLinkClient().createClusterLink(linkMetadata, migrationState));
        });
        changedClusterLinks.forEach(linkId -> {
            ClusterLink clusterLinkImage = clusterLinksImage.linksById().get(linkId);
            ConfigResource resource = (ConfigResource)clusterLinkResource.apply((Uuid)linkId);
            Map<String, String> linkProperties = configsImage.configMapForResource(resource);
            operationConsumer.accept(UPDATE_LINK_CONFIG, "Changing link configs for link " + clusterLinkImage.linkName(), migrationState -> this.migrationClient.configClient().writeConfigs(resource, linkProperties, migrationState));
        });
        newClusterLinks.addAll(changedClusterLinks);
        newClusterLinks.forEach(linkId -> this.migrationClient.clusterLinkClient().createClusterLinkChangeNotification((Uuid)linkId));
        return visitedLinks;
    }

    void handleDeletedClusterLinksInSnapshot(ClusterLinksImage clusterLinksImage, Map<Uuid, ClusterLinkMetadata> linksInZookeeper, KRaftMigrationOperationConsumer operationConsumer) {
        HashSet<Uuid> deletedLinksInImage = new HashSet<Uuid>(linksInZookeeper.keySet());
        deletedLinksInImage.removeAll(clusterLinksImage.linksById().keySet());
        deletedLinksInImage.forEach(linkId -> {
            String linkName = ((ClusterLinkMetadata)linksInZookeeper.get(linkId)).linkName();
            operationConsumer.accept(DELETE_LINK, "Deleting link metadata for link " + linkName, migrationState -> this.migrationClient.clusterLinkClient().deleteClusterLink((Uuid)linkId, migrationState));
            ConfigResource resource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkId.toString());
            operationConsumer.accept(DELETE_LINK_CONFIG, "Deleting link configs for link " + linkName, migrationState -> this.migrationClient.configClient().deleteConfigs(resource, migrationState));
            this.migrationClient.clusterLinkClient().createClusterLinkChangeNotification((Uuid)linkId);
        });
    }

    void handleNewAndChangedClusterLinksDelta(MetadataImage image, ClusterLinksDelta clusterLinksDelta, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) {
        HashSet<Object> newClusterLinks = new HashSet<Object>();
        HashSet changedClusterLinks = new HashSet();
        Function<Uuid, ConfigResource> configResource = linkId -> new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkId.toString());
        if (clusterLinksDelta != null) {
            newClusterLinks.addAll(clusterLinksDelta.addedClusterLinks().keySet());
        }
        if (configsDelta != null) {
            changedClusterLinks.addAll(configsDelta.changes().keySet().stream().filter(resource -> resource.type() == ConfigResource.Type.CLUSTER_LINK && !image.configs().configProperties((ConfigResource)resource).isEmpty()).map(resource -> Uuid.fromString((String)resource.name())).collect(Collectors.toSet()));
        }
        newClusterLinks.forEach(linkId -> {
            ClusterLink clusterLinkImage = image.clusterLinks().linksById().get(linkId);
            ConfigResource resource = (ConfigResource)configResource.apply((Uuid)linkId);
            Map<String, String> linkProperties = image.configs().configMapForResource(resource);
            ClusterLinkMetadata linkMetadata = new ClusterLinkMetadata(clusterLinkImage, linkProperties);
            operationConsumer.accept(CREATE_LINK_CONFIG, "Creating link configs for link " + clusterLinkImage.linkName(), migrationState -> this.migrationClient.configClient().writeConfigs(resource, linkProperties, migrationState));
            operationConsumer.accept(CREATE_LINK, "Creating link metadata for link " + clusterLinkImage.linkName(), migrationState -> this.migrationClient.clusterLinkClient().createClusterLink(linkMetadata, migrationState));
        });
        changedClusterLinks.forEach(linkId -> {
            ConfigResource resource = (ConfigResource)configResource.apply((Uuid)linkId);
            Map<String, String> linkProperties = image.configs().configMapForResource(resource);
            operationConsumer.accept(UPDATE_LINK_CONFIG, "Creating link configs for link " + image.clusterLinks().linksById().get(linkId), migrationState -> this.migrationClient.configClient().writeConfigs(resource, linkProperties, migrationState));
        });
        newClusterLinks.addAll(changedClusterLinks);
        newClusterLinks.forEach(linkId -> this.migrationClient.clusterLinkClient().createClusterLinkChangeNotification((Uuid)linkId));
    }

    void handleDeletedClusterLinksInDelta(ClusterLinksDelta clusterLinksDelta, KRaftMigrationOperationConsumer operationConsumer) {
        Set<Object> deletedClusterLinks = new HashSet();
        if (clusterLinksDelta != null) {
            deletedClusterLinks = clusterLinksDelta.deletedClusterLinks();
        }
        deletedClusterLinks.forEach(linkId -> {
            operationConsumer.accept(DELETE_LINK, "Deleting link metadata for link " + linkId, migrationState -> this.migrationClient.clusterLinkClient().deleteClusterLink((Uuid)linkId, migrationState));
            ConfigResource resource = new ConfigResource(ConfigResource.Type.CLUSTER_LINK, linkId.toString());
            operationConsumer.accept(DELETE_LINK_CONFIG, "Deleting link configs for link " + linkId, migrationState -> this.migrationClient.configClient().deleteConfigs(resource, migrationState));
            this.migrationClient.clusterLinkClient().createClusterLinkChangeNotification((Uuid)linkId);
        });
    }

    void handleTopicsSnapshot(final TopicsImage topicsImage, final Map<Uuid, ClusterLinkMetadata> clusterLinksMetadata, KRaftMigrationOperationConsumer operationConsumer) {
        final HashMap<Uuid, String> deletedTopics = new HashMap<Uuid, String>();
        final HashSet topicsInZk = new HashSet();
        final HashSet newTopics = new HashSet(topicsImage.topicsById().keySet());
        final HashSet changedTopics = new HashSet();
        final HashMap partitionsInZk = new HashMap();
        HashMap<String, Set> extraneousPartitionsInZk = new HashMap<String, Set>();
        final HashMap<Uuid, Map> changedPartitions = new HashMap<Uuid, Map>();
        HashMap<Uuid, Map> newPartitions = new HashMap<Uuid, Map>();
        this.migrationClient.topicClient().iterateTopics(EnumSet.of(TopicMigrationClient.TopicVisitorInterest.TOPICS, TopicMigrationClient.TopicVisitorInterest.MIRROR_STATE, TopicMigrationClient.TopicVisitorInterest.PARTITIONS), new TopicMigrationClient.TopicVisitor(){

            @Override
            public void visitTopic(String topicName, Uuid topicId, Map<Integer, List<Integer>> assignments, Optional<MirrorTopic> mirrorTopic) {
                TopicImage topic = topicsImage.getTopic(topicId);
                if (topic == null) {
                    deletedTopics.put(topicId, topicName);
                } else {
                    if (!newTopics.remove(topicId)) {
                        return;
                    }
                    if (!topic.mirrorTopic().equals(mirrorTopic)) {
                        changedTopics.add(topicId);
                    }
                    topicsInZk.add(topicId);
                }
            }

            @Override
            public void visitPartition(TopicIdPartition topicIdPartition, PartitionRegistration partitionRegistration) {
                TopicImage topic = topicsImage.getTopic(topicIdPartition.topicId());
                if (topic == null) {
                    return;
                }
                partitionsInZk.computeIfAbsent(topic.id(), __ -> new HashSet()).add(topicIdPartition.partition());
                PartitionRegistration kraftPartition = topic.partitions().get(topicIdPartition.partition());
                if (kraftPartition != null) {
                    if (!kraftPartition.equals(partitionRegistration)) {
                        changedPartitions.computeIfAbsent(topicIdPartition.topicId(), __ -> new HashMap()).put(topicIdPartition.partition(), kraftPartition);
                    }
                    if (!kraftPartition.hasSameAssignment(partitionRegistration)) {
                        changedTopics.add(topic.id());
                    }
                }
            }

            @Override
            public ClusterLinkMetadata resolveClusterLink(Uuid linkId) {
                return (ClusterLinkMetadata)clusterLinksMetadata.get(linkId);
            }
        });
        topicsInZk.forEach(topicId -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            Set topicPartitionsInZk = partitionsInZk.computeIfAbsent(topicId, __ -> new HashSet());
            if (!topicPartitionsInZk.equals(topic.partitions().keySet())) {
                HashMap<Integer, PartitionRegistration> newTopicPartitions = new HashMap<Integer, PartitionRegistration>(topic.partitions());
                topicPartitionsInZk.forEach(newTopicPartitions::remove);
                newPartitions.put((Uuid)topicId, newTopicPartitions);
                topicPartitionsInZk.removeAll(topic.partitions().keySet());
                if (!topicPartitionsInZk.isEmpty()) {
                    extraneousPartitionsInZk.put(topic.name(), topicPartitionsInZk);
                }
                changedTopics.add(topicId);
            }
        });
        newTopics.forEach(topicId -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(CREATE_TOPIC, "Create Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().createTopic(topic.name(), (Uuid)topicId, topic.partitions(), topic.mirrorTopic(), migrationState));
        });
        changedTopics.forEach(topicId -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(UPDATE_TOPIC, "Changed Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopic(topic.name(), (Uuid)topicId, topic.partitions(), topic.mirrorTopic(), migrationState));
        });
        deletedTopics.forEach((topicId, topicName) -> {
            operationConsumer.accept(DELETE_TOPIC, "Delete Topic " + topicName + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().deleteTopic((String)topicName, migrationState));
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
            operationConsumer.accept(UPDATE_TOPIC_CONFIG, "Updating Configs for Topic " + topicName + ", ID " + topicId, migrationState -> this.migrationClient.configClient().deleteConfigs(resource, migrationState));
        });
        newPartitions.forEach((topicId, partitionMap) -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(UPDATE_PARTITON, "Creating additional partitions for Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopicPartitions(Collections.singletonMap(topic.name(), partitionMap), migrationState));
        });
        changedPartitions.forEach((topicId, partitionMap) -> {
            TopicImage topic = topicsImage.getTopic((Uuid)topicId);
            operationConsumer.accept(UPDATE_PARTITON, "Updating Partitions for Topic " + topic.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopicPartitions(Collections.singletonMap(topic.name(), partitionMap), migrationState));
        });
        extraneousPartitionsInZk.forEach((topicName, partitions) -> operationConsumer.accept(DELETE_PARTITION, "Deleting extraneous Partitions " + partitions + " for Topic " + topicName, migrationState -> this.migrationClient.topicClient().deleteTopicPartitions(Collections.singletonMap(topicName, partitions), migrationState)));
    }

    void handleTopicsDelta(Function<Uuid, String> deletedTopicNameResolver, TopicsImage topicsImage, TopicsDelta topicsDelta, KRaftMigrationOperationConsumer operationConsumer) {
        topicsDelta.deletedTopicIds().forEach(topicId -> {
            String name = (String)deletedTopicNameResolver.apply((Uuid)topicId);
            operationConsumer.accept(DELETE_TOPIC, "Deleting topic " + name + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().deleteTopic(name, migrationState));
        });
        topicsDelta.changedTopics().forEach((topicId, topicDelta) -> {
            if (topicsDelta.createdTopicIds().contains(topicId)) {
                operationConsumer.accept(CREATE_TOPIC, "Create Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().createTopic(topicDelta.name(), (Uuid)topicId, topicDelta.partitionChanges(), topicDelta.latestMirrorTopicState(), migrationState));
            } else {
                if (topicDelta.hasPartitionsWithAssignmentChanges() || topicDelta.mirrorTopicDelta().changed()) {
                    operationConsumer.accept(UPDATE_TOPIC, "Updating Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopic(topicDelta.name(), (Uuid)topicId, topicsImage.getTopic((Uuid)topicId).partitions(), topicDelta.latestMirrorTopicState(), migrationState));
                }
                HashMap<Integer, PartitionRegistration> newPartitions = new HashMap<Integer, PartitionRegistration>(topicDelta.newPartitions());
                HashMap<Integer, PartitionRegistration> changedPartitions = new HashMap<Integer, PartitionRegistration>(topicDelta.partitionChanges());
                if (!newPartitions.isEmpty()) {
                    operationConsumer.accept(UPDATE_PARTITON, "Create new partitions for Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().createTopicPartitions(Collections.singletonMap(topicDelta.name(), newPartitions), migrationState));
                    newPartitions.keySet().forEach(changedPartitions::remove);
                }
                if (!changedPartitions.isEmpty()) {
                    HashMap<Integer, PartitionRegistration> finalChangedPartitions = changedPartitions;
                    operationConsumer.accept(UPDATE_PARTITON, "Updating Partitions for Topic " + topicDelta.name() + ", ID " + topicId, migrationState -> this.migrationClient.topicClient().updateTopicPartitions(Collections.singletonMap(topicDelta.name(), finalChangedPartitions), migrationState));
                }
            }
        });
    }

    private String brokerOrTopicOpType(ConfigResource resource, String brokerOp, String topicOp) {
        if (resource.type().equals((Object)ConfigResource.Type.BROKER)) {
            return brokerOp;
        }
        return topicOp;
    }

    void handleBrokerAndTopicConfigsSnapshot(ConfigurationsImage configsImage, KRaftMigrationOperationConsumer operationConsumer) {
        HashSet newResources = new HashSet();
        configsImage.resourceData().keySet().forEach(resource -> {
            if (EnumSet.of(ConfigResource.Type.BROKER, ConfigResource.Type.TOPIC).contains(resource.type())) {
                newResources.add(resource);
            }
        });
        HashSet resourcesToUpdate = new HashSet();
        BiConsumer<ConfigResource, Map> processConfigsForResource = (resource, configs) -> {
            newResources.remove(resource);
            Map<String, String> kraftProps = configsImage.configMapForResource((ConfigResource)resource);
            if (!kraftProps.equals(configs)) {
                resourcesToUpdate.add(resource);
            }
        };
        this.migrationClient.configClient().iterateBrokerConfigs((broker, configs, encryptedConfigs) -> {
            ConfigResource brokerResource = new ConfigResource(ConfigResource.Type.BROKER, broker);
            HashMap combinedZkProps = new HashMap(configs);
            combinedZkProps.putAll(encryptedConfigs);
            processConfigsForResource.accept(brokerResource, combinedZkProps);
        });
        this.migrationClient.configClient().iterateTopicConfigs((topic, configs) -> {
            ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, topic);
            processConfigsForResource.accept(topicResource, (Map)configs);
        });
        newResources.forEach(resource -> {
            Map<String, String> props = configsImage.configMapForResource((ConfigResource)resource);
            if (!props.isEmpty()) {
                String opType = this.brokerOrTopicOpType((ConfigResource)resource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
                operationConsumer.accept(opType, "Create configs for " + resource.type().name() + " " + resource.name(), migrationState -> this.migrationClient.configClient().writeConfigs((ConfigResource)resource, props, migrationState));
            }
        });
        resourcesToUpdate.forEach(resource -> {
            Map<String, String> props = configsImage.configMapForResource((ConfigResource)resource);
            if (props.isEmpty()) {
                String opType = this.brokerOrTopicOpType((ConfigResource)resource, DELETE_BROKER_CONFIG, DELETE_TOPIC_CONFIG);
                operationConsumer.accept(opType, "Delete configs for " + resource.type().name() + " " + resource.name(), migrationState -> this.migrationClient.configClient().deleteConfigs((ConfigResource)resource, migrationState));
            } else {
                String opType = this.brokerOrTopicOpType((ConfigResource)resource, UPDATE_BROKER_CONFIG, UPDATE_TOPIC_CONFIG);
                operationConsumer.accept(opType, "Update configs for " + resource.type().name() + " " + resource.name(), migrationState -> this.migrationClient.configClient().writeConfigs((ConfigResource)resource, props, migrationState));
            }
        });
    }

    private Map<String, String> getScramCredentialStringsForUser(ScramImage image, String userName) {
        HashMap<String, String> userScramCredentialStrings = new HashMap<String, String>();
        if (image != null) {
            image.mechanisms().forEach((scramMechanism, scramMechanismMap) -> {
                ScramCredentialData scramCredentialData = (ScramCredentialData)scramMechanismMap.get(userName);
                if (scramCredentialData != null) {
                    userScramCredentialStrings.put(scramMechanism.mechanismName(), ScramCredentialUtils.credentialToString((ScramCredential)scramCredentialData.toCredential((ScramMechanism)scramMechanism)));
                }
            });
        }
        return userScramCredentialStrings;
    }

    void handleClientQuotasSnapshot(final ClientQuotasImage clientQuotasImage, final ScramImage scramImage, KRaftMigrationOperationConsumer opConsumer) {
        final HashSet<ClientQuotaEntity> changedNonUserEntities = new HashSet<ClientQuotaEntity>();
        final HashSet<Object> changedUsers = new HashSet<Object>();
        if (clientQuotasImage != null) {
            for (Map.Entry<Object, Object> entry : clientQuotasImage.entities().entrySet()) {
                ClientQuotaEntity entity2 = (ClientQuotaEntity)entry.getKey();
                if (entity2.entries().containsKey("user") && !entity2.entries().containsKey("client-id")) {
                    changedUsers.add(entity2.entries().get("user"));
                    continue;
                }
                changedNonUserEntities.add(entity2);
            }
        }
        if (scramImage != null) {
            for (Map.Entry<Object, Object> entry : scramImage.mechanisms().entrySet()) {
                for (Map.Entry userEntry : ((Map)entry.getValue()).entrySet()) {
                    changedUsers.add(userEntry.getKey());
                }
            }
        }
        this.migrationClient.configClient().iterateClientQuotas(new ConfigMigrationClient.ClientQuotaVisitor(){

            @Override
            public void visitClientQuota(List<ClientQuotaRecord.EntityData> entityDataList, Map<String, Double> quotas) {
                HashMap entityMap = new HashMap(2);
                entityDataList.forEach(entityData -> entityMap.put(entityData.entityType(), entityData.entityName()));
                ClientQuotaEntity entity = new ClientQuotaEntity(entityMap);
                if (!clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap().equals(quotas)) {
                    if (entity.entries().containsKey("user") && !entity.entries().containsKey("client-id")) {
                        changedUsers.add(entityMap.get("user"));
                    } else {
                        changedNonUserEntities.add(entity);
                    }
                }
            }

            @Override
            public void visitScramCredential(String userName, ScramMechanism scramMechanism, ScramCredential scramCredential) {
                ScramCredentialData data = (ScramCredentialData)scramImage.mechanisms().getOrDefault(scramMechanism, Collections.emptyMap()).get(userName);
                if (data == null || !data.toCredential(scramMechanism).equals(scramCredential)) {
                    changedUsers.add(userName);
                }
            }
        });
        changedNonUserEntities.forEach(entity -> {
            Map<String, Double> quotaMap = clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap();
            opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + entity, migrationState -> this.migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, Collections.emptyMap(), migrationState));
        });
        changedUsers.forEach(userName -> {
            ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap("user", userName));
            Map<String, Double> quotaMap = clientQuotasImage.entities().getOrDefault(entity, ClientQuotaImage.EMPTY).quotaMap();
            Map<String, String> scramMap = this.getScramCredentialStringsForUser(scramImage, (String)userName);
            opConsumer.accept(UPDATE_CLIENT_QUOTA, "Update client quotas for " + userName, migrationState -> this.migrationClient.configClient().writeClientQuotas(entity.entries(), quotaMap, scramMap, migrationState));
        });
    }

    void handleProducerIdSnapshot(ProducerIdsImage image, KRaftMigrationOperationConsumer operationConsumer) {
        if (image.isEmpty()) {
            return;
        }
        Optional<ProducerIdsBlock> zkProducerId = this.migrationClient.readProducerId();
        if (zkProducerId.isPresent()) {
            if (zkProducerId.get().nextBlockFirstId() != image.nextProducerId()) {
                operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> this.migrationClient.writeProducerId(image.nextProducerId(), migrationState));
            }
        } else {
            operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> this.migrationClient.writeProducerId(image.nextProducerId(), migrationState));
        }
    }

    void handleConfigsDelta(ConfigurationsImage configsImage, ConfigurationsDelta configsDelta, KRaftMigrationOperationConsumer operationConsumer) {
        Set<ConfigResource> updatedResources = configsDelta.changes().keySet();
        updatedResources.forEach(configResource -> {
            if (!configResource.type().equals((Object)ConfigResource.Type.CLUSTER_LINK)) {
                Map<String, String> props = configsImage.configMapForResource((ConfigResource)configResource);
                if (props.isEmpty()) {
                    operationConsumer.accept("DeleteConfig", "Delete configs for " + configResource, migrationState -> this.migrationClient.configClient().deleteConfigs((ConfigResource)configResource, migrationState));
                } else {
                    operationConsumer.accept("UpdateConfig", "Update configs for " + configResource, migrationState -> this.migrationClient.configClient().writeConfigs((ConfigResource)configResource, props, migrationState));
                }
            }
        });
    }

    void handleClientQuotasDelta(MetadataImage metadataImage, MetadataDelta metadataDelta, KRaftMigrationOperationConsumer operationConsumer) {
        if (metadataDelta.clientQuotasDelta() != null || metadataDelta.scramDelta() != null) {
            HashSet users = new HashSet();
            if (metadataDelta.scramDelta() != null) {
                metadataDelta.scramDelta().changes().forEach((scramMechanism, changes) -> changes.forEach((userName, changeOpt) -> users.add(userName)));
            }
            if (metadataDelta.clientQuotasDelta() != null) {
                metadataDelta.clientQuotasDelta().changes().forEach((clientQuotaEntity, clientQuotaDelta) -> {
                    if (clientQuotaEntity.entries().containsKey("user") && !clientQuotaEntity.entries().containsKey("client-id")) {
                        String userName = (String)clientQuotaEntity.entries().get("user");
                        users.add(userName);
                    } else {
                        Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
                        operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating client quota " + clientQuotaEntity, migrationState -> this.migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, Collections.emptyMap(), migrationState));
                    }
                });
            }
            users.forEach(userName -> {
                Map<String, String> userScramMap = this.getScramCredentialStringsForUser(metadataImage.scram(), (String)userName);
                ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(Collections.singletonMap("user", userName));
                if (metadataImage.clientQuotas() == null || metadataImage.clientQuotas().entities().get(clientQuotaEntity) == null) {
                    operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating scram credentials for " + clientQuotaEntity, migrationState -> this.migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), Collections.emptyMap(), userScramMap, migrationState));
                } else {
                    Map<String, Double> quotaMap = metadataImage.clientQuotas().entities().get(clientQuotaEntity).quotaMap();
                    operationConsumer.accept(UPDATE_CLIENT_QUOTA, "Updating client quota for " + clientQuotaEntity, migrationState -> this.migrationClient.configClient().writeClientQuotas(clientQuotaEntity.entries(), quotaMap, userScramMap, migrationState));
                }
            });
        }
    }

    void handleProducerIdDelta(ProducerIdsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
        operationConsumer.accept(UPDATE_PRODUCER_ID, "Setting next producer ID", migrationState -> this.migrationClient.writeProducerId(delta.nextProducerId(), migrationState));
    }

    private ResourcePattern resourcePatternFromAcl(StandardAcl acl) {
        return new ResourcePattern(acl.resourceType(), acl.resourceName(), acl.patternType());
    }

    private ResourcePattern resourcePatternFromAcl(ConfluentStandardAcl acl) {
        return this.resourcePatternFromAcl(acl.standardAcl());
    }

    void handleAclsSnapshot(AclsImage image, KRaftMigrationOperationConsumer operationConsumer) {
        HashMap allAclsInSnapshot = new HashMap();
        image.acls().values().forEach(confluentStandardAcl -> {
            StandardAcl standardAcl = confluentStandardAcl.standardAcl();
            ResourcePattern resourcePattern = this.resourcePatternFromAcl(standardAcl);
            allAclsInSnapshot.computeIfAbsent(resourcePattern, __ -> new HashMap()).computeIfAbsent(standardAcl, __ -> new HashSet()).add(confluentStandardAcl.clusterLinkId().orElse(Uuid.ZERO_UUID));
        });
        Function<ResourcePattern, Set> aclsForResource = resourcePattern -> ((Map)allAclsInSnapshot.getOrDefault(resourcePattern, new HashMap())).entrySet().stream().map(entry -> {
            StandardAcl standardAcl = (StandardAcl)entry.getKey();
            Set linkIds = (Set)entry.getValue();
            if (linkIds.size() == 1 && linkIds.contains(Uuid.ZERO_UUID)) {
                linkIds = Collections.emptySet();
            }
            return new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType(), linkIds);
        }).collect(Collectors.toSet());
        HashSet newResources = new HashSet(allAclsInSnapshot.keySet());
        HashSet resourcesToDelete = new HashSet();
        HashMap<ResourcePattern, Set> changedResources = new HashMap<ResourcePattern, Set>();
        this.migrationClient.aclClient().iterateAcls((resourcePattern, accessControlEntries) -> {
            newResources.remove(resourcePattern);
            if (!allAclsInSnapshot.containsKey(resourcePattern)) {
                resourcesToDelete.add(resourcePattern);
            } else {
                Set snapshotEntries = (Set)aclsForResource.apply((ResourcePattern)resourcePattern);
                if (!snapshotEntries.equals(accessControlEntries)) {
                    changedResources.put((ResourcePattern)resourcePattern, snapshotEntries);
                }
            }
        });
        newResources.forEach(resourcePattern -> {
            Set accessControlEntries = (Set)aclsForResource.apply((ResourcePattern)resourcePattern);
            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
            operationConsumer.accept(UPDATE_ACL, name, migrationState -> this.migrationClient.aclClient().writeResourceAcls((ResourcePattern)resourcePattern, accessControlEntries, migrationState));
        });
        resourcesToDelete.forEach(deletedResource -> {
            String name = "Deleting resource " + deletedResource + " which has no ACLs in snapshot";
            operationConsumer.accept(DELETE_ACL, name, migrationState -> this.migrationClient.aclClient().deleteResource((ResourcePattern)deletedResource, migrationState));
        });
        changedResources.forEach((resourcePattern, accessControlEntries) -> {
            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
            operationConsumer.accept(UPDATE_ACL, name, migrationState -> this.migrationClient.aclClient().writeResourceAcls((ResourcePattern)resourcePattern, (Collection<AccessControlEntry>)accessControlEntries, migrationState));
        });
    }

    void handleAclsDelta(AclsImage image, AclsDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
        Set resourcesWithChangedAcls = delta.changes().values().stream().filter(Optional::isPresent).map(Optional::get).map(this::resourcePatternFromAcl).collect(Collectors.toSet());
        Set<ResourcePattern> resourcesWithDeletedAcls = delta.deleted().stream().map(this::resourcePatternFromAcl).collect(Collectors.toSet());
        HashMap<ResourcePattern, Map> newAcls = new HashMap<ResourcePattern, Map>();
        image.acls().forEach((uuid, confluentStandardAcl) -> {
            StandardAcl standardAcl = confluentStandardAcl.standardAcl();
            ResourcePattern resourcePattern = this.resourcePatternFromAcl(standardAcl);
            boolean removed = resourcesWithDeletedAcls.remove(resourcePattern);
            if (resourcesWithChangedAcls.contains(resourcePattern) || removed) {
                newAcls.computeIfAbsent(resourcePattern, __ -> new HashMap()).computeIfAbsent(standardAcl, __ -> new HashSet()).add(confluentStandardAcl.clusterLinkId().orElse(Uuid.ZERO_UUID));
            }
        });
        resourcesWithDeletedAcls.forEach(deletedResource -> {
            String name = "Deleting resource " + deletedResource + " which has no more ACLs";
            operationConsumer.accept(DELETE_ACL, name, migrationState -> this.migrationClient.aclClient().deleteResource((ResourcePattern)deletedResource, migrationState));
        });
        newAcls.forEach((resourcePattern, aclMap) -> {
            Collection accessControlEntries = aclMap.entrySet().stream().map(entry -> {
                StandardAcl standardAcl = (StandardAcl)entry.getKey();
                Set linkIds = (Set)entry.getValue();
                if (linkIds.size() == 1 && linkIds.contains(Uuid.ZERO_UUID)) {
                    linkIds = Collections.emptySet();
                }
                return new AccessControlEntry(standardAcl.principal(), standardAcl.host(), standardAcl.operation(), standardAcl.permissionType(), linkIds);
            }).collect(Collectors.toSet());
            String name = "Writing " + accessControlEntries.size() + " for resource " + resourcePattern;
            operationConsumer.accept(UPDATE_ACL, name, migrationState -> this.migrationClient.aclClient().writeResourceAcls((ResourcePattern)resourcePattern, accessControlEntries, migrationState));
        });
    }

    void handleReplicaExclusionImage(BrokerReplicaExclusionsImage exclusionsImage, Set<Integer> registeredBrokers, KRaftMigrationOperationConsumer operationConsumer) {
        Optional<Set<BrokerReplicaExclusion>> zkExclusions = this.migrationClient.replicaExclusionClient().readReplicaExclusions(registeredBrokers);
        Set kraftExclusions = registeredBrokers.stream().filter(registeredBroker -> exclusionsImage.activeBrokerReplicaExclusions().containsKey(registeredBroker)).map(registeredBroker -> new BrokerReplicaExclusion((int)registeredBroker, exclusionsImage.activeBrokerReplicaExclusions().get(registeredBroker))).collect(Collectors.toSet());
        if (!zkExclusions.orElseGet(Collections::emptySet).equals(kraftExclusions)) {
            String replicaExclusionOperation = !zkExclusions.isPresent() ? CREATE_REPLICA_EXCLUSIONS : UPDATE_REPLICA_EXCLUSIONS;
            operationConsumer.accept(replicaExclusionOperation, "Changed replica exclusions to " + kraftExclusions, migrationState -> this.migrationClient.replicaExclusionClient().writeReplicaExclusions(kraftExclusions, migrationState, !zkExclusions.isPresent()));
        }
    }
}

