blob: b6f7c1c90a8769916821747ec00587ed5689a373 [file] [edit]
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
#include <jni.h>
#include <aws/common/atomics.h>
#include <aws/common/condition_variable.h>
#include <aws/common/logging.h>
#include <aws/common/mutex.h>
#include <aws/common/string.h>
#include <aws/common/thread.h>
#include <aws/http/connection.h>
#include <aws/http/proxy.h>
#include <aws/http/request_response.h>
#include <aws/io/channel.h>
#include <aws/io/channel_bootstrap.h>
#include <aws/io/event_loop.h>
#include <aws/io/host_resolver.h>
#include <aws/io/socket.h>
#include <aws/io/socket_channel_handler.h>
#include <aws/io/tls_channel_handler.h>
#include <aws/mqtt/client.h>
#include <ctype.h>
#include <string.h>
#include "crt.h"
#include "http_request_utils.h"
#include "java_class_ids.h"
#include "mqtt5_client_jni.h"
/*******************************************************************************
* mqtt_jni_async_callback - carries an AsyncCallback around as user data to mqtt
* async ops, and is used to deliver callbacks. Also hangs on to JNI references
* to buffers and strings that need to outlive the request
******************************************************************************/
struct mqtt_jni_async_callback {
struct mqtt_jni_connection *connection;
jobject async_callback;
struct aws_byte_buf buffer; /* payloads or other pinned resources go in here, freed when callback is delivered */
};
/*******************************************************************************
* mqtt_jni_connection - represents an aws_mqtt_client_connection to Java
******************************************************************************/
struct mqtt_jni_connection {
struct aws_mqtt_client *client; /* Provided to mqtt_connect */
struct aws_mqtt_client_connection *client_connection;
struct aws_socket_options socket_options;
struct aws_tls_connection_options tls_options;
JavaVM *jvm;
jweak java_mqtt_connection; /* MqttClientConnection instance */
struct mqtt_jni_async_callback *on_message;
struct aws_atomic_var ref_count;
};
/*******************************************************************************
* mqtt_jni_ws_handshake - Data needed to perform the async websocket handshake
* transform operations. Destroyed when transform is complete.
******************************************************************************/
struct mqtt_jni_ws_handshake {
struct mqtt_jni_connection *connection;
struct aws_http_message *http_request;
aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn;
void *complete_ctx;
};
static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection);
static void s_mqtt_jni_connection_acquire(struct mqtt_jni_connection *connection) {
size_t old_value = aws_atomic_fetch_add(&connection->ref_count, 1);
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection acquire, ref count now = %d", (int)old_value + 1);
}
static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data);
static void s_mqtt_jni_connection_release(struct mqtt_jni_connection *connection) {
size_t old_value = aws_atomic_fetch_sub(&connection->ref_count, 1);
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection release, ref count now = %d", (int)old_value - 1);
}
/* The destroy function is called on Java MqttClientConnection resource release. */
static void s_mqtt_jni_connection_destroy(struct mqtt_jni_connection *connection) {
/* For mqtt311 client, we have to call aws_mqtt_client_connection_disconnect before releasing the underlying c
* connection.*/
if (aws_mqtt_client_connection_disconnect(
connection->client_connection, s_on_shutdown_disconnect_complete, connection) != AWS_OP_SUCCESS) {
/*
* This can happen under normal code paths if the client happens to be disconnected at cleanup/shutdown
* time. Log it (in case it was unexpected) and then shutdown the underlying connection manually.
*/
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "Client disconnect failed. Release the client connection.");
s_on_shutdown_disconnect_complete(connection->client_connection, NULL);
}
}
static struct mqtt_jni_async_callback *s_mqtt_jni_async_callback_new(
struct mqtt_jni_connection *connection,
jobject async_callback,
JNIEnv *env) {
if (env == NULL) {
return NULL;
}
struct aws_allocator *allocator = aws_jni_get_allocator();
/* allocate cannot fail */
struct mqtt_jni_async_callback *callback = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_async_callback));
callback->connection = connection;
callback->async_callback = async_callback ? (*env)->NewGlobalRef(env, async_callback) : NULL;
aws_byte_buf_init(&callback->buffer, aws_jni_get_allocator(), 0);
return callback;
}
static void s_mqtt_jni_async_callback_destroy(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
AWS_FATAL_ASSERT(callback && callback->connection);
if (env == NULL) {
return;
}
if (callback->async_callback) {
(*env)->DeleteGlobalRef(env, callback->async_callback);
}
aws_byte_buf_clean_up(&callback->buffer);
struct aws_allocator *allocator = aws_jni_get_allocator();
aws_mem_release(allocator, callback);
}
static jobject s_new_mqtt_exception(JNIEnv *env, int error_code) {
jobject exception = (*env)->NewObject(
env, mqtt_exception_properties.jni_mqtt_exception, mqtt_exception_properties.jni_constructor, error_code);
return exception;
}
/* on 32-bit platforms, casting pointers to longs throws a warning we don't need */
#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(push)
# pragma warning(disable : 4305) /* 'type cast': truncation from 'jlong' to 'jni_tls_ctx_options *' */
# else
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wpointer-to-int-cast"
# pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
# endif
#endif
/*******************************************************************************
* new
******************************************************************************/
static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data);
static void s_on_connection_complete(
struct aws_mqtt_client_connection *client_connection,
int error_code,
enum aws_mqtt_connect_return_code return_code,
bool session_present,
void *user_data) {
(void)client_connection;
(void)return_code;
struct mqtt_jni_async_callback *connect_callback = user_data;
struct mqtt_jni_connection *connection = connect_callback->connection;
/********** JNI ENV ACQUIRE **********/
JavaVM *jvm = connection->jvm;
JNIEnv *env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
if (mqtt_connection != NULL) {
(*env)->CallVoidMethod(
env, mqtt_connection, mqtt_connection_properties.on_connection_complete, error_code, session_present);
(*env)->DeleteLocalRef(env, mqtt_connection);
if (aws_jni_check_and_clear_exception(env)) {
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE EARLY OUT **********/
aws_mqtt_client_connection_disconnect(client_connection, s_on_connection_disconnected, connect_callback);
return; /* callback and ref count will be cleaned up in s_on_connection_disconnected */
}
}
s_mqtt_jni_async_callback_destroy(connect_callback, env);
aws_jni_release_thread_env(jvm, env);
/********** JNI ENV RELEASE **********/
s_mqtt_jni_connection_release(connection);
}
static void s_on_connection_interrupted_internal(
struct mqtt_jni_connection *connection,
int error_code,
jobject ack_callback,
JNIEnv *env) {
AWS_FATAL_ASSERT(env);
jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
if (mqtt_connection) {
(*env)->CallVoidMethod(
env, mqtt_connection, mqtt_connection_properties.on_connection_interrupted, error_code, ack_callback);
(*env)->DeleteLocalRef(env, mqtt_connection);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
}
static void s_on_connection_interrupted(
struct aws_mqtt_client_connection *client_connection,
int error_code,
void *user_data) {
(void)client_connection;
struct mqtt_jni_connection *connection = user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
s_on_connection_interrupted_internal(user_data, error_code, NULL, env);
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE **********/
}
static void s_on_connection_success(
struct aws_mqtt_client_connection *client_connection,
enum aws_mqtt_connect_return_code return_code,
bool session_present,
void *user_data) {
(void)client_connection;
(void)return_code;
struct mqtt_jni_connection *connection = user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
if (mqtt_connection) {
(*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_success, session_present);
(*env)->DeleteLocalRef(env, mqtt_connection);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE **********/
}
static void s_on_connection_failure(
struct aws_mqtt_client_connection *client_connection,
int error_code,
void *user_data) {
(void)client_connection;
struct mqtt_jni_connection *connection = user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
if (mqtt_connection) {
(*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_failure, error_code);
(*env)->DeleteLocalRef(env, mqtt_connection);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE **********/
}
static void s_on_connection_resumed(
struct aws_mqtt_client_connection *client_connection,
enum aws_mqtt_connect_return_code return_code,
bool session_present,
void *user_data) {
(void)client_connection;
(void)return_code;
struct mqtt_jni_connection *connection = user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
if (mqtt_connection) {
(*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_resumed, session_present);
(*env)->DeleteLocalRef(env, mqtt_connection);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE **********/
}
static void s_on_connection_disconnected(struct aws_mqtt_client_connection *client_connection, void *user_data) {
(void)client_connection;
struct mqtt_jni_async_callback *connect_callback = user_data;
struct mqtt_jni_connection *jni_connection = connect_callback->connection;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
s_on_connection_interrupted_internal(connect_callback->connection, 0, connect_callback->async_callback, env);
s_mqtt_jni_async_callback_destroy(connect_callback, env);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
aws_jni_release_thread_env(jni_connection->jvm, env);
/********** JNI ENV RELEASE **********/
/* Do not call release here: s_on_connection_closed will (normally) be called
* right after and so we can call the release there instead. */
}
static void s_on_connection_closed(
struct aws_mqtt_client_connection *client_connection,
struct on_connection_closed_data *data,
void *user_data) {
(void)client_connection;
(void)data;
struct mqtt_jni_connection *connection = user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
// Make sure the Java object has not been garbage collected
if (!(*env)->IsSameObject(env, connection->java_mqtt_connection, NULL)) {
jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
if (mqtt_connection) {
(*env)->CallVoidMethod(env, mqtt_connection, mqtt_connection_properties.on_connection_closed);
(*env)->DeleteLocalRef(env, mqtt_connection);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
}
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE **********/
}
static void s_on_connection_terminated(void *user_data) {
struct mqtt_jni_connection *jni_connection = (struct mqtt_jni_connection *)user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(jni_connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
jobject mqtt_connection = (*env)->NewLocalRef(env, jni_connection->java_mqtt_connection);
if (mqtt_connection != NULL) {
(*env)->CallVoidMethod(env, mqtt_connection, crt_resource_properties.release_references);
(*env)->DeleteLocalRef(env, mqtt_connection);
aws_jni_check_and_clear_exception(env);
}
JavaVM *jvm = jni_connection->jvm;
s_mqtt_connection_destroy(env, jni_connection);
aws_jni_release_thread_env(jvm, env);
/********** JNI ENV RELEASE **********/
}
static struct mqtt_jni_connection *s_mqtt_connection_new(
JNIEnv *env,
struct aws_mqtt_client *client3,
struct aws_mqtt5_client_java_jni *client5_jni,
jobject java_mqtt_connection) {
struct aws_allocator *allocator = aws_jni_get_allocator();
struct mqtt_jni_connection *connection = aws_mem_calloc(allocator, 1, sizeof(struct mqtt_jni_connection));
if (!connection) {
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqtt_connect: Out of memory allocating JNI connection");
return NULL;
}
aws_atomic_store_int(&connection->ref_count, 1);
connection->java_mqtt_connection = (*env)->NewWeakGlobalRef(env, java_mqtt_connection);
jint jvmresult = (*env)->GetJavaVM(env, &connection->jvm);
AWS_FATAL_ASSERT(jvmresult == 0);
if (client3 != NULL) {
connection->client = client3;
connection->client_connection = aws_mqtt_client_connection_new(client3);
} else if (client5_jni != NULL) {
connection->client_connection = aws_mqtt_client_connection_new_from_mqtt5_client(client5_jni->client);
}
if (!connection->client_connection) {
aws_jni_throw_runtime_exception(
env,
"MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to create new "
"connection");
goto on_error;
}
if (aws_mqtt_client_connection_set_connection_termination_handler(
connection->client_connection, s_on_connection_terminated, connection)) {
aws_jni_throw_runtime_exception(
env,
"MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_new failed, unable to set termination "
"callback");
goto on_error;
}
return connection;
on_error:
s_mqtt_jni_connection_release(connection);
return NULL;
}
static void s_mqtt_connection_destroy(JNIEnv *env, struct mqtt_jni_connection *connection) {
if (connection == NULL) {
return;
}
if (connection->on_message) {
s_mqtt_jni_async_callback_destroy(connection->on_message, env);
}
if (connection->java_mqtt_connection) {
(*env)->DeleteWeakGlobalRef(env, connection->java_mqtt_connection);
}
aws_tls_connection_options_clean_up(&connection->tls_options);
struct aws_allocator *allocator = aws_jni_get_allocator();
aws_mem_release(allocator, connection);
}
JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom311Client(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jobject jni_mqtt_connection) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = NULL;
struct aws_mqtt_client *client3 = (struct aws_mqtt_client *)jni_client;
if (!client3) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt3 Client is invalid/null");
return (jlong)NULL;
}
connection = s_mqtt_connection_new(env, client3, NULL, jni_mqtt_connection);
if (!connection) {
return (jlong)NULL;
}
aws_mqtt_client_connection_set_connection_result_handlers(
connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
aws_mqtt_client_connection_set_connection_interruption_handlers(
connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
aws_mqtt_client_connection_set_connection_closed_handler(
connection->client_connection, s_on_connection_closed, connection);
return (jlong)connection;
}
JNIEXPORT jlong JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionNewFrom5Client(
JNIEnv *env,
jclass jni_class,
jlong jni_client,
jobject jni_mqtt_connection) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = NULL;
struct aws_mqtt5_client_java_jni *client5_jni = (struct aws_mqtt5_client_java_jni *)jni_client;
if (!client5_jni) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_new: Mqtt5 Client is invalid/null");
return (jlong)NULL;
}
connection = s_mqtt_connection_new(env, NULL, client5_jni, jni_mqtt_connection);
if (!connection) {
return (jlong)NULL;
}
aws_mqtt_client_connection_set_connection_result_handlers(
connection->client_connection, s_on_connection_success, connection, s_on_connection_failure, connection);
aws_mqtt_client_connection_set_connection_interruption_handlers(
connection->client_connection, s_on_connection_interrupted, connection, s_on_connection_resumed, connection);
aws_mqtt_client_connection_set_connection_closed_handler(
connection->client_connection, s_on_connection_closed, connection);
return (jlong)connection;
}
/* The disconnect callback called on shutdown. We will release the underlying connection here, which should init the
** client shutdown process. Then on termination callback, we will finally release all jni resources.
*/
static void s_on_shutdown_disconnect_complete(struct aws_mqtt_client_connection *connection, void *user_data) {
(void)user_data;
AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "mqtt_jni_connection shutdown complete, releasing references");
/* Release the underlying mqtt connection */
aws_mqtt_client_connection_release(connection);
}
/*******************************************************************************
* clean_up
******************************************************************************/
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDestroy(
JNIEnv *env,
jclass jni_class,
jlong jni_connection) {
(void)jni_class;
(void)env;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
s_mqtt_jni_connection_destroy(connection);
}
/*******************************************************************************
* connect
******************************************************************************/
JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionConnect(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jstring jni_endpoint,
jint jni_port,
jlong jni_socket_options,
jlong jni_tls_ctx,
jstring jni_client_id,
jboolean jni_clean_session,
jint keep_alive_secs,
jshort ping_timeout_ms,
jint protocol_operation_timeout_ms) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Connection is invalid/null");
return;
}
struct aws_byte_cursor client_id;
AWS_ZERO_STRUCT(client_id);
struct aws_byte_cursor endpoint = aws_jni_byte_cursor_from_jstring_acquire(env, jni_endpoint);
uint32_t port = (uint32_t)jni_port;
if (!port) {
aws_jni_throw_runtime_exception(
env,
"MqttClientConnection.mqtt_new: Endpoint should be in the format hostname:port and port must not be 0");
goto cleanup;
}
struct mqtt_jni_async_callback *connect_callback = s_mqtt_jni_async_callback_new(connection, NULL, env);
if (connect_callback == NULL) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_connect: Failed to create async callback");
goto cleanup;
}
s_mqtt_jni_connection_acquire(connection);
struct aws_socket_options default_socket_options;
AWS_ZERO_STRUCT(default_socket_options);
default_socket_options.type = AWS_SOCKET_STREAM;
default_socket_options.connect_timeout_ms = 3000;
struct aws_socket_options *socket_options = &default_socket_options;
if (jni_socket_options) {
socket_options = (struct aws_socket_options *)jni_socket_options;
}
memcpy(&connection->socket_options, socket_options, sizeof(struct aws_socket_options));
/* if a tls_ctx was provided, initialize tls options */
struct aws_tls_ctx *tls_ctx = (struct aws_tls_ctx *)jni_tls_ctx;
struct aws_tls_connection_options *tls_options = NULL;
if (tls_ctx) {
tls_options = &connection->tls_options;
aws_tls_connection_options_init_from_ctx(tls_options, tls_ctx);
aws_tls_connection_options_set_server_name(tls_options, aws_jni_get_allocator(), &endpoint);
}
client_id = aws_jni_byte_cursor_from_jstring_acquire(env, jni_client_id);
bool clean_session = jni_clean_session != 0;
struct aws_mqtt_connection_options connect_options;
AWS_ZERO_STRUCT(connect_options);
connect_options.host_name = endpoint;
connect_options.port = port;
connect_options.socket_options = &connection->socket_options;
connect_options.tls_options = tls_options;
connect_options.client_id = client_id;
connect_options.keep_alive_time_secs = (uint16_t)keep_alive_secs;
connect_options.ping_timeout_ms = ping_timeout_ms;
connect_options.protocol_operation_timeout_ms = protocol_operation_timeout_ms;
connect_options.clean_session = clean_session;
connect_options.on_connection_complete = s_on_connection_complete;
connect_options.user_data = connect_callback;
int result = aws_mqtt_client_connection_connect(connection->client_connection, &connect_options);
if (result != AWS_OP_SUCCESS) {
s_mqtt_jni_connection_release(connection);
s_mqtt_jni_async_callback_destroy(connect_callback, env);
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqtt_connect: aws_mqtt_client_connection_connect failed");
}
cleanup:
aws_jni_byte_cursor_from_jstring_release(env, jni_endpoint, endpoint);
aws_jni_byte_cursor_from_jstring_release(env, jni_client_id, client_id);
}
/*******************************************************************************
* disconnect
******************************************************************************/
JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionDisconnect(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jobject jni_ack) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Invalid connection");
return;
}
struct mqtt_jni_async_callback *disconnect_callback = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
if (disconnect_callback == NULL) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_disconnect: Failed to create async callback");
return;
}
if (aws_mqtt_client_connection_disconnect(
connection->client_connection, s_on_connection_disconnected, disconnect_callback) != AWS_OP_SUCCESS) {
int error = aws_last_error();
/*
* Disconnect invoked on a disconnected connection can happen under normal circumstances. Invoke the callback
* manually since it won't get invoked otherwise.
*/
AWS_LOGF_WARN(
AWS_LS_MQTT_CLIENT,
"MqttClientConnection.mqtt_disconnect: error calling disconnect - %d(%s)",
error,
aws_error_str(error));
s_on_connection_disconnected(connection->client_connection, disconnect_callback);
}
}
/*******************************************************************************
* subscribe
******************************************************************************/
/* called from any sub, unsub, or pub ack */
static void s_deliver_ack_success(struct mqtt_jni_async_callback *callback, JNIEnv *env) {
AWS_FATAL_ASSERT(callback);
AWS_FATAL_ASSERT(callback->connection);
if (callback->async_callback) {
(*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_success);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
}
static void s_deliver_ack_failure(struct mqtt_jni_async_callback *callback, int error_code, JNIEnv *env) {
AWS_FATAL_ASSERT(callback);
AWS_FATAL_ASSERT(callback->connection);
AWS_FATAL_ASSERT(env);
if (callback->async_callback) {
jobject jni_reason = s_new_mqtt_exception(env, error_code);
(*env)->CallVoidMethod(env, callback->async_callback, async_callback_properties.on_failure, jni_reason);
(*env)->DeleteLocalRef(env, jni_reason);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
}
static void s_on_op_complete(
struct aws_mqtt_client_connection *connection,
uint16_t packet_id,
int error_code,
void *user_data) {
AWS_FATAL_ASSERT(connection);
(void)packet_id;
struct mqtt_jni_async_callback *callback = user_data;
if (!callback) {
return;
}
/********** JNI ENV ACQUIRE **********/
JavaVM *jvm = callback->connection->jvm;
JNIEnv *env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
return;
}
if (error_code) {
s_deliver_ack_failure(callback, error_code, env);
} else {
s_deliver_ack_success(callback, env);
}
s_mqtt_jni_async_callback_destroy(callback, env);
aws_jni_release_thread_env(jvm, env);
/********** JNI ENV RELEASE **********/
}
static bool s_is_qos_successful(enum aws_mqtt_qos qos) {
return qos < 128;
}
static void s_on_ack(
struct aws_mqtt_client_connection *connection,
uint16_t packet_id,
const struct aws_byte_cursor *topic,
enum aws_mqtt_qos qos,
int error_code,
void *user_data) {
(void)topic;
// Handle a case when the server processed SUBSCRIBE request successfully, but rejected a subscription for some
// reason, i.e. error_code is 0 and qos is 0x80.
// This mostly applies to mqtt5to3adapter, as MQTT3 client will be disconnected on unsuccessful subscribe.
if (error_code == 0 && !s_is_qos_successful(qos)) {
error_code = AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE;
}
s_on_op_complete(connection, packet_id, error_code, user_data);
}
static void s_cleanup_handler(void *user_data) {
struct mqtt_jni_async_callback *handler = user_data;
/********** JNI ENV ACQUIRE **********/
JavaVM *jvm = handler->connection->jvm;
JNIEnv *env = aws_jni_acquire_thread_env(jvm);
if (env == NULL) {
return;
}
s_mqtt_jni_async_callback_destroy(handler, env);
aws_jni_release_thread_env(jvm, env);
/********** JNI ENV RELEASE **********/
}
static void s_on_subscription_delivered(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *topic,
const struct aws_byte_cursor *payload,
bool dup,
enum aws_mqtt_qos qos,
bool retain,
void *user_data) {
AWS_FATAL_ASSERT(connection);
AWS_FATAL_ASSERT(topic);
AWS_FATAL_ASSERT(payload);
AWS_FATAL_ASSERT(user_data);
struct mqtt_jni_async_callback *callback = user_data;
if (!callback->async_callback) {
return;
}
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(callback->connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
return;
}
jbyteArray jni_payload = (*env)->NewByteArray(env, (jsize)payload->len);
(*env)->SetByteArrayRegion(env, jni_payload, 0, (jsize)payload->len, (const signed char *)payload->ptr);
jstring jni_topic = aws_jni_string_from_cursor(env, topic);
(*env)->CallVoidMethod(
env, callback->async_callback, message_handler_properties.deliver, jni_topic, jni_payload, dup, qos, retain);
(*env)->DeleteLocalRef(env, jni_payload);
(*env)->DeleteLocalRef(env, jni_topic);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
aws_jni_release_thread_env(callback->connection->jvm, env);
/********** JNI ENV RELEASE **********/
}
JNIEXPORT
jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSubscribe(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jstring jni_topic,
jint jni_qos,
jobject jni_handler,
jobject jni_ack) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Invalid connection");
return 0;
}
struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
if (!handler) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate handler");
return 0;
}
/* from here, any failure requires error_cleanup */
struct mqtt_jni_async_callback *sub_ack = NULL;
if (jni_ack) {
sub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
if (!sub_ack) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_subscribe: Unable to allocate sub ack");
goto error_cleanup;
}
}
struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
enum aws_mqtt_qos qos = jni_qos;
uint16_t msg_id = aws_mqtt_client_connection_subscribe(
connection->client_connection,
&topic,
qos,
s_on_subscription_delivered,
handler,
s_cleanup_handler,
s_on_ack,
sub_ack);
aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
if (msg_id == 0) {
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqtt_subscribe: aws_mqtt_client_connection_subscribe failed");
goto error_cleanup;
}
return msg_id;
error_cleanup:
if (handler) {
s_mqtt_jni_async_callback_destroy(handler, env);
}
if (sub_ack) {
s_mqtt_jni_async_callback_destroy(sub_ack, env);
}
return 0;
}
JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionOnMessage(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jobject jni_handler) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid connection");
return;
}
if (!jni_handler) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqttClientConnectionOnMessage: Invalid handler");
return;
}
struct mqtt_jni_async_callback *handler = s_mqtt_jni_async_callback_new(connection, jni_handler, env);
if (!handler) {
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqttClientConnectionOnMessage: Unable to allocate handler");
return;
}
if (aws_mqtt_client_connection_set_on_any_publish_handler(
connection->client_connection, s_on_subscription_delivered, handler)) {
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqttClientConnectionOnMessage: Failed to install on_any_publish_handler");
goto error_cleanup;
}
if (connection->on_message) {
s_mqtt_jni_async_callback_destroy(connection->on_message, env);
}
connection->on_message = handler;
return;
error_cleanup:
if (handler) {
s_mqtt_jni_async_callback_destroy(handler, env);
}
}
/*******************************************************************************
* unsubscribe
******************************************************************************/
JNIEXPORT
jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUnsubscribe(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jstring jni_topic,
jobject jni_ack) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Invalid connection");
return 0;
}
struct mqtt_jni_async_callback *unsub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
if (!unsub_ack) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_unsubscribe: Unable to allocate unsub ack");
goto error_cleanup;
}
struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
uint16_t msg_id =
aws_mqtt_client_connection_unsubscribe(connection->client_connection, &topic, s_on_op_complete, unsub_ack);
aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
if (msg_id == 0) {
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqtt_unsubscribe: aws_mqtt_client_connection_unsubscribe failed");
goto error_cleanup;
}
return msg_id;
error_cleanup:
if (unsub_ack) {
s_mqtt_jni_async_callback_destroy(unsub_ack, env);
}
return 0;
}
/*******************************************************************************
* publish
******************************************************************************/
JNIEXPORT
jshort JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionPublish(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jstring jni_topic,
jint jni_qos,
jboolean jni_retain,
jbyteArray jni_payload,
jobject jni_ack) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid connection");
return 0;
}
if (!jni_topic) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Invalid/null topic");
return 0;
}
struct mqtt_jni_async_callback *pub_ack = s_mqtt_jni_async_callback_new(connection, jni_ack, env);
if (!pub_ack) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_publish: Unable to allocate pub ack");
goto error_cleanup;
}
struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
struct aws_byte_cursor payload;
AWS_ZERO_STRUCT(payload);
if (jni_payload != NULL) {
payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
}
enum aws_mqtt_qos qos = jni_qos;
bool retain = jni_retain != 0;
uint16_t msg_id = aws_mqtt_client_connection_publish(
connection->client_connection, &topic, qos, retain, &payload, s_on_op_complete, pub_ack);
aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
if (jni_payload != NULL) {
aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
}
if (msg_id == 0) {
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqtt_publish: aws_mqtt_client_connection_publish failed");
goto error_cleanup;
}
return msg_id;
error_cleanup:
if (pub_ack) {
s_mqtt_jni_async_callback_destroy(pub_ack, env);
}
return 0;
}
JNIEXPORT jboolean JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetWill(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jstring jni_topic,
jint jni_qos,
jboolean jni_retain,
jbyteArray jni_payload) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Invalid connection");
return false;
}
if (jni_topic == NULL) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_will: Topic must be non-null");
return false;
}
struct aws_byte_cursor topic = aws_jni_byte_cursor_from_jstring_acquire(env, jni_topic);
struct aws_byte_cursor payload;
AWS_ZERO_STRUCT(payload);
if (jni_payload != NULL) {
payload = aws_jni_byte_cursor_from_jbyteArray_acquire(env, jni_payload);
}
enum aws_mqtt_qos qos = jni_qos;
bool retain = jni_retain != 0;
int result = aws_mqtt_client_connection_set_will(connection->client_connection, &topic, qos, retain, &payload);
aws_jni_byte_cursor_from_jstring_release(env, jni_topic, topic);
if (jni_payload != NULL) {
aws_jni_byte_cursor_from_jbyteArray_release(env, jni_payload, payload);
}
return (result == AWS_OP_SUCCESS);
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetLogin(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jstring jni_user,
jstring jni_pass) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Invalid connection");
return;
}
struct aws_byte_cursor username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_user);
struct aws_byte_cursor password;
struct aws_byte_cursor *password_ptr = NULL;
AWS_ZERO_STRUCT(password);
if (jni_pass != NULL) {
password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_pass);
password_ptr = &password;
}
if (aws_mqtt_client_connection_set_login(connection->client_connection, &username, password_ptr)) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_set_login: Failed to set login");
}
aws_jni_byte_cursor_from_jstring_release(env, jni_user, username);
if (password.len > 0) {
aws_jni_byte_cursor_from_jstring_release(env, jni_pass, password);
}
}
JNIEXPORT void JNICALL
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetReconnectTimeout(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jlong jni_min_timeout,
jlong jni_max_timeout) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.mqtt_reconnect_timeout: Invalid connection");
return;
}
if (aws_mqtt_client_connection_set_reconnect_timeout(
connection->client_connection, jni_min_timeout, jni_max_timeout)) {
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.mqtt_reconnect_timeout: Failed to set reconnect timeout");
}
}
///////
static void s_ws_handshake_destroy(struct mqtt_jni_ws_handshake *ws_handshake) {
if (!ws_handshake) {
return;
}
s_mqtt_jni_connection_release(ws_handshake->connection);
aws_mem_release(aws_jni_get_allocator(), ws_handshake);
}
static void s_ws_handshake_transform(
struct aws_http_message *request,
void *user_data,
aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
void *complete_ctx) {
struct mqtt_jni_connection *connection = user_data;
/********** JNI ENV ACQUIRE **********/
JNIEnv *env = aws_jni_acquire_thread_env(connection->jvm);
if (env == NULL) {
/* If we can't get an environment, then the JVM is probably shutting down. Don't crash. */
complete_fn(request, AWS_ERROR_INVALID_STATE, complete_ctx);
return;
}
struct aws_allocator *alloc = aws_jni_get_allocator();
struct mqtt_jni_ws_handshake *ws_handshake = aws_mem_calloc(alloc, 1, sizeof(struct mqtt_jni_ws_handshake));
if (!ws_handshake) {
goto error;
}
ws_handshake->connection = connection;
s_mqtt_jni_connection_acquire(ws_handshake->connection);
ws_handshake->complete_ctx = complete_ctx;
ws_handshake->complete_fn = complete_fn;
ws_handshake->http_request = request;
jobject java_http_request = aws_java_http_request_from_native(env, request, NULL);
if (!java_http_request) {
aws_raise_error(AWS_ERROR_UNKNOWN); /* TODO: given java exception, choose appropriate aws error code */
goto error;
}
jobject mqtt_connection = (*env)->NewLocalRef(env, connection->java_mqtt_connection);
if (mqtt_connection != NULL) {
(*env)->CallVoidMethod(
env, mqtt_connection, mqtt_connection_properties.on_websocket_handshake, java_http_request, ws_handshake);
(*env)->DeleteLocalRef(env, mqtt_connection);
AWS_FATAL_ASSERT(!aws_jni_check_and_clear_exception(env));
}
(*env)->DeleteLocalRef(env, java_http_request);
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE SUCCESS PATH **********/
return;
error:;
int error_code = aws_last_error();
s_ws_handshake_destroy(ws_handshake);
complete_fn(request, error_code, complete_ctx);
aws_jni_release_thread_env(connection->jvm, env);
/********** JNI ENV RELEASE FAILURE PATH **********/
}
JNIEXPORT void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionUseWebsockets(
JNIEnv *env,
jclass jni_class,
jlong jni_connection) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Invalid connection");
return;
}
if (aws_mqtt_client_connection_use_websockets(
connection->client_connection, s_ws_handshake_transform, connection, NULL, NULL)) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.useWebsockets: Failed to use websockets");
return;
}
}
JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionWebsocketHandshakeComplete(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jbyteArray jni_marshalled_request,
jobject jni_throwable,
jlong jni_user_data) {
(void)jni_class;
(void)jni_connection;
aws_cache_jni_ids(env);
struct mqtt_jni_ws_handshake *ws_handshake = (void *)jni_user_data;
int error_code = AWS_ERROR_SUCCESS;
if (jni_throwable != NULL) {
if ((*env)->IsInstanceOf(env, jni_throwable, crt_runtime_exception_properties.crt_runtime_exception_class)) {
error_code = (*env)->GetIntField(env, jni_throwable, crt_runtime_exception_properties.error_code_field_id);
}
if (error_code == AWS_ERROR_SUCCESS) {
error_code = AWS_ERROR_UNKNOWN; /* is there anything more that could be done here? */
}
goto done;
}
if (aws_apply_java_http_request_changes_to_native_request(
env, jni_marshalled_request, NULL, ws_handshake->http_request)) {
error_code = aws_last_error();
goto done;
}
done:
ws_handshake->complete_fn(ws_handshake->http_request, error_code, ws_handshake->complete_ctx);
s_ws_handshake_destroy(ws_handshake);
}
JNIEXPORT
void JNICALL Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionSetHttpProxyOptions(
JNIEnv *env,
jclass jni_class,
jlong jni_connection,
jint jni_proxy_connection_type,
jstring jni_proxy_host,
jint jni_proxy_port,
jlong jni_proxy_tls_context,
jint jni_proxy_authorization_type,
jstring jni_proxy_authorization_username,
jstring jni_proxy_authorization_password) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
struct aws_http_proxy_options proxy_options;
AWS_ZERO_STRUCT(proxy_options);
if (!jni_proxy_host) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: proxyHost must not be null.");
return;
}
proxy_options.connection_type = (enum aws_http_proxy_connection_type)jni_proxy_connection_type;
proxy_options.host = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_host);
proxy_options.port = (uint32_t)jni_proxy_port;
proxy_options.auth_type = (enum aws_http_proxy_authentication_type)jni_proxy_authorization_type;
if (jni_proxy_authorization_username) {
proxy_options.auth_username = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_username);
}
if (jni_proxy_authorization_password) {
proxy_options.auth_password = aws_jni_byte_cursor_from_jstring_acquire(env, jni_proxy_authorization_password);
}
struct aws_tls_connection_options proxy_tls_conn_options;
AWS_ZERO_STRUCT(proxy_tls_conn_options);
if (jni_proxy_tls_context != 0) {
struct aws_tls_ctx *proxy_tls_ctx = (struct aws_tls_ctx *)jni_proxy_tls_context;
aws_tls_connection_options_init_from_ctx(&proxy_tls_conn_options, proxy_tls_ctx);
aws_tls_connection_options_set_server_name(
&proxy_tls_conn_options, aws_jni_get_allocator(), &proxy_options.host);
proxy_options.tls_options = &proxy_tls_conn_options;
}
if (aws_mqtt_client_connection_set_http_proxy_options(connection->client_connection, &proxy_options)) {
aws_jni_throw_runtime_exception(env, "MqttClientConnection.setHttpProxyOptions: Failed to set proxy options");
}
if (jni_proxy_authorization_password) {
aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_password, proxy_options.auth_password);
}
if (jni_proxy_authorization_username) {
aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_authorization_username, proxy_options.auth_username);
}
aws_jni_byte_cursor_from_jstring_release(env, jni_proxy_host, proxy_options.host);
aws_tls_connection_options_clean_up(&proxy_tls_conn_options);
}
JNIEXPORT jobject JNICALL
Java_software_amazon_awssdk_crt_mqtt_MqttClientConnection_mqttClientConnectionGetOperationStatistics(
JNIEnv *env,
jclass jni_class,
jlong jni_connection) {
(void)jni_class;
aws_cache_jni_ids(env);
struct mqtt_jni_connection *connection = (struct mqtt_jni_connection *)jni_connection;
if (!connection) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(env, "MqttClientConnection.getOperationStatistics: Invalid connection");
return NULL;
}
/* Construct Java object */
jobject jni_operation_statistics = (*env)->NewObject(
env,
mqtt_connection_operation_statistics_properties.statistics_class,
mqtt_connection_operation_statistics_properties.statistics_constructor_id);
if (jni_operation_statistics == NULL) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.getOperationStatistics: Could not create operation statistics object");
return NULL;
}
struct aws_mqtt_connection_operation_statistics connection_stats;
aws_mqtt_client_connection_get_stats(connection->client_connection, &connection_stats);
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt_connection_operation_statistics_properties.incomplete_operation_count_field_id,
(jlong)connection_stats.incomplete_operation_count);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation count");
return NULL;
}
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt_connection_operation_statistics_properties.incomplete_operation_size_field_id,
(jlong)connection_stats.incomplete_operation_size);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.getOperationStatistics: could not create incomplete operation size");
return NULL;
}
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt_connection_operation_statistics_properties.unacked_operation_count_field_id,
(jlong)connection_stats.unacked_operation_count);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.getOperationStatistics: could not create unacked operation count");
return NULL;
}
(*env)->SetLongField(
env,
jni_operation_statistics,
mqtt_connection_operation_statistics_properties.unacked_operation_size_field_id,
(jlong)connection_stats.unacked_operation_size);
if (aws_jni_check_and_clear_exception(env)) {
aws_raise_error(AWS_ERROR_INVALID_STATE);
aws_jni_throw_runtime_exception(
env, "MqttClientConnection.getOperationStatistics: could not create unacked operation size");
return NULL;
}
return jni_operation_statistics;
}
#if UINTPTR_MAX == 0xffffffff
# if defined(_MSC_VER)
# pragma warning(pop)
# else
# pragma GCC diagnostic pop
# endif
#endif