/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.shaded.akka.org.jboss.netty.channel.ChannelException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AkkaRpcServiceUtils {
    private static final Logger LOG = LoggerFactory.getLogger(AkkaRpcServiceUtils.class);
    private static final String AKKA_TCP = "akka.tcp";
    private static final String AkKA_SSL_TCP = "akka.ssl.tcp";
    private static final AtomicLong nextNameOffset = new AtomicLong(0L);

    public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception {
        ActorSystem actorSystem;
        LOG.info("Starting AkkaRpcService at {}.", (Object)NetUtils.unresolvedHostAndPortToNormalizedString((String)hostname, (int)port));
        try {
            Config akkaConfig = hostname != null && !hostname.isEmpty() ? AkkaUtils.getAkkaConfig(configuration, hostname, port) : AkkaUtils.getAkkaConfig(configuration);
            LOG.debug("Using akka configuration \n {}.", (Object)akkaConfig);
            actorSystem = AkkaUtils.createActorSystem(akkaConfig);
        }
        catch (Throwable t) {
            Throwable cause;
            if (t instanceof ChannelException && (cause = t.getCause()) != null && t.getCause() instanceof BindException) {
                String address = NetUtils.hostAndPortToUrlString((String)hostname, (int)port);
                throw new IOException("Unable to bind AkkaRpcService actor system to address " + address + " - " + cause.getMessage(), t);
            }
            throw new Exception("Could not create TaskManager actor system", t);
        }
        Time timeout = Time.milliseconds((long)AkkaUtils.getTimeout(configuration).toMillis());
        return new AkkaRpcService(actorSystem, timeout);
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, HighAvailabilityServicesUtils.AddressResolution addressResolution, Configuration config) throws UnknownHostException {
        Preconditions.checkNotNull((Object)config, (String)"config is null");
        boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) && SSLUtils.getSSLEnabled(config);
        return AkkaRpcServiceUtils.getRpcUrl(hostname, port, endpointName, addressResolution, sslEnabled ? AkkaProtocol.SSL_TCP : AkkaProtocol.TCP);
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, HighAvailabilityServicesUtils.AddressResolution addressResolution, AkkaProtocol akkaProtocol) throws UnknownHostException {
        String protocolPrefix;
        Preconditions.checkNotNull((Object)hostname, (String)"hostname is null");
        Preconditions.checkNotNull((Object)endpointName, (String)"endpointName is null");
        Preconditions.checkArgument((port > 0 && port <= 65535 ? 1 : 0) != 0, (Object)"port must be in [1, 65535]");
        String string = protocolPrefix = akkaProtocol == AkkaProtocol.SSL_TCP ? AkKA_SSL_TCP : AKKA_TCP;
        if (addressResolution == HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION) {
            InetAddress.getByName(hostname);
        }
        String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString((String)hostname, (int)port);
        return String.format("%s://flink@%s/user/%s", protocolPrefix, hostPort, endpointName);
    }

    public static String createRandomName(String prefix) {
        long nameOffset;
        Preconditions.checkNotNull((Object)prefix, (String)"Prefix must not be null.");
        while (!nextNameOffset.compareAndSet(nameOffset = nextNameOffset.get(), nameOffset + 1L)) {
        }
        return prefix + '_' + nameOffset;
    }

    private AkkaRpcServiceUtils() {
    }

    public static enum AkkaProtocol {
        TCP,
        SSL_TCP;

    }
}

