blob: fb112316e30f0000e0444ade206239b284fe23ce [file] [log] [blame]
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <grpc/support/port_platform.h>
#include "src/core/lib/gprpp/memory.h"
#include "src/core/lib/iomgr/port.h"
#ifdef GRPC_CFSTREAM
#import <CoreFoundation/CoreFoundation.h>
#import "src/core/lib/iomgr/cfstream_handle.h"
#include <grpc/grpc.h>
#include <grpc/support/atm.h>
#include <grpc/support/sync.h>
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error_cfstream.h"
#include "src/core/lib/iomgr/ev_apple.h"
#include "src/core/lib/iomgr/exec_ctx.h"
extern grpc_core::TraceFlag grpc_tcp_trace;
GrpcLibraryInitHolder::GrpcLibraryInitHolder() { grpc_init(); }
GrpcLibraryInitHolder::~GrpcLibraryInitHolder() { grpc_shutdown(); }
void* CFStreamHandle::Retain(void* info) {
CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
CFSTREAM_HANDLE_REF(handle, "retain");
return info;
}
void CFStreamHandle::Release(void* info) {
CFStreamHandle* handle = static_cast<CFStreamHandle*>(info);
CFSTREAM_HANDLE_UNREF(handle, "release");
}
CFStreamHandle* CFStreamHandle::CreateStreamHandle(
CFReadStreamRef read_stream, CFWriteStreamRef write_stream) {
return new CFStreamHandle(read_stream, write_stream);
}
void CFStreamHandle::ReadCallback(CFReadStreamRef stream,
CFStreamEventType type,
void* client_callback_info) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_error_handle error;
CFErrorRef stream_error;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(client_callback_info);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream ReadCallback (%p, %p, %lu, %p)", handle,
stream, type, client_callback_info);
}
switch (type) {
case kCFStreamEventOpenCompleted:
handle->open_event_.SetReady();
break;
case kCFStreamEventHasBytesAvailable:
case kCFStreamEventEndEncountered:
handle->read_event_.SetReady();
break;
case kCFStreamEventErrorOccurred:
stream_error = CFReadStreamCopyError(stream);
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "read error"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
CFRelease(stream_error);
handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
break;
default:
GPR_UNREACHABLE_CODE(return );
}
}
void CFStreamHandle::WriteCallback(CFWriteStreamRef stream,
CFStreamEventType type,
void* clientCallBackInfo) {
grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
grpc_core::ExecCtx exec_ctx;
grpc_error_handle error;
CFErrorRef stream_error;
CFStreamHandle* handle = static_cast<CFStreamHandle*>(clientCallBackInfo);
if (grpc_tcp_trace.enabled()) {
gpr_log(GPR_DEBUG, "CFStream WriteCallback (%p, %p, %lu, %p)", handle,
stream, type, clientCallBackInfo);
}
switch (type) {
case kCFStreamEventOpenCompleted:
handle->open_event_.SetReady();
break;
case kCFStreamEventCanAcceptBytes:
case kCFStreamEventEndEncountered:
handle->write_event_.SetReady();
break;
case kCFStreamEventErrorOccurred:
stream_error = CFWriteStreamCopyError(stream);
error = grpc_error_set_int(
GRPC_ERROR_CREATE_FROM_CFERROR(stream_error, "write error"),
GRPC_ERROR_INT_GRPC_STATUS, GRPC_STATUS_UNAVAILABLE);
CFRelease(stream_error);
handle->open_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->write_event_.SetShutdown(GRPC_ERROR_REF(error));
handle->read_event_.SetShutdown(GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
break;
default:
GPR_UNREACHABLE_CODE(return );
}
}
CFStreamHandle::CFStreamHandle(CFReadStreamRef read_stream,
CFWriteStreamRef write_stream) {
gpr_ref_init(&refcount_, 1);
open_event_.InitEvent();
read_event_.InitEvent();
write_event_.InitEvent();
dispatch_queue_ = dispatch_queue_create(nullptr, DISPATCH_QUEUE_SERIAL);
CFStreamClientContext ctx = {0, static_cast<void*>(this),
CFStreamHandle::Retain, CFStreamHandle::Release,
nil};
CFReadStreamSetClient(
read_stream,
kCFStreamEventOpenCompleted | kCFStreamEventHasBytesAvailable |
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
CFStreamHandle::ReadCallback, &ctx);
CFWriteStreamSetClient(
write_stream,
kCFStreamEventOpenCompleted | kCFStreamEventCanAcceptBytes |
kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered,
CFStreamHandle::WriteCallback, &ctx);
grpc_apple_register_read_stream(read_stream, dispatch_queue_);
grpc_apple_register_write_stream(write_stream, dispatch_queue_);
}
CFStreamHandle::~CFStreamHandle() {
open_event_.DestroyEvent();
read_event_.DestroyEvent();
write_event_.DestroyEvent();
dispatch_release(dispatch_queue_);
}
void CFStreamHandle::NotifyOnOpen(grpc_closure* closure) {
open_event_.NotifyOn(closure);
}
void CFStreamHandle::NotifyOnRead(grpc_closure* closure) {
read_event_.NotifyOn(closure);
}
void CFStreamHandle::NotifyOnWrite(grpc_closure* closure) {
write_event_.NotifyOn(closure);
}
void CFStreamHandle::Shutdown(grpc_error_handle error) {
open_event_.SetShutdown(GRPC_ERROR_REF(error));
read_event_.SetShutdown(GRPC_ERROR_REF(error));
write_event_.SetShutdown(GRPC_ERROR_REF(error));
GRPC_ERROR_UNREF(error);
}
void CFStreamHandle::Ref(const char* file, int line, const char* reason) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
gpr_log(file, line, GPR_LOG_SEVERITY_DEBUG,
"CFStream Handle ref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
reason, val, val + 1);
}
gpr_ref(&refcount_);
}
void CFStreamHandle::Unref(const char* file, int line, const char* reason) {
if (grpc_tcp_trace.enabled()) {
gpr_atm val = gpr_atm_no_barrier_load(&refcount_.count);
gpr_log(GPR_DEBUG,
"CFStream Handle unref %p : %s %" PRIdPTR " -> %" PRIdPTR, this,
reason, val, val - 1);
}
if (gpr_unref(&refcount_)) {
delete this;
}
}
#else
/* Creating a phony function so that the grpc_cfstream library will be
* non-empty.
*/
void CFStreamPhony() {}
#endif