/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.active;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blocklist.BlocklistHandler;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
import org.apache.flink.runtime.metrics.ThresholdMeter;
import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
import org.apache.flink.runtime.resourcemanager.active.ResourceEventHandler;
import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
import org.apache.flink.runtime.resourcemanager.active.WorkerCounter;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocator;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceDeclaration;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.util.FlinkExpectedException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;

public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
extends ResourceManager<WorkerType>
implements ResourceEventHandler<WorkerType> {
    protected final Configuration flinkConfig;
    private final Duration startWorkerRetryInterval;
    private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
    private final Map<ResourceID, WorkerType> workerNodeMap;
    private final WorkerCounter pendingWorkerCounter;
    private final WorkerCounter totalWorkerCounter;
    private final Map<ResourceID, WorkerResourceSpec> workerResourceSpecs;
    private final Map<CompletableFuture<WorkerType>, WorkerResourceSpec> unallocatedWorkerFutures;
    private final Set<ResourceID> currentAttemptUnregisteredWorkers;
    private final Set<ResourceID> previousAttemptUnregisteredWorkers;
    private final ThresholdMeter startWorkerFailureRater;
    private final Duration workerRegistrationTimeout;
    private CompletableFuture<Void> startWorkerCoolDown;
    private final CompletableFuture<Void> readyToServeFuture;
    private final Duration previousWorkerRecoverTimeout;
    private Collection<ResourceDeclaration> resourceDeclarations;

    public ActiveResourceManager(ResourceManagerDriver<WorkerType> resourceManagerDriver, Configuration flinkConfig, RpcService rpcService, UUID leaderSessionId, ResourceID resourceId, HeartbeatServices heartbeatServices, DelegationTokenManager delegationTokenManager, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory, BlocklistHandler.Factory blocklistHandlerFactory, JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, ResourceManagerMetricGroup resourceManagerMetricGroup, ThresholdMeter startWorkerFailureRater, Duration retryInterval, Duration workerRegistrationTimeout, Duration previousWorkerRecoverTimeout, Executor ioExecutor) {
        super(rpcService, leaderSessionId, resourceId, heartbeatServices, delegationTokenManager, slotManager, clusterPartitionTrackerFactory, blocklistHandlerFactory, jobLeaderIdService, clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, Time.fromDuration((Duration)((Duration)((Configuration)Preconditions.checkNotNull((Object)flinkConfig)).get(AkkaOptions.ASK_TIMEOUT_DURATION))), ioExecutor);
        this.flinkConfig = flinkConfig;
        this.resourceManagerDriver = resourceManagerDriver;
        this.workerNodeMap = new HashMap<ResourceID, WorkerType>();
        this.pendingWorkerCounter = new WorkerCounter();
        this.totalWorkerCounter = new WorkerCounter();
        this.workerResourceSpecs = new HashMap<ResourceID, WorkerResourceSpec>();
        this.unallocatedWorkerFutures = new HashMap<CompletableFuture<WorkerType>, WorkerResourceSpec>();
        this.currentAttemptUnregisteredWorkers = new HashSet<ResourceID>();
        this.previousAttemptUnregisteredWorkers = new HashSet<ResourceID>();
        this.startWorkerFailureRater = (ThresholdMeter)Preconditions.checkNotNull((Object)startWorkerFailureRater);
        this.startWorkerRetryInterval = retryInterval;
        this.workerRegistrationTimeout = workerRegistrationTimeout;
        this.startWorkerCoolDown = FutureUtils.completedVoidFuture();
        this.previousWorkerRecoverTimeout = previousWorkerRecoverTimeout;
        this.readyToServeFuture = new CompletableFuture();
        this.resourceDeclarations = new HashSet<ResourceDeclaration>();
    }

    @Override
    protected void initialize() throws ResourceManagerException {
        try {
            this.resourceManagerDriver.initialize(this, new GatewayMainThreadExecutor(), this.ioExecutor, this.blocklistHandler::getAllBlockedNodeIds);
        }
        catch (Exception e) {
            throw new ResourceManagerException("Cannot initialize resource provider.", e);
        }
    }

    @Override
    protected void terminate() throws ResourceManagerException {
        try {
            this.resourceManagerDriver.terminate();
        }
        catch (Exception e) {
            throw new ResourceManagerException("Cannot terminate resource provider.", e);
        }
    }

    @Override
    protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String optionalDiagnostics) throws ResourceManagerException {
        try {
            this.resourceManagerDriver.deregisterApplication(finalStatus, optionalDiagnostics);
        }
        catch (Exception e) {
            throw new ResourceManagerException("Cannot deregister application.", e);
        }
    }

    @Override
    protected Optional<WorkerType> getWorkerNodeIfAcceptRegistration(ResourceID resourceID) {
        return Optional.ofNullable(this.workerNodeMap.get(resourceID));
    }

    @VisibleForTesting
    public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) {
        this.resourceDeclarations = Collections.unmodifiableCollection(resourceDeclarations);
        this.log.debug("Update resource declarations to {}.", resourceDeclarations);
        this.checkResourceDeclarations();
    }

    @Override
    protected void onWorkerRegistered(WorkerType worker, WorkerResourceSpec workerResourceSpec) {
        ResourceID resourceId = worker.getResourceID();
        this.log.info("Worker {} is registered.", (Object)resourceId.getStringWithMetadata());
        this.tryRemovePreviousPendingRecoveryTaskManager(resourceId);
        if (!this.workerResourceSpecs.containsKey(worker.getResourceID())) {
            this.workerResourceSpecs.put(worker.getResourceID(), workerResourceSpec);
            this.totalWorkerCounter.increaseAndGet(workerResourceSpec);
            this.log.info("Recovered worker {} with resource spec {} registered", (Object)resourceId.getStringWithMetadata(), (Object)workerResourceSpec);
        }
        if (this.currentAttemptUnregisteredWorkers.remove(resourceId)) {
            int count = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
            this.log.info("Worker {} with resource spec {} was requested in current attempt. Current pending count after registering: {}.", new Object[]{resourceId.getStringWithMetadata(), workerResourceSpec, count});
        }
    }

    @Override
    protected void registerMetrics() {
        super.registerMetrics();
        this.resourceManagerMetricGroup.meter("startWorkFailurePerSecond", this.startWorkerFailureRater);
        this.resourceManagerMetricGroup.gauge("numPendingTaskManagers", this.pendingWorkerCounter::getTotalNum);
    }

    @Override
    public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> recoveredWorkers) {
        this.getMainThreadExecutor().assertRunningInMainThread();
        this.log.info("Recovered {} workers from previous attempt.", (Object)recoveredWorkers.size());
        for (ResourceIDRetrievable worker : recoveredWorkers) {
            ResourceID resourceId = worker.getResourceID();
            this.workerNodeMap.put(resourceId, worker);
            this.previousAttemptUnregisteredWorkers.add(resourceId);
            this.scheduleWorkerRegistrationTimeoutCheck(resourceId);
            this.log.info("Worker {} recovered from previous attempt.", (Object)resourceId.getStringWithMetadata());
        }
        if (recoveredWorkers.size() > 0 && !this.previousWorkerRecoverTimeout.isZero()) {
            this.scheduleRunAsync(() -> {
                this.readyToServeFuture.complete(null);
                this.log.info("Timeout to wait recovery taskmanagers, recovery future is completed.");
            }, this.previousWorkerRecoverTimeout.toMillis(), TimeUnit.MILLISECONDS);
        } else {
            this.readyToServeFuture.complete(null);
        }
    }

    @Override
    public void onWorkerTerminated(ResourceID resourceId, String diagnostics) {
        if (this.currentAttemptUnregisteredWorkers.contains(resourceId)) {
            this.recordWorkerFailureAndPauseWorkerCreationIfNeeded();
        }
        if (this.clearStateForWorker(resourceId)) {
            this.log.info("Worker {} is terminated. Diagnostics: {}", (Object)resourceId.getStringWithMetadata(), (Object)diagnostics);
            this.checkResourceDeclarations();
        }
        this.closeTaskManagerConnection(resourceId, new Exception(diagnostics));
    }

    @Override
    public void onError(Throwable exception) {
        this.onFatalError(exception);
    }

    private void checkResourceDeclarations() {
        this.validateRunsInMainThread();
        for (ResourceDeclaration resourceDeclaration : this.resourceDeclarations) {
            WorkerResourceSpec workerResourceSpec = resourceDeclaration.getSpec();
            int declaredWorkerNumber = resourceDeclaration.getNumNeeded();
            int releaseOrRequestWorkerNumber = this.totalWorkerCounter.getNum(workerResourceSpec) - declaredWorkerNumber;
            if (releaseOrRequestWorkerNumber > 0) {
                this.log.info("need release {} workers, current worker number {}, declared worker number {}", new Object[]{releaseOrRequestWorkerNumber, this.totalWorkerCounter.getNum(workerResourceSpec), declaredWorkerNumber});
                int remainingReleasingWorkerNumber = this.releaseUnWantedResources(resourceDeclaration.getUnwantedWorkers(), releaseOrRequestWorkerNumber);
                if (remainingReleasingWorkerNumber > 0) {
                    remainingReleasingWorkerNumber = this.releaseUnallocatedWorkers(workerResourceSpec, remainingReleasingWorkerNumber);
                }
                if (remainingReleasingWorkerNumber > 0) {
                    remainingReleasingWorkerNumber = this.releaseAllocatedWorkers(this.currentAttemptUnregisteredWorkers, workerResourceSpec, remainingReleasingWorkerNumber);
                }
                if (remainingReleasingWorkerNumber > 0) {
                    remainingReleasingWorkerNumber = this.releaseAllocatedWorkers(this.workerNodeMap.keySet(), workerResourceSpec, remainingReleasingWorkerNumber);
                }
                Preconditions.checkState((remainingReleasingWorkerNumber == 0 ? 1 : 0) != 0, (Object)"there are no more workers to release");
                continue;
            }
            if (releaseOrRequestWorkerNumber < 0) {
                if (this.startWorkerCoolDown.isDone()) {
                    int requestWorkerNumber = -releaseOrRequestWorkerNumber;
                    this.log.info("need request {} new workers, current worker number {}, declared worker number {}", new Object[]{requestWorkerNumber, this.totalWorkerCounter.getNum(workerResourceSpec), declaredWorkerNumber});
                    for (int i = 0; i < requestWorkerNumber; ++i) {
                        this.requestNewWorker(workerResourceSpec);
                    }
                    continue;
                }
                this.startWorkerCoolDown.thenRun(this::checkResourceDeclarations);
                continue;
            }
            this.log.debug("current worker number {} meets the declared worker {}", (Object)this.totalWorkerCounter.getNum(workerResourceSpec), (Object)declaredWorkerNumber);
        }
    }

    private int releaseUnWantedResources(Collection<InstanceID> unwantedWorkers, int needReleaseWorkerNumber) {
        FlinkExpectedException cause = new FlinkExpectedException("slot manager has determined that the resource is no longer needed");
        for (InstanceID unwantedWorker : unwantedWorkers) {
            if (needReleaseWorkerNumber <= 0) break;
            if (!this.releaseResource(unwantedWorker, (Exception)cause)) continue;
            --needReleaseWorkerNumber;
        }
        return needReleaseWorkerNumber;
    }

    private int releaseUnallocatedWorkers(WorkerResourceSpec workerResourceSpec, int needReleaseWorkerNumber) {
        Set unallocatedWorkerFuturesShouldRelease = this.unallocatedWorkerFutures.entrySet().stream().filter(e -> ((WorkerResourceSpec)e.getValue()).equals(workerResourceSpec)).map(Map.Entry::getKey).collect(Collectors.toSet());
        for (CompletableFuture requestFuture : unallocatedWorkerFuturesShouldRelease) {
            if (needReleaseWorkerNumber <= 0) break;
            if (!requestFuture.cancel(true)) continue;
            --needReleaseWorkerNumber;
        }
        return needReleaseWorkerNumber;
    }

    private int releaseAllocatedWorkers(Collection<ResourceID> candidateWorkers, WorkerResourceSpec workerResourceSpec, int needReleaseWorkerNumber) {
        List workerCanRelease = candidateWorkers.stream().filter(r -> workerResourceSpec.equals(this.workerResourceSpecs.get(r))).collect(Collectors.toList());
        FlinkExpectedException cause = new FlinkExpectedException("resource is no longer needed");
        for (ResourceID resourceID : workerCanRelease) {
            if (needReleaseWorkerNumber <= 0) break;
            if (this.releaseResource(resourceID, (Exception)cause)) {
                --needReleaseWorkerNumber;
                continue;
            }
            this.log.warn("Resource {} could not release.", (Object)resourceID);
        }
        return needReleaseWorkerNumber;
    }

    private boolean releaseResource(InstanceID instanceId, Exception cause) {
        Object worker = this.getWorkerByInstanceId(instanceId);
        if (worker != null) {
            return this.releaseResource(worker.getResourceID(), cause);
        }
        this.log.debug("Instance {} not found in ResourceManager.", (Object)instanceId);
        return false;
    }

    private boolean releaseResource(ResourceID resourceID, Exception cause) {
        if (this.workerNodeMap.containsKey(resourceID)) {
            this.internalStopWorker(resourceID);
            this.closeTaskManagerConnection(resourceID, cause);
            return true;
        }
        return false;
    }

    @VisibleForTesting
    public void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
        TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(this.flinkConfig, workerResourceSpec);
        int pendingCount = this.pendingWorkerCounter.increaseAndGet(workerResourceSpec);
        this.totalWorkerCounter.increaseAndGet(workerResourceSpec);
        this.log.info("Requesting new worker with resource spec {}, current pending count: {}.", (Object)workerResourceSpec, (Object)pendingCount);
        CompletableFuture<WorkerType> requestResourceFuture = this.resourceManagerDriver.requestResource(taskExecutorProcessSpec);
        this.unallocatedWorkerFutures.put(requestResourceFuture, workerResourceSpec);
        FutureUtils.assertNoException((CompletableFuture)requestResourceFuture.handle((worker, exception) -> {
            this.unallocatedWorkerFutures.remove(requestResourceFuture);
            if (exception != null) {
                int count = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
                this.totalWorkerCounter.decreaseAndGet(workerResourceSpec);
                if (exception instanceof CancellationException) {
                    this.log.info("Requesting worker with resource spec {} canceled, current pending count: {}", (Object)workerResourceSpec, (Object)count);
                } else {
                    this.log.warn("Failed requesting worker with resource spec {}, current pending count: {}", new Object[]{workerResourceSpec, count, exception});
                    this.recordWorkerFailureAndPauseWorkerCreationIfNeeded();
                    this.checkResourceDeclarations();
                }
            } else {
                ResourceID resourceId = worker.getResourceID();
                this.workerNodeMap.put(resourceId, worker);
                this.workerResourceSpecs.put(resourceId, workerResourceSpec);
                this.currentAttemptUnregisteredWorkers.add(resourceId);
                this.scheduleWorkerRegistrationTimeoutCheck(resourceId);
                this.log.info("Requested worker {} with resource spec {}.", (Object)resourceId.getStringWithMetadata(), (Object)workerResourceSpec);
            }
            return null;
        }));
    }

    private void scheduleWorkerRegistrationTimeoutCheck(ResourceID resourceId) {
        this.scheduleRunAsync(() -> {
            if (this.currentAttemptUnregisteredWorkers.contains(resourceId) || this.previousAttemptUnregisteredWorkers.contains(resourceId)) {
                this.log.warn("Worker {} did not register in {}, will stop it and request a new one if needed.", (Object)resourceId, (Object)this.workerRegistrationTimeout);
                this.internalStopWorker(resourceId);
                this.checkResourceDeclarations();
            }
        }, this.workerRegistrationTimeout);
    }

    private void internalStopWorker(ResourceID resourceId) {
        this.log.info("Stopping worker {}.", (Object)resourceId.getStringWithMetadata());
        ResourceIDRetrievable worker = (ResourceIDRetrievable)this.workerNodeMap.get(resourceId);
        if (worker != null) {
            this.resourceManagerDriver.releaseResource(worker);
        }
        this.clearStateForWorker(resourceId);
    }

    private boolean clearStateForWorker(ResourceID resourceId) {
        ResourceIDRetrievable worker = (ResourceIDRetrievable)this.workerNodeMap.remove(resourceId);
        if (worker == null) {
            this.log.debug("Ignore unrecognized worker {}.", (Object)resourceId.getStringWithMetadata());
            return false;
        }
        WorkerResourceSpec workerResourceSpec = this.workerResourceSpecs.remove(resourceId);
        this.tryRemovePreviousPendingRecoveryTaskManager(resourceId);
        if (workerResourceSpec != null) {
            this.totalWorkerCounter.decreaseAndGet(workerResourceSpec);
            if (this.currentAttemptUnregisteredWorkers.remove(resourceId)) {
                int count = this.pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
                this.log.info("Worker {} with resource spec {} was requested in current attempt and has not registered. Current pending count after removing: {}.", new Object[]{resourceId.getStringWithMetadata(), workerResourceSpec, count});
            }
        }
        return true;
    }

    private void recordWorkerFailureAndPauseWorkerCreationIfNeeded() {
        if (this.recordStartWorkerFailure()) {
            this.tryResetWorkerCreationCoolDown();
        }
    }

    private boolean recordStartWorkerFailure() {
        this.startWorkerFailureRater.markEvent();
        try {
            this.startWorkerFailureRater.checkAgainstThreshold();
        }
        catch (ThresholdMeter.ThresholdExceedException e) {
            this.log.warn("Reaching max start worker failure rate: {}", (Object)e.getMessage());
            return true;
        }
        return false;
    }

    private void tryResetWorkerCreationCoolDown() {
        if (this.startWorkerCoolDown.isDone()) {
            this.log.info("Will not retry creating worker in {}.", (Object)this.startWorkerRetryInterval);
            this.startWorkerCoolDown = new CompletableFuture();
            this.scheduleRunAsync(() -> this.startWorkerCoolDown.complete(null), this.startWorkerRetryInterval);
        }
    }

    @Override
    public CompletableFuture<Void> getReadyToServeFuture() {
        return this.readyToServeFuture;
    }

    @Override
    protected ResourceAllocator getResourceAllocator() {
        return new ResourceAllocatorImpl();
    }

    private void tryRemovePreviousPendingRecoveryTaskManager(ResourceID resourceID) {
        long sizeBeforeRemove = this.previousAttemptUnregisteredWorkers.size();
        if (this.previousAttemptUnregisteredWorkers.remove(resourceID)) {
            this.log.info("Pending recovery taskmanagers {} -> {}.{}", new Object[]{sizeBeforeRemove, this.previousAttemptUnregisteredWorkers.size(), this.previousAttemptUnregisteredWorkers.size() == 0 ? " Resource manager is ready to serve." : ""});
        }
        if (this.previousAttemptUnregisteredWorkers.size() == 0) {
            this.readyToServeFuture.complete(null);
        }
    }

    @VisibleForTesting
    <T> CompletableFuture<T> runInMainThread(Callable<T> callable, Time timeout) {
        return this.callAsync(callable, TimeUtils.toDuration((Time)timeout));
    }

    private class ResourceAllocatorImpl
    implements ResourceAllocator {
        private ResourceAllocatorImpl() {
        }

        @Override
        public boolean isSupported() {
            return true;
        }

        @Override
        public void cleaningUpDisconnectedResource(ResourceID resourceID) {
            ActiveResourceManager.this.validateRunsInMainThread();
            ActiveResourceManager.this.internalStopWorker(resourceID);
        }

        @Override
        public void declareResourceNeeded(Collection<ResourceDeclaration> resourceDeclarations) {
            ActiveResourceManager.this.validateRunsInMainThread();
            ActiveResourceManager.this.declareResourceNeeded(resourceDeclarations);
        }
    }

    private class GatewayMainThreadExecutor
    implements ScheduledExecutor {
        private GatewayMainThreadExecutor() {
        }

        public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(command, delay, unit);
        }

        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().schedule(callable, delay, unit);
        }

        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleAtFixedRate(command, initialDelay, period, unit);
        }

        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
            return ActiveResourceManager.this.getMainThreadExecutor().scheduleWithFixedDelay(command, initialDelay, delay, unit);
        }

        public void execute(Runnable command) {
            ActiveResourceManager.this.getMainThreadExecutor().execute(command);
        }
    }
}

