/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.heartbeat;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.slf4j.Logger;

public class HeartbeatManagerSenderImpl<I, O>
extends HeartbeatManagerImpl<I, O>
implements Runnable {
    private final ScheduledFuture<?> triggerFuture;

    public HeartbeatManagerSenderImpl(long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, Executor executor, ScheduledExecutor scheduledExecutor, Logger log) {
        super(heartbeatTimeout, ownResourceID, heartbeatListener, executor, scheduledExecutor, log);
        this.triggerFuture = scheduledExecutor.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
    }

    @Override
    public void run() {
        if (!this.stopped) {
            this.log.debug("Trigger heartbeat request.");
            for (HeartbeatManagerImpl.HeartbeatMonitor heartbeatMonitor : this.getHeartbeatTargets()) {
                CompletableFuture futurePayload = this.getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
                HeartbeatTarget heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
                if (futurePayload != null) {
                    CompletionStage requestHeartbeatFuture = futurePayload.thenAcceptAsync(payload -> heartbeatTarget.requestHeartbeat(this.getOwnResourceID(), payload), this.getExecutor());
                    ((CompletableFuture)requestHeartbeatFuture).exceptionally(failure -> {
                        this.log.warn("Could not request the heartbeat from target {}.", (Object)heartbeatTarget, failure);
                        return null;
                    });
                    continue;
                }
                heartbeatTarget.requestHeartbeat(this.getOwnResourceID(), null);
            }
        }
    }

    @Override
    public void stop() {
        this.triggerFuture.cancel(true);
        super.stop();
    }
}

