/*
 * Decompiled with CFR 0.152.
 */
package akka.stream.alpakka.cassandra;

import akka.ConfigurationException;
import akka.actor.ActorSystem;
import akka.discovery.Discovery$;
import akka.stream.alpakka.cassandra.CqlSessionProvider$;
import akka.stream.alpakka.cassandra.DriverConfigLoaderFromConfig$;
import akka.util.JavaDurationConverters;
import akka.util.JavaDurationConverters$;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigMergeable;
import java.io.Serializable;
import scala.Function0;
import scala.Function1;
import scala.collection.immutable.Seq;
import scala.compat.java8.FutureConverters;
import scala.compat.java8.FutureConverters$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

public final class AkkaDiscoverySessionProvider$ {
    public static final AkkaDiscoverySessionProvider$ MODULE$ = new AkkaDiscoverySessionProvider$();

    public Future<CqlSession> connect(ActorSystem system, Config config, ExecutionContext ec) {
        return this.readNodes(config, system, ec).flatMap((Function1 & Serializable)contactPoints -> {
            Config driverConfigWithContactPoints = ConfigFactory.parseString((String)new StringBuilder(43).append("\n        basic.contact-points = [").append(contactPoints.mkString("\"", "\", \"", "\"")).append("]\n        ").toString()).withFallback((ConfigMergeable)CqlSessionProvider$.MODULE$.driverConfig(system, config));
            DriverConfigLoader driverConfigLoader = DriverConfigLoaderFromConfig$.MODULE$.fromConfig(driverConfigWithContactPoints);
            return FutureConverters.CompletionStageOps$.MODULE$.toScala$extension(FutureConverters$.MODULE$.CompletionStageOps(CqlSession.builder().withConfigLoader(driverConfigLoader).buildAsync()));
        }, ec);
    }

    private Future<Seq<String>> readNodes(Config config, ActorSystem system, ExecutionContext ec) {
        Config serviceConfig = config.getConfig("service-discovery");
        String serviceName = serviceConfig.getString("name");
        FiniteDuration lookupTimeout = JavaDurationConverters.JavaDurationOps$.MODULE$.asScala$extension(JavaDurationConverters$.MODULE$.JavaDurationOps(serviceConfig.getDuration("lookup-timeout")));
        return this.readNodes(serviceName, lookupTimeout, system, ec);
    }

    private Future<Seq<String>> readNodes(String serviceName, FiniteDuration lookupTimeout, ActorSystem system, ExecutionContext ec) {
        return Discovery$.MODULE$.apply(system).discovery().lookup(serviceName, lookupTimeout).map((Function1 & Serializable)resolved -> (Seq)resolved.addresses().map((Function1 & Serializable)target -> new StringBuilder(1).append(target.host()).append(":").append(target.port().getOrElse((Function0 & Serializable)() -> {
            throw new ConfigurationException(new StringBuilder(66).append("Akka Discovery for Cassandra service [").append(serviceName).append("] must provide a port for [").append(target.host()).append("]").toString());
        })).toString()), ec);
    }

    private AkkaDiscoverySessionProvider$() {
    }
}

