Skip to content

Commit c1a4ba7

Browse files
committed
feat: Added ThreadsafeFunction class and runtime methods
1 parent 3f63ccf commit c1a4ba7

File tree

6 files changed

+460
-0
lines changed

6 files changed

+460
-0
lines changed

packages/host/android/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ add_library(node-api-host SHARED
2424
../cpp/RuntimeNodeApi.hpp
2525
../cpp/RuntimeNodeApiAsync.cpp
2626
../cpp/RuntimeNodeApiAsync.hpp
27+
../cpp/ThreadsafeFunction.cpp
28+
../cpp/ThreadsafeFunction.hpp
2729
)
2830

2931
target_include_directories(node-api-host PRIVATE

packages/host/cpp/RuntimeNodeApiAsync.cpp

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "RuntimeNodeApiAsync.hpp"
22
#include <ReactCommon/CallInvoker.h>
33
#include "Logger.hpp"
4+
#include "ThreadsafeFunction.hpp"
45

56
struct AsyncJob {
67
using IdType = uint64_t;
@@ -187,4 +188,86 @@ napi_status napi_cancel_async_work(
187188
job->state = AsyncJob::State::Cancelled;
188189
return napi_ok;
189190
}
191+
192+
napi_status napi_create_threadsafe_function(napi_env env,
193+
napi_value func,
194+
napi_value async_resource,
195+
napi_value async_resource_name,
196+
size_t max_queue_size,
197+
size_t initial_thread_count,
198+
void* thread_finalize_data,
199+
napi_finalize thread_finalize_cb,
200+
void* context,
201+
napi_threadsafe_function_call_js call_js_cb,
202+
napi_threadsafe_function* result) {
203+
const auto function = ThreadSafeFunction::create(getCallInvoker(env),
204+
env,
205+
func,
206+
async_resource,
207+
async_resource_name,
208+
max_queue_size,
209+
initial_thread_count,
210+
thread_finalize_data,
211+
thread_finalize_cb,
212+
context,
213+
call_js_cb);
214+
*result = reinterpret_cast<napi_threadsafe_function>(function.get());
215+
return napi_ok;
216+
}
217+
218+
napi_status napi_get_threadsafe_function_context(
219+
napi_threadsafe_function func, void** result) {
220+
const auto function = ThreadSafeFunction::get(func);
221+
if (!function) {
222+
return napi_invalid_arg;
223+
}
224+
return function->getContext(result);
225+
}
226+
227+
napi_status napi_call_threadsafe_function(napi_threadsafe_function func,
228+
void* data,
229+
napi_threadsafe_function_call_mode is_blocking) {
230+
const auto function = ThreadSafeFunction::get(func);
231+
if (!function) {
232+
return napi_invalid_arg;
233+
}
234+
return function->call(data, is_blocking);
235+
}
236+
237+
napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func) {
238+
const auto function = ThreadSafeFunction::get(func);
239+
if (!function) {
240+
return napi_invalid_arg;
241+
}
242+
return function->acquire();
243+
}
244+
245+
napi_status napi_release_threadsafe_function(
246+
napi_threadsafe_function func, napi_threadsafe_function_release_mode mode) {
247+
const auto function = ThreadSafeFunction::get(func);
248+
if (!function) {
249+
return napi_invalid_arg;
250+
}
251+
return function->release(mode);
252+
}
253+
254+
napi_status napi_unref_threadsafe_function(
255+
node_api_basic_env env, napi_threadsafe_function func) {
256+
const auto function = ThreadSafeFunction::get(func);
257+
if (!function) {
258+
return napi_invalid_arg;
259+
}
260+
// RN has no libuv loop to unref; we only update internal state for parity.
261+
return function->unref();
262+
}
263+
264+
napi_status napi_ref_threadsafe_function(
265+
node_api_basic_env env, napi_threadsafe_function func) {
266+
const auto function = ThreadSafeFunction::get(func);
267+
if (!function) {
268+
return napi_invalid_arg;
269+
}
270+
// RN has no libuv loop to ref; we only update internal state for parity.
271+
return function->ref();
272+
}
190273
} // namespace callstack::nodeapihost

packages/host/cpp/RuntimeNodeApiAsync.hpp

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,34 @@ napi_status napi_delete_async_work(
2323

2424
napi_status napi_cancel_async_work(
2525
node_api_basic_env env, napi_async_work work);
26+
27+
napi_status napi_create_threadsafe_function(napi_env env,
28+
napi_value func,
29+
napi_value async_resource,
30+
napi_value async_resource_name,
31+
size_t max_queue_size,
32+
size_t initial_thread_count,
33+
void* thread_finalize_data,
34+
napi_finalize thread_finalize_cb,
35+
void* context,
36+
napi_threadsafe_function_call_js call_js_cb,
37+
napi_threadsafe_function* result);
38+
39+
napi_status napi_get_threadsafe_function_context(
40+
napi_threadsafe_function func, void** result);
41+
42+
napi_status napi_call_threadsafe_function(napi_threadsafe_function func,
43+
void* data,
44+
napi_threadsafe_function_call_mode is_blocking);
45+
46+
napi_status napi_acquire_threadsafe_function(napi_threadsafe_function func);
47+
48+
napi_status napi_release_threadsafe_function(
49+
napi_threadsafe_function func, napi_threadsafe_function_release_mode mode);
50+
51+
napi_status napi_unref_threadsafe_function(
52+
node_api_basic_env env, napi_threadsafe_function func);
53+
54+
napi_status napi_ref_threadsafe_function(
55+
node_api_basic_env env, napi_threadsafe_function func);
2656
} // namespace callstack::nodeapihost
Lines changed: 260 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,260 @@
1+
#include "ThreadsafeFunction.hpp"
2+
#include "Logger.hpp"
3+
4+
// This file provides a React Native-friendly implementation of Node-API's
5+
// thread-safe function primitive. In RN we don't own/libuv, so we:
6+
// - Use CallInvoker to hop onto the JS thread instead of uv_async.
7+
// - Track a registry mapping native handles to shared_ptrs for lookup/lifetime.
8+
// - Emulate ref/unref semantics without affecting any event loop.
9+
10+
static std::unordered_map<napi_threadsafe_function,
11+
std::shared_ptr<callstack::nodeapihost::ThreadSafeFunction>>
12+
registry;
13+
static std::mutex registryMutex;
14+
15+
namespace callstack::nodeapihost {
16+
17+
ThreadSafeFunction::ThreadSafeFunction(
18+
std::weak_ptr<facebook::react::CallInvoker> callInvoker,
19+
napi_env env,
20+
napi_value jsFunc,
21+
napi_value asyncResource,
22+
napi_value asyncResourceName,
23+
size_t maxQueueSize,
24+
size_t initialThreadCount,
25+
void* threadFinalizeData,
26+
napi_finalize threadFinalizeCb,
27+
void* context,
28+
napi_threadsafe_function_call_js callJsCb)
29+
: callInvoker_{std::move(callInvoker)},
30+
env_{env},
31+
jsFunc_{jsFunc},
32+
asyncResource_{asyncResource},
33+
asyncResourceName_{asyncResourceName},
34+
maxQueueSize_{maxQueueSize},
35+
threadCount_{initialThreadCount},
36+
threadFinalizeData_{threadFinalizeData},
37+
threadFinalizeCb_{threadFinalizeCb},
38+
context_{context},
39+
callJsCb_{callJsCb},
40+
refCount_{initialThreadCount} {
41+
if (jsFunc) {
42+
// Keep JS function alive across async hops; fatal here mirrors Node-API's
43+
// behavior when environment is irrecoverable.
44+
const auto status = napi_create_reference(env, jsFunc, 1, &jsFuncRef_);
45+
if (status != napi_ok) {
46+
napi_fatal_error(nullptr,
47+
0,
48+
"Failed to create JS function reference",
49+
NAPI_AUTO_LENGTH);
50+
}
51+
}
52+
}
53+
54+
ThreadSafeFunction::~ThreadSafeFunction() {
55+
if (jsFuncRef_) {
56+
napi_delete_reference(env_, jsFuncRef_);
57+
}
58+
}
59+
60+
std::shared_ptr<ThreadSafeFunction> ThreadSafeFunction::create(
61+
std::weak_ptr<facebook::react::CallInvoker> callInvoker,
62+
napi_env env,
63+
napi_value jsFunc,
64+
napi_value asyncResource,
65+
napi_value asyncResourceName,
66+
size_t maxQueueSize,
67+
size_t initialThreadCount,
68+
void* threadFinalizeData,
69+
napi_finalize threadFinalizeCb,
70+
void* context,
71+
napi_threadsafe_function_call_js callJsCb) {
72+
const auto function =
73+
std::make_shared<ThreadSafeFunction>(std::move(callInvoker),
74+
env,
75+
jsFunc,
76+
asyncResource,
77+
asyncResourceName,
78+
maxQueueSize,
79+
initialThreadCount,
80+
threadFinalizeData,
81+
threadFinalizeCb,
82+
context,
83+
callJsCb);
84+
85+
{
86+
auto handle = reinterpret_cast<napi_threadsafe_function>(function.get());
87+
std::lock_guard lock{registryMutex};
88+
registry[handle] = function;
89+
}
90+
91+
return std::move(function);
92+
}
93+
94+
std::shared_ptr<ThreadSafeFunction> ThreadSafeFunction::get(
95+
napi_threadsafe_function func) {
96+
std::lock_guard lock{registryMutex};
97+
return registry.contains(func) ? registry[func] : nullptr;
98+
}
99+
100+
napi_status ThreadSafeFunction::getContext(void** result) {
101+
if (!result) {
102+
return napi_invalid_arg;
103+
}
104+
105+
*result = context_;
106+
return napi_ok;
107+
}
108+
109+
napi_status ThreadSafeFunction::call(
110+
void* data, napi_threadsafe_function_call_mode isBlocking) {
111+
if (aborted_ || closing_) {
112+
return napi_closing;
113+
}
114+
115+
{
116+
std::unique_lock lock{queueMutex_};
117+
// Backpressure: enforce maxQueueSize_. If nonblocking, fail fast; if
118+
// blocking, wait until space is available or closing/aborted.
119+
if (maxQueueSize_ && queue_.size() >= maxQueueSize_) {
120+
if (isBlocking == napi_tsfn_nonblocking) {
121+
return napi_queue_full;
122+
}
123+
queueCv_.wait(lock, [&] {
124+
return queue_.size() < maxQueueSize_ || aborted_ || closing_;
125+
});
126+
if (aborted_ || closing_) return napi_closing;
127+
}
128+
queue_.push(data);
129+
}
130+
131+
const auto invoker = callInvoker_.lock();
132+
if (!invoker) {
133+
log_debug("Error: No CallInvoker available for ThreadSafeFunction");
134+
return napi_generic_failure;
135+
}
136+
// Hop to JS thread; we drain one item per hop to keep latency predictable
137+
// and avoid long monopolization of the JS queue.
138+
invoker->invokeAsync([this] {
139+
void* queuedData{nullptr};
140+
auto empty{false};
141+
{
142+
std::lock_guard lock{queueMutex_};
143+
if (!queue_.empty()) {
144+
queuedData = queue_.front();
145+
const auto size = queue_.size();
146+
queue_.pop();
147+
empty = queue_.empty();
148+
if (size == maxQueueSize_ && maxQueueSize_) {
149+
queueCv_.notify_one();
150+
}
151+
}
152+
}
153+
if (queuedData && !aborted_) {
154+
// Prefer the user-provided callJsCb_ (Node-API compatible). If absent
155+
// but we have a JS function ref, call it directly with no args.
156+
if (callJsCb_) {
157+
napi_value fn{nullptr};
158+
if (jsFuncRef_) {
159+
napi_get_reference_value(env_, jsFuncRef_, &fn);
160+
}
161+
callJsCb_(env_, fn, context_, queuedData);
162+
} else if (jsFuncRef_) {
163+
napi_value fn;
164+
napi_get_reference_value(env_, jsFuncRef_, &fn);
165+
napi_value recv;
166+
napi_get_undefined(env_, &recv);
167+
napi_value result;
168+
napi_call_function(env_, recv, fn, 0, nullptr, &result);
169+
}
170+
}
171+
172+
// Auto-finalize when: no remaining threads (acquire/release balance),
173+
// queue drained, and not already closing.
174+
if (!threadCount_ && empty && !closing_) {
175+
if (maxQueueSize_) {
176+
std::lock_guard lock{queueMutex_};
177+
queueCv_.notify_all();
178+
}
179+
finalize();
180+
}
181+
});
182+
return napi_ok;
183+
}
184+
185+
napi_status ThreadSafeFunction::acquire() {
186+
if (closing_) {
187+
return napi_closing;
188+
}
189+
refCount_++;
190+
threadCount_++;
191+
return napi_ok;
192+
}
193+
194+
napi_status ThreadSafeFunction::release(
195+
napi_threadsafe_function_release_mode mode) {
196+
// Node-API semantics: abort prevents further JS calls and wakes any waiters.
197+
if (mode == napi_tsfn_abort) {
198+
aborted_ = true;
199+
closing_ = true;
200+
}
201+
if (refCount_) {
202+
refCount_--;
203+
}
204+
if (threadCount_) {
205+
threadCount_--;
206+
}
207+
// When the last ref is gone (or we're closing), queue is drained, notify and
208+
// finalize.
209+
std::lock_guard lock{queueMutex_};
210+
if (!refCount_ && !threadCount_ && queue_.empty() || closing_) {
211+
closing_ = true;
212+
if (maxQueueSize_) {
213+
queueCv_.notify_all();
214+
}
215+
finalize();
216+
}
217+
return napi_ok;
218+
}
219+
220+
napi_status ThreadSafeFunction::ref() {
221+
// In libuv, this would keep the loop alive. In RN we don't own or expose a
222+
// libuv loop. We just track the state for API parity.
223+
referenced_.store(true, std::memory_order_relaxed);
224+
return napi_ok;
225+
}
226+
227+
napi_status ThreadSafeFunction::unref() {
228+
// In libuv, this allows the loop to exit if nothing else is keeping it
229+
// alive. In RN this is a no-op beyond state tracking.
230+
referenced_.store(false, std::memory_order_relaxed);
231+
return napi_ok;
232+
}
233+
234+
void ThreadSafeFunction::finalize() {
235+
std::lock_guard lock{finalizeMutex_};
236+
if (handlesClosing_) {
237+
return;
238+
}
239+
handlesClosing_ = true;
240+
closing_ = true;
241+
242+
const auto onFinalize = [this] {
243+
// Invoke user finalizer and unregister the handle from the global map.
244+
if (threadFinalizeCb_) {
245+
threadFinalizeCb_(env_, threadFinalizeData_, context_);
246+
}
247+
std::lock_guard lock{registryMutex};
248+
registry.erase(reinterpret_cast<napi_threadsafe_function>(this));
249+
};
250+
251+
// Prefer running the finalizer on the JS thread to match expectations;
252+
// if CallInvoker is gone, run synchronously.
253+
if (const auto invoker = callInvoker_.lock()) {
254+
invoker->invokeAsync([=]() { onFinalize(); });
255+
} else {
256+
onFinalize();
257+
}
258+
}
259+
260+
} // namespace callstack::nodeapihost

0 commit comments

Comments
 (0)