blob: ba13adce0ed457ba1d3973dd6580f79187ba951e [file] [log] [blame]
/*
* Copyright (C) 2017 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "chre_host/socket_client.h"
#include <inttypes.h>
#include <string.h>
#include <unistd.h>
#include <chrono>
#include <cutils/sockets.h>
#include <sys/socket.h>
#include <utils/RefBase.h>
#include <utils/StrongPointer.h>
#include "chre_host/log.h"
namespace android {
namespace chre {
SocketClient::SocketClient() {
std::atomic_init(&mSockFd, INVALID_SOCKET);
}
SocketClient::~SocketClient() {
disconnect();
}
bool SocketClient::connect(const char *socketName,
const sp<ICallbacks> &callbacks) {
return doConnect(socketName, callbacks, false /* connectInBackground */);
}
bool SocketClient::connectInBackground(const char *socketName,
const sp<ICallbacks> &callbacks) {
return doConnect(socketName, callbacks, true /* connectInBackground */);
}
void SocketClient::disconnect() {
if (inReceiveThread()) {
LOGE("disconnect() can't be called from a receive thread callback");
} else if (receiveThreadRunning()) {
// Inform the RX thread that we're requesting a shutdown, breaking it out of
// the retry wait if it's currently blocked there
{
std::lock_guard<std::mutex> lock(mShutdownMutex);
mGracefulShutdown = true;
}
mShutdownCond.notify_all();
// Invalidate the socket (will kick the RX thread out of recv if it's
// currently blocked there)
if (mSockFd != INVALID_SOCKET && shutdown(mSockFd, SHUT_RDWR) != 0) {
LOG_ERROR("Couldn't shut down socket", errno);
}
if (mRxThread.joinable()) {
LOGD("Waiting for RX thread to exit");
mRxThread.join();
}
}
}
bool SocketClient::isConnected() const {
return (mSockFd != INVALID_SOCKET);
}
bool SocketClient::sendMessage(const void *data, size_t length) {
bool success = false;
if (mSockFd == INVALID_SOCKET) {
LOGW("Tried sending a message, but don't have a valid socket handle");
} else {
ssize_t bytesSent = send(mSockFd, data, length, 0);
if (bytesSent < 0) {
LOGE("Failed to send %zu bytes of data: %s", length, strerror(errno));
} else if (bytesSent == 0) {
LOGW("Failed to send data; remote side disconnected");
} else if (static_cast<size_t>(bytesSent) != length) {
LOGW("Truncated packet, tried sending %zu bytes, only %zd went through",
length, bytesSent);
} else {
success = true;
}
}
return success;
}
bool SocketClient::doConnect(const char *socketName,
const sp<ICallbacks> &callbacks,
bool connectInBackground) {
bool success = false;
if (inReceiveThread()) {
LOGE("Can't attempt to connect from a receive thread callback");
} else {
if (receiveThreadRunning()) {
LOGW("Re-connecting socket with implicit disconnect");
disconnect();
}
size_t socketNameLen =
strlcpy(mSocketName, socketName, sizeof(mSocketName));
if (socketNameLen >= sizeof(mSocketName)) {
LOGE("Socket name length parameter is too long (%zu, max %zu)",
socketNameLen, sizeof(mSocketName));
} else if (callbacks == nullptr) {
LOGE("Callbacks parameter must be provided");
} else if (connectInBackground || tryConnect()) {
mGracefulShutdown = false;
mCallbacks = callbacks;
mRxThread = std::thread([this]() { receiveThread(); });
success = true;
}
}
return success;
}
bool SocketClient::inReceiveThread() const {
return (std::this_thread::get_id() == mRxThread.get_id());
}
void SocketClient::receiveThread() {
constexpr size_t kReceiveBufferSize = 4096;
uint8_t buffer[kReceiveBufferSize];
LOGV("Receive thread started");
while (!mGracefulShutdown && (mSockFd != INVALID_SOCKET || reconnect())) {
while (!mGracefulShutdown) {
ssize_t bytesReceived = recv(mSockFd, buffer, sizeof(buffer), 0);
if (bytesReceived < 0) {
LOG_ERROR("Exiting RX thread", errno);
break;
} else if (bytesReceived == 0) {
if (!mGracefulShutdown) {
LOGI("Socket disconnected on remote end");
mCallbacks->onDisconnected();
}
break;
}
mCallbacks->onMessageReceived(buffer, bytesReceived);
}
if (close(mSockFd) != 0) {
LOG_ERROR("Couldn't close socket", errno);
}
mSockFd = INVALID_SOCKET;
}
if (!mGracefulShutdown) {
mCallbacks->onConnectionAborted();
}
mCallbacks.clear();
LOGV("Exiting receive thread");
}
bool SocketClient::receiveThreadRunning() const {
return mRxThread.joinable();
}
bool SocketClient::reconnect() {
constexpr auto kMinDelay = std::chrono::duration<int32_t, std::milli>(250);
constexpr auto kMaxDelay = std::chrono::minutes(5);
// Try reconnecting at initial delay this many times before backing off
constexpr unsigned int kExponentialBackoffDelay =
std::chrono::seconds(10) / kMinDelay;
// Give up after this many tries (~2.5 hours)
constexpr unsigned int kRetryLimit = kExponentialBackoffDelay + 40;
auto delay = kMinDelay;
unsigned int retryCount = 0;
while (retryCount++ < kRetryLimit) {
{
std::unique_lock<std::mutex> lock(mShutdownMutex);
mShutdownCond.wait_for(lock, delay,
[this]() { return mGracefulShutdown.load(); });
if (mGracefulShutdown) {
break;
}
}
bool suppressErrorLogs = (delay == kMinDelay);
if (!tryConnect(suppressErrorLogs)) {
if (!suppressErrorLogs) {
LOGW("Failed to (re)connect, next try in %" PRId32 " ms",
delay.count());
}
if (retryCount > kExponentialBackoffDelay) {
delay *= 2;
}
if (delay > kMaxDelay) {
delay = kMaxDelay;
}
} else {
LOGD("Successfully (re)connected");
mCallbacks->onConnected();
return true;
}
}
return false;
}
bool SocketClient::tryConnect(bool suppressErrorLogs) {
bool success = false;
errno = 0;
int sockFd = socket(AF_LOCAL, SOCK_SEQPACKET, 0);
if (sockFd >= 0) {
// Set the send buffer size to 2MB to allow plenty of room for nanoapp
// loading
int sndbuf = 2 * 1024 * 1024;
// Normally, send() should effectively return immediately, but in the event
// that we get blocked due to flow control, don't stay blocked for more than
// 3 seconds
struct timeval timeout = {
.tv_sec = 3,
.tv_usec = 0,
};
int ret;
if ((ret = setsockopt(sockFd, SOL_SOCKET, SO_SNDBUF, &sndbuf,
sizeof(sndbuf))) != 0) {
if (!suppressErrorLogs) {
LOGE("Failed to set SO_SNDBUF to %d: %s", sndbuf, strerror(errno));
}
} else if ((ret = setsockopt(sockFd, SOL_SOCKET, SO_SNDTIMEO, &timeout,
sizeof(timeout))) != 0) {
if (!suppressErrorLogs) {
LOGE("Failed to set SO_SNDTIMEO: %s", strerror(errno));
}
} else {
mSockFd = socket_local_client_connect(sockFd, mSocketName,
ANDROID_SOCKET_NAMESPACE_RESERVED,
SOCK_SEQPACKET);
if (mSockFd != INVALID_SOCKET) {
success = true;
} else if (!suppressErrorLogs) {
LOGE("Couldn't connect client socket to '%s': %s", mSocketName,
strerror(errno));
}
}
if (!success) {
close(sockFd);
}
} else if (!suppressErrorLogs) {
LOGE("Couldn't create local socket: %s", strerror(errno));
}
return success;
}
} // namespace chre
} // namespace android