/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import akka.pattern.AskTimeoutException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotStatusSyncer;
import org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedTaskManagerTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceTracker;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotState;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerSlotInformation;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerTracker;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.SlotStatus;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Test;

public class DefaultSlotStatusSyncerTest
extends TestLogger {
    private static final Time TASK_MANAGER_REQUEST_TIMEOUT = Time.seconds((long)10L);
    private static final TaskExecutorConnection TASK_EXECUTOR_CONNECTION = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)new TestingTaskExecutorGatewayBuilder().createTestingTaskExecutorGateway());

    @Test
    public void testAllocateSlot() throws Exception {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        CompletableFuture requestFuture = new CompletableFuture();
        CompletableFuture<Acknowledge> responseFuture = new CompletableFuture<Acknowledge>();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(tuple6 -> {
            requestFuture.complete(tuple6);
            return responseFuture;
        }).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        taskManagerTracker.addTaskManager(taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        JobID jobId = new JobID();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)TestingUtils.defaultExecutor());
        CompletableFuture allocatedFuture = slotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobId, "address", ResourceProfile.ANY);
        AllocationID allocationId = (AllocationID)((Tuple6)requestFuture.get()).f2;
        Assert.assertThat((Object)resourceTracker.getAcquiredResources(jobId), (Matcher)Matchers.contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)ResourceProfile.ANY, (int)1)}));
        Assert.assertTrue((boolean)taskManagerTracker.getAllocatedOrPendingSlot(allocationId).isPresent());
        Assert.assertThat((Object)((TaskManagerSlotInformation)taskManagerTracker.getAllocatedOrPendingSlot(allocationId).get()).getJobId(), (Matcher)Is.is((Object)jobId));
        Assert.assertThat((Object)((TaskManagerSlotInformation)taskManagerTracker.getAllocatedOrPendingSlot(allocationId).get()).getState(), (Matcher)Is.is((Object)SlotState.PENDING));
        responseFuture.complete(Acknowledge.get());
        Assert.assertFalse((boolean)allocatedFuture.isCompletedExceptionally());
    }

    @Test
    public void testAllocateSlotFailsWithException() {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> FutureUtils.completedExceptionally((Throwable)new AskTimeoutException("timeout"))).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        taskManagerTracker.addTaskManager(taskExecutorConnection, ResourceProfile.ANY, ResourceProfile.ANY);
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        JobID jobId = new JobID();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)TestingUtils.defaultExecutor());
        CompletableFuture allocatedFuture = slotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobId, "address", ResourceProfile.ANY);
        try {
            allocatedFuture.get();
        }
        catch (Exception e) {
            Assert.assertThat((Object)e.getCause(), (Matcher)Matchers.instanceOf(AskTimeoutException.class));
        }
        Assert.assertThat((Object)resourceTracker.getAcquiredResources(jobId), (Matcher)Is.is((Matcher)Matchers.empty()));
        Assert.assertThat(((TaskManagerInfo)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAllocatedSlots().keySet(), (Matcher)Is.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testFreeSlot() {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        JobID jobId = new JobID();
        AllocationID allocationId = new AllocationID();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)TestingUtils.defaultExecutor());
        taskManagerTracker.addTaskManager(TASK_EXECUTOR_CONNECTION, ResourceProfile.ANY, ResourceProfile.ANY);
        taskManagerTracker.notifySlotStatus(allocationId, jobId, TASK_EXECUTOR_CONNECTION.getInstanceID(), ResourceProfile.ANY, SlotState.ALLOCATED);
        resourceTracker.notifyAcquiredResource(jobId, ResourceProfile.ANY);
        slotStatusSyncer.freeSlot(allocationId);
        Assert.assertThat((Object)resourceTracker.getAcquiredResources(jobId), (Matcher)Is.is((Matcher)Matchers.empty()));
        Assert.assertThat(((TaskManagerInfo)taskManagerTracker.getRegisteredTaskManager(TASK_EXECUTOR_CONNECTION.getInstanceID()).get()).getAllocatedSlots().keySet(), (Matcher)Is.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testSlotStatusProcessing() {
        FineGrainedTaskManagerTracker taskManagerTracker = new FineGrainedTaskManagerTracker();
        DefaultResourceTracker resourceTracker = new DefaultResourceTracker();
        DefaultSlotStatusSyncer slotStatusSyncer = new DefaultSlotStatusSyncer(TASK_MANAGER_REQUEST_TIMEOUT);
        slotStatusSyncer.initialize((TaskManagerTracker)taskManagerTracker, (ResourceTracker)resourceTracker, ResourceManagerId.generate(), (Executor)TestingUtils.defaultExecutor());
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setRequestSlotFunction(ignored -> new CompletableFuture()).createTestingTaskExecutorGateway();
        TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(ResourceID.generate(), (TaskExecutorGateway)taskExecutorGateway);
        JobID jobId = new JobID();
        AllocationID allocationId1 = new AllocationID();
        AllocationID allocationId2 = new AllocationID();
        SlotID slotId1 = new SlotID(taskExecutorConnection.getResourceID(), 0);
        SlotID slotId2 = new SlotID(taskExecutorConnection.getResourceID(), 1);
        SlotID slotId3 = new SlotID(taskExecutorConnection.getResourceID(), 2);
        ResourceProfile totalResource = ResourceProfile.fromResources((double)5.0, (int)20);
        ResourceProfile resource = ResourceProfile.fromResources((double)1.0, (int)4);
        SlotReport slotReport1 = new SlotReport(Arrays.asList(new SlotStatus(slotId1, totalResource), new SlotStatus(slotId2, resource, jobId, allocationId1), new SlotStatus(slotId3, resource, jobId, allocationId2)));
        SlotReport slotReport2 = new SlotReport(Arrays.asList(new SlotStatus(slotId3, resource), new SlotStatus(slotId2, resource, jobId, allocationId1)));
        taskManagerTracker.addTaskManager(taskExecutorConnection, totalResource, totalResource);
        slotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport1);
        Assert.assertThat((Object)resourceTracker.getAcquiredResources(jobId), (Matcher)Matchers.contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)resource, (int)2)}));
        Assert.assertThat((Object)((TaskManagerInfo)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAvailableResource(), (Matcher)IsEqual.equalTo((Object)ResourceProfile.fromResources((double)3.0, (int)12)));
        Assert.assertTrue((boolean)taskManagerTracker.getAllocatedOrPendingSlot(allocationId1).isPresent());
        Assert.assertTrue((boolean)taskManagerTracker.getAllocatedOrPendingSlot(allocationId2).isPresent());
        slotStatusSyncer.allocateSlot(taskExecutorConnection.getInstanceID(), jobId, "address", resource);
        Assert.assertThat((Object)resourceTracker.getAcquiredResources(jobId), (Matcher)Matchers.contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)resource, (int)3)}));
        Assert.assertThat((Object)((TaskManagerInfo)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAvailableResource(), (Matcher)IsEqual.equalTo((Object)ResourceProfile.fromResources((double)2.0, (int)8)));
        AllocationID allocationId3 = ((TaskManagerInfo)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAllocatedSlots().keySet().stream().filter(allocationId -> !allocationId.equals((Object)allocationId1) && !allocationId.equals((Object)allocationId2)).findAny().get();
        slotStatusSyncer.reportSlotStatus(taskExecutorConnection.getInstanceID(), slotReport2);
        Assert.assertThat((Object)resourceTracker.getAcquiredResources(jobId), (Matcher)Matchers.contains((Object[])new ResourceRequirement[]{ResourceRequirement.create((ResourceProfile)resource, (int)2)}));
        Assert.assertThat((Object)((TaskManagerInfo)taskManagerTracker.getRegisteredTaskManager(taskExecutorConnection.getInstanceID()).get()).getAvailableResource(), (Matcher)IsEqual.equalTo((Object)ResourceProfile.fromResources((double)3.0, (int)12)));
        Assert.assertTrue((boolean)taskManagerTracker.getAllocatedOrPendingSlot(allocationId1).isPresent());
        Assert.assertFalse((boolean)taskManagerTracker.getAllocatedOrPendingSlot(allocationId2).isPresent());
        Assert.assertTrue((boolean)taskManagerTracker.getAllocatedOrPendingSlot(allocationId3).isPresent());
        Assert.assertThat((Object)((TaskManagerSlotInformation)taskManagerTracker.getAllocatedOrPendingSlot(allocationId1).get()).getState(), (Matcher)Is.is((Object)SlotState.ALLOCATED));
        Assert.assertThat((Object)((TaskManagerSlotInformation)taskManagerTracker.getAllocatedOrPendingSlot(allocationId3).get()).getState(), (Matcher)Is.is((Object)SlotState.PENDING));
    }
}

