/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testframe.container;

import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testframe.container.FlinkContainers;
import org.apache.flink.connector.testframe.container.FlinkContainersSettings;
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
import org.apache.flink.connector.testframe.environment.ClusterControllable;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkContainerTestEnvironment
implements TestEnvironment,
ClusterControllable {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkContainerTestEnvironment.class);
    private final FlinkContainers flinkContainers;
    private final Collection<String> jarPaths = new ArrayList<String>();
    private String checkpointPath = FlinkContainersSettings.getDefaultCheckpointPath();

    public FlinkContainerTestEnvironment(int numTaskManagers, int numSlotsPerTaskManager, String ... jarPaths) {
        this(new Configuration(), numTaskManagers, numSlotsPerTaskManager, Arrays.asList(jarPaths));
    }

    public FlinkContainerTestEnvironment(Configuration clusterConfiguration, int numTaskManagers, int numSlotsPerTaskManager, String ... jarPaths) {
        this(clusterConfiguration, numTaskManagers, numSlotsPerTaskManager, Arrays.asList(jarPaths));
    }

    public FlinkContainerTestEnvironment(Configuration clusterConfiguration, int numTaskManagers, int numSlotsPerTaskManager, Collection<String> jarPaths) {
        TestcontainersSettings testcontainersSettings = TestcontainersSettings.builder().logger(LOG).build();
        FlinkContainersSettings flinkContainersSettings = FlinkContainersSettings.builder().basedOn(clusterConfiguration).numTaskManagers(numTaskManagers).numSlotsPerTaskManager(numSlotsPerTaskManager).enableZookeeperHA().jarPaths(jarPaths).build();
        this.flinkContainers = FlinkContainers.builder().withTestcontainersSettings(testcontainersSettings).withFlinkContainersSettings(flinkContainersSettings).build();
        this.jarPaths.addAll(jarPaths);
    }

    public static FlinkContainerTestEnvironment fromSettings(FlinkContainersSettings settings) {
        return new FlinkContainerTestEnvironment(settings);
    }

    public FlinkContainerTestEnvironment(FlinkContainersSettings settings) {
        TestcontainersSettings testcontainersSettings = TestcontainersSettings.builder().logger(LOG).build();
        this.flinkContainers = FlinkContainers.builder().withFlinkContainersSettings(settings).withTestcontainersSettings(testcontainersSettings).build();
        this.checkpointPath = settings.getCheckpointPath();
        this.jarPaths.addAll(settings.getJarPaths());
    }

    @Override
    public void startUp() throws Exception {
        if (!this.flinkContainers.isStarted()) {
            this.flinkContainers.start();
        }
    }

    @Override
    public void tearDown() {
        if (this.flinkContainers.isStarted()) {
            this.flinkContainers.stop();
        }
    }

    @Override
    public StreamExecutionEnvironment createExecutionEnvironment(TestEnvironmentSettings envOptions) {
        this.jarPaths.addAll(envOptions.getConnectorJarPaths().stream().map(URL::getPath).collect(Collectors.toList()));
        if (envOptions.getSavepointRestorePath() != null) {
            return new RemoteStreamEnvironment(this.flinkContainers.getJobManagerHost(), this.flinkContainers.getJobManagerPort(), null, this.jarPaths.toArray(new String[0]), null, SavepointRestoreSettings.forPath((String)envOptions.getSavepointRestorePath()));
        }
        return StreamExecutionEnvironment.createRemoteEnvironment((String)this.flinkContainers.getJobManagerHost(), (int)this.flinkContainers.getJobManagerPort(), (String[])this.jarPaths.toArray(new String[0]));
    }

    @Override
    public TestEnvironment.Endpoint getRestEndpoint() {
        return new TestEnvironment.Endpoint(this.flinkContainers.getJobManagerHost(), this.flinkContainers.getJobManagerPort());
    }

    @Override
    public String getCheckpointUri() {
        return this.checkpointPath;
    }

    @Override
    public void triggerJobManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception {
        this.flinkContainers.restartJobManager(afterFailAction::run);
    }

    @Override
    public void triggerTaskManagerFailover(JobClient jobClient, Runnable afterFailAction) throws Exception {
        this.flinkContainers.restartTaskManager(() -> {
            CommonTestUtils.waitForNoTaskRunning(() -> (JobDetailsInfo)this.flinkContainers.getRestClusterClient().getJobDetails(jobClient.getJobID()).get());
            afterFailAction.run();
        });
    }

    @Override
    public void isolateNetwork(JobClient jobClient, Runnable afterFailAction) {
    }

    public String toString() {
        return "FlinkContainers";
    }

    public FlinkContainers getFlinkContainers() {
        return this.flinkContainers;
    }
}

