/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.postgresql;

import io.netty.buffer.ByteBufAllocator;
import io.r2dbc.postgresql.ConnectionFunction;
import io.r2dbc.postgresql.ConnectionStrategy;
import io.r2dbc.postgresql.ConnectionStrategyFactory;
import io.r2dbc.postgresql.DefaultPortalNameSupplier;
import io.r2dbc.postgresql.DefaultPostgresqlReplicationConnection;
import io.r2dbc.postgresql.Extensions;
import io.r2dbc.postgresql.MultiHostConnectionStrategy;
import io.r2dbc.postgresql.PostgresqlConnection;
import io.r2dbc.postgresql.PostgresqlConnectionConfiguration;
import io.r2dbc.postgresql.PostgresqlConnectionFactoryMetadata;
import io.r2dbc.postgresql.StatementCache;
import io.r2dbc.postgresql.api.ErrorDetails;
import io.r2dbc.postgresql.api.PostgresqlException;
import io.r2dbc.postgresql.api.PostgresqlReplicationConnection;
import io.r2dbc.postgresql.client.Client;
import io.r2dbc.postgresql.client.ConnectionSettings;
import io.r2dbc.postgresql.client.ReactorNettyClient;
import io.r2dbc.postgresql.codec.DefaultCodecs;
import io.r2dbc.postgresql.extension.CodecRegistrar;
import io.r2dbc.postgresql.util.Assert;
import io.r2dbc.postgresql.util.Operators;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

public final class PostgresqlConnectionFactory
implements ConnectionFactory {
    private static final ConnectionFunction DEFAULT_CONNECTION_FUNCTION = (endpoint, settings) -> ReactorNettyClient.connect(endpoint, settings).cast(Client.class);
    private static final String REPLICATION_OPTION = "replication";
    private static final String REPLICATION_DATABASE = "database";
    private final ConnectionFunction connectionFunction;
    private final PostgresqlConnectionConfiguration configuration;
    private final Extensions extensions;

    public PostgresqlConnectionFactory(PostgresqlConnectionConfiguration configuration) {
        this(DEFAULT_CONNECTION_FUNCTION, configuration);
    }

    PostgresqlConnectionFactory(ConnectionFunction connectionFunction, PostgresqlConnectionConfiguration configuration) {
        this.connectionFunction = Assert.requireNonNull(connectionFunction, "connectionFunction must not be null");
        this.configuration = Assert.requireNonNull(configuration, "configuration must not be null");
        this.extensions = PostgresqlConnectionFactory.getExtensions(configuration);
    }

    private static Extensions getExtensions(PostgresqlConnectionConfiguration configuration) {
        Extensions extensions = Extensions.from(configuration.getExtensions());
        if (configuration.isAutodetectExtensions()) {
            extensions = extensions.mergeWith(Extensions.autodetect());
        }
        return extensions;
    }

    public Mono<io.r2dbc.postgresql.api.PostgresqlConnection> create() {
        if (this.isReplicationConnection()) {
            throw new UnsupportedOperationException("Cannot create replication connection through create(). Use replication() method instead.");
        }
        ConnectionStrategy connectionStrategy = ConnectionStrategyFactory.getConnectionStrategy(this.connectionFunction, this.configuration, this.configuration.getConnectionSettings());
        return this.doCreateConnection(false, connectionStrategy).cast(io.r2dbc.postgresql.api.PostgresqlConnection.class);
    }

    public Mono<PostgresqlReplicationConnection> replication() {
        LinkedHashMap<String, String> options = new LinkedHashMap<String, String>(this.configuration.getOptions());
        options.put(REPLICATION_OPTION, REPLICATION_DATABASE);
        ConnectionSettings connectionSettings = this.configuration.getConnectionSettings().mutate(builder -> builder.startupOptions(options));
        ConnectionStrategy connectionStrategy = ConnectionStrategyFactory.getConnectionStrategy(this.connectionFunction, this.configuration, connectionSettings);
        return this.doCreateConnection(true, connectionStrategy).map(DefaultPostgresqlReplicationConnection::new);
    }

    private Mono<PostgresqlConnection> doCreateConnection(boolean forReplication, ConnectionStrategy connectionStrategy) {
        ZoneId defaultZone = TimeZone.getDefault().toZoneId();
        return ((Flux)connectionStrategy.connect().flatMap(client -> {
            DefaultCodecs codecs = new DefaultCodecs(client.getByteBufAllocator(), this.configuration.isPreferAttachedBuffers(), () -> client.getTimeZone().map(TimeZone::toZoneId).orElse(defaultZone));
            StatementCache statementCache = StatementCache.fromPreparedStatementCacheQueries(client, this.configuration.getPreparedStatementCacheQueries());
            PostgresqlConnection earlyConnection = new PostgresqlConnection((Client)client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, IsolationLevel.READ_COMMITTED, this.configuration);
            Mono<IsolationLevel> isolationLevelMono = Mono.just((Object)IsolationLevel.READ_COMMITTED);
            if (!forReplication) {
                isolationLevelMono = this.getIsolationLevel(earlyConnection);
            }
            return isolationLevelMono.map(isolationLevel -> new PostgresqlConnection((Client)client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, (IsolationLevel)isolationLevel, this.configuration)).delayUntil(connection -> this.prepareConnection((PostgresqlConnection)connection, client.getByteBufAllocator(), codecs, forReplication)).onErrorResume(throwable -> this.closeWithError((Client)client, (Throwable)throwable));
        }).onErrorMap(e -> this.cannotConnect((Throwable)e, connectionStrategy)).flux().as(Operators::discardOnCancel)).single().doOnDiscard(PostgresqlConnection.class, client -> client.close().subscribe());
    }

    private boolean isReplicationConnection() {
        Map<String, String> options = this.configuration.getOptions();
        return REPLICATION_DATABASE.equalsIgnoreCase(options.get(REPLICATION_OPTION));
    }

    private Publisher<?> prepareConnection(PostgresqlConnection connection, ByteBufAllocator byteBufAllocator, DefaultCodecs codecs, boolean forReplication) {
        ArrayList publishers = new ArrayList();
        if (!forReplication) {
            this.extensions.forEach(CodecRegistrar.class, it -> publishers.add(it.register(connection, byteBufAllocator, codecs)));
        }
        return Flux.concat(publishers).then();
    }

    private Mono<PostgresqlConnection> closeWithError(Client client, Throwable throwable) {
        return client.close().then(Mono.error((Throwable)throwable));
    }

    private Throwable cannotConnect(Throwable throwable, ConnectionStrategy strategy) {
        if (throwable instanceof MultiHostConnectionStrategy.ExceptionAggregator) {
            String message = throwable.getMessage() != null ? String.format("Cannot connect to %s: %s", strategy, throwable.getMessage()) : String.format("Cannot connect to %s", strategy);
            PostgresConnectionException exception = new PostgresConnectionException(message, throwable.getCause());
            for (Throwable t : throwable.getSuppressed()) {
                exception.addSuppressed(t);
            }
            return exception;
        }
        if (throwable instanceof R2dbcException) {
            return throwable;
        }
        return new PostgresConnectionException(String.format("Cannot connect to %s", strategy), throwable);
    }

    public ConnectionFactoryMetadata getMetadata() {
        return PostgresqlConnectionFactoryMetadata.INSTANCE;
    }

    PostgresqlConnectionConfiguration getConfiguration() {
        return this.configuration;
    }

    public String toString() {
        return "PostgresqlConnectionFactory{, configuration=" + this.configuration + ", extensions=" + this.extensions + '}';
    }

    private Mono<IsolationLevel> getIsolationLevel(io.r2dbc.postgresql.api.PostgresqlConnection connection) {
        return connection.createStatement("SHOW TRANSACTION ISOLATION LEVEL").fetchSize(0).execute().flatMap(it -> it.map((row, rowMetadata) -> {
            String level = (String)row.get(0, String.class);
            if (level == null) {
                return IsolationLevel.READ_COMMITTED;
            }
            return IsolationLevel.valueOf((String)level.toUpperCase(Locale.US));
        })).defaultIfEmpty((Object)IsolationLevel.READ_COMMITTED).last();
    }

    static class PostgresConnectionException
    extends R2dbcNonTransientResourceException
    implements PostgresqlException {
        private static final String CONNECTION_DOES_NOT_EXIST = "08003";
        private final ErrorDetails errorDetails;

        public PostgresConnectionException(String reason, @Nullable Throwable cause) {
            super(reason, CONNECTION_DOES_NOT_EXIST, 0, null, cause);
            this.errorDetails = ErrorDetails.fromCodeAndMessage(CONNECTION_DOES_NOT_EXIST, reason);
        }

        @Override
        public ErrorDetails getErrorDetails() {
            return this.errorDetails;
        }
    }
}

