/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testframe.environment;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.connector.testframe.environment.ClusterControllable;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
public class MiniClusterTestEnvironment
implements TestEnvironment,
ClusterControllable {
    private static final Logger LOG = LoggerFactory.getLogger(MiniClusterTestEnvironment.class);
    private final MiniClusterWithClientResource miniCluster;
    private final java.nio.file.Path checkpointPath;
    private int latestTMIndex = 0;
    private boolean isStarted = false;

    public MiniClusterTestEnvironment() {
        this(MiniClusterTestEnvironment.defaultMiniClusterResourceConfiguration());
    }

    public MiniClusterTestEnvironment(MiniClusterResourceConfiguration conf) {
        this.miniCluster = new MiniClusterWithClientResource(conf);
        try {
            this.checkpointPath = Files.createTempDirectory("minicluster-environment-checkpoint-", new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to create temporary checkpoint directory", e);
        }
    }

    private static MiniClusterResourceConfiguration defaultMiniClusterResourceConfiguration() {
        Configuration conf = new Configuration();
        conf.set(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL, (Object)1000L);
        return new MiniClusterResourceConfiguration.Builder().setConfiguration(conf).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(6).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build();
    }

    @Override
    public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions) {
        Configuration configuration = new Configuration();
        if (envOptions.getSavepointRestorePath() != null) {
            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, envOptions.getSavepointRestorePath());
        }
        return new TestStreamEnvironment(this.miniCluster.getMiniCluster(), configuration, this.miniCluster.getNumberSlots(), (Collection)envOptions.getConnectorJarPaths().stream().map(url -> new Path(url.getPath())).collect(Collectors.toList()), Collections.emptyList());
    }

    @Override
    public TestEnvironment.Endpoint getRestEndpoint() {
        try {
            URI restAddress = (URI)this.miniCluster.getMiniCluster().getRestAddress().get();
            return new TestEnvironment.Endpoint(restAddress.getHost(), restAddress.getPort());
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to get REST endpoint of MiniCluster", e);
        }
    }

    @Override
    public String getCheckpointUri() {
        return this.checkpointPath.toUri().toString();
    }

    @Override
    public void triggerJobManagerFailover(JobClient jobClient, Runnable afterFailAction) throws ExecutionException, InterruptedException {
        Optional controlOptional = this.miniCluster.getMiniCluster().getHaLeadershipControl();
        if (!controlOptional.isPresent()) {
            throw new UnsupportedOperationException("This MiniCluster does not support JobManager HA");
        }
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl)controlOptional.get();
        haLeadershipControl.revokeJobMasterLeadership(jobClient.getJobID()).get();
        afterFailAction.run();
        haLeadershipControl.grantJobMasterLeadership(jobClient.getJobID()).get();
    }

    @Override
    public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception {
        this.terminateTaskManager();
        CommonTestUtils.waitForNoTaskRunning(() -> (JobDetailsInfo)this.miniCluster.getRestClusterClient().getJobDetails(jobClient.getJobID()).get());
        afterFailAction.run();
        this.startTaskManager();
    }

    @Override
    public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) {
        throw new UnsupportedOperationException("Cannot isolate network in a MiniCluster");
    }

    @Override
    public void startUp() throws Exception {
        if (this.isStarted) {
            return;
        }
        this.miniCluster.before();
        LOG.debug("MiniCluster is running");
        this.isStarted = true;
    }

    @Override
    public void tearDown() throws Exception {
        if (!this.isStarted) {
            return;
        }
        this.isStarted = false;
        this.miniCluster.after();
        FileUtils.deleteDirectory((File)this.checkpointPath.toFile());
        LOG.debug("MiniCluster has been tear down");
    }

    private void terminateTaskManager() throws Exception {
        this.miniCluster.getMiniCluster().terminateTaskManager(this.latestTMIndex).get();
        LOG.debug("TaskManager {} has been terminated.", (Object)this.latestTMIndex);
    }

    private void startTaskManager() throws Exception {
        this.miniCluster.getMiniCluster().startTaskManager();
        ++this.latestTMIndex;
        LOG.debug("New TaskManager {} has been launched.", (Object)this.latestTMIndex);
    }

    public String toString() {
        return "MiniCluster";
    }
}

