/**
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */
#include <jni.h>

#include <aws/common/atomics.h>
#include <aws/common/condition_variable.h>
#include <aws/common/logging.h>
#include <aws/common/mutex.h>
#include <aws/common/string.h>
#include <aws/common/thread.h>
#include <aws/http/connection.h>
#include <aws/http/proxy.h>
#include <aws/http/request_response.h>
#include <aws/io/channel.h>
#include <aws/io/channel_bootstrap.h>
#include <aws/io/event_loop.h>
#include <aws/io/host_resolver.h>
#include <aws/io/socket.h>
#include <aws/io/socket_channel_handler.h>
#include <aws/io/tls_channel_handler.h>
#include <aws/mqtt/client.h>

#include <ctype.h>
#include <string.h>

#include "crt.h"

#include "http_request_utils.h"
#include "java_class_ids.h"
#include "mqtt5_client_jni.h"

/*******************************************************************************
 * mqtt_jni_async_callback - carries an AsyncCallback around as user data to mqtt
 * async ops, and is used to deliver callbacks. Also hangs on to JNI references
 * to buffers and strings that need to outlive the request
 ******************************************************************************/
struct mqtt_jni_async_callback {
    struct mqtt_jni_connection *connection;
    jobject async_callback;
    struct aws_byte_buf buffer; /* payloads or other pinned resources go in here, freed when callback is delivered */
};

/*******************************************************************************
 * mqtt_jni_connection - represents an aws_mqtt_client_connection to Java
 ******************************************************************************/
struct mqtt_jni_connection {
    struct aws_mqtt_client *client; /* Provided to mqtt_connect */
    struct aws_mqtt_client_connection *client_connection;
    struct aws_socket_options socket_options;
    struct aws_tls_connection_options tls_options;

    JavaVM *jvm;
    jweak java_mqtt_connection; /* MqttClientConnection instance */
    struct mqtt_jni_async_callback *on_message;

    struct aws_atomic_var ref_count;
};

/*******************************************************************************
 * mqtt_jni_ws_handshake - Data needed to perform the async websocket handshake
 * transform operations. Destroyed when transform is complete.
 ******************************************************************************/
struct mqtt_jni_ws_handshake {
    struct mqtt_jni_connection *connection;
    struct aws_http_message *http_request;
    aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn;
    void *complete_ctx;
};

static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection);

static void s_mqtt_jni_connection_acquire(struct mqtt_jni_connection *connection) {
    size_t old_value = aws_atomic_fetch_add(&connection->ref_count, 1);

    AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection acquire, ref count now = %d", (int)old_value + 1);
}

static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data);

static void s_mqtt_jni_connection_release(struct mqtt_jni_connection *connection) {
    size_t old_value = aws_atomic_fetch_sub(&connection->ref_count, 1);

    AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection release, ref count now = %d", (int)old_value - 1);
}

/* The destroy function is called on Java MqttClientConnection resource release. */
static void s_mqtt_jni_connection_destroy(struct mqtt_jni_connection *connection) {
    /* For mqtt311 client, we have to call aws_mqtt_client_connection_disconnect before releasing the underlying c
     * connection.*/
    if (aws_mqtt_client_connection_disconnect(
            connection->client_connection, s_on_shutdown_disconnect_complete, connection) != AWS_OP_SUCCESS) {

        /*
         * This can happen under normal code paths if the client happens to be disconnected at cleanup/shutdown
         * time. Log it (in case it was unexpected) and then shutdown the underlying connection manually.
         */
        AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "Client disconnect failed. Release the client connection.");
        s_on_shutdown_disconnect_complete(connection->client_connection, NULL);
    }
}

static struct mqtt_jni_async_callback *s_mqtt_jni_async_callback_new(
    struct mqtt_jni_connection *connection,
    jobject async_callback,
    JNIEnv *env) {

    if (env == NULL) {
        return NULL;
    }

    struct aws_allocator *allocator = aws_jni_get_allocator();
    /* allocate cannot fail */
    struct mqtt_jni_async_callback *callback = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_async_callback));
    callback->connection = connection;
    callback->async_callback = async_callback ? (*env)->NewGlobalRef(env, async_callback) : NULL;
    aws_byte_buf_init(&callback->buffer, aws_jni_get_allocator(), 0);

    return callback;
}

static void s_mqtt_jni_async_callback_destroy(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
    AWS_FATAL_ASSERT(callback && callback->connection);

    if (env == NULL) {
        return;
    }

    if (callback->async_callback) {
        (*env)->DeleteGlobalRef(env, callback->async_callback);
    }

    aws_byte_buf_clean_up(&callback->buffer);

    struct aws_allocator *allocator = aws_jni_get_allocator();
    aws_mem_release(allocator, callback);
}

static jobject s_new_mqtt_exception(JNIEnv *env, int error_code) {
    jobject exception = (*env)->NewObject(
        env, mqtt_exception_properties.jni_mqtt_exception, mqtt_exception_properties.jni_constructor, error_code);
    return exception;
}

/* on 32-bit platforms, casting pointers to longs throws a warning we don't need */
#if UINTPTR_MAX == 0xffffffff
#    if defined(_MSC_VER)
#        pragma warning(push)
#        pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */
#    else
#        pragma GCC diagnostic push
#        pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
#        pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
#    endif
#endif

/*******************************************************************************
 * new
 ******************************************************************************/
static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data);
static void s_on_connection_complete(
    struct aws_mqtt_client_connection *client_connection,
    int error_code,
    enum aws_mqtt_connect_return_code return_code,
    bool session_present,
    void *user_data) {
    (void)client_connection;
    (void)return_code;

    struct mqtt_jni_async_callback *connect_callback = user_data;
    struct mqtt_jni_connection *connection = connect_callback->connection;

    /********** JNI ENV ACQUIRE **********/
    JavaVM *jvm = connection->jvm;
    JNIEnv *env = aws_jni_acquire_thread_env(jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }

    jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
    if (mqtt_connection != NULL) {
        (*env)->CallVoidMethod(
            env, mqtt_connection, mqtt_connection_properties.on_connection_complete, error_code, session_present);

        (*env)->DeleteLocalRef(env, mqtt_connection);

        if (aws_jni_check_and_clear_exception(env)) {
            aws_jni_release_thread_env(connection->jvm, env);
            /********** JNI ENV RELEASE EARLY OUT **********/
            aws_mqtt_client_connection_disconnect(client_connection, s_on_connection_disconnected, connect_callback);
            return; /* callback and ref count will be cleaned up in s_on_connection_disconnected */
        }
    }

    s_mqtt_jni_async_callback_destroy(connect_callback, env);

    aws_jni_release_thread_env(jvm, env);
    /********** JNI ENV RELEASE **********/
    s_mqtt_jni_connection_release(connection);
}

static void s_on_connection_interrupted_internal(
    struct mqtt_jni_connection *connection,
    int error_code,
    jobject ack_callback,
    JNIEnv *env) {

    AWS_FATAL_ASSERT(env);

    jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
    if (mqtt_connection) {
        (*env)->CallVoidMethod(
            env, mqtt_connection, mqtt_connection_properties.on_connection_interrupted, error_code, ack_callback);

        (*env)->DeleteLocalRef(env, mqtt_connection);

        AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
    }
}

static void s_on_connection_interrupted(
    struct aws_mqtt_client_connection *client_connection,
    int error_code,
    void *user_data) {
    (void)client_connection;

    struct mqtt_jni_connection *connection = user_data;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }

    s_on_connection_interrupted_internal(user_data, error_code, NULL, env);

    aws_jni_release_thread_env(connection->jvm, env);
    /********** JNI ENV RELEASE **********/
}

static void s_on_connection_success(
    struct aws_mqtt_client_connection *client_connection,
    enum aws_mqtt_connect_return_code return_code,
    bool session_present,
    void *user_data) {
    (void)client_connection;
    (void)return_code;

    struct mqtt_jni_connection *connection = user_data;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }
    jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
    if (mqtt_connection) {

        (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_success, session_present);

        (*env)->DeleteLocalRef(env, mqtt_connection);

        AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
    }
    aws_jni_release_thread_env(connection->jvm, env);
    /********** JNI ENV RELEASE **********/
}

static void s_on_connection_failure(
    struct aws_mqtt_client_connection *client_connection,
    int error_code,
    void *user_data) {
    (void)client_connection;

    struct mqtt_jni_connection *connection = user_data;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }
    jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
    if (mqtt_connection) {
        (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_failure, error_code);

        (*env)->DeleteLocalRef(env, mqtt_connection);

        AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
    }
    aws_jni_release_thread_env(connection->jvm, env);
    /********** JNI ENV RELEASE **********/
}

static void s_on_connection_resumed(
    struct aws_mqtt_client_connection *client_connection,
    enum aws_mqtt_connect_return_code return_code,
    bool session_present,
    void *user_data) {
    (void)client_connection;
    (void)return_code;

    struct mqtt_jni_connection *connection = user_data;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }

    jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
    if (mqtt_connection) {

        (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_resumed, session_present);

        (*env)->DeleteLocalRef(env, mqtt_connection);

        AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
    }

    aws_jni_release_thread_env(connection->jvm, env);
    /********** JNI ENV RELEASE **********/
}

static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data) {
    (void)client_connection;

    struct mqtt_jni_async_callback *connect_callback = user_data;
    struct mqtt_jni_connection *jni_connection = connect_callback->connection;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }

    s_on_connection_interrupted_internal(connect_callback->connection, 0, connect_callback->async_callback, env);

    s_mqtt_jni_async_callback_destroy(connect_callback, env);

    AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));

    aws_jni_release_thread_env(jni_connection->jvm, env);
    /********** JNI ENV RELEASE **********/

    /* Do not call release here: s_on_connection_closed will (normally) be called
     * right after and so we can call the release there instead. */
}

static void s_on_connection_closed(
    struct aws_mqtt_client_connection *client_connection,
    struct on_connection_closed_data *data,
    void *user_data) {
    (void)client_connection;
    (void)data;

    struct mqtt_jni_connection *connection = user_data;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }

    // Make sure the Java object has not been garbage collected
    if (!(*env)->IsSameObject(env, connection->java_mqtt_connection, NULL)) {
        jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
        if (mqtt_connection) {
            (*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_closed);
            (*env)->DeleteLocalRef(env, mqtt_connection);
            AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
        }
    }
    aws_jni_release_thread_env(connection->jvm, env);
    /********** JNI ENV RELEASE **********/
}

static void s_on_connection_terminated(void *user_data) {

    struct mqtt_jni_connection *jni_connection = (struct mqtt_jni_connection *)user_data;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }

    jobject mqtt_connection = (*env)->NewLocalRef(env, jni_connection->java_mqtt_connection);
    if (mqtt_connection != NULL) {
        (*env)->CallVoidMethod(env, mqtt_connection, crt_resource_properties.release_references);

        (*env)->DeleteLocalRef(env, mqtt_connection);

        aws_jni_check_and_clear_exception(env);
    }

    JavaVM *jvm = jni_connection->jvm;

    s_mqtt_connection_destroy(env, jni_connection);
    aws_jni_release_thread_env(jvm, env);
    /********** JNI ENV RELEASE **********/
}

static struct mqtt_jni_connection *s_mqtt_connection_new(
    JNIEnv *env,
    struct aws_mqtt_client *client3,
    struct aws_mqtt5_client_java_jni *client5_jni,
    jobject java_mqtt_connection) {
    struct aws_allocator *allocator = aws_jni_get_allocator();

    struct mqtt_jni_connection *connection = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_connection));
    if (!connection) {
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqtt_connect: Out of memory allocating JNI connection");
        return NULL;
    }

    aws_atomic_store_int(&connection->ref_count, 1);
    connection->java_mqtt_connection = (*env)->NewWeakGlobalRef(env, java_mqtt_connection);
    jint jvmresult = (*env)->GetJavaVM(env, &connection->jvm);
    AWS_FATAL_ASSERT(jvmresult == 0);

    if (client3 != NULL) {
        connection->client = client3;
        connection->client_connection = aws_mqtt_client_connection_new(client3);
    } else if (client5_jni != NULL) {
        connection->client_connection = aws_mqtt_client_connection_new_from_mqtt5_client(client5_jni->client);
    }

    if (!connection->client_connection) {
        aws_jni_throw_runtime_exception(
            env,
            "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to create new "
            "connection");
        goto on_error;
    }

    if (aws_mqtt_client_connection_set_connection_termination_handler(
            connection->client_connection, s_on_connection_terminated, connection)) {
        aws_jni_throw_runtime_exception(
            env,
            "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to set termination "
            "callback");
        goto on_error;
    }

    return connection;

on_error:

    s_mqtt_jni_connection_release(connection);

    return NULL;
}

static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection) {
    if (connection == NULL) {
        return;
    }

    if (connection->on_message) {
        s_mqtt_jni_async_callback_destroy(connection->on_message, env);
    }

    if (connection->java_mqtt_connection) {
        (*env)->DeleteWeakGlobalRef(env, connection->java_mqtt_connection);
    }

    aws_tls_connection_options_clean_up(&connection->tls_options);

    struct aws_allocator *allocator = aws_jni_get_allocator();
    aws_mem_release(allocator, connection);
}

JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom311Client(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_client,
    jobject jni_mqtt_connection) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = NULL;
    struct aws_mqtt_client *client3 = (struct aws_mqtt_client *)jni_client;
    if (!client3) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt3 Client is invalid/null");
        return (jlong)NULL;
    }

    connection = s_mqtt_connection_new(env, client3, NULL, jni_mqtt_connection);
    if (!connection) {
        return (jlong)NULL;
    }

    aws_mqtt_client_connection_set_connection_result_handlers(
        connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
    aws_mqtt_client_connection_set_connection_interruption_handlers(
        connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
    aws_mqtt_client_connection_set_connection_closed_handler(
        connection->client_connection, s_on_connection_closed, connection);

    return (jlong)connection;
}

JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom5Client(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_client,
    jobject jni_mqtt_connection) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = NULL;
    struct aws_mqtt5_client_java_jni *client5_jni = (struct aws_mqtt5_client_java_jni *)jni_client;
    if (!client5_jni) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt5 Client is invalid/null");
        return (jlong)NULL;
    }

    connection = s_mqtt_connection_new(env, NULL, client5_jni, jni_mqtt_connection);
    if (!connection) {
        return (jlong)NULL;
    }

    aws_mqtt_client_connection_set_connection_result_handlers(
        connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
    aws_mqtt_client_connection_set_connection_interruption_handlers(
        connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
    aws_mqtt_client_connection_set_connection_closed_handler(
        connection->client_connection, s_on_connection_closed, connection);

    return (jlong)connection;
}

/* The disconnect callback called on shutdown. We will release the underlying connection here, which should init the
** client shutdown process. Then on termination callback, we will finally release all jni resources.
*/
static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data) {
    (void)user_data;

    AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection shutdown complete, releasing references");

    /* Release the underlying mqtt connection */
    aws_mqtt_client_connection_release(connection);
}

/*******************************************************************************
 * clean_up
 ******************************************************************************/
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDestroy(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection) {
    (void)jni_class;
    (void)env;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    s_mqtt_jni_connection_destroy(connection);
}

/*******************************************************************************
 * connect
 ******************************************************************************/
JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionConnect(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jstring jni_endpoint,
    jint jni_port,
    jlong jni_socket_options,
    jlong jni_tls_ctx,
    jstring jni_client_id,
    jboolean jni_clean_session,
    jint keep_alive_secs,
    jshort ping_timeout_ms,
    jint protocol_operation_timeout_ms) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Connection is invalid/null");
        return;
    }

    struct aws_byte_cursor client_id;
    AWS_ZERO_STRUCT(client_id);
    struct aws_byte_cursor endpoint = aws_jni_byte_cursor_from_jstring_acquire(env, jni_endpoint);
    uint32_t port = (uint32_t)jni_port;
    if (!port) {
        aws_jni_throw_runtime_exception(
            env,
            "MqttClientConnection.mqtt_new: Endpoint should be in the format hostname:port and port must not be 0");
        goto cleanup;
    }

    struct mqtt_jni_async_callback *connect_callback = s_mqtt_jni_async_callback_new(connection, NULL, env);
    if (connect_callback == NULL) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Failed to create async callback");
        goto cleanup;
    }

    s_mqtt_jni_connection_acquire(connection);

    struct aws_socket_options default_socket_options;
    AWS_ZERO_STRUCT(default_socket_options);
    default_socket_options.type = AWS_SOCKET_STREAM;
    default_socket_options.connect_timeout_ms = 3000;
    struct aws_socket_options *socket_options = &default_socket_options;
    if (jni_socket_options) {
        socket_options = (struct aws_socket_options *)jni_socket_options;
    }
    memcpy(&connection->socket_options, socket_options, sizeof(struct aws_socket_options));

    /* if a tls_ctx was provided, initialize tls options */
    struct aws_tls_ctx *tls_ctx = (struct aws_tls_ctx *)jni_tls_ctx;
    struct aws_tls_connection_options *tls_options = NULL;
    if (tls_ctx) {
        tls_options = &connection->tls_options;
        aws_tls_connection_options_init_from_ctx(tls_options, tls_ctx);
        aws_tls_connection_options_set_server_name(tls_options, aws_jni_get_allocator(), &endpoint);
    }

    client_id = aws_jni_byte_cursor_from_jstring_acquire(env, jni_client_id);
    bool clean_session = jni_clean_session != 0;

    struct aws_mqtt_connection_options connect_options;
    AWS_ZERO_STRUCT(connect_options);
    connect_options.host_name = endpoint;
    connect_options.port = port;
    connect_options.socket_options = &connection->socket_options;
    connect_options.tls_options = tls_options;
    connect_options.client_id = client_id;
    connect_options.keep_alive_time_secs = (uint16_t)keep_alive_secs;
    connect_options.ping_timeout_ms = ping_timeout_ms;
    connect_options.protocol_operation_timeout_ms = protocol_operation_timeout_ms;
    connect_options.clean_session = clean_session;
    connect_options.on_connection_complete = s_on_connection_complete;
    connect_options.user_data = connect_callback;

    int result = aws_mqtt_client_connection_connect(connection->client_connection, &connect_options);
    if (result != AWS_OP_SUCCESS) {
        s_mqtt_jni_connection_release(connection);
        s_mqtt_jni_async_callback_destroy(connect_callback, env);
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_connect failed");
    }

cleanup:
    aws_jni_byte_cursor_from_jstring_release(env, jni_endpoint, endpoint);
    aws_jni_byte_cursor_from_jstring_release(env, jni_client_id, client_id);
}

/*******************************************************************************
 * disconnect
 ******************************************************************************/
JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDisconnect(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jobject jni_ack) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Invalid connection");
        return;
    }

    struct mqtt_jni_async_callback *disconnect_callback = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
    if (disconnect_callback == NULL) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Failed to create async callback");
        return;
    }

    if (aws_mqtt_client_connection_disconnect(
            connection->client_connection, s_on_connection_disconnected, disconnect_callback) != AWS_OP_SUCCESS) {
        int error = aws_last_error();
        /*
         * Disconnect invoked on a disconnected connection can happen under normal circumstances.  Invoke the callback
         * manually since it won't get invoked otherwise.
         */
        AWS_LOGF_WARN(
            AWS_LS_MQTT_CLIENT,
            "MqttClientConnection.mqtt_disconnect: error calling disconnect - %d(%s)",
            error,
            aws_error_str(error));
        s_on_connection_disconnected(connection->client_connection, disconnect_callback);
    }
}

/*******************************************************************************
 * subscribe
 ******************************************************************************/
/* called from any sub, unsub, or pub ack */
static void s_deliver_ack_success(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
    AWS_FATAL_ASSERT(callback);
    AWS_FATAL_ASSERT(callback->connection);

    if (callback->async_callback) {
        (*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_success);
        AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
    }
}

static void s_deliver_ack_failure(struct mqtt_jni_async_callback *callback, int error_code, JNIEnv *env) {
    AWS_FATAL_ASSERT(callback);
    AWS_FATAL_ASSERT(callback->connection);
    AWS_FATAL_ASSERT(env);

    if (callback->async_callback) {
        jobject jni_reason = s_new_mqtt_exception(env, error_code);
        (*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_failure, jni_reason);
        (*env)->DeleteLocalRef(env, jni_reason);
        AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
    }
}

static void s_on_op_complete(
    struct aws_mqtt_client_connection *connection,
    uint16_t packet_id,
    int error_code,
    void *user_data) {
    AWS_FATAL_ASSERT(connection);
    (void)packet_id;

    struct mqtt_jni_async_callback *callback = user_data;
    if (!callback) {
        return;
    }

    /********** JNI ENV ACQUIRE **********/
    JavaVM *jvm = callback->connection->jvm;
    JNIEnv *env = aws_jni_acquire_thread_env(jvm);
    if (env == NULL) {
        return;
    }

    if (error_code) {
        s_deliver_ack_failure(callback, error_code, env);
    } else {
        s_deliver_ack_success(callback, env);
    }

    s_mqtt_jni_async_callback_destroy(callback, env);

    aws_jni_release_thread_env(jvm, env);
    /********** JNI ENV RELEASE **********/
}

static bool s_is_qos_successful(enum aws_mqtt_qos qos) {
    return qos < 128;
}

static void s_on_ack(
    struct aws_mqtt_client_connection *connection,
    uint16_t packet_id,
    const struct aws_byte_cursor *topic,
    enum aws_mqtt_qos qos,
    int error_code,
    void *user_data) {
    (void)topic;

    // Handle a case when the server processed SUBSCRIBE request successfully, but rejected a subscription for some
    // reason, i.e. error_code is 0 and qos is 0x80.
    // This mostly applies to mqtt5to3adapter, as MQTT3 client will be disconnected on unsuccessful subscribe.
    if (error_code == 0 && !s_is_qos_successful(qos)) {
        error_code = AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE;
    }

    s_on_op_complete(connection, packet_id, error_code, user_data);
}

static void s_cleanup_handler(void *user_data) {
    struct mqtt_jni_async_callback *handler = user_data;

    /********** JNI ENV ACQUIRE **********/
    JavaVM *jvm = handler->connection->jvm;
    JNIEnv *env = aws_jni_acquire_thread_env(jvm);
    if (env == NULL) {
        return;
    }

    s_mqtt_jni_async_callback_destroy(handler, env);

    aws_jni_release_thread_env(jvm, env);
    /********** JNI ENV RELEASE **********/
}

static void s_on_subscription_delivered(
    struct aws_mqtt_client_connection *connection,
    const struct aws_byte_cursor *topic,
    const struct aws_byte_cursor *payload,
    bool dup,
    enum aws_mqtt_qos qos,
    bool retain,
    void *user_data) {

    AWS_FATAL_ASSERT(connection);
    AWS_FATAL_ASSERT(topic);
    AWS_FATAL_ASSERT(payload);
    AWS_FATAL_ASSERT(user_data);

    struct mqtt_jni_async_callback *callback = user_data;
    if (!callback->async_callback) {
        return;
    }

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(callback->connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        return;
    }

    jbyteArray jni_payload = (*env)->NewByteArray(env, (jsize)payload->len);
    (*env)->SetByteArrayRegion(env, jni_payload, 0, (jsize)payload->len, (const signed char *)payload->ptr);

    jstring jni_topic = aws_jni_string_from_cursor(env, topic);

    (*env)->CallVoidMethod(
        env, callback->async_callback, message_handler_properties.deliver, jni_topic, jni_payload, dup, qos, retain);

    (*env)->DeleteLocalRef(env, jni_payload);
    (*env)->DeleteLocalRef(env, jni_topic);

    AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));

    aws_jni_release_thread_env(callback->connection->jvm, env);
    /********** JNI ENV RELEASE **********/
}

JNIEXPORT
jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSubscribe(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jstring jni_topic,
    jint jni_qos,
    jobject jni_handler,
    jobject jni_ack) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Invalid connection");
        return 0;
    }

    struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
    if (!handler) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate handler");
        return 0;
    }

    /* from here, any failure requires error_cleanup */
    struct mqtt_jni_async_callback *sub_ack = NULL;
    if (jni_ack) {
        sub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
        if (!sub_ack) {
            aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate sub ack");
            goto error_cleanup;
        }
    }

    struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
    enum aws_mqtt_qos qos = jni_qos;

    uint16_t msg_id = aws_mqtt_client_connection_subscribe(
        connection->client_connection,
        &topic,
        qos,
        s_on_subscription_delivered,
        handler,
        s_cleanup_handler,
        s_on_ack,
        sub_ack);
    aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
    if (msg_id == 0) {
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqtt_subscribe: aws_mqtt_client_connection_subscribe failed");
        goto error_cleanup;
    }

    return msg_id;

error_cleanup:
    if (handler) {
        s_mqtt_jni_async_callback_destroy(handler, env);
    }

    if (sub_ack) {
        s_mqtt_jni_async_callback_destroy(sub_ack, env);
    }

    return 0;
}

JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionOnMessage(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jobject jni_handler) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid connection");
        return;
    }

    if (!jni_handler) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid handler");
        return;
    }

    struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
    if (!handler) {
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqttClientConnectionOnMessage: Unable to allocate handler");
        return;
    }

    if (aws_mqtt_client_connection_set_on_any_publish_handler(
            connection->client_connection, s_on_subscription_delivered, handler)) {
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqttClientConnectionOnMessage: Failed to install on_any_publish_handler");
        goto error_cleanup;
    }

    if (connection->on_message) {
        s_mqtt_jni_async_callback_destroy(connection->on_message, env);
    }

    connection->on_message = handler;

    return;

error_cleanup:
    if (handler) {
        s_mqtt_jni_async_callback_destroy(handler, env);
    }
}

/*******************************************************************************
 * unsubscribe
 ******************************************************************************/
JNIEXPORT
jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUnsubscribe(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jstring jni_topic,
    jobject jni_ack) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Invalid connection");
        return 0;
    }

    struct mqtt_jni_async_callback *unsub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
    if (!unsub_ack) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Unable to allocate unsub ack");
        goto error_cleanup;
    }

    struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
    uint16_t msg_id =
        aws_mqtt_client_connection_unsubscribe(connection->client_connection, &topic, s_on_op_complete, unsub_ack);
    aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
    if (msg_id == 0) {
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqtt_unsubscribe: aws_mqtt_client_connection_unsubscribe failed");
        goto error_cleanup;
    }

    return msg_id;

error_cleanup:
    if (unsub_ack) {
        s_mqtt_jni_async_callback_destroy(unsub_ack, env);
    }
    return 0;
}

/*******************************************************************************
 * publish
 ******************************************************************************/
JNIEXPORT
jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionPublish(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jstring jni_topic,
    jint jni_qos,
    jboolean jni_retain,
    jbyteArray jni_payload,
    jobject jni_ack) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid connection");
        return 0;
    }

    if (!jni_topic) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid/null topic");
        return 0;
    }

    struct mqtt_jni_async_callback *pub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
    if (!pub_ack) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Unable to allocate pub ack");
        goto error_cleanup;
    }

    struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);

    struct aws_byte_cursor payload;
    AWS_ZERO_STRUCT(payload);
    if (jni_payload != NULL) {
        payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
    }

    enum aws_mqtt_qos qos = jni_qos;
    bool retain = jni_retain != 0;

    uint16_t msg_id = aws_mqtt_client_connection_publish(
        connection->client_connection, &topic, qos, retain, &payload, s_on_op_complete, pub_ack);
    aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);

    if (jni_payload != NULL) {
        aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
    }

    if (msg_id == 0) {
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqtt_publish: aws_mqtt_client_connection_publish failed");
        goto error_cleanup;
    }

    return msg_id;

error_cleanup:
    if (pub_ack) {
        s_mqtt_jni_async_callback_destroy(pub_ack, env);
    }

    return 0;
}

JNIEXPORT jboolean JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetWill(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jstring jni_topic,
    jint jni_qos,
    jboolean jni_retain,
    jbyteArray jni_payload) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Invalid connection");
        return false;
    }

    if (jni_topic == NULL) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Topic must be non-null");
        return false;
    }
    struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);

    struct aws_byte_cursor payload;
    AWS_ZERO_STRUCT(payload);
    if (jni_payload != NULL) {
        payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
    }

    enum aws_mqtt_qos qos = jni_qos;
    bool retain = jni_retain != 0;

    int result = aws_mqtt_client_connection_set_will(connection->client_connection, &topic, qos, retain, &payload);
    aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);

    if (jni_payload != NULL) {
        aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
    }

    return (result == AWS_OP_SUCCESS);
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetLogin(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jstring jni_user,
    jstring jni_pass) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Invalid connection");
        return;
    }

    struct aws_byte_cursor username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_user);
    struct aws_byte_cursor password;
    struct aws_byte_cursor *password_ptr = NULL;
    AWS_ZERO_STRUCT(password);
    if (jni_pass != NULL) {
        password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_pass);
        password_ptr = &password;
    }

    if (aws_mqtt_client_connection_set_login(connection->client_connection, &username, password_ptr)) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Failed to set login");
    }

    aws_jni_byte_cursor_from_jstring_release(env, jni_user, username);

    if (password.len > 0) {
        aws_jni_byte_cursor_from_jstring_release(env, jni_pass, password);
    }
}

JNIEXPORT void JNICALL
    Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetReconnectTimeout(
        JNIEnv *env,
        jclass jni_class,
        jlong jni_connection,
        jlong jni_min_timeout,
        jlong jni_max_timeout) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_reconnect_timeout: Invalid connection");
        return;
    }

    if (aws_mqtt_client_connection_set_reconnect_timeout(
            connection->client_connection, jni_min_timeout, jni_max_timeout)) {
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.mqtt_reconnect_timeout: Failed to set reconnect timeout");
    }
}

///////
static void s_ws_handshake_destroy(struct mqtt_jni_ws_handshake *ws_handshake) {
    if (!ws_handshake) {
        return;
    }

    s_mqtt_jni_connection_release(ws_handshake->connection);
    aws_mem_release(aws_jni_get_allocator(), ws_handshake);
}

static void s_ws_handshake_transform(
    struct aws_http_message *request,
    void *user_data,
    aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
    void *complete_ctx) {

    struct mqtt_jni_connection *connection = user_data;

    /********** JNI ENV ACQUIRE **********/
    JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
    if (env == NULL) {
        /* If we can't get an environment, then the JVM is probably shutting down.  Don't crash. */
        complete_fn(request, AWS_ERROR_INVALID_STATE, complete_ctx);
        return;
    }

    struct aws_allocator *alloc = aws_jni_get_allocator();

    struct mqtt_jni_ws_handshake *ws_handshake = aws_mem_calloc(alloc, 1, sizeof(struct mqtt_jni_ws_handshake));
    if (!ws_handshake) {
        goto error;
    }

    ws_handshake->connection = connection;
    s_mqtt_jni_connection_acquire(ws_handshake->connection);

    ws_handshake->complete_ctx = complete_ctx;
    ws_handshake->complete_fn = complete_fn;
    ws_handshake->http_request = request;

    jobject java_http_request = aws_java_http_request_from_native(env, request, NULL);
    if (!java_http_request) {
        aws_raise_error(AWS_ERROR_UNKNOWN); /* TODO: given java exception, choose appropriate aws error code */
        goto error;
    }

    jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
    if (mqtt_connection != NULL) {
        (*env)->CallVoidMethod(
            env, mqtt_connection, mqtt_connection_properties.on_websocket_handshake, java_http_request, ws_handshake);

        (*env)->DeleteLocalRef(env, mqtt_connection);

        AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
    }

    (*env)->DeleteLocalRef(env, java_http_request);
    aws_jni_release_thread_env(connection->jvm, env);
    /********** JNI ENV RELEASE SUCCESS PATH **********/

    return;

error:;

    int error_code = aws_last_error();
    s_ws_handshake_destroy(ws_handshake);
    complete_fn(request, error_code, complete_ctx);
    aws_jni_release_thread_env(connection->jvm, env);
    /********** JNI ENV RELEASE FAILURE PATH **********/
}

JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUseWebsockets(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Invalid connection");
        return;
    }

    if (aws_mqtt_client_connection_use_websockets(
            connection->client_connection, s_ws_handshake_transform, connection, NULL, NULL)) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Failed to use websockets");
        return;
    }
}

JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionWebsocketHandshakeComplete(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jbyteArray jni_marshalled_request,
    jobject jni_throwable,
    jlong jni_user_data) {
    (void)jni_class;
    (void)jni_connection;
    aws_cache_jni_ids(env);

    struct mqtt_jni_ws_handshake *ws_handshake = (void *)jni_user_data;
    int error_code = AWS_ERROR_SUCCESS;

    if (jni_throwable != NULL) {
        if ((*env)->IsInstanceOf(env, jni_throwable, crt_runtime_exception_properties.crt_runtime_exception_class)) {
            error_code = (*env)->GetIntField(env, jni_throwable, crt_runtime_exception_properties.error_code_field_id);
        }

        if (error_code == AWS_ERROR_SUCCESS) {
            error_code = AWS_ERROR_UNKNOWN; /* is there anything more that could be done here? */
        }

        goto done;
    }

    if (aws_apply_java_http_request_changes_to_native_request(
            env, jni_marshalled_request, NULL, ws_handshake->http_request)) {
        error_code = aws_last_error();
        goto done;
    }

done:
    ws_handshake->complete_fn(ws_handshake->http_request, error_code, ws_handshake->complete_ctx);
    s_ws_handshake_destroy(ws_handshake);
}

JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetHttpProxyOptions(
    JNIEnv *env,
    jclass jni_class,
    jlong jni_connection,
    jint jni_proxy_connection_type,
    jstring jni_proxy_host,
    jint jni_proxy_port,
    jlong jni_proxy_tls_context,
    jint jni_proxy_authorization_type,
    jstring jni_proxy_authorization_username,
    jstring jni_proxy_authorization_password) {

    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;

    struct aws_http_proxy_options proxy_options;
    AWS_ZERO_STRUCT(proxy_options);

    if (!jni_proxy_host) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: proxyHost must not be null.");
        return;
    }

    proxy_options.connection_type = (enum aws_http_proxy_connection_type)jni_proxy_connection_type;

    proxy_options.host = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_host);
    proxy_options.port = (uint32_t)jni_proxy_port;

    proxy_options.auth_type = (enum aws_http_proxy_authentication_type)jni_proxy_authorization_type;

    if (jni_proxy_authorization_username) {
        proxy_options.auth_username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_username);
    }

    if (jni_proxy_authorization_password) {
        proxy_options.auth_password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_password);
    }

    struct aws_tls_connection_options proxy_tls_conn_options;
    AWS_ZERO_STRUCT(proxy_tls_conn_options);

    if (jni_proxy_tls_context != 0) {
        struct aws_tls_ctx *proxy_tls_ctx = (struct aws_tls_ctx *)jni_proxy_tls_context;
        aws_tls_connection_options_init_from_ctx(&proxy_tls_conn_options, proxy_tls_ctx);
        aws_tls_connection_options_set_server_name(
            &proxy_tls_conn_options, aws_jni_get_allocator(), &proxy_options.host);
        proxy_options.tls_options = &proxy_tls_conn_options;
    }

    if (aws_mqtt_client_connection_set_http_proxy_options(connection->client_connection, &proxy_options)) {
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: Failed to set proxy options");
    }

    if (jni_proxy_authorization_password) {
        aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_password, proxy_options.auth_password);
    }

    if (jni_proxy_authorization_username) {
        aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_username, proxy_options.auth_username);
    }

    aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_host, proxy_options.host);
    aws_tls_connection_options_clean_up(&proxy_tls_conn_options);
}

JNIEXPORT jobject JNICALL
    Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionGetOperationStatistics(
        JNIEnv *env,
        jclass jni_class,
        jlong jni_connection) {
    (void)jni_class;
    aws_cache_jni_ids(env);

    struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
    if (!connection) {
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        aws_jni_throw_runtime_exception(env, "MqttClientConnection.getOperationStatistics: Invalid connection");
        return NULL;
    }

    /* Construct Java object */
    jobject jni_operation_statistics = (*env)->NewObject(
        env,
        mqtt_connection_operation_statistics_properties.statistics_class,
        mqtt_connection_operation_statistics_properties.statistics_constructor_id);
    if (jni_operation_statistics == NULL) {
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.getOperationStatistics: Could not create operation statistics object");
        return NULL;
    }

    struct aws_mqtt_connection_operation_statistics connection_stats;
    aws_mqtt_client_connection_get_stats(connection->client_connection, &connection_stats);

    (*env)->SetLongField(
        env,
        jni_operation_statistics,
        mqtt_connection_operation_statistics_properties.incomplete_operation_count_field_id,
        (jlong)connection_stats.incomplete_operation_count);
    if (aws_jni_check_and_clear_exception(env)) {
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation count");
        return NULL;
    }

    (*env)->SetLongField(
        env,
        jni_operation_statistics,
        mqtt_connection_operation_statistics_properties.incomplete_operation_size_field_id,
        (jlong)connection_stats.incomplete_operation_size);
    if (aws_jni_check_and_clear_exception(env)) {
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation size");
        return NULL;
    }

    (*env)->SetLongField(
        env,
        jni_operation_statistics,
        mqtt_connection_operation_statistics_properties.unacked_operation_count_field_id,
        (jlong)connection_stats.unacked_operation_count);
    if (aws_jni_check_and_clear_exception(env)) {
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.getOperationStatistics: could not create unacked operation count");
        return NULL;
    }

    (*env)->SetLongField(
        env,
        jni_operation_statistics,
        mqtt_connection_operation_statistics_properties.unacked_operation_size_field_id,
        (jlong)connection_stats.unacked_operation_size);
    if (aws_jni_check_and_clear_exception(env)) {
        aws_raise_error(AWS_ERROR_INVALID_STATE);
        aws_jni_throw_runtime_exception(
            env, "MqttClientConnection.getOperationStatistics: could not create unacked operation size");
        return NULL;
    }

    return jni_operation_statistics;
}

#if UINTPTR_MAX == 0xffffffff
#    if defined(_MSC_VER)
#        pragma warning(pop)
#    else
#        pragma GCC diagnostic pop
#    endif
#endif
