/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.jobmaster.ServiceConnectionManager;
import org.apache.flink.runtime.jobmaster.slotpool.DeclareResourceRequirementServiceConnectionManager;
import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclareResourceRequirementServiceConnectionManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class DefaultDeclareResourceRequirementServiceConnectionManagerTest
extends TestLogger {
    private final ManuallyTriggeredScheduledExecutorService scheduledExecutor = new ManuallyTriggeredScheduledExecutorService();
    private final JobID jobId = new JobID();

    @Test
    public void testIgnoreDeclareResourceRequirementsIfNotConnected() {
        DeclareResourceRequirementServiceConnectionManager declareResourceRequirementServiceConnectionManager = this.createResourceManagerConnectionManager();
        declareResourceRequirementServiceConnectionManager.declareResourceRequirements(this.createResourceRequirements());
    }

    @Test
    public void testDeclareResourceRequirementsSendsRequirementsIfConnected() {
        DeclareResourceRequirementServiceConnectionManager declareResourceRequirementServiceConnectionManager = this.createResourceManagerConnectionManager();
        CompletableFuture declareResourceRequirementsFuture = new CompletableFuture();
        declareResourceRequirementServiceConnectionManager.connect(resourceRequirements -> {
            declareResourceRequirementsFuture.complete(resourceRequirements);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        ResourceRequirements resourceRequirements2 = this.createResourceRequirements();
        declareResourceRequirementServiceConnectionManager.declareResourceRequirements(resourceRequirements2);
        Assert.assertThat(declareResourceRequirementsFuture.join(), (Matcher)CoreMatchers.is((Object)resourceRequirements2));
    }

    @Test
    public void testRetryDeclareResourceRequirementsIfTransmissionFailed() throws InterruptedException {
        DeclareResourceRequirementServiceConnectionManager declareResourceRequirementServiceConnectionManager = this.createResourceManagerConnectionManager();
        FailingDeclareResourceRequirementsService failingDeclareResourceRequirementsService = new FailingDeclareResourceRequirementsService(4);
        declareResourceRequirementServiceConnectionManager.connect((Object)failingDeclareResourceRequirementsService);
        ResourceRequirements resourceRequirements = this.createResourceRequirements();
        declareResourceRequirementServiceConnectionManager.declareResourceRequirements(resourceRequirements);
        this.scheduledExecutor.triggerNonPeriodicScheduledTasksWithRecursion();
        Assert.assertThat((Object)failingDeclareResourceRequirementsService.nextResourceRequirements(), (Matcher)CoreMatchers.is((Object)resourceRequirements));
        Assert.assertThat((Object)failingDeclareResourceRequirementsService.hasResourceRequirements(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testDisconnectStopsSendingResourceRequirements() throws InterruptedException {
        this.runStopSendingResourceRequirementsTest(ServiceConnectionManager::disconnect);
    }

    @Test
    public void testCloseStopsSendingResourceRequirements() throws InterruptedException {
        this.runStopSendingResourceRequirementsTest(ServiceConnectionManager::close);
    }

    private void runStopSendingResourceRequirementsTest(Consumer<DeclareResourceRequirementServiceConnectionManager> testAction) throws InterruptedException {
        DeclareResourceRequirementServiceConnectionManager declareResourceRequirementServiceConnectionManager = this.createResourceManagerConnectionManager();
        FailingDeclareResourceRequirementsService declareResourceRequirementsService = new FailingDeclareResourceRequirementsService(1);
        declareResourceRequirementServiceConnectionManager.connect((Object)declareResourceRequirementsService);
        ResourceRequirements resourceRequirements = this.createResourceRequirements();
        declareResourceRequirementServiceConnectionManager.declareResourceRequirements(resourceRequirements);
        declareResourceRequirementsService.waitForResourceRequirementsDeclaration();
        testAction.accept(declareResourceRequirementServiceConnectionManager);
        this.scheduledExecutor.triggerNonPeriodicScheduledTasksWithRecursion();
        Assert.assertThat((Object)declareResourceRequirementsService.hasResourceRequirements(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void testNewResourceRequirementsOverrideOldRequirements() throws InterruptedException {
        DeclareResourceRequirementServiceConnectionManager declareResourceRequirementServiceConnectionManager = this.createResourceManagerConnectionManager();
        ResourceRequirements resourceRequirements1 = this.createResourceRequirements(Arrays.asList(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)1)));
        ResourceRequirements resourceRequirements2 = this.createResourceRequirements(Arrays.asList(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)2)));
        FailingDeclareResourceRequirementsService failingDeclareResourceRequirementsService = new FailingDeclareResourceRequirementsService(1);
        declareResourceRequirementServiceConnectionManager.connect((Object)failingDeclareResourceRequirementsService);
        declareResourceRequirementServiceConnectionManager.declareResourceRequirements(resourceRequirements1);
        failingDeclareResourceRequirementsService.waitForResourceRequirementsDeclaration();
        declareResourceRequirementServiceConnectionManager.declareResourceRequirements(resourceRequirements2);
        this.scheduledExecutor.triggerNonPeriodicScheduledTasksWithRecursion();
        Assert.assertThat((Object)failingDeclareResourceRequirementsService.nextResourceRequirements(), (Matcher)CoreMatchers.is((Object)resourceRequirements2));
        Assert.assertThat((Object)failingDeclareResourceRequirementsService.hasResourceRequirements(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Nonnull
    private ResourceRequirements createResourceRequirements() {
        return this.createResourceRequirements(Arrays.asList(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)2)));
    }

    private ResourceRequirements createResourceRequirements(List<ResourceRequirement> requestedResourceRequirements) {
        return ResourceRequirements.create((JobID)this.jobId, (String)"localhost", requestedResourceRequirements);
    }

    @Nonnull
    private DeclareResourceRequirementServiceConnectionManager createResourceManagerConnectionManager() {
        return DefaultDeclareResourceRequirementServiceConnectionManager.create((ScheduledExecutor)this.scheduledExecutor);
    }

    private static final class FailingDeclareResourceRequirementsService
    implements DeclareResourceRequirementServiceConnectionManager.DeclareResourceRequirementsService {
        private final BlockingQueue<ResourceRequirements> resourceRequirements = new ArrayBlockingQueue<ResourceRequirements>(2);
        private final OneShotLatch declareResourceRequirementsLatch = new OneShotLatch();
        private int failureCounter;

        private FailingDeclareResourceRequirementsService(int failureCounter) {
            this.failureCounter = failureCounter;
        }

        public CompletableFuture<Acknowledge> declareResourceRequirements(ResourceRequirements resourceRequirements) {
            if (this.failureCounter > 0) {
                --this.failureCounter;
                this.declareResourceRequirementsLatch.trigger();
                return FutureUtils.completedExceptionally((Throwable)new FlinkException("Test exception"));
            }
            this.resourceRequirements.offer(resourceRequirements);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        private boolean hasResourceRequirements() {
            return !this.resourceRequirements.isEmpty();
        }

        private ResourceRequirements nextResourceRequirements() throws InterruptedException {
            return this.resourceRequirements.take();
        }

        public void waitForResourceRequirementsDeclaration() throws InterruptedException {
            this.declareResourceRequirementsLatch.await();
        }
    }
}

