/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.ComponentHealthStatus;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.common.PartitionPlacementStrategy;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
import org.apache.kafka.common.errors.ClusterLinkNotFoundException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.AllocateProducerIdsRequestData;
import org.apache.kafka.common.message.AllocateProducerIdsResponseData;
import org.apache.kafka.common.message.AlterBrokerHealthRequestData;
import org.apache.kafka.common.message.AlterBrokerHealthResponseData;
import org.apache.kafka.common.message.AlterBrokerReplicaExclusionsRequestData;
import org.apache.kafka.common.message.AlterCellResponseData;
import org.apache.kafka.common.message.AlterMirrorTopicsRequestData;
import org.apache.kafka.common.message.AlterMirrorTopicsResponseData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.AlterPartitionRequestData;
import org.apache.kafka.common.message.AlterPartitionResponseData;
import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.AssignBrokersToCellResponseData;
import org.apache.kafka.common.message.AssignTenantsToCellResponseData;
import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.message.CreateCellResponseData;
import org.apache.kafka.common.message.CreateClusterLinksRequestData;
import org.apache.kafka.common.message.CreateClusterLinksResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.DeleteCellResponseData;
import org.apache.kafka.common.message.DeleteClusterLinksRequestData;
import org.apache.kafka.common.message.DeleteClusterLinksResponseData;
import org.apache.kafka.common.message.DeleteTenantsResponseData;
import org.apache.kafka.common.message.DescribeBrokerHealthResponseData;
import org.apache.kafka.common.message.DescribeCellsResponseData;
import org.apache.kafka.common.message.DescribeTenantsResponseData;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.UnAssignBrokersFromCellResponseData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData;
import org.apache.kafka.common.metadata.AccessControlEntryRecord;
import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
import org.apache.kafka.common.metadata.BrokerReplicaExclusionRecord;
import org.apache.kafka.common.metadata.CellRecord;
import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.ClusterLinkRecord;
import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.EncryptedEnvelopeRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.InstallMetadataEncryptorRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
import org.apache.kafka.common.metadata.MirrorTopicChangeRecord;
import org.apache.kafka.common.metadata.MirrorTopicRecord;
import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
import org.apache.kafka.common.metadata.RemoveCellRecord;
import org.apache.kafka.common.metadata.RemoveClusterLinkRecord;
import org.apache.kafka.common.metadata.RemoveTenantRecord;
import org.apache.kafka.common.metadata.RemoveTopicRecord;
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.TenantRecord;
import org.apache.kafka.common.metadata.TopicRecord;
import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.metadata.ZkMigrationStateRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.requests.AlterCellRequest;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.AssignBrokersToCellRequest;
import org.apache.kafka.common.requests.AssignTenantsToCellRequest;
import org.apache.kafka.common.requests.CreateCellRequest;
import org.apache.kafka.common.requests.DeleteCellRequest;
import org.apache.kafka.common.requests.DeleteTenantsRequest;
import org.apache.kafka.common.requests.DescribeCellsRequest;
import org.apache.kafka.common.requests.DescribeTenantsRequest;
import org.apache.kafka.common.requests.UnAssignBrokersFromCellRequest;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.AclControlManager;
import org.apache.kafka.controller.CellControlManager;
import org.apache.kafka.controller.ClientQuotaControlManager;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ClusterLinkControlManager;
import org.apache.kafka.controller.ConfigurationControlManager;
import org.apache.kafka.controller.ConfigurationValidator;
import org.apache.kafka.controller.Controller;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.ControllerResultAndOffset;
import org.apache.kafka.controller.EncryptionControlManager;
import org.apache.kafka.controller.FeatureControlManager;
import org.apache.kafka.controller.LogReplayTracker;
import org.apache.kafka.controller.MirrorTopicControlManager;
import org.apache.kafka.controller.ProducerIdControlManager;
import org.apache.kafka.controller.QuorumFeatures;
import org.apache.kafka.controller.ReplicationControlManager;
import org.apache.kafka.controller.ResultOrError;
import org.apache.kafka.controller.ScramControlManager;
import org.apache.kafka.controller.TenantControlManager;
import org.apache.kafka.controller.errors.ControllerExceptions;
import org.apache.kafka.controller.metrics.CellMetrics;
import org.apache.kafka.controller.metrics.ControllerMetrics;
import org.apache.kafka.controller.metrics.ControllerMetricsManager;
import org.apache.kafka.deferred.DeferredEvent;
import org.apache.kafka.deferred.DeferredEventQueue;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.DegradedBrokerHealthState;
import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.TopicPlacement;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.migration.ZkMigrationState;
import org.apache.kafka.metadata.migration.ZkRecordConsumer;
import org.apache.kafka.metadata.placement.CellAssignor;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.util.RecordRedactor;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.AlterReplicaExclusionOp;
import org.apache.kafka.server.common.AlterReplicaExclusionsReply;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateClusterLinkPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.snapshot.SnapshotReader;
import org.apache.kafka.snapshot.Snapshots;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;

public final class QuorumController
implements Controller {
    private static final int MAX_RECORDS_PER_BATCH = 10000;
    static final int MAX_RECORDS_PER_USER_OP = 10000;
    public static final String CONTROLLER_THREAD_SUFFIX = "QuorumControllerEventHandler";
    static final String MAYBE_FENCE_REPLICAS = "maybeFenceReplicas";
    private static final String MAYBE_BALANCE_PARTITION_LEADERS = "maybeBalancePartitionLeaders";
    private static final String LEADER_ELECTION_FOR_DEMOTED_BROKERS = "leaderElectionForDemotedBrokers";
    private static final String LEADER_ELECTION_FOR_PROMOTED_BROKERS = "leaderElectionForPromotedBrokers";
    private static final String WRITE_NO_OP_RECORD = "writeNoOpRecord";
    private final FaultHandler nonFatalFaultHandler;
    private final FaultHandler fatalFaultHandler;
    private final Logger log;
    private final int nodeId;
    private final String clusterId;
    private final KafkaEventQueue queue;
    private final Time time;
    private final ControllerMetrics controllerMetrics;
    private final CellMetrics cellMetrics;
    private final ControllerMetricsManager controllerMetricsManager;
    private final SnapshotRegistry snapshotRegistry;
    private final DeferredEventQueue deferredEventQueue;
    private final Consumer<ConfigResource> resourceExists;
    private final EncryptionControlManager encryptionControl;
    private final ConfigurationControlManager configurationControl;
    private final ClientQuotaControlManager clientQuotaControlManager;
    private final ClusterControlManager clusterControl;
    private final FeatureControlManager featureControl;
    private final ProducerIdControlManager producerIdControlManager;
    private final ReplicationControlManager replicationControl;
    private final ScramControlManager scramControlManager;
    private final Optional<ClusterMetadataAuthorizer> authorizer;
    private final Map<String, Object> staticConfig;
    private final AclControlManager aclControlManager;
    private final LogReplayTracker logReplayTracker;
    private final ClusterLinkControlManager clusterLinkControl;
    private final MirrorTopicControlManager mirrorTopicControl;
    private final CellControlManager cellControl;
    private final TenantControlManager tenantControl;
    private final RaftClient<ApiMessageAndVersion> raftClient;
    private QuorumMetaLogListener metaLogListener;
    private volatile int curClaimEpoch;
    private long lastCommittedOffset = -1L;
    private int lastCommittedEpoch = -1;
    private long lastCommittedTimestamp = -1L;
    private boolean needToCompleteAuthorizerLoad;
    private long writeOffset;
    private long oldestNonSnapshottedTimestamp = Long.MAX_VALUE;
    private final OptionalLong leaderImbalanceCheckIntervalNs;
    private final OptionalLong maxIdleIntervalNs;
    private ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED;
    private boolean noOpRecordScheduled = false;
    private final BootstrapMetadata bootstrapMetadata;
    private final ZkRecordConsumer zkRecordConsumer;
    private final boolean zkMigrationEnabled;
    private final int maxRecordsPerBatch;
    private final RecordRedactor recordRedactor;

    private OptionalInt latestController() {
        return this.raftClient.leaderAndEpoch().leaderId();
    }

    private long currentReadOffset() {
        if (this.isActiveController()) {
            return this.lastCommittedOffset;
        }
        return Long.MAX_VALUE;
    }

    private void handleEventEnd(String name, long startProcessingTimeNs) {
        long endProcessingTime = this.time.nanoseconds();
        long deltaNs = endProcessingTime - startProcessingTimeNs;
        this.log.debug("Processed {} in {} us", (Object)name, (Object)TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS));
        this.controllerMetrics.updateEventQueueProcessingTime(TimeUnit.NANOSECONDS.toMillis(deltaNs));
    }

    private Throwable handleEventException(String name, OptionalLong startProcessingTimeNs, Throwable exception) {
        Throwable externalException = ControllerExceptions.toExternalException(exception, () -> this.latestController());
        if (!startProcessingTimeNs.isPresent()) {
            this.log.error("{}: unable to start processing because of {}. Reason: {}", new Object[]{name, exception.getClass().getSimpleName(), exception.getMessage()});
            return externalException;
        }
        long endProcessingTime = this.time.nanoseconds();
        long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
        long deltaUs = TimeUnit.MICROSECONDS.convert(deltaNs, TimeUnit.NANOSECONDS);
        if (ControllerExceptions.isExpected(exception)) {
            this.log.info("{}: failed with {} in {} us. Reason: {}", new Object[]{name, exception.getClass().getSimpleName(), deltaUs, exception.getMessage()});
            return externalException;
        }
        if (this.isActiveController()) {
            this.nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server exception %s at epoch %d in %d us. Renouncing leadership and reverting to the last committed offset %d.", name, exception.getClass().getSimpleName(), this.curClaimEpoch, deltaUs, this.lastCommittedOffset), exception);
            this.renounce();
        } else {
            this.nonFatalFaultHandler.handleFault(String.format("%s: failed with unexpected server exception %s in %d us. The controller is already in standby mode.", name, exception.getClass().getSimpleName(), deltaUs), exception);
        }
        return externalException;
    }

    private void appendControlEvent(String name, Runnable handler) {
        ControllerEvent event = new ControllerEvent(name, handler);
        this.queue.append((EventQueue.Event)event);
    }

    ReplicationControlManager replicationControl() {
        return this.replicationControl;
    }

    ClusterControlManager clusterControl() {
        return this.clusterControl;
    }

    FeatureControlManager featureControl() {
        return this.featureControl;
    }

    ConfigurationControlManager configurationControl() {
        return this.configurationControl;
    }

    public ZkRecordConsumer zkRecordConsumer() {
        return this.zkRecordConsumer;
    }

    public ClusterLinkControlManager clusterLinkControlManager() {
        return this.clusterLinkControl;
    }

    <T> CompletableFuture<T> appendReadEvent(String name, OptionalLong deadlineNs, Supplier<T> handler) {
        ControllerReadEvent<T> event = new ControllerReadEvent<T>(name, handler);
        if (deadlineNs.isPresent()) {
            this.queue.appendWithDeadline(deadlineNs.getAsLong(), event);
        } else {
            this.queue.append(event);
        }
        return event.future();
    }

    static long appendRecords(Logger log, ControllerResult<?> result, int maxRecordsPerBatch, Function<List<ApiMessageAndVersion>, Long> appender) {
        try {
            List<ApiMessageAndVersion> records = result.records();
            if (result.isAtomic()) {
                if (records.size() > maxRecordsPerBatch) {
                    throw new IllegalStateException("Attempted to atomically commit " + records.size() + " records, but maxRecordsPerBatch is " + maxRecordsPerBatch);
                }
                long offset = appender.apply(records);
                if (log.isTraceEnabled()) {
                    log.trace("Atomically appended {} record(s) ending with offset {}.", (Object)records.size(), (Object)offset);
                }
                return offset;
            }
            int startIndex = 0;
            int numBatches = 0;
            while (true) {
                ++numBatches;
                int endIndex = startIndex + maxRecordsPerBatch;
                if (endIndex > records.size()) {
                    long offset = appender.apply(records.subList(startIndex, records.size()));
                    if (log.isTraceEnabled()) {
                        log.trace("Appended {} record(s) in {} batch(es), ending with offset {}.", new Object[]{records.size(), numBatches, offset});
                    }
                    return offset;
                }
                appender.apply(records.subList(startIndex, endIndex));
                startIndex += maxRecordsPerBatch;
            }
        }
        catch (ApiException e) {
            throw new RuntimeException(e);
        }
    }

    <T> CompletableFuture<T> appendWriteEvent(String name, OptionalLong deadlineNs, ControllerWriteOperation<T> op) {
        return this.appendWriteEvent(name, deadlineNs, op, EnumSet.noneOf(ControllerOperationFlag.class));
    }

    <T> CompletableFuture<T> appendWriteEvent(String name, OptionalLong deadlineNs, ControllerWriteOperation<T> op, EnumSet<ControllerOperationFlag> flags) {
        ControllerWriteEvent<T> event = new ControllerWriteEvent<T>(name, op, flags);
        if (deadlineNs.isPresent()) {
            this.queue.appendWithDeadline(deadlineNs.getAsLong(), event);
        } else {
            this.queue.append(event);
        }
        return event.future();
    }

    private void maybeCompleteAuthorizerInitialLoad() {
        if (!this.needToCompleteAuthorizerLoad) {
            return;
        }
        OptionalLong highWatermark = this.raftClient.highWatermark();
        if (highWatermark.isPresent()) {
            if (this.lastCommittedOffset + 1L >= highWatermark.getAsLong()) {
                this.log.info("maybeCompleteAuthorizerInitialLoad: completing authorizer initial load at last committed offset {}.", (Object)this.lastCommittedOffset);
                this.authorizer.get().completeInitialLoad();
                this.needToCompleteAuthorizerLoad = false;
            } else {
                this.log.trace("maybeCompleteAuthorizerInitialLoad: can't proceed because lastCommittedOffset  = {}, but highWatermark = {}.", (Object)this.lastCommittedOffset, (Object)highWatermark.getAsLong());
            }
        } else {
            this.log.trace("maybeCompleteAuthorizerInitialLoad: highWatermark not set.");
        }
    }

    private boolean isActiveController() {
        return QuorumController.isActiveController(this.curClaimEpoch);
    }

    private static boolean isActiveController(int claimEpoch) {
        return claimEpoch != -1;
    }

    private void updateWriteOffset(long offset) {
        this.writeOffset = offset;
        if (this.isActiveController()) {
            this.controllerMetrics.setLastAppliedRecordOffset(this.writeOffset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(this.time.milliseconds());
        } else {
            this.controllerMetrics.setLastAppliedRecordOffset(this.lastCommittedOffset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(this.lastCommittedTimestamp);
        }
    }

    private void claim(int epoch) {
        try {
            long startMs = this.time.milliseconds();
            if (this.curClaimEpoch != -1) {
                throw new RuntimeException("Cannot claim leadership because we are already the active controller.");
            }
            this.curClaimEpoch = epoch;
            this.controllerMetrics.setActive(true);
            this.updateWriteOffset(this.lastCommittedOffset);
            this.clusterControl.activate();
            this.snapshotRegistry.getOrCreateSnapshot(this.lastCommittedOffset);
            ControllerWriteEvent<Void> activationEvent = new ControllerWriteEvent<Void>("completeActivation[" + epoch + "]", new CompleteActivationEvent(), EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME, ControllerOperationFlag.RUNS_IN_PREMIGRATION));
            this.queue.prepend(activationEvent);
            this.queue.append(new ControllerWriteEvent(LEADER_ELECTION_FOR_DEMOTED_BROKERS, this.replicationControl::tryUnelectDemotedLeaders, EnumSet.noneOf(ControllerOperationFlag.class)));
            this.appendControlEvent("updateControllerLoadTime", () -> this.controllerMetrics.recordControllerLoadTime(startMs, this.time.milliseconds()));
        }
        catch (Throwable e) {
            this.fatalFaultHandler.handleFault("exception while claiming leadership", e);
        }
    }

    public static List<ApiMessageAndVersion> generateActivationRecords(Logger log, boolean isLogEmpty, boolean zkMigrationEnabled, BootstrapMetadata bootstrapMetadata, FeatureControlManager featureControl) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>();
        if (isLogEmpty) {
            log.info("The metadata log appears to be empty. Appending {} bootstrap record(s) at metadata.version {} from {}.", new Object[]{bootstrapMetadata.records().size(), bootstrapMetadata.metadataVersion(), bootstrapMetadata.source()});
            records.addAll(bootstrapMetadata.records());
            if (bootstrapMetadata.metadataVersion().isMigrationSupported()) {
                if (zkMigrationEnabled) {
                    log.info("Putting the controller into pre-migration mode. No metadata updates will be allowed until the ZK metadata has been migrated");
                    records.add(ZkMigrationState.PRE_MIGRATION.toRecord());
                } else {
                    log.debug("Setting the ZK migration state to NONE since this is a de-novo KRaft cluster.");
                    records.add(ZkMigrationState.NONE.toRecord());
                }
            } else if (zkMigrationEnabled) {
                throw new RuntimeException("The bootstrap metadata.version " + bootstrapMetadata.metadataVersion() + " does not support ZK migrations. Cannot continue with ZK migrations enabled.");
            }
        } else {
            if (featureControl.metadataVersion().equals((Object)MetadataVersion.MINIMUM_KRAFT_VERSION)) {
                log.info("No metadata.version feature level record was found in the log. Treating the log as version {}.", (Object)MetadataVersion.MINIMUM_KRAFT_VERSION);
            }
            if (featureControl.metadataVersion().isMigrationSupported()) {
                log.info("Loaded ZK migration state of {}", (Object)featureControl.zkMigrationState());
                switch (featureControl.zkMigrationState()) {
                    case NONE: {
                        if (!zkMigrationEnabled) break;
                        throw new RuntimeException("Should not have ZK migrations enabled on a cluster that was created in KRaft mode.");
                    }
                    case PRE_MIGRATION: {
                        log.warn("Activating pre-migration controller without empty log. There may be a partial migration");
                        break;
                    }
                    case MIGRATION: {
                        if (!zkMigrationEnabled) {
                            log.warn("Completing the ZK migration since this controller was configured with 'zookeeper.metadata.migration.enable' set to 'false'.");
                            records.add(ZkMigrationState.POST_MIGRATION.toRecord());
                            break;
                        }
                        log.info("Staying in the ZK migration since 'zookeeper.metadata.migration.enable' is still 'true'.");
                        break;
                    }
                    case POST_MIGRATION: {
                        if (!zkMigrationEnabled) break;
                        log.info("Ignoring 'zookeeper.metadata.migration.enable' value of 'true' since the ZK migrationhas been completed.");
                    }
                }
            } else if (zkMigrationEnabled) {
                throw new RuntimeException("Should not have ZK migrations enabled on a cluster running metadata.version " + featureControl.metadataVersion());
            }
        }
        return records;
    }

    private void updateLastCommittedState(long offset, int epoch, long timestamp) {
        this.lastCommittedOffset = offset;
        this.lastCommittedEpoch = epoch;
        this.lastCommittedTimestamp = timestamp;
        this.controllerMetrics.setLastCommittedRecordOffset(offset);
        if (!this.isActiveController()) {
            this.controllerMetrics.setLastAppliedRecordOffset(offset);
            this.controllerMetrics.setLastAppliedRecordTimestamp(timestamp);
        }
    }

    private void renounce() {
        try {
            if (this.curClaimEpoch == -1) {
                throw new RuntimeException("Cannot renounce leadership because we are not the current leader.");
            }
            this.raftClient.resign(this.curClaimEpoch);
            this.curClaimEpoch = -1;
            this.controllerMetrics.setActive(false);
            this.deferredEventQueue.failAll((Exception)ControllerExceptions.newWrongControllerException(OptionalInt.empty()));
            if (!this.snapshotRegistry.hasSnapshot(this.lastCommittedOffset)) {
                throw new RuntimeException("Unable to find last committed offset " + this.lastCommittedEpoch + " in snapshot registry.");
            }
            this.snapshotRegistry.revertToSnapshot(this.lastCommittedOffset);
            this.authorizer.ifPresent(a -> a.loadSnapshot(this.aclControlManager.idToAcl()));
            this.replicationControl.resetConfluentPartitionsPerTopicListener();
            this.updateWriteOffset(-1L);
            this.clusterControl.deactivate();
            this.cancelMaybeFenceReplicas();
            this.cancelMaybeBalancePartitionLeaders();
            this.cancelNextWriteNoOpRecord();
        }
        catch (Throwable e) {
            this.fatalFaultHandler.handleFault("exception while renouncing leadership", e);
        }
    }

    private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs, ControllerWriteOperation<T> op, EnumSet<ControllerOperationFlag> flags) {
        if (!flags.contains((Object)ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME)) {
            throw new RuntimeException("deferred events should not update the queue time.");
        }
        ControllerWriteEvent<T> event = new ControllerWriteEvent<T>(name, op, flags);
        this.queue.scheduleDeferred(name, (Function)new EventQueue.EarliestDeadlineFunction(deadlineNs), event);
        ((ControllerWriteEvent)event).future.exceptionally(e -> {
            if (ControllerExceptions.isTimeoutException(e)) {
                this.log.error("Cancelling deferred write event {} because the event queue is now closed.", (Object)name);
                return null;
            }
            if (e instanceof NotControllerException) {
                this.log.debug("Cancelling deferred write event {} because this controller is no longer active.", (Object)name);
                return null;
            }
            this.log.error("Unexpected exception while executing deferred write event {}. Rescheduling for a minute from now.", (Object)name, e);
            this.scheduleDeferredWriteEvent(name, deadlineNs + TimeUnit.NANOSECONDS.convert(1L, TimeUnit.MINUTES), op, flags);
            return null;
        });
    }

    private void rescheduleMaybeFenceStaleBrokers() {
        long nextCheckTimeNs = this.clusterControl.heartbeatManager().nextCheckTimeNs();
        if (nextCheckTimeNs == Long.MAX_VALUE) {
            this.cancelMaybeFenceReplicas();
            return;
        }
        this.scheduleDeferredWriteEvent(MAYBE_FENCE_REPLICAS, nextCheckTimeNs, () -> {
            ControllerResult<Void> result = this.replicationControl.maybeFenceOneStaleBroker();
            this.rescheduleMaybeFenceStaleBrokers();
            return result;
        }, EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME, ControllerOperationFlag.RUNS_IN_PREMIGRATION));
    }

    private void cancelMaybeFenceReplicas() {
        this.queue.cancelDeferred(MAYBE_FENCE_REPLICAS);
    }

    private void maybeScheduleNextBalancePartitionLeaders() {
        if (this.imbalancedScheduled != ImbalanceSchedule.SCHEDULED && this.leaderImbalanceCheckIntervalNs.isPresent() && this.replicationControl.arePartitionLeadersImbalanced()) {
            this.log.debug("Scheduling write event for {} because scheduled ({}), checkIntervalNs ({}) and isImbalanced ({})", new Object[]{MAYBE_BALANCE_PARTITION_LEADERS, this.imbalancedScheduled, this.leaderImbalanceCheckIntervalNs, this.replicationControl.arePartitionLeadersImbalanced()});
            ControllerWriteEvent event = new ControllerWriteEvent(MAYBE_BALANCE_PARTITION_LEADERS, () -> {
                ControllerResult<Boolean> result = this.replicationControl.maybeBalancePartitionLeaders();
                boolean additionalBalancingNeeded = result.response();
                this.imbalancedScheduled = additionalBalancingNeeded ? ImbalanceSchedule.RETRY_AFTER_BACKOFF : ImbalanceSchedule.DEFERRED;
                return result;
            }, EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME));
            long delayNs = this.time.nanoseconds();
            delayNs = this.imbalancedScheduled == ImbalanceSchedule.DEFERRED ? (delayNs += this.leaderImbalanceCheckIntervalNs.getAsLong()) : (delayNs += TimeUnit.NANOSECONDS.convert(100L, TimeUnit.MILLISECONDS));
            this.queue.scheduleDeferred(MAYBE_BALANCE_PARTITION_LEADERS, (Function)new EventQueue.EarliestDeadlineFunction(delayNs), event);
            this.imbalancedScheduled = ImbalanceSchedule.SCHEDULED;
        }
    }

    private void cancelMaybeBalancePartitionLeaders() {
        this.imbalancedScheduled = ImbalanceSchedule.DEFERRED;
        this.queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);
    }

    private void maybeScheduleNextWriteNoOpRecord() {
        if (!this.noOpRecordScheduled && this.maxIdleIntervalNs.isPresent() && this.featureControl.metadataVersion().isNoOpRecordSupported()) {
            this.log.debug("Scheduling write event for {} because maxIdleIntervalNs ({}) and metadataVersion ({})", new Object[]{WRITE_NO_OP_RECORD, this.maxIdleIntervalNs.getAsLong(), this.featureControl.metadataVersion()});
            ControllerWriteEvent event = new ControllerWriteEvent(WRITE_NO_OP_RECORD, () -> {
                this.noOpRecordScheduled = false;
                this.maybeScheduleNextWriteNoOpRecord();
                return ControllerResult.of(Arrays.asList(new ApiMessageAndVersion((ApiMessage)new NoOpRecord(), 0)), null);
            }, EnumSet.of(ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME, ControllerOperationFlag.RUNS_IN_PREMIGRATION));
            long delayNs = this.time.nanoseconds() + this.maxIdleIntervalNs.getAsLong();
            this.queue.scheduleDeferred(WRITE_NO_OP_RECORD, (Function)new EventQueue.EarliestDeadlineFunction(delayNs), event);
            this.noOpRecordScheduled = true;
        }
    }

    private void cancelNextWriteNoOpRecord() {
        this.noOpRecordScheduled = false;
        this.queue.cancelDeferred(WRITE_NO_OP_RECORD);
    }

    private void handleFeatureControlChange() {
        if (this.isActiveController()) {
            if (this.featureControl.metadataVersion().isNoOpRecordSupported()) {
                this.maybeScheduleNextWriteNoOpRecord();
            } else {
                this.cancelNextWriteNoOpRecord();
            }
        }
    }

    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long batchLastOffset) {
        if (this.log.isTraceEnabled()) {
            if (snapshotId.isPresent()) {
                this.log.trace("Replaying snapshot {} record {}", (Object)Snapshots.filenameFromSnapshotId((OffsetAndEpoch)snapshotId.get()), (Object)this.recordRedactor.toLoggableString(message));
            } else {
                this.log.trace("Replaying log record {} with batchLastOffset {}", (Object)this.recordRedactor.toLoggableString(message), (Object)batchLastOffset);
            }
        }
        this.logReplayTracker.replay(message);
        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
        if (type == MetadataRecordType.ENCRYPTED_ENVELOPE_RECORD) {
            message = this.encryptionControl.decrypt((EncryptedEnvelopeRecord)message).message();
            type = MetadataRecordType.fromId(message.apiKey());
        }
        switch (type) {
            case REGISTER_BROKER_RECORD: {
                this.clusterControl.replay((RegisterBrokerRecord)message, batchLastOffset);
                break;
            }
            case UNREGISTER_BROKER_RECORD: {
                this.clusterControl.replay((UnregisterBrokerRecord)message);
                break;
            }
            case TOPIC_RECORD: {
                this.replicationControl.replay((TopicRecord)message);
                break;
            }
            case PARTITION_RECORD: {
                this.replicationControl.replay((PartitionRecord)message);
                break;
            }
            case CONFIG_RECORD: {
                this.configurationControl.replay((ConfigRecord)message);
                break;
            }
            case PARTITION_CHANGE_RECORD: {
                this.replicationControl.replay((PartitionChangeRecord)message);
                break;
            }
            case FENCE_BROKER_RECORD: {
                this.clusterControl.replay((FenceBrokerRecord)message);
                break;
            }
            case UNFENCE_BROKER_RECORD: {
                this.clusterControl.replay((UnfenceBrokerRecord)message);
                break;
            }
            case REMOVE_TOPIC_RECORD: {
                this.replicationControl.replay((RemoveTopicRecord)message);
                break;
            }
            case FEATURE_LEVEL_RECORD: {
                this.featureControl.replay((FeatureLevelRecord)message);
                this.handleFeatureControlChange();
                break;
            }
            case CLIENT_QUOTA_RECORD: {
                this.clientQuotaControlManager.replay((ClientQuotaRecord)message);
                break;
            }
            case PRODUCER_IDS_RECORD: {
                this.producerIdControlManager.replay((ProducerIdsRecord)message);
                break;
            }
            case BROKER_REGISTRATION_CHANGE_RECORD: {
                this.clusterControl.replay((BrokerRegistrationChangeRecord)message);
                break;
            }
            case ACCESS_CONTROL_ENTRY_RECORD: {
                this.aclControlManager.replay((AccessControlEntryRecord)message, snapshotId);
                break;
            }
            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD: {
                this.aclControlManager.replay((RemoveAccessControlEntryRecord)message, snapshotId);
                break;
            }
            case USER_SCRAM_CREDENTIAL_RECORD: {
                this.scramControlManager.replay((UserScramCredentialRecord)message);
                break;
            }
            case REMOVE_USER_SCRAM_CREDENTIAL_RECORD: {
                this.scramControlManager.replay((RemoveUserScramCredentialRecord)message);
                break;
            }
            case NO_OP_RECORD: {
                break;
            }
            case ZK_MIGRATION_STATE_RECORD: {
                this.featureControl.replay((ZkMigrationStateRecord)message);
                break;
            }
            case BROKER_REPLICA_EXCLUSION_RECORD: {
                this.clusterControl.replay((BrokerReplicaExclusionRecord)message);
                break;
            }
            case ENCRYPTED_ENVELOPE_RECORD: {
                throw new RuntimeException("Nested encrypted envelope records are not supported.");
            }
            case INSTALL_METADATA_ENCRYPTOR_RECORD: {
                this.encryptionControl.replay((InstallMetadataEncryptorRecord)message);
                break;
            }
            case CLUSTER_LINK_RECORD: {
                this.clusterLinkControl.replay((ClusterLinkRecord)message);
                break;
            }
            case REMOVE_CLUSTER_LINK_RECORD: {
                this.clusterLinkControl.replay((RemoveClusterLinkRecord)message, Optional.empty());
                break;
            }
            case MIRROR_TOPIC_RECORD: {
                this.mirrorTopicControl.replay((MirrorTopicRecord)message);
                break;
            }
            case MIRROR_TOPIC_CHANGE_RECORD: {
                this.mirrorTopicControl.replay((MirrorTopicChangeRecord)message);
                break;
            }
            case CELL_RECORD: {
                this.cellControl.replay((CellRecord)message);
                break;
            }
            case REMOVE_CELL_RECORD: {
                this.cellControl.replay((RemoveCellRecord)message);
                break;
            }
            case TENANT_RECORD: {
                this.tenantControl.replay((TenantRecord)message);
                break;
            }
            case REMOVE_TENANT_RECORD: {
                this.tenantControl.replay((RemoveTenantRecord)message);
                break;
            }
            default: {
                throw new RuntimeException("Unhandled record type " + (Object)((Object)type));
            }
        }
    }

    private void resetToEmptyState() {
        this.snapshotRegistry.reset();
        this.controllerMetricsManager.reset();
        this.replicationControl.resetConfluentPartitionsPerTopicListener();
        this.updateLastCommittedState(-1L, -1, -1L);
    }

    private QuorumController(FaultHandler nonFatalFaultHandler, FaultHandler fatalFaultHandler, LogContext logContext, int nodeId, String clusterId, KafkaEventQueue queue, Time time, KafkaConfigSchema configSchema, RaftClient<ApiMessageAndVersion> raftClient, QuorumFeatures quorumFeatures, short defaultReplicationFactor, int defaultNumPartitions, ReplicaPlacer replicaPlacer, OptionalLong leaderImbalanceCheckIntervalNs, OptionalLong maxIdleIntervalNs, long sessionTimeoutNs, ControllerMetrics controllerMetrics, Optional<CreateTopicPolicy> createTopicPolicy, Optional<AlterConfigPolicy> alterConfigPolicy, ConfigurationValidator configurationValidator, Optional<ClusterMetadataAuthorizer> authorizer, Map<String, Object> staticConfig, BootstrapMetadata bootstrapMetadata, Function<String, String> nameToTenantCallback, int defaultMinIsrCount, int maxRecordsPerBatch, boolean zkMigrationEnabled, Optional<CreateClusterLinkPolicy> createClusterLinkPolicy, CellAssignor cellAssignor, PartitionPlacementStrategy partitionPlacementStrategy, boolean isImplicitCellCreationEnabled, Optional<TopicPlacement> defaultTopicPlacement, CellMetrics cellMetrics) {
        this.nonFatalFaultHandler = nonFatalFaultHandler;
        this.fatalFaultHandler = fatalFaultHandler;
        this.log = logContext.logger(QuorumController.class);
        this.nodeId = nodeId;
        this.clusterId = clusterId;
        this.queue = queue;
        this.time = time;
        this.controllerMetrics = controllerMetrics;
        this.cellMetrics = cellMetrics;
        this.controllerMetricsManager = new ControllerMetricsManager(controllerMetrics, cellMetrics, Optional.ofNullable(nameToTenantCallback), defaultMinIsrCount, defaultReplicationFactor);
        this.snapshotRegistry = new SnapshotRegistry(logContext);
        this.deferredEventQueue = new DeferredEventQueue(logContext);
        this.resourceExists = new ConfigResourceExistenceChecker();
        this.encryptionControl = new EncryptionControlManager(logContext, this.snapshotRegistry, staticConfig);
        this.clientQuotaControlManager = new ClientQuotaControlManager(this.snapshotRegistry);
        this.featureControl = new FeatureControlManager.Builder().setLogContext(logContext).setQuorumFeatures(quorumFeatures).setSnapshotRegistry(this.snapshotRegistry).setMetadataVersion(MetadataVersion.MINIMUM_KRAFT_VERSION).build();
        this.cellControl = new CellControlManager(logContext, this.snapshotRegistry, this.featureControl, cellAssignor, Short.parseShort(staticConfig.getOrDefault("confluent.cells.default.size", (short)15).toString()), Short.parseShort(staticConfig.getOrDefault("confluent.cells.min.size", (short)6).toString()), Short.parseShort(staticConfig.getOrDefault("confluent.cells.max.size", (short)15).toString()), defaultReplicationFactor, isImplicitCellCreationEnabled, Boolean.parseBoolean(staticConfig.getOrDefault("confluent.cells.enable", false).toString()));
        this.tenantControl = new TenantControlManager(logContext, this.featureControl, this.cellControl, partitionPlacementStrategy, defaultReplicationFactor);
        this.clusterControl = new ClusterControlManager.Builder().setLogContext(logContext).setClusterId(clusterId).setTime(time).setSnapshotRegistry(this.snapshotRegistry).setSessionTimeoutNs(sessionTimeoutNs).setReplicaPlacer(replicaPlacer).setControllerMetrics(controllerMetrics).setFeatureControlManager(this.featureControl).setZkMigrationEnabled(zkMigrationEnabled).setCellControlManager(this.cellControl).build();
        this.configurationControl = new ConfigurationControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).setKafkaConfigSchema(configSchema).setExistenceChecker(this.resourceExists).setAlterConfigPolicy(alterConfigPolicy).setValidator(configurationValidator).setStaticConfig(staticConfig).setNodeId(nodeId).setEncryptionControlManager(this.encryptionControl).setUsableBrokers(() -> this.clusterControl.usableBrokers()).setIsTopicPlacementSupport(() -> this.featureControl.isTopicPlacementSupported()).build();
        this.producerIdControlManager = new ProducerIdControlManager(this.clusterControl, this.snapshotRegistry);
        this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
        this.mirrorTopicControl = new MirrorTopicControlManager(this.snapshotRegistry, logContext, time, this::resolveTopicId, this::resolveClusterLinkId);
        this.maxIdleIntervalNs = maxIdleIntervalNs;
        this.authorizer = authorizer;
        authorizer.ifPresent(a -> a.setAclMutator(this));
        this.staticConfig = staticConfig;
        this.aclControlManager = new AclControlManager(logContext, this.snapshotRegistry, authorizer, this::isValidClusterLink);
        this.replicationControl = new ReplicationControlManager.Builder().setSnapshotRegistry(this.snapshotRegistry).setLogContext(logContext).setDefaultReplicationFactor(defaultReplicationFactor).setDefaultNumPartitions(defaultNumPartitions).setMaxElectionsPerImbalance(1000).setConfigurationControl(this.configurationControl).setClusterControl(this.clusterControl).setCreateTopicPolicy(createTopicPolicy).setFeatureControl(this.featureControl).setApplyCreateTopicsPolicyToCreatePartitions(QuorumController.shouldApplyCreateTopicsPolicyToCreatePartitions(staticConfig)).setMirrorTopicControl(this.mirrorTopicControl).setNameToTenantCallback(nameToTenantCallback).setTenantControl(this.tenantControl).setDefaultTopicPlacement(defaultTopicPlacement).build();
        this.clusterLinkControl = new ClusterLinkControlManager(this.snapshotRegistry, logContext, this.configurationControl, this.mirrorTopicControl, this.featureControl, this.replicationControl::unlinkMirrorTopic, this.aclControlManager::unlinkAcls, clusterId, createClusterLinkPolicy);
        this.scramControlManager = new ScramControlManager.Builder().setLogContext(logContext).setSnapshotRegistry(this.snapshotRegistry).build();
        this.logReplayTracker = new LogReplayTracker.Builder().setLogContext(logContext).build();
        this.raftClient = raftClient;
        this.bootstrapMetadata = bootstrapMetadata;
        this.maxRecordsPerBatch = maxRecordsPerBatch;
        this.metaLogListener = new QuorumMetaLogListener();
        this.curClaimEpoch = -1;
        this.needToCompleteAuthorizerLoad = authorizer.isPresent();
        this.zkRecordConsumer = new MigrationRecordConsumer();
        this.zkMigrationEnabled = zkMigrationEnabled;
        this.recordRedactor = new RecordRedactor(configSchema);
        this.updateWriteOffset(-1L);
        this.resetToEmptyState();
        this.log.info("Creating new QuorumController with clusterId {}, authorizer {}.{}", new Object[]{clusterId, authorizer, zkMigrationEnabled ? " ZK migration mode is enabled." : ""});
        this.raftClient.register((RaftClient.Listener)this.metaLogListener);
    }

    static boolean shouldApplyCreateTopicsPolicyToCreatePartitions(Map<String, Object> config) {
        return Boolean.parseBoolean(config.getOrDefault("confluent.apply.create.topic.policy.to.create.partitions", "false").toString());
    }

    private double maxDemotedBrokersPercentage() {
        return Double.parseDouble(this.staticConfig.getOrDefault("confluent.alter.broker.health.max.demoted.brokers.percentage", ConfluentConfigs.ALTER_BROKER_HEALTH_MAX_DEMOTED_BROKERS_PERCENTAGE_DEFAULT).toString());
    }

    private Optional<Uuid> resolveClusterLinkId(String linkName) {
        return this.clusterLinkControl.getClusterLinkId(linkName);
    }

    private Boolean isValidClusterLink(Uuid linkId) {
        return this.clusterLinkControl.isValidLinkId(linkId);
    }

    private Optional<Uuid> resolveTopicId(String topicName) {
        return Optional.ofNullable(this.replicationControl.getTopicId(topicName));
    }

    @Override
    public boolean isMirrorTopic(String topicName) {
        return this.mirrorTopicControl.isMirrorTopic(topicName);
    }

    @Override
    public CompletableFuture<MetadataResponseData.MetadataResponseBrokerCollection> unfencedBrokerEndpoints(ControllerRequestContext context, ListenerName listenerName) {
        return this.appendReadEvent("fetchMetadata", context.deadlineNs(), () -> this.clusterControl.unfencedBrokerEndpoints(listenerName));
    }

    @Override
    public CompletableFuture<AlterPartitionResponseData> alterPartition(ControllerRequestContext context, AlterPartitionRequestData request) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new AlterPartitionResponseData());
        }
        return this.appendWriteEvent("alterPartition", context.deadlineNs(), () -> this.replicationControl.alterPartition(context, request));
    }

    @Override
    public CompletableFuture<AlterUserScramCredentialsResponseData> alterUserScramCredentials(ControllerRequestContext context, AlterUserScramCredentialsRequestData request) {
        if (request.deletions().isEmpty() && request.upsertions().isEmpty()) {
            return CompletableFuture.completedFuture(new AlterUserScramCredentialsResponseData());
        }
        return this.appendWriteEvent("alterUserScramCredentials", context.deadlineNs(), () -> this.scramControlManager.alterCredentials(request, this.featureControl.metadataVersion()));
    }

    @Override
    public CompletableFuture<CreateTopicsResponseData> createTopics(ControllerRequestContext context, CreateTopicsRequestData request, Set<String> describable) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new CreateTopicsResponseData());
        }
        return this.appendWriteEvent("createTopics", context.deadlineNs(), () -> this.replicationControl.createTopics(context, request, describable));
    }

    @Override
    public CompletableFuture<Void> unregisterBroker(ControllerRequestContext context, int brokerId) {
        return this.appendWriteEvent("unregisterBroker", context.deadlineNs(), () -> this.replicationControl.unregisterBroker(brokerId), EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
    }

    @Override
    public CompletableFuture<Map<String, ResultOrError<Uuid>>> findTopicIds(ControllerRequestContext context, Collection<String> names) {
        if (names.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendReadEvent("findTopicIds", context.deadlineNs(), () -> this.replicationControl.findTopicIds(this.currentReadOffset(), names));
    }

    @Override
    public CompletableFuture<Map<String, Uuid>> findAllTopicIds(ControllerRequestContext context) {
        return this.appendReadEvent("findAllTopicIds", context.deadlineNs(), () -> this.replicationControl.findAllTopicIds(this.currentReadOffset()));
    }

    @Override
    public CompletableFuture<Map<Uuid, ResultOrError<String>>> findTopicNames(ControllerRequestContext context, Collection<Uuid> ids) {
        if (ids.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendReadEvent("findTopicNames", context.deadlineNs(), () -> this.replicationControl.findTopicNames(this.currentReadOffset(), ids));
    }

    @Override
    public CompletableFuture<Map<Uuid, ApiError>> deleteTopics(ControllerRequestContext context, Collection<Uuid> ids) {
        if (ids.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("deleteTopics", context.deadlineNs(), () -> this.replicationControl.deleteTopics(context, ids));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>> describeConfigs(ControllerRequestContext context, Map<ConfigResource, Collection<String>> resources) {
        return this.appendReadEvent("describeConfigs", context.deadlineNs(), () -> this.configurationControl.describeConfigs(this.currentReadOffset(), resources));
    }

    @Override
    public CompletableFuture<ElectLeadersResponseData> electLeaders(ControllerRequestContext context, ElectLeadersRequestData request) {
        if (request.topicPartitions() != null && request.topicPartitions().isEmpty()) {
            return CompletableFuture.completedFuture(new ElectLeadersResponseData());
        }
        return this.appendWriteEvent("electLeaders", context.deadlineNs(), () -> this.replicationControl.electLeaders(request));
    }

    @Override
    public CompletableFuture<FinalizedControllerFeatures> finalizedFeatures(ControllerRequestContext context) {
        return this.appendReadEvent("getFinalizedFeatures", context.deadlineNs(), () -> this.featureControl.finalizedFeatures(this.currentReadOffset()));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(ControllerRequestContext context, Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges, boolean validateOnly) {
        if (configChanges.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("incrementalAlterConfigs", context.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> result = this.configurationControl.incrementalAlterConfigs(configChanges, false, context.principal());
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<AlterPartitionReassignmentsResponseData> alterPartitionReassignments(ControllerRequestContext context, AlterPartitionReassignmentsRequestData request) {
        if (request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new AlterPartitionReassignmentsResponseData());
        }
        return this.appendWriteEvent("alterPartitionReassignments", context.deadlineNs(), () -> this.replicationControl.alterPartitionReassignments(request, context.principal()));
    }

    @Override
    public CompletableFuture<ListPartitionReassignmentsResponseData> listPartitionReassignments(ControllerRequestContext context, ListPartitionReassignmentsRequestData request) {
        if (request.topics() != null && request.topics().isEmpty()) {
            return CompletableFuture.completedFuture(new ListPartitionReassignmentsResponseData().setErrorMessage(null));
        }
        return this.appendReadEvent("listPartitionReassignments", context.deadlineNs(), () -> this.replicationControl.listPartitionReassignments(request.topics(), this.currentReadOffset()));
    }

    @Override
    public CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(ControllerRequestContext context, Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly) {
        if (newConfigs.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("legacyAlterConfigs", context.deadlineNs(), () -> {
            ControllerResult<Map<ConfigResource, ApiError>> result = this.configurationControl.legacyAlterConfigs(newConfigs, false, context.principal());
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(ControllerRequestContext context, final BrokerHeartbeatRequestData request) {
        return this.appendWriteEvent("processBrokerHeartbeat", context.deadlineNs(), new ControllerWriteOperation<BrokerHeartbeatReply>(){
            private final int brokerId;
            private boolean inControlledShutdown;
            {
                this.brokerId = request.brokerId();
                this.inControlledShutdown = false;
            }

            @Override
            public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
                OptionalLong offsetForRegisterBrokerRecord = QuorumController.this.clusterControl.registerBrokerRecordOffset(this.brokerId);
                if (!offsetForRegisterBrokerRecord.isPresent()) {
                    throw new StaleBrokerEpochException(String.format("Receive a heartbeat from broker %d before registration", this.brokerId));
                }
                ControllerResult<BrokerHeartbeatReply> result = QuorumController.this.replicationControl.processBrokerHeartbeat(request, offsetForRegisterBrokerRecord.getAsLong());
                this.inControlledShutdown = result.response().inControlledShutdown();
                QuorumController.this.rescheduleMaybeFenceStaleBrokers();
                return result;
            }

            @Override
            public void processBatchEndOffset(long offset) {
                if (this.inControlledShutdown) {
                    QuorumController.this.clusterControl.heartbeatManager().maybeUpdateControlledShutdownOffset(this.brokerId, offset);
                }
            }
        }, EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION)).whenComplete((__, t) -> {
            if (ControllerExceptions.isTimeoutException(t)) {
                this.replicationControl.processExpiredBrokerHeartbeat(request);
                this.controllerMetrics.incrementTimedOutHeartbeats();
            }
        });
    }

    @Override
    public CompletableFuture<BrokerRegistrationReply> registerBroker(ControllerRequestContext context, BrokerRegistrationRequestData request) {
        return this.appendWriteEvent("registerBroker", context.deadlineNs(), () -> {
            ControllerResult<BrokerRegistrationReply> result = this.clusterControl.registerBroker(request, this.writeOffset + 1L, this.featureControl.finalizedFeatures(Long.MAX_VALUE));
            this.rescheduleMaybeFenceStaleBrokers();
            return result;
        }, EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
    }

    @Override
    public CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(ControllerRequestContext context, Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly) {
        if (quotaAlterations.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        return this.appendWriteEvent("alterClientQuotas", context.deadlineNs(), () -> {
            ControllerResult<Map<ClientQuotaEntity, ApiError>> result = this.clientQuotaControlManager.alterClientQuotas(quotaAlterations);
            if (validateOnly) {
                return result.withoutRecords();
            }
            return result;
        });
    }

    @Override
    public CompletableFuture<AlterReplicaExclusionsReply> alterBrokerReplicaExclusions(ControllerRequestContext context, AlterBrokerReplicaExclusionsRequestData request) {
        Set requestedExclusions = request.brokersToExclude().stream().map(e -> new AlterReplicaExclusionOp(e.brokerId(), e.reason(), ExclusionOp.OpType.forId((byte)e.exclusionOperationCode()))).collect(Collectors.toSet());
        return this.appendWriteEvent("alterBrokerReplicaExclusions", context.deadlineNs(), () -> this.clusterControl.processAlterReplicaExclusions(requestedExclusions));
    }

    @Override
    public CompletableFuture<Map<Integer, String>> describeBrokerReplicaExclusions(ControllerRequestContext context) {
        return this.appendReadEvent("describeBrokerReplicaExclusions", context.deadlineNs(), this.clusterControl::activeBrokerReplicaExclusions);
    }

    @Override
    public CompletableFuture<AlterBrokerHealthResponseData> alterBrokerHealth(ControllerRequestContext context, AlterBrokerHealthRequestData request) {
        ComponentHealthStatus healthStatus = ComponentHealthStatus.forId((byte)request.statusCode());
        if (healthStatus == ComponentHealthStatus.UNKNOWN) {
            throw new IllegalArgumentException("Invalid health status code from request: " + request.statusCode());
        }
        CompletableFuture<AlterBrokerHealthResponseData> alterBrokerHealthFuture = this.appendWriteEvent("alterBrokerHealth", context.deadlineNs(), () -> this.clusterControl.processAlterBrokerHealth(request, this.maxDemotedBrokersPercentage()));
        alterBrokerHealthFuture.whenComplete((response, exception) -> {
            if (response != null && response.errorCode() == Errors.NONE.code()) {
                if (healthStatus == ComponentHealthStatus.DEGRADED) {
                    this.appendWriteEvent(LEADER_ELECTION_FOR_DEMOTED_BROKERS, OptionalLong.empty(), this.replicationControl::tryUnelectDemotedLeaders);
                } else {
                    this.appendWriteEvent(LEADER_ELECTION_FOR_PROMOTED_BROKERS, OptionalLong.empty(), () -> this.replicationControl.tryReelectPromotedLeaders(request.brokerIds()));
                }
            }
        });
        return alterBrokerHealthFuture;
    }

    @Override
    public CompletableFuture<DescribeBrokerHealthResponseData> describeBrokerHealth(ControllerRequestContext context) {
        return this.appendReadEvent("describeBrokerHealth", context.deadlineNs(), () -> this.toDescribeBrokerHealthResponseData(this.clusterControl.activeBrokerComponentDegradations()));
    }

    @Override
    public CompletableFuture<AllocateProducerIdsResponseData> allocateProducerIds(ControllerRequestContext context, AllocateProducerIdsRequestData request) {
        return this.appendWriteEvent("allocateProducerIds", context.deadlineNs(), () -> this.producerIdControlManager.generateNextProducerId(request.brokerId(), request.brokerEpoch())).thenApply(result -> new AllocateProducerIdsResponseData().setProducerIdStart(result.firstProducerId()).setProducerIdLen(result.size()));
    }

    @Override
    public CompletableFuture<UpdateFeaturesResponseData> updateFeatures(ControllerRequestContext context, UpdateFeaturesRequestData request) {
        return this.appendWriteEvent("updateFeatures", context.deadlineNs(), () -> {
            HashMap<String, Short> updates = new HashMap<String, Short>();
            HashMap<String, FeatureUpdate.UpgradeType> upgradeTypes = new HashMap<String, FeatureUpdate.UpgradeType>();
            request.featureUpdates().forEach(featureUpdate -> {
                String featureName = featureUpdate.feature();
                upgradeTypes.put(featureName, FeatureUpdate.UpgradeType.fromCode((int)featureUpdate.upgradeType()));
                updates.put(featureName, featureUpdate.maxVersionLevel());
            });
            return this.featureControl.updateFeatures(updates, upgradeTypes, this.clusterControl.brokerSupportedVersions(), request.validateOnly());
        }).thenApply(result -> {
            UpdateFeaturesResponseData responseData = new UpdateFeaturesResponseData();
            responseData.setResults(new UpdateFeaturesResponseData.UpdatableFeatureResultCollection(result.size()));
            result.forEach((featureName, error) -> responseData.results().add((ImplicitLinkedHashCollection.Element)new UpdateFeaturesResponseData.UpdatableFeatureResult().setFeature(featureName).setErrorCode(error.error().code()).setErrorMessage(error.message())));
            return responseData;
        });
    }

    @Override
    public CompletableFuture<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> createPartitions(ControllerRequestContext context, List<CreatePartitionsRequestData.CreatePartitionsTopic> topics, boolean validateOnly) {
        if (topics.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyList());
        }
        return this.appendWriteEvent("createPartitions", context.deadlineNs(), () -> {
            ControllerResult<List<CreatePartitionsResponseData.CreatePartitionsTopicResult>> result = this.replicationControl.createPartitions(context, topics);
            if (validateOnly) {
                this.log.debug("Validate-only CreatePartitions result(s): {}", result.response());
                return result.withoutRecords();
            }
            this.log.debug("CreatePartitions result(s): {}", result.response());
            return result;
        });
    }

    @Override
    public CompletableFuture<CreateClusterLinksResponseData> createClusterLinks(ControllerRequestContext context, CreateClusterLinksRequestData request, KafkaPrincipal kafkaPrincipal) {
        return this.appendWriteEvent("createClusterLinks", context.deadlineNs(), this.clusterLinkSupportedOrThrow(() -> this.clusterLinkControl.createClusterLinks(request, kafkaPrincipal)));
    }

    @Override
    public CompletableFuture<DeleteClusterLinksResponseData> deleteClusterLinks(ControllerRequestContext context, DeleteClusterLinksRequestData request) {
        return this.appendWriteEvent("deleteClusterLinks", context.deadlineNs(), this.clusterLinkSupportedOrThrow(() -> this.clusterLinkControl.deleteClusterLinks(request)));
    }

    @Override
    public CompletableFuture<AlterMirrorTopicsResponseData> alterMirrorTopic(ControllerRequestContext context, AlterMirrorTopicsRequestData request) {
        return this.appendWriteEvent("alterMirrorTopics", context.deadlineNs(), this.clusterLinkSupportedOrThrow(() -> this.mirrorTopicControl.alterMirrorTopics(request)));
    }

    private <T> ControllerWriteOperation<T> clusterLinkSupportedOrThrow(Supplier<ControllerResult<T>> controllerCall) {
        return () -> {
            if (this.featureControl.metadataVersion().isAtLeast(MetadataVersion.IBP_3_3_IV0)) {
                return (ControllerResult)controllerCall.get();
            }
            throw new UnsupportedVersionException("Cluster Linking needs a metadata.version of 3.3 or greater.");
        };
    }

    @Override
    public CompletableFuture<List<AclCreateResult>> createAcls(ControllerRequestContext context, List<AclBinding> aclBindings) {
        return this.appendWriteEvent("createAcls", context.deadlineNs(), () -> this.aclControlManager.createAcls(aclBindings));
    }

    @Override
    public CompletableFuture<List<AclDeleteResult>> deleteAcls(ControllerRequestContext context, List<AclBindingFilter> filters) {
        return this.appendWriteEvent("deleteAcls", context.deadlineNs(), () -> this.aclControlManager.deleteAcls(filters));
    }

    @Override
    public CompletableFuture<Void> waitForReadyBrokers(int minBrokers) {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        this.appendControlEvent("waitForReadyBrokers", () -> this.clusterControl.addReadyBrokersFuture(future, minBrokers));
        return future;
    }

    @Override
    public CompletableFuture<DescribeCellsResponseData> describeCells(ControllerRequestContext context, DescribeCellsRequest request) {
        return this.appendWriteEvent("describeCells", context.deadlineNs(), () -> this.cellControl.describeCells(request, this.usableBrokers()));
    }

    @Override
    public CompletableFuture<CreateCellResponseData> createCell(ControllerRequestContext context, CreateCellRequest request) {
        return this.appendWriteEvent("createCell", context.deadlineNs(), () -> this.cellControl.createCell(request));
    }

    @Override
    public CompletableFuture<DeleteCellResponseData> deleteCell(ControllerRequestContext context, DeleteCellRequest request) {
        return this.appendWriteEvent("deleteCell", context.deadlineNs(), () -> this.cellControl.deleteCell(request));
    }

    @Override
    public CompletableFuture<AlterCellResponseData> alterCell(ControllerRequestContext context, AlterCellRequest request) {
        return this.appendWriteEvent("alterCell", context.deadlineNs(), () -> this.cellControl.alterCell(request, this.usableBrokers()));
    }

    @Override
    public CompletableFuture<AssignBrokersToCellResponseData> assignBrokersToCell(ControllerRequestContext context, AssignBrokersToCellRequest request) {
        return this.appendWriteEvent("assignBrokersToCell", context.deadlineNs(), () -> this.cellControl.assignBrokersToCell(request, this.usableBrokers()));
    }

    @Override
    public CompletableFuture<UnAssignBrokersFromCellResponseData> unassignBrokersFromCell(ControllerRequestContext context, UnAssignBrokersFromCellRequest request) {
        return this.appendWriteEvent("unassignBrokersFromCell", context.deadlineNs(), () -> this.cellControl.unassignBrokersFromCell(request, this.usableBrokers()));
    }

    @Override
    public CompletableFuture<AssignTenantsToCellResponseData> assignTenantsToCell(ControllerRequestContext context, AssignTenantsToCellRequest request) {
        return this.appendWriteEvent("assignTenantsToCell", context.deadlineNs(), () -> this.tenantControl.assignTenantsToCell(request, this.usableBrokers()));
    }

    @Override
    public CompletableFuture<DescribeTenantsResponseData> describeTenants(ControllerRequestContext context, DescribeTenantsRequest request) {
        return this.appendWriteEvent("describeTenants", context.deadlineNs(), () -> this.tenantControl.describeTenants(request));
    }

    @Override
    public CompletableFuture<DeleteTenantsResponseData> deleteTenants(ControllerRequestContext context, DeleteTenantsRequest request) {
        return this.appendWriteEvent("deleteTenants", context.deadlineNs(), () -> this.tenantControl.deleteTenants(request));
    }

    @Override
    public void beginShutdown() {
        this.queue.beginShutdown("QuorumController#beginShutdown");
    }

    public int nodeId() {
        return this.nodeId;
    }

    public String clusterId() {
        return this.clusterId;
    }

    @Override
    public int curClaimEpoch() {
        return this.curClaimEpoch;
    }

    @Override
    public void close() throws InterruptedException {
        this.queue.close();
        this.controllerMetrics.close();
        this.cellMetrics.close();
    }

    public CountDownLatch pause() {
        CountDownLatch latch = new CountDownLatch(1);
        this.appendControlEvent("pause", () -> {
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                this.log.info("Interrupted while waiting for unpause.", (Throwable)e);
            }
        });
        return latch;
    }

    Time time() {
        return this.time;
    }

    private Set<Integer> usableBrokers() {
        HashSet<Integer> usableBrokers = new HashSet<Integer>();
        this.clusterControl.usableBrokers().forEachRemaining(b -> usableBrokers.add(b.id()));
        return usableBrokers;
    }

    private DescribeBrokerHealthResponseData toDescribeBrokerHealthResponseData(Map<Integer, Set<DegradedBrokerHealthState>> activeBrokerComponentDegradations) {
        ArrayList<DescribeBrokerHealthResponseData.DegradedBroker> degradedBrokers = new ArrayList<DescribeBrokerHealthResponseData.DegradedBroker>();
        for (Map.Entry<Integer, Set<DegradedBrokerHealthState>> entry : activeBrokerComponentDegradations.entrySet()) {
            ArrayList degradedBrokerComponents = new ArrayList();
            entry.getValue().forEach(degradedState -> {
                DescribeBrokerHealthResponseData.DegradedBrokerComponent degradedBrokerComponent = new DescribeBrokerHealthResponseData.DegradedBrokerComponent().setReason(degradedState.reason()).setComponentCode(degradedState.component().id());
                degradedBrokerComponents.add(degradedBrokerComponent);
            });
            degradedBrokers.add(new DescribeBrokerHealthResponseData.DegradedBroker().setBrokerId(entry.getKey().intValue()).setDegradedBrokerComponents(degradedBrokerComponents));
        }
        return new DescribeBrokerHealthResponseData().setErrorCode(Errors.NONE.code()).setDegradedBrokers(degradedBrokers);
    }

    private static enum ImbalanceSchedule {
        SCHEDULED,
        DEFERRED,
        RETRY_AFTER_BACKOFF;

    }

    class CompleteActivationEvent
    implements ControllerWriteOperation<Void> {
        CompleteActivationEvent() {
        }

        @Override
        public ControllerResult<Void> generateRecordsAndResult() {
            try {
                List<ApiMessageAndVersion> records = QuorumController.generateActivationRecords(QuorumController.this.log, QuorumController.this.logReplayTracker.empty(), QuorumController.this.zkMigrationEnabled, QuorumController.this.bootstrapMetadata, QuorumController.this.featureControl);
                return ControllerResult.atomicOf(records, null);
            }
            catch (Throwable t) {
                throw QuorumController.this.fatalFaultHandler.handleFault("exception while completing controller activation", t);
            }
        }

        @Override
        public void processBatchEndOffset(long offset) {
            QuorumController.this.maybeScheduleNextBalancePartitionLeaders();
            QuorumController.this.maybeScheduleNextWriteNoOpRecord();
            QuorumController.this.queue.prepend(new ControllerWriteEvent("maybeInstallEncryptor", QuorumController.this.encryptionControl::maybeInstallEncryptor, EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION)));
        }
    }

    class QuorumMetaLogListener
    implements RaftClient.Listener<ApiMessageAndVersion> {
        QuorumMetaLogListener() {
        }

        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
            this.appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + "]", () -> {
                try {
                    QuorumController.this.maybeCompleteAuthorizerInitialLoad();
                    boolean isActive = QuorumController.this.isActiveController();
                    while (reader.hasNext()) {
                        Batch batch = (Batch)reader.next();
                        long offset = batch.lastOffset();
                        int epoch = batch.epoch();
                        List messages = batch.records();
                        if (isActive) {
                            QuorumController.this.log.debug("Completing purgatory items up to offset {} and epoch {}.", (Object)offset, (Object)epoch);
                            QuorumController.this.deferredEventQueue.completeUpTo(offset);
                            QuorumController.this.snapshotRegistry.deleteSnapshotsUpTo(offset);
                        } else {
                            if (QuorumController.this.log.isDebugEnabled()) {
                                QuorumController.this.log.debug("Replaying commits from the active node up to offset {} and epoch {}.", (Object)offset, (Object)epoch);
                            }
                            int i = 1;
                            for (ApiMessageAndVersion message : messages) {
                                try {
                                    QuorumController.this.replay(message.message(), Optional.empty(), offset);
                                }
                                catch (Throwable e) {
                                    String failureMessage = String.format("Unable to apply %s record on standby controller, which was %d of %d record(s) in the batch with baseOffset %d.", message.message().getClass().getSimpleName(), i, messages.size(), batch.baseOffset());
                                    throw QuorumController.this.fatalFaultHandler.handleFault(failureMessage, e);
                                }
                                ++i;
                            }
                        }
                        QuorumController.this.controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
                        QuorumController.this.updateLastCommittedState(offset, epoch, batch.appendTimestamp());
                        if (offset < QuorumController.this.raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) continue;
                        QuorumController.this.oldestNonSnapshottedTimestamp = Math.min(QuorumController.this.oldestNonSnapshottedTimestamp, batch.appendTimestamp());
                    }
                }
                finally {
                    reader.close();
                }
            });
        }

        public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
            this.appendRaftEvent(String.format("handleLoadSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                try {
                    String snapshotName = Snapshots.filenameFromSnapshotId((OffsetAndEpoch)reader.snapshotId());
                    if (QuorumController.this.isActiveController()) {
                        throw QuorumController.this.fatalFaultHandler.handleFault("Asked to load snapshot " + snapshotName + ", but we are the active controller at epoch " + QuorumController.this.curClaimEpoch);
                    }
                    QuorumController.this.log.info("Starting to replay snapshot {}, from last commit offset {} and epoch {}", new Object[]{snapshotName, QuorumController.this.lastCommittedOffset, QuorumController.this.lastCommittedEpoch});
                    QuorumController.this.resetToEmptyState();
                    while (reader.hasNext()) {
                        Batch batch = (Batch)reader.next();
                        long offset = batch.lastOffset();
                        List messages = batch.records();
                        QuorumController.this.log.debug("Replaying snapshot {} batch with last offset of {}", (Object)snapshotName, (Object)offset);
                        int i = 1;
                        for (ApiMessageAndVersion message : messages) {
                            try {
                                QuorumController.this.replay(message.message(), Optional.of(reader.snapshotId()), reader.lastContainedLogOffset());
                                QuorumController.this.controllerMetricsManager.replay(message.message());
                            }
                            catch (Throwable e) {
                                String failureMessage = String.format("Unable to apply %s record from snapshot %s on standby controller, which was %d of %d record(s) in the batch with baseOffset %d.", message.message().getClass().getSimpleName(), reader.snapshotId(), i, messages.size(), batch.baseOffset());
                                throw QuorumController.this.fatalFaultHandler.handleFault(failureMessage, e);
                            }
                            ++i;
                        }
                    }
                    QuorumController.this.log.info("Finished replaying snapshot {}", (Object)snapshotName);
                    QuorumController.this.updateLastCommittedState(reader.lastContainedLogOffset(), reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp());
                    QuorumController.this.snapshotRegistry.getOrCreateSnapshot(QuorumController.this.lastCommittedOffset);
                    QuorumController.this.authorizer.ifPresent(a -> a.loadSnapshot(QuorumController.this.aclControlManager.idToAcl()));
                }
                finally {
                    reader.close();
                }
            });
        }

        public void handleLeaderChange(LeaderAndEpoch newLeader) {
            this.appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", () -> {
                String newLeaderName;
                String string = newLeaderName = newLeader.leaderId().isPresent() ? String.valueOf(newLeader.leaderId().getAsInt()) : "(none)";
                if (QuorumController.this.isActiveController()) {
                    if (newLeader.isLeader(QuorumController.this.nodeId)) {
                        QuorumController.this.log.warn("We were the leader in epoch {}, and are still the leader in the new epoch {}.", (Object)QuorumController.this.curClaimEpoch, (Object)newLeader.epoch());
                        QuorumController.this.curClaimEpoch = newLeader.epoch();
                    } else {
                        QuorumController.this.log.warn("Renouncing the leadership due to a metadata log event. We were the leader at epoch {}, but in the new epoch {}, the leader is {}. Reverting to last committed offset {}.", new Object[]{QuorumController.this.curClaimEpoch, newLeader.epoch(), newLeaderName, QuorumController.this.lastCommittedOffset});
                        QuorumController.this.renounce();
                    }
                } else if (newLeader.isLeader(QuorumController.this.nodeId)) {
                    QuorumController.this.log.info("Becoming the active controller at epoch {}, committed offset {}, committed epoch {}", new Object[]{newLeader.epoch(), QuorumController.this.lastCommittedOffset, QuorumController.this.lastCommittedEpoch});
                    QuorumController.this.claim(newLeader.epoch());
                } else {
                    QuorumController.this.log.info("In the new epoch {}, the leader is {}.", (Object)newLeader.epoch(), (Object)newLeaderName);
                }
            });
        }

        public void beginShutdown() {
            QuorumController.this.queue.beginShutdown("MetaLogManager.Listener");
        }

        private void appendRaftEvent(String name, Runnable runnable) {
            QuorumController.this.appendControlEvent(name, () -> {
                if (this != QuorumController.this.metaLogListener) {
                    QuorumController.this.log.debug("Ignoring {} raft event from an old registration", (Object)name);
                } else {
                    try {
                        runnable.run();
                    }
                    finally {
                        QuorumController.this.maybeCompleteAuthorizerInitialLoad();
                    }
                }
            });
        }
    }

    class MigrationRecordConsumer
    implements ZkRecordConsumer {
        private volatile OffsetAndEpoch highestMigrationRecordOffset;

        MigrationRecordConsumer() {
        }

        @Override
        public void beginMigration() {
            QuorumController.this.log.info("Starting ZK Migration");
        }

        @Override
        public CompletableFuture<?> acceptBatch(List<ApiMessageAndVersion> recordBatch) {
            if (QuorumController.this.queue.size() > 100) {
                CompletableFuture future = new CompletableFuture();
                future.completeExceptionally((Throwable)new NotControllerException("Cannot accept migration record batch. Controller queue is too large"));
                return future;
            }
            ControllerWriteEvent<Void> batchEvent = new ControllerWriteEvent<Void>("ZK Migration Batch", new MigrationWriteOperation(recordBatch), EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
            QuorumController.this.queue.append(batchEvent);
            return ((ControllerWriteEvent)batchEvent).future;
        }

        @Override
        public CompletableFuture<OffsetAndEpoch> completeMigration() {
            QuorumController.this.log.info("Completing ZK Migration");
            ControllerWriteEvent<Void> event = new ControllerWriteEvent<Void>("Complete ZK Migration", new MigrationWriteOperation(Collections.singletonList(ZkMigrationState.MIGRATION.toRecord())), EnumSet.of(ControllerOperationFlag.RUNS_IN_PREMIGRATION));
            QuorumController.this.queue.append(event);
            return ((ControllerWriteEvent)event).future.thenApply(__ -> this.highestMigrationRecordOffset);
        }

        @Override
        public void abortMigration() {
            QuorumController.this.fatalFaultHandler.handleFault("Aborting the ZK migration");
        }

        class MigrationWriteOperation
        implements ControllerWriteOperation<Void> {
            private final List<ApiMessageAndVersion> batch;

            MigrationWriteOperation(List<ApiMessageAndVersion> batch) {
                this.batch = batch;
            }

            @Override
            public ControllerResult<Void> generateRecordsAndResult() {
                return ControllerResult.atomicOf(this.batch, null);
            }

            @Override
            public void processBatchEndOffset(long offset) {
                MigrationRecordConsumer.this.highestMigrationRecordOffset = new OffsetAndEpoch(offset, QuorumController.this.curClaimEpoch);
            }
        }
    }

    class ControllerWriteEvent<T>
    implements EventQueue.Event,
    DeferredEvent {
        private final String name;
        private final CompletableFuture<T> future;
        private final ControllerWriteOperation<T> op;
        private final long eventCreatedTimeNs;
        private final EnumSet<ControllerOperationFlag> flags;
        private OptionalLong startProcessingTimeNs;
        private ControllerResultAndOffset<T> resultAndOffset;

        ControllerWriteEvent(String name, ControllerWriteOperation<T> op, EnumSet<ControllerOperationFlag> flags) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = OptionalLong.empty();
            this.name = name;
            this.future = new CompletableFuture();
            this.op = op;
            this.flags = flags;
            this.resultAndOffset = null;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            int controllerEpoch;
            long now = QuorumController.this.time.nanoseconds();
            if (!this.flags.contains((Object)ControllerOperationFlag.DOES_NOT_UPDATE_QUEUE_TIME)) {
                QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            }
            if (!QuorumController.isActiveController(controllerEpoch = QuorumController.this.curClaimEpoch)) {
                throw ControllerExceptions.newWrongControllerException(QuorumController.this.latestController());
            }
            if (QuorumController.this.featureControl.inPreMigrationMode() && !this.flags.contains((Object)ControllerOperationFlag.RUNS_IN_PREMIGRATION)) {
                QuorumController.this.log.info("Cannot run write operation {} in pre-migration mode. Returning NOT_CONTROLLER.", (Object)this.name);
                throw ControllerExceptions.newPreMigrationException(QuorumController.this.latestController());
            }
            this.startProcessingTimeNs = OptionalLong.of(now);
            ControllerResult<T> result = this.op.generateRecordsAndResult();
            if (result.records().isEmpty()) {
                this.op.processBatchEndOffset(QuorumController.this.writeOffset);
                OptionalLong maybeOffset = QuorumController.this.deferredEventQueue.highestPendingOffset();
                if (!maybeOffset.isPresent()) {
                    this.resultAndOffset = ControllerResultAndOffset.of(-1L, result);
                    QuorumController.this.log.debug("Completing read-only operation {} immediately because the purgatory is empty.", (Object)this);
                    this.complete(null);
                } else {
                    this.resultAndOffset = ControllerResultAndOffset.of(maybeOffset.getAsLong(), result);
                    QuorumController.this.log.debug("Read-only operation {} will be completed when the log reaches offset {}", (Object)this, (Object)this.resultAndOffset.offset());
                }
            } else {
                long offset = QuorumController.appendRecords(QuorumController.this.log, result, QuorumController.this.maxRecordsPerBatch, new Function<List<ApiMessageAndVersion>, Long>(){
                    private long prevEndOffset;
                    {
                        this.prevEndOffset = QuorumController.this.writeOffset;
                    }

                    @Override
                    public Long apply(List<ApiMessageAndVersion> records) {
                        int i = 1;
                        for (ApiMessageAndVersion message : records) {
                            try {
                                QuorumController.this.replay(message.message(), Optional.empty(), this.prevEndOffset + (long)records.size());
                            }
                            catch (Throwable e) {
                                String failureMessage = String.format("Unable to apply %s record, which was %d of %d record(s) in the batch following last write offset %d.", message.message().getClass().getSimpleName(), i, records.size(), this.prevEndOffset);
                                throw QuorumController.this.fatalFaultHandler.handleFault(failureMessage, e);
                            }
                            ++i;
                        }
                        this.prevEndOffset = QuorumController.this.raftClient.scheduleAtomicAppend(controllerEpoch, records);
                        QuorumController.this.snapshotRegistry.getOrCreateSnapshot(this.prevEndOffset);
                        return this.prevEndOffset;
                    }
                });
                this.op.processBatchEndOffset(offset);
                QuorumController.this.updateWriteOffset(offset);
                this.resultAndOffset = ControllerResultAndOffset.of(offset, result);
                QuorumController.this.log.debug("Read-write operation {} will be completed when the log reaches offset {}.", (Object)this, (Object)this.resultAndOffset.offset());
            }
            QuorumController.this.maybeScheduleNextBalancePartitionLeaders();
            if (!this.future.isDone()) {
                QuorumController.this.deferredEventQueue.add(this.resultAndOffset.offset(), (DeferredEvent)this);
            }
        }

        public void handleException(Throwable exception) {
            this.complete(exception);
        }

        public void complete(Throwable exception) {
            if (exception == null) {
                QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.getAsLong());
                this.future.complete(this.resultAndOffset.response());
            } else {
                this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception));
            }
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    static interface ControllerWriteOperation<T> {
        public ControllerResult<T> generateRecordsAndResult() throws Exception;

        default public void processBatchEndOffset(long offset) {
        }
    }

    static enum ControllerOperationFlag {
        DOES_NOT_UPDATE_QUEUE_TIME,
        RUNS_IN_PREMIGRATION;

    }

    class ControllerReadEvent<T>
    implements EventQueue.Event {
        private final String name;
        private final CompletableFuture<T> future;
        private final Supplier<T> handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs;

        ControllerReadEvent(String name, Supplier<T> handler) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = OptionalLong.empty();
            this.name = name;
            this.future = new CompletableFuture();
            this.handler = handler;
        }

        CompletableFuture<T> future() {
            return this.future;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = OptionalLong.of(now);
            T value = this.handler.get();
            QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.getAsLong());
            this.future.complete(value);
        }

        public void handleException(Throwable exception) {
            this.future.completeExceptionally(QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception));
        }

        public String toString() {
            return this.name + "(" + System.identityHashCode(this) + ")";
        }
    }

    class ControllerEvent
    implements EventQueue.Event {
        private final String name;
        private final Runnable handler;
        private final long eventCreatedTimeNs;
        private OptionalLong startProcessingTimeNs;

        ControllerEvent(String name, Runnable handler) {
            this.eventCreatedTimeNs = QuorumController.this.time.nanoseconds();
            this.startProcessingTimeNs = OptionalLong.empty();
            this.name = name;
            this.handler = handler;
        }

        public void run() throws Exception {
            long now = QuorumController.this.time.nanoseconds();
            QuorumController.this.controllerMetrics.updateEventQueueTime(TimeUnit.NANOSECONDS.toMillis(now - this.eventCreatedTimeNs));
            this.startProcessingTimeNs = OptionalLong.of(now);
            QuorumController.this.log.debug("Executing {}.", (Object)this);
            this.handler.run();
            QuorumController.this.handleEventEnd(this.toString(), this.startProcessingTimeNs.getAsLong());
        }

        public void handleException(Throwable exception) {
            QuorumController.this.handleEventException(this.name, this.startProcessingTimeNs, exception);
        }

        public String toString() {
            return this.name;
        }
    }

    class ConfigResourceExistenceChecker
    implements Consumer<ConfigResource> {
        ConfigResourceExistenceChecker() {
        }

        @Override
        public void accept(ConfigResource configResource) {
            switch (configResource.type()) {
                case BROKER_LOGGER: {
                    break;
                }
                case BROKER: {
                    int nodeId;
                    if (configResource.name().isEmpty()) break;
                    try {
                        nodeId = Integer.parseInt(configResource.name());
                    }
                    catch (NumberFormatException e) {
                        throw new InvalidRequestException("Invalid broker name " + configResource.name());
                    }
                    if (QuorumController.this.clusterControl.brokerRegistrations().containsKey(nodeId) || QuorumController.this.featureControl.isControllerId(nodeId)) break;
                    throw new BrokerIdNotRegisteredException("No node with id " + nodeId + " found.");
                }
                case TOPIC: {
                    if (QuorumController.this.replicationControl.getTopicId(configResource.name()) != null) break;
                    throw new UnknownTopicOrPartitionException("The topic '" + configResource.name() + "' does not exist.");
                }
                case CLUSTER_LINK: {
                    if (QuorumController.this.clusterLinkControl.getClusterLink(configResource.name()).isPresent()) break;
                    throw new ClusterLinkNotFoundException("The cluster link " + configResource.name() + " does not exist.");
                }
            }
        }
    }

    public static class Builder {
        private final int nodeId;
        private final String clusterId;
        private FaultHandler nonFatalFaultHandler = null;
        private FaultHandler fatalFaultHandler = null;
        private Function<String, String> nameToTenantCallback = null;
        private Time time = Time.SYSTEM;
        private String threadNamePrefix = null;
        private LogContext logContext = null;
        private KafkaConfigSchema configSchema = KafkaConfigSchema.EMPTY;
        private RaftClient<ApiMessageAndVersion> raftClient = null;
        private QuorumFeatures quorumFeatures = null;
        private short defaultReplicationFactor = (short)3;
        private int defaultNumPartitions = 1;
        private int defaultMinIsrCount = 2;
        private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
        private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
        private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
        private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
        private ControllerMetrics controllerMetrics = null;
        private Optional<CreateTopicPolicy> createTopicPolicy = Optional.empty();
        private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
        private Optional<CreateClusterLinkPolicy> createClusterLinkPolicy = Optional.empty();
        private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
        private Optional<ClusterMetadataAuthorizer> authorizer = Optional.empty();
        private Map<String, Object> staticConfig = Collections.emptyMap();
        private BootstrapMetadata bootstrapMetadata = null;
        private int maxRecordsPerBatch = 10000;
        private boolean zkMigrationEnabled = false;
        private CellAssignor cellAssignor = null;
        private PartitionPlacementStrategy partitionPlacementStrategy = PartitionPlacementStrategy.CLUSTER_WIDE;
        private boolean isImplicitCellCreationEnabled = false;
        private Optional<TopicPlacement> defaultTopicPlacement = Optional.empty();
        private CellMetrics cellMetrics = null;

        public Builder(int nodeId, String clusterId) {
            this.nodeId = nodeId;
            this.clusterId = clusterId;
        }

        public Builder setNonFatalFaultHandler(FaultHandler nonFatalFaultHandler) {
            this.nonFatalFaultHandler = nonFatalFaultHandler;
            return this;
        }

        public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
            this.fatalFaultHandler = fatalFaultHandler;
            return this;
        }

        public Builder setNameToTenantCallback(Function<String, String> callback) {
            this.nameToTenantCallback = callback;
            return this;
        }

        public Builder setDefaultMinIsrCount(int count) {
            this.defaultMinIsrCount = count;
            return this;
        }

        public int nodeId() {
            return this.nodeId;
        }

        public Builder setTime(Time time) {
            this.time = time;
            return this;
        }

        public Builder setThreadNamePrefix(String threadNamePrefix) {
            this.threadNamePrefix = threadNamePrefix;
            return this;
        }

        public Builder setLogContext(LogContext logContext) {
            this.logContext = logContext;
            return this;
        }

        public LogContext logContext() {
            return this.logContext;
        }

        public Builder setConfigSchema(KafkaConfigSchema configSchema) {
            this.configSchema = configSchema;
            return this;
        }

        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> logManager) {
            this.raftClient = logManager;
            return this;
        }

        public Builder setQuorumFeatures(QuorumFeatures quorumFeatures) {
            this.quorumFeatures = quorumFeatures;
            return this;
        }

        public Builder setDefaultReplicationFactor(short defaultReplicationFactor) {
            this.defaultReplicationFactor = defaultReplicationFactor;
            return this;
        }

        public Builder setDefaultNumPartitions(int defaultNumPartitions) {
            this.defaultNumPartitions = defaultNumPartitions;
            return this;
        }

        public ReplicaPlacer replicaPlacer() {
            return this.replicaPlacer;
        }

        public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
            this.replicaPlacer = replicaPlacer;
            return this;
        }

        public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong value) {
            this.leaderImbalanceCheckIntervalNs = value;
            return this;
        }

        public Builder setMaxIdleIntervalNs(OptionalLong value) {
            this.maxIdleIntervalNs = value;
            return this;
        }

        public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
            this.sessionTimeoutNs = sessionTimeoutNs;
            return this;
        }

        public Builder setMetrics(ControllerMetrics controllerMetrics) {
            this.controllerMetrics = controllerMetrics;
            return this;
        }

        public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) {
            this.bootstrapMetadata = bootstrapMetadata;
            return this;
        }

        public Builder setMaxRecordsPerBatch(int maxRecordsPerBatch) {
            this.maxRecordsPerBatch = maxRecordsPerBatch;
            return this;
        }

        public Builder setCreateTopicPolicy(Optional<CreateTopicPolicy> createTopicPolicy) {
            this.createTopicPolicy = createTopicPolicy;
            return this;
        }

        public Builder setAlterConfigPolicy(Optional<AlterConfigPolicy> alterConfigPolicy) {
            this.alterConfigPolicy = alterConfigPolicy;
            return this;
        }

        public Builder setCreateClusterLinkPolicy(Optional<CreateClusterLinkPolicy> createClusterLinkPolicy) {
            this.createClusterLinkPolicy = createClusterLinkPolicy;
            return this;
        }

        public Builder setConfigurationValidator(ConfigurationValidator configurationValidator) {
            this.configurationValidator = configurationValidator;
            return this;
        }

        public Builder setAuthorizer(ClusterMetadataAuthorizer authorizer) {
            this.authorizer = Optional.of(authorizer);
            return this;
        }

        public Builder setCellAssignor(CellAssignor cellAssignor) {
            this.cellAssignor = cellAssignor;
            return this;
        }

        public Builder setPartitionPlacementStrategy(PartitionPlacementStrategy partitionPlacementStrategy) {
            this.partitionPlacementStrategy = partitionPlacementStrategy;
            return this;
        }

        public Builder setIsImplicitCellCreationEnabled(boolean isImplicitCellCreationEnabled) {
            this.isImplicitCellCreationEnabled = isImplicitCellCreationEnabled;
            return this;
        }

        public Builder setStaticConfig(Map<String, Object> staticConfig) {
            this.staticConfig = staticConfig;
            return this;
        }

        public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) {
            this.zkMigrationEnabled = zkMigrationEnabled;
            return this;
        }

        public Builder setDefaultTopicPlacement(Optional<TopicPlacement> defaultTopicPlacement) {
            this.defaultTopicPlacement = defaultTopicPlacement;
            return this;
        }

        public Builder setCellMetrics(CellMetrics cellMetrics) {
            this.cellMetrics = cellMetrics;
            return this;
        }

        public QuorumController build() throws Exception {
            if (this.raftClient == null) {
                throw new IllegalStateException("You must set a raft client.");
            }
            if (this.bootstrapMetadata == null) {
                throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
            }
            if (this.quorumFeatures == null) {
                throw new IllegalStateException("You must specify the quorum features");
            }
            if (this.nonFatalFaultHandler == null) {
                throw new IllegalStateException("You must specify a non-fatal fault handler.");
            }
            if (this.fatalFaultHandler == null) {
                throw new IllegalStateException("You must specify a fatal fault handler.");
            }
            if (this.cellAssignor == null) {
                throw new IllegalStateException("You must specify a cell assignor");
            }
            if (this.cellMetrics == null) {
                throw new IllegalStateException("You must specify a cell metrics");
            }
            if (this.threadNamePrefix == null) {
                this.threadNamePrefix = String.format("quorum-controller-%d-", this.nodeId);
            }
            if (this.logContext == null) {
                this.logContext = new LogContext(String.format("[QuorumController id=%d] ", this.nodeId));
            }
            if (this.controllerMetrics == null) {
                this.controllerMetrics = (ControllerMetrics)Class.forName("org.apache.kafka.controller.metrics.MockControllerMetrics").getConstructor(new Class[0]).newInstance(new Object[0]);
            }
            KafkaEventQueue queue = null;
            try {
                queue = new KafkaEventQueue(this.time, this.logContext, this.threadNamePrefix);
                return new QuorumController(this.nonFatalFaultHandler, this.fatalFaultHandler, this.logContext, this.nodeId, this.clusterId, queue, this.time, this.configSchema, this.raftClient, this.quorumFeatures, this.defaultReplicationFactor, this.defaultNumPartitions, this.replicaPlacer, this.leaderImbalanceCheckIntervalNs, this.maxIdleIntervalNs, this.sessionTimeoutNs, this.controllerMetrics, this.createTopicPolicy, this.alterConfigPolicy, this.configurationValidator, this.authorizer, this.staticConfig, this.bootstrapMetadata, this.nameToTenantCallback, this.defaultMinIsrCount, this.maxRecordsPerBatch, this.zkMigrationEnabled, this.createClusterLinkPolicy, this.cellAssignor, this.partitionPlacementStrategy, this.isImplicitCellCreationEnabled, this.defaultTopicPlacement, this.cellMetrics);
            }
            catch (Exception e) {
                Utils.closeQuietly(queue, (String)"event queue");
                throw e;
            }
        }
    }
}

