/*
 * Decompiled with CFR 0.152.
 */
package software.amazon.awssdk.crt.mqtt;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.crt.AsyncCallback;
import software.amazon.awssdk.crt.CrtResource;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.http.HttpProxyOptions;
import software.amazon.awssdk.crt.http.HttpRequest;
import software.amazon.awssdk.crt.io.SocketOptions;
import software.amazon.awssdk.crt.io.TlsContext;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionEvents;
import software.amazon.awssdk.crt.mqtt.MqttClientConnectionOperationStatistics;
import software.amazon.awssdk.crt.mqtt.MqttConnectionConfig;
import software.amazon.awssdk.crt.mqtt.MqttException;
import software.amazon.awssdk.crt.mqtt.MqttMessage;
import software.amazon.awssdk.crt.mqtt.OnConnectionClosedReturn;
import software.amazon.awssdk.crt.mqtt.OnConnectionFailureReturn;
import software.amazon.awssdk.crt.mqtt.OnConnectionSuccessReturn;
import software.amazon.awssdk.crt.mqtt.QualityOfService;
import software.amazon.awssdk.crt.mqtt.WebsocketHandshakeTransformArgs;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
import software.amazon.awssdk.crt.mqtt5.Mqtt5ClientOptions;

public class MqttClientConnection
extends CrtResource {
    private static final int MAX_PORT = 65535;
    private MqttConnectionConfig config;
    private AsyncCallback connectAck;

    private static MqttConnectionConfig s_toMqtt3ConnectionConfig(Mqtt5ClientOptions mqtt5options) {
        MqttConnectionConfig options = new MqttConnectionConfig();
        options.setEndpoint(mqtt5options.getHostName());
        options.setPort(mqtt5options.getPort() != null ? Math.toIntExact(mqtt5options.getPort()) : 0);
        options.setSocketOptions(mqtt5options.getSocketOptions());
        if (mqtt5options.getConnectOptions() != null) {
            options.setClientId(mqtt5options.getConnectOptions().getClientId());
            options.setKeepAliveSecs(mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds() != null ? Math.toIntExact(mqtt5options.getConnectOptions().getKeepAliveIntervalSeconds()) : 0);
        }
        options.setCleanSession(mqtt5options.getSessionBehavior().compareTo(Mqtt5ClientOptions.ClientSessionBehavior.CLEAN) <= 0);
        options.setPingTimeoutMs(mqtt5options.getPingTimeoutMs() != null ? Math.toIntExact(mqtt5options.getPingTimeoutMs()) : 0);
        options.setProtocolOperationTimeoutMs(mqtt5options.getAckTimeoutSeconds() != null ? Math.toIntExact(mqtt5options.getAckTimeoutSeconds()) * 1000 : 0);
        return options;
    }

    public MqttClientConnection(MqttConnectionConfig config) throws MqttException {
        if (config.getMqttClient() == null) {
            throw new MqttException("mqttClient must not be null");
        }
        if (config.getClientId() == null) {
            throw new MqttException("clientId must not be null");
        }
        if (config.getEndpoint() == null) {
            throw new MqttException("endpoint must not be null");
        }
        if (config.getPort() <= 0 || config.getPort() > 65535) {
            throw new MqttException("port must be a positive integer between 1 and 65535");
        }
        try {
            this.acquireNativeHandle(MqttClientConnection.mqttClientConnectionNewFrom311Client(config.getMqttClient().getNativeHandle(), this));
            this.SetupConfig(config);
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
        }
    }

    public MqttClientConnection(Mqtt5Client mqtt5client, MqttClientConnectionEvents callbacks) throws MqttException {
        if (mqtt5client == null) {
            throw new MqttException("mqttClient must not be null");
        }
        try (MqttConnectionConfig config = MqttClientConnection.s_toMqtt3ConnectionConfig(mqtt5client.getClientOptions());){
            config.setMqtt5Client(mqtt5client);
            if (callbacks != null) {
                config.setConnectionCallbacks(callbacks);
            }
            if (config.getClientId() == null) {
                throw new MqttException("clientId must not be null");
            }
            if (config.getEndpoint() == null) {
                throw new MqttException("endpoint must not be null");
            }
            if (config.getPort() <= 0 || config.getPort() > 65535) {
                throw new MqttException("port must be a positive integer between 1 and 65535");
            }
            try {
                this.acquireNativeHandle(MqttClientConnection.mqttClientConnectionNewFrom5Client(config.getMqtt5Client().getNativeHandle(), this));
                this.SetupConfig(config);
            }
            catch (CrtRuntimeException ex) {
                throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
            }
        }
        catch (Exception e) {
            throw new MqttException("Failed to setup mqtt3 connection : " + e.getMessage());
        }
    }

    private void SetupConfig(MqttConnectionConfig config) throws MqttException {
        try {
            MqttMessage message;
            if (config.getUsername() != null) {
                MqttClientConnection.mqttClientConnectionSetLogin(this.getNativeHandle(), config.getUsername(), config.getPassword());
            }
            if (config.getMinReconnectTimeoutSecs() != 0L && config.getMaxReconnectTimeoutSecs() != 0L) {
                MqttClientConnection.mqttClientConnectionSetReconnectTimeout(this.getNativeHandle(), config.getMinReconnectTimeoutSecs(), config.getMaxReconnectTimeoutSecs());
            }
            if ((message = config.getWillMessage()) != null) {
                MqttClientConnection.mqttClientConnectionSetWill(this.getNativeHandle(), message.getTopic(), message.getQos().getValue(), message.getRetain(), message.getPayload());
            }
            if (config.getUseWebsockets()) {
                MqttClientConnection.mqttClientConnectionUseWebsockets(this.getNativeHandle());
            }
            if (config.getHttpProxyOptions() != null) {
                HttpProxyOptions options = config.getHttpProxyOptions();
                TlsContext proxyTlsContext = options.getTlsContext();
                MqttClientConnection.mqttClientConnectionSetHttpProxyOptions(this.getNativeHandle(), options.getConnectionType().getValue(), options.getHost(), options.getPort(), proxyTlsContext != null ? proxyTlsContext.getNativeHandle() : 0L, options.getAuthorizationType().getValue(), options.getAuthorizationUsername(), options.getAuthorizationPassword());
            }
            this.addReferenceTo(config);
            this.config = config;
        }
        catch (CrtRuntimeException ex) {
            throw new MqttException("Exception during mqttClientConnectionNew: " + ex.getMessage());
        }
    }

    @Override
    protected void releaseNativeHandle() {
        MqttClientConnection.mqttClientConnectionDestroy(this.getNativeHandle());
    }

    @Override
    protected boolean canReleaseReferencesImmediately() {
        return false;
    }

    private void onConnectionComplete(int errorCode, boolean sessionPresent) {
        MqttClientConnectionEvents callbacks;
        if (this.connectAck != null) {
            if (errorCode == 0) {
                this.connectAck.onSuccess(sessionPresent);
            } else {
                this.connectAck.onFailure(new MqttException(errorCode));
            }
            this.connectAck = null;
        }
        if ((callbacks = this.config.getConnectionCallbacks()) != null) {
            if (errorCode == 0) {
                OnConnectionSuccessReturn returnData = new OnConnectionSuccessReturn(sessionPresent);
                callbacks.onConnectionSuccess(returnData);
            } else {
                OnConnectionFailureReturn returnData = new OnConnectionFailureReturn(errorCode);
                callbacks.onConnectionFailure(returnData);
            }
        }
    }

    private void onConnectionInterrupted(int errorCode, AsyncCallback callback) {
        MqttClientConnectionEvents callbacks;
        if (callback != null) {
            if (errorCode == 0) {
                callback.onSuccess();
            } else {
                callback.onFailure(new MqttException(errorCode));
            }
        }
        if ((callbacks = this.config.getConnectionCallbacks()) != null) {
            callbacks.onConnectionInterrupted(errorCode);
        }
    }

    private void onConnectionSuccess(boolean sessionPresent) {
        MqttClientConnectionEvents callbacks = this.config.getConnectionCallbacks();
        if (callbacks != null) {
            OnConnectionSuccessReturn returnData = new OnConnectionSuccessReturn(sessionPresent);
            callbacks.onConnectionSuccess(returnData);
        }
    }

    private void onConnectionFailure(int errorCode) {
        MqttClientConnectionEvents callbacks = this.config.getConnectionCallbacks();
        if (callbacks != null) {
            OnConnectionFailureReturn returnData = new OnConnectionFailureReturn(errorCode);
            callbacks.onConnectionFailure(returnData);
        }
    }

    private void onConnectionResumed(boolean sessionPresent) {
        MqttClientConnectionEvents callbacks = this.config.getConnectionCallbacks();
        if (callbacks != null) {
            callbacks.onConnectionResumed(sessionPresent);
            OnConnectionSuccessReturn returnData = new OnConnectionSuccessReturn(sessionPresent);
            callbacks.onConnectionSuccess(returnData);
        }
    }

    private void onConnectionClosed() {
        MqttClientConnectionEvents callbacks;
        if (this.config != null && (callbacks = this.config.getConnectionCallbacks()) != null) {
            OnConnectionClosedReturn returnData = new OnConnectionClosedReturn();
            callbacks.onConnectionClosed(returnData);
        }
    }

    public CompletableFuture<Boolean> connect() throws MqttException {
        TlsContext tls = null;
        if (this.config.getMqttClient() != null) {
            tls = this.config.getMqttClient().getTlsContext();
        } else if (this.config.getMqtt5Client() != null) {
            tls = this.config.getMqtt5Client().getClientOptions().getTlsContext();
        }
        short pingTimeout = (short)Math.max(0, Math.min(this.config.getPingTimeoutMs(), Short.MAX_VALUE));
        int port = this.config.getPort();
        if (port > 65535 || port <= 0) {
            throw new MqttException("Port must be betweeen 0 and 65535");
        }
        CompletableFuture<Boolean> future = new CompletableFuture<Boolean>();
        this.connectAck = AsyncCallback.wrapFuture(future, null);
        SocketOptions socketOptions = this.config.getSocketOptions();
        try {
            MqttClientConnection.mqttClientConnectionConnect(this.getNativeHandle(), this.config.getEndpoint(), port, socketOptions != null ? socketOptions.getNativeHandle() : 0L, tls != null ? tls.getNativeHandle() : 0L, this.config.getClientId(), this.config.getCleanSession(), this.config.getKeepAliveSecs(), pingTimeout, this.config.getProtocolOperationTimeoutMs());
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
        }
        return future;
    }

    public CompletableFuture<Void> disconnect() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (this.isNull()) {
            future.complete(null);
            return future;
        }
        AsyncCallback disconnectAck = AsyncCallback.wrapFuture(future, null);
        MqttClientConnection.mqttClientConnectionDisconnect(this.getNativeHandle(), disconnectAck);
        return future;
    }

    public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos, Consumer<MqttMessage> handler) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during subscribe"));
            return future;
        }
        AsyncCallback subAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttClientConnection.mqttClientConnectionSubscribe(this.getNativeHandle(), topic, qos.getValue(), handler != null ? new MessageHandler(handler) : null, subAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    public CompletableFuture<Integer> subscribe(String topic, QualityOfService qos) {
        return this.subscribe(topic, qos, null);
    }

    public void onMessage(Consumer<MqttMessage> handler) {
        MqttClientConnection.mqttClientConnectionOnMessage(this.getNativeHandle(), new MessageHandler(handler));
    }

    public CompletableFuture<Integer> unsubscribe(String topic) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during unsubscribe"));
            return future;
        }
        AsyncCallback unsubAck = AsyncCallback.wrapFuture(future, 0);
        short packetId = MqttClientConnection.mqttClientConnectionUnsubscribe(this.getNativeHandle(), topic, unsubAck);
        return future.thenApply(unused -> packetId);
    }

    public CompletableFuture<Integer> publish(MqttMessage message) {
        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
        if (this.isNull()) {
            future.completeExceptionally(new MqttException("Invalid connection during publish"));
        }
        AsyncCallback pubAck = AsyncCallback.wrapFuture(future, 0);
        try {
            short packetId = MqttClientConnection.mqttClientConnectionPublish(this.getNativeHandle(), message.getTopic(), message.getQos().getValue(), message.getRetain(), message.getPayload(), pubAck);
            return future.thenApply(unused -> packetId);
        }
        catch (CrtRuntimeException ex) {
            future.completeExceptionally(ex);
            return future;
        }
    }

    @Deprecated
    public CompletableFuture<Integer> publish(MqttMessage message, QualityOfService qos, boolean retain) {
        return this.publish(new MqttMessage(message.getTopic(), message.getPayload(), qos, retain));
    }

    private void onWebsocketHandshake(HttpRequest handshakeRequest, long nativeUserData) {
        CompletableFuture<HttpRequest> future = new CompletableFuture<HttpRequest>();
        future.whenComplete((x, throwable) -> MqttClientConnection.mqttClientConnectionWebsocketHandshakeComplete(this.getNativeHandle(), x != null ? x.marshalForJni() : null, throwable, nativeUserData));
        WebsocketHandshakeTransformArgs args = new WebsocketHandshakeTransformArgs(this, handshakeRequest, future);
        Consumer<WebsocketHandshakeTransformArgs> transform = this.config.getWebsocketHandshakeTransform();
        if (transform != null) {
            transform.accept(args);
        } else {
            args.complete(handshakeRequest);
        }
    }

    public MqttClientConnectionOperationStatistics getOperationStatistics() {
        return MqttClientConnection.mqttClientConnectionGetOperationStatistics(this.getNativeHandle());
    }

    private static native long mqttClientConnectionNewFrom311Client(long var0, MqttClientConnection var2) throws CrtRuntimeException;

    private static native long mqttClientConnectionNewFrom5Client(long var0, MqttClientConnection var2) throws CrtRuntimeException;

    private static native void mqttClientConnectionDestroy(long var0);

    private static native void mqttClientConnectionConnect(long var0, String var2, int var3, long var4, long var6, String var8, boolean var9, int var10, short var11, int var12) throws CrtRuntimeException;

    private static native void mqttClientConnectionDisconnect(long var0, AsyncCallback var2);

    private static native short mqttClientConnectionSubscribe(long var0, String var2, int var3, MessageHandler var4, AsyncCallback var5) throws CrtRuntimeException;

    private static native void mqttClientConnectionOnMessage(long var0, MessageHandler var2) throws CrtRuntimeException;

    private static native short mqttClientConnectionUnsubscribe(long var0, String var2, AsyncCallback var3);

    private static native short mqttClientConnectionPublish(long var0, String var2, int var3, boolean var4, byte[] var5, AsyncCallback var6) throws CrtRuntimeException;

    private static native boolean mqttClientConnectionSetWill(long var0, String var2, int var3, boolean var4, byte[] var5) throws CrtRuntimeException;

    private static native void mqttClientConnectionSetLogin(long var0, String var2, String var3) throws CrtRuntimeException;

    private static native void mqttClientConnectionSetReconnectTimeout(long var0, long var2, long var4) throws CrtRuntimeException;

    private static native void mqttClientConnectionUseWebsockets(long var0) throws CrtRuntimeException;

    private static native void mqttClientConnectionWebsocketHandshakeComplete(long var0, byte[] var2, Throwable var3, long var4) throws CrtRuntimeException;

    private static native void mqttClientConnectionSetHttpProxyOptions(long var0, int var2, String var3, int var4, long var5, int var7, String var8, String var9) throws CrtRuntimeException;

    private static native MqttClientConnectionOperationStatistics mqttClientConnectionGetOperationStatistics(long var0);

    private class MessageHandler {
        Consumer<MqttMessage> callback;

        private MessageHandler(Consumer<MqttMessage> callback) {
            this.callback = callback;
        }

        void deliver(String topic, byte[] payload, boolean dup, int qos, boolean retain) {
            QualityOfService qosEnum = QualityOfService.getEnumValueFromInteger(qos);
            this.callback.accept(new MqttMessage(topic, payload, qosEnum, retain, dup));
        }
    }
}

