/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.heartbeat;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatMonitor;
import org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;

@ThreadSafe
public class HeartbeatManagerImpl<I, O>
implements HeartbeatManager<I, O> {
    private final long heartbeatTimeoutIntervalMs;
    private final ResourceID ownResourceID;
    private final HeartbeatListener<I, O> heartbeatListener;
    private final ScheduledExecutor mainThreadExecutor;
    protected final Logger log;
    private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;
    private final HeartbeatMonitor.Factory<O> heartbeatMonitorFactory;
    protected volatile boolean stopped;

    public HeartbeatManagerImpl(long heartbeatTimeoutIntervalMs, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log) {
        this(heartbeatTimeoutIntervalMs, ownResourceID, heartbeatListener, mainThreadExecutor, log, new HeartbeatMonitorImpl.Factory());
    }

    public HeartbeatManagerImpl(long heartbeatTimeoutIntervalMs, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor mainThreadExecutor, Logger log, HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
        Preconditions.checkArgument((heartbeatTimeoutIntervalMs > 0L ? 1 : 0) != 0, (Object)"The heartbeat timeout has to be larger than 0.");
        this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
        this.ownResourceID = (ResourceID)Preconditions.checkNotNull((Object)ownResourceID);
        this.heartbeatListener = (HeartbeatListener)Preconditions.checkNotNull(heartbeatListener, (String)"heartbeatListener");
        this.mainThreadExecutor = (ScheduledExecutor)Preconditions.checkNotNull((Object)mainThreadExecutor);
        this.log = (Logger)Preconditions.checkNotNull((Object)log);
        this.heartbeatMonitorFactory = heartbeatMonitorFactory;
        this.heartbeatTargets = new ConcurrentHashMap(16);
        this.stopped = false;
    }

    ResourceID getOwnResourceID() {
        return this.ownResourceID;
    }

    HeartbeatListener<I, O> getHeartbeatListener() {
        return this.heartbeatListener;
    }

    Map<ResourceID, HeartbeatMonitor<O>> getHeartbeatTargets() {
        return this.heartbeatTargets;
    }

    @Override
    public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
        if (!this.stopped) {
            if (this.heartbeatTargets.containsKey(resourceID)) {
                this.log.debug("The target with resource ID {} is already been monitored.", (Object)resourceID);
            } else {
                HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatMonitorFactory.createHeartbeatMonitor(resourceID, heartbeatTarget, this.mainThreadExecutor, this.heartbeatListener, this.heartbeatTimeoutIntervalMs);
                this.heartbeatTargets.put(resourceID, heartbeatMonitor);
                if (this.stopped) {
                    heartbeatMonitor.cancel();
                    this.heartbeatTargets.remove(resourceID);
                }
            }
        }
    }

    @Override
    public void unmonitorTarget(ResourceID resourceID) {
        HeartbeatMonitor<O> heartbeatMonitor;
        if (!this.stopped && (heartbeatMonitor = this.heartbeatTargets.remove(resourceID)) != null) {
            heartbeatMonitor.cancel();
        }
    }

    @Override
    public void stop() {
        this.stopped = true;
        for (HeartbeatMonitor<O> heartbeatMonitor : this.heartbeatTargets.values()) {
            heartbeatMonitor.cancel();
        }
        this.heartbeatTargets.clear();
    }

    @Override
    public long getLastHeartbeatFrom(ResourceID resourceId) {
        HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceId);
        if (heartbeatMonitor != null) {
            return heartbeatMonitor.getLastHeartbeat();
        }
        return -1L;
    }

    ScheduledExecutor getMainThreadExecutor() {
        return this.mainThreadExecutor;
    }

    @Override
    public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {
        if (!this.stopped) {
            this.log.debug("Received heartbeat from {}.", (Object)heartbeatOrigin);
            this.reportHeartbeat(heartbeatOrigin);
            if (heartbeatPayload != null) {
                this.heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
            }
        }
    }

    @Override
    public void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload) {
        if (!this.stopped) {
            this.log.debug("Received heartbeat request from {}.", (Object)requestOrigin);
            HeartbeatTarget<O> heartbeatTarget = this.reportHeartbeat(requestOrigin);
            if (heartbeatTarget != null) {
                if (heartbeatPayload != null) {
                    this.heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                }
                heartbeatTarget.receiveHeartbeat(this.getOwnResourceID(), this.heartbeatListener.retrievePayload(requestOrigin));
            }
        }
    }

    HeartbeatTarget<O> reportHeartbeat(ResourceID resourceID) {
        if (this.heartbeatTargets.containsKey(resourceID)) {
            HeartbeatMonitor<O> heartbeatMonitor = this.heartbeatTargets.get(resourceID);
            heartbeatMonitor.reportHeartbeat();
            return heartbeatMonitor.getHeartbeatTarget();
        }
        return null;
    }
}

