diff --git a/include/channel.h b/include/channel.h index 2f9655a2827..1fd80fe6e4d 100644 --- a/include/channel.h +++ b/include/channel.h @@ -28,7 +28,7 @@ struct notify_msg_t struct timeout_msg_t { Channel *chan; - coroutine_t *co; + Coroutine *co; bool error; swTimer_node *timer; }; @@ -36,8 +36,8 @@ struct timeout_msg_t class Channel { private: - std::list producer_queue; - std::list consumer_queue; + std::list producer_queue; + std::list consumer_queue; std::queue data_queue; size_t capacity; @@ -68,7 +68,7 @@ class Channel return producer_queue.size(); } - inline void remove(coroutine_t *co) + inline void remove(Coroutine *co) { consumer_queue.remove(co); } @@ -87,9 +87,9 @@ class Channel return data; } - inline coroutine_t* pop_coroutine(enum channel_op type) + inline Coroutine* pop_coroutine(enum channel_op type) { - coroutine_t* co; + Coroutine* co; if (type == PRODUCER) { co = producer_queue.front(); diff --git a/include/context.h b/include/context.h index 56d78558ff2..be3df7dfed5 100644 --- a/include/context.h +++ b/include/context.h @@ -17,7 +17,6 @@ #endif #include "swoole.h" -#include "coroutine.h" #include "error.h" #if __linux__ @@ -28,6 +27,8 @@ #include #endif +typedef void (*coroutine_func_t)(void*); + namespace swoole { //namespace start diff --git a/include/coroutine.h b/include/coroutine.h index 7954e91df0d..62e6745edb2 100644 --- a/include/coroutine.h +++ b/include/coroutine.h @@ -14,15 +14,10 @@ +----------------------------------------------------------------------+ */ -#ifndef SW_COROUTINE_H_ -#define SW_COROUTINE_H_ +#pragma once #include "swoole.h" - -#ifdef __cplusplus -extern "C" -{ -#endif +#include "context.h" #define DEFAULT_MAX_CORO_NUM 3000 #define DEFAULT_STACK_SIZE 8192 @@ -32,41 +27,67 @@ extern "C" #define CORO_LIMIT -1 #define CORO_INVALID -2 -typedef struct coroutine_s coroutine_t; -typedef void (*coroutine_func_t)(void*); + +typedef enum +{ + SW_CORO_INIT = 0, SW_CORO_YIELD, SW_CORO_RUNNING, SW_CORO_END, +} sw_coro_state; typedef void (*coro_php_create_t)(); typedef void (*coro_php_yield_t)(void*); typedef void (*coro_php_resume_t)(void*); typedef void (*coro_php_close_t)(); -typedef enum +namespace swoole { - SW_CORO_INIT = 0, SW_CORO_YIELD, SW_CORO_RUNNING, SW_CORO_END, -} sw_coro_state; +class Coroutine +{ +public: + swoole::Context ctx; + long cid; + sw_coro_state state; + void *task; + + Coroutine(long _cid, size_t stack_size, coroutine_func_t fn, void *private_data) : + ctx(stack_size, fn, private_data) + { + cid = _cid; + task = nullptr; + state = SW_CORO_INIT; + } + void resume(); + void yield(); -void coro_yield(); -void coro_handle_timeout(); + void resume_naked(); + void yield_naked(); -/* basic api */ -long coroutine_create(coroutine_func_t func, void* args); -void coroutine_resume(coroutine_t *co); -void coroutine_yield(coroutine_t *co); -void coroutine_resume_naked(coroutine_t *co); -void coroutine_yield_naked(coroutine_t *co); -void coroutine_release(coroutine_t *co); + void release(); + + inline void set_task(void *_task) + { + task = _task; + } + + inline long get_cid() + { + return cid; + } + + static long create(coroutine_func_t fn, void* args); + static int sleep(double sec); + static swString* read_file(const char *file, int lock); + static ssize_t write_file(const char *file, char *buf, size_t length, int lock, int flags); +}; +} /* co task */ -void coroutine_set_task(coroutine_t *co, void *ptr); void* coroutine_get_current_task(); void* coroutine_get_task_by_cid(long cid); -void coroutine_add_defer_task(coroutine_t *co, swCallback cb, void *data); -void coroutine_do_defer_task(coroutine_t *co); /* get coroutine */ -coroutine_t* coroutine_get_current(); -coroutine_t *coroutine_get_by_id(long cid); +swoole::Coroutine* coroutine_get_current(); +swoole::Coroutine* coroutine_get_by_id(long cid); /* get cid */ -long coroutine_get_cid(coroutine_t *co); +long coroutine_get_cid(swoole::Coroutine *co); long coroutine_get_current_cid(); void coroutine_set_stack_size(int stack_size); /* callback */ @@ -78,10 +99,3 @@ void coroutine_set_onClose(coro_php_close_t func); void internal_coro_yield(void *arg); void internal_coro_resume(void *arg); -swString* swoole_coroutine_read_file(const char *file, int lock); -ssize_t swoole_coroutine_write_file(const char *file, char *buf, size_t length, int lock, int flags); - -#ifdef __cplusplus -} /* end extern "C" */ -#endif -#endif diff --git a/src/coroutine/base.cc b/src/coroutine/base.cc index c8e55ece2c5..7b825bad3e6 100644 --- a/src/coroutine/base.cc +++ b/src/coroutine/base.cc @@ -15,7 +15,7 @@ */ #include "coroutine.h" -#include "context.h" +#include "async.h" #include #include @@ -23,37 +23,20 @@ using namespace swoole; -struct coroutine_s -{ -public: - Context ctx; - long cid; - sw_coro_state state; - void *task; - - coroutine_s(long _cid, size_t stack_size, coroutine_func_t fn, void *private_data) : - ctx(stack_size, fn, private_data) - { - cid = _cid; - task = nullptr; - state = SW_CORO_INIT; - } -}; - static struct { int stack_size; int call_stack_size; long last_cid; - struct coroutine_s* call_stack[SW_MAX_CORO_NESTING_LEVEL]; + Coroutine* call_stack[SW_MAX_CORO_NESTING_LEVEL]; coro_php_yield_t onYield; /* before php yield coro */ coro_php_resume_t onResume; /* before php resume coro */ coro_php_close_t onClose; /* before php close coro */ } swCoroG = { SW_DEFAULT_C_STACK_SIZE, 0, 1, { nullptr, }, nullptr, nullptr, nullptr }; -static std::unordered_map coroutines; +static std::unordered_map coroutines; -long coroutine_create(coroutine_func_t fn, void* args) +long Coroutine::create(coroutine_func_t fn, void* args) { if (unlikely(swCoroG.call_stack_size == SW_MAX_CORO_NESTING_LEVEL)) { @@ -61,7 +44,7 @@ long coroutine_create(coroutine_func_t fn, void* args) return CORO_LIMIT; } long cid = swCoroG.last_cid++; - coroutine_t *co = new coroutine_s(cid, swCoroG.stack_size, fn, args); + Coroutine *co = new Coroutine(cid, swCoroG.stack_size, fn, args); coroutines[cid] = co; swCoroG.call_stack[swCoroG.call_stack_size++] = co; co->state = SW_CORO_RUNNING; @@ -69,75 +52,70 @@ long coroutine_create(coroutine_func_t fn, void* args) if (co->ctx.end) { co->state = SW_CORO_END; - coroutine_release(co); + co->release(); } return cid; } -void coroutine_yield(coroutine_t *co) +void Coroutine::yield() { if (swCoroG.onYield) { - swCoroG.onYield(co->task); + swCoroG.onYield(task); } swCoroG.call_stack_size--; - co->state = SW_CORO_YIELD; - co->ctx.SwapOut(); + state = SW_CORO_YIELD; + ctx.SwapOut(); } -void coroutine_resume(coroutine_t *co) +void Coroutine::resume() { if (swCoroG.onResume) { - swCoroG.onResume(co->task); + swCoroG.onResume(task); } - swCoroG.call_stack[swCoroG.call_stack_size++] = co; - co->state = SW_CORO_RUNNING; - co->ctx.SwapIn(); - if (co->ctx.end) + swCoroG.call_stack[swCoroG.call_stack_size++] = this; + state = SW_CORO_RUNNING; + ctx.SwapIn(); + if (ctx.end) { - coroutine_release(co); + release(); } } -void coroutine_yield_naked(coroutine_t *co) +void Coroutine::yield_naked() { swCoroG.call_stack_size--; - co->state = SW_CORO_YIELD; - co->ctx.SwapOut(); + state = SW_CORO_YIELD; + ctx.SwapOut(); } -void coroutine_resume_naked(coroutine_t *co) +void Coroutine::resume_naked() { - swCoroG.call_stack[swCoroG.call_stack_size++] = co; - co->state = SW_CORO_RUNNING; - co->ctx.SwapIn(); - if (co->ctx.end) + swCoroG.call_stack[swCoroG.call_stack_size++] = this; + state = SW_CORO_RUNNING; + ctx.SwapIn(); + if (ctx.end) { - coroutine_release(co); + release(); } } -void coroutine_release(coroutine_t *co) +void Coroutine::release() { - co->state = SW_CORO_END; + state = SW_CORO_END; if (swCoroG.onClose) { swCoroG.onClose(); } swCoroG.call_stack_size--; - coroutines.erase(co->cid); - delete co; -} - -void coroutine_set_task(coroutine_t *co, void *task) -{ - co->task = task; + coroutines.erase(cid); + delete this; } void* coroutine_get_task_by_cid(long cid) { - coroutine_t *co = coroutine_get_by_id(cid); + Coroutine *co = coroutine_get_by_id(cid); if (co == nullptr) { return nullptr; @@ -148,9 +126,9 @@ void* coroutine_get_task_by_cid(long cid) } } -coroutine_t* coroutine_get_by_id(long cid) +Coroutine* coroutine_get_by_id(long cid) { - std::unordered_map::iterator i = coroutines.find(cid); + std::unordered_map::iterator i = coroutines.find(cid); if (i == coroutines.end()) { return nullptr; @@ -161,14 +139,14 @@ coroutine_t* coroutine_get_by_id(long cid) } } -coroutine_t* coroutine_get_current() +Coroutine* coroutine_get_current() { return likely(swCoroG.call_stack_size > 0) ? swCoroG.call_stack[swCoroG.call_stack_size - 1] : nullptr; } void* coroutine_get_current_task() { - coroutine_t* co = coroutine_get_current(); + Coroutine* co = coroutine_get_current(); if (co == nullptr) { return nullptr; @@ -181,12 +159,7 @@ void* coroutine_get_current_task() long coroutine_get_current_cid() { - coroutine_t* co = coroutine_get_current(); - return likely(co) ? co->cid : -1; -} - -long coroutine_get_cid(coroutine_t *co) -{ + Coroutine* co = coroutine_get_current(); return likely(co) ? co->cid : -1; } diff --git a/src/coroutine/channel.cc b/src/coroutine/channel.cc index bab65e032a1..1411523d2a8 100644 --- a/src/coroutine/channel.cc +++ b/src/coroutine/channel.cc @@ -26,7 +26,7 @@ static void channel_operation_timeout(swTimer *timer, swTimer_node *tnode) msg->error = true; msg->timer = nullptr; msg->chan->remove(msg->co); - coroutine_resume(msg->co); + msg->co->resume(); } Channel::Channel(size_t _capacity) @@ -37,7 +37,7 @@ Channel::Channel(size_t _capacity) void Channel::yield(enum channel_op type) { - coroutine_t *co = coroutine_get_current(); + Coroutine *co = coroutine_get_current(); if (unlikely(!co)) { swError("Channel::yield() must be called in the coroutine."); @@ -52,7 +52,7 @@ void Channel::yield(enum channel_op type) consumer_queue.push_back(co); swTraceLog(SW_TRACE_CHANNEL, "consumer cid=%ld", coroutine_get_cid(co)); } - coroutine_yield(co); + co->yield(); } void* Channel::pop(double timeout) @@ -95,8 +95,8 @@ void* Channel::pop(double timeout) */ if (producer_queue.size() > 0) { - coroutine_t *co = pop_coroutine(PRODUCER); - coroutine_resume(co); + Coroutine *co = pop_coroutine(PRODUCER); + co->resume(); } return data; } @@ -141,8 +141,8 @@ bool Channel::push(void *data, double timeout) */ if (consumer_queue.size() > 0) { - coroutine_t *co = pop_coroutine(CONSUMER); - coroutine_resume(co); + Coroutine *co = pop_coroutine(CONSUMER); + co->resume(); } return true; } @@ -157,13 +157,13 @@ bool Channel::close() closed = true; while (producer_queue.size() > 0) { - coroutine_t *co = pop_coroutine(PRODUCER); - coroutine_resume(co); + Coroutine *co = pop_coroutine(PRODUCER); + co->resume(); } while (consumer_queue.size() > 0) { - coroutine_t *co = pop_coroutine(CONSUMER); - coroutine_resume(co); + Coroutine *co = pop_coroutine(CONSUMER); + co->resume(); } return true; } diff --git a/src/coroutine/hook.cc b/src/coroutine/hook.cc index 02163dca508..4b67c3fc5e8 100644 --- a/src/coroutine/hook.cc +++ b/src/coroutine/hook.cc @@ -36,7 +36,7 @@ extern "C" { struct aio_task { - coroutine_t *co; + Coroutine *co; swAio_event *event; }; @@ -299,7 +299,7 @@ static void aio_onCompleted(swAio_event *event) swAio_event *ev = (swAio_event *) event->req; ev->ret = event->ret; errno = event->error; - coroutine_resume((coroutine_t *) event->object); + ((Coroutine *) event->object)->resume(); } static void aio_onReadFileCompleted(swAio_event *event) @@ -308,7 +308,7 @@ static void aio_onReadFileCompleted(swAio_event *event) task->event->buf = event->buf; task->event->nbytes = event->ret; task->event->error = event->error; - coroutine_resume((coroutine_t *) task->co); + ((Coroutine *) task->co)->resume(); } static void aio_onWriteFileCompleted(swAio_event *event) @@ -316,12 +316,7 @@ static void aio_onWriteFileCompleted(swAio_event *event) aio_task *task = (aio_task *) event->object; task->event->ret = event->ret; task->event->error = event->error; - coroutine_resume((coroutine_t *) task->co); -} - -static void sleep_timeout(swTimer *timer, swTimer_node *tnode) -{ - coroutine_resume((coroutine_t *) tnode->data); + ((Coroutine *) task->co)->resume(); } int swoole_coroutine_open(const char *pathname, int flags, mode_t mode) @@ -346,7 +341,7 @@ int swoole_coroutine_open(const char *pathname, int flags, mode_t mode) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -379,7 +374,7 @@ ssize_t swoole_coroutine_read(int fd, void *buf, size_t count) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -412,7 +407,7 @@ ssize_t swoole_coroutine_write(int fd, const void *buf, size_t count) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -438,7 +433,7 @@ off_t swoole_coroutine_lseek(int fd, off_t offset, int whence) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -463,7 +458,7 @@ int swoole_coroutine_fstat(int fd, struct stat *statbuf) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -487,7 +482,7 @@ int swoole_coroutine_unlink(const char *pathname) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -512,7 +507,7 @@ int swoole_coroutine_statvfs(const char *path, struct statvfs *buf) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -537,7 +532,7 @@ int swoole_coroutine_mkdir(const char *pathname, mode_t mode) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -561,7 +556,7 @@ int swoole_coroutine_rmdir(const char *pathname) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -586,7 +581,7 @@ int swoole_coroutine_rename(const char *oldpath, const char *newpath) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } @@ -611,21 +606,10 @@ int swoole_coroutine_access(const char *pathname, int mode) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } -int swoole_coroutine_sleep(double sec) -{ - coroutine_t* co = coroutine_get_current(); - if (swTimer_add(&SwooleG.timer, sec * 1000, 0, co, sleep_timeout) == NULL) - { - return -1; - } - coroutine_yield(co); - return 0; -} - int swoole_coroutine_flock(int fd, int operation) { if (SwooleG.main_reactor == nullptr || coroutine_get_current_cid() == -1) @@ -647,11 +631,27 @@ int swoole_coroutine_flock(int fd, int operation) { return SW_ERR; } - coroutine_yield((coroutine_t *) ev.object); + ((Coroutine *) ev.object)->yield(); return ev.ret; } -swString* swoole_coroutine_read_file(const char *file, int lock) +static void sleep_timeout(swTimer *timer, swTimer_node *tnode) +{ + ((Coroutine *) tnode->data)->resume(); +} + +int Coroutine::sleep(double sec) +{ + Coroutine* co = coroutine_get_current(); + if (swTimer_add(&SwooleG.timer, sec * 1000, 0, co, sleep_timeout) == NULL) + { + return -1; + } + co->yield(); + return 0; +} + +swString* Coroutine::read_file(const char *file, int lock) { aio_task task; @@ -673,7 +673,7 @@ swString* swoole_coroutine_read_file(const char *file, int lock) { return NULL; } - coroutine_yield(task.co); + task.co->yield(); if (ev.error == 0) { swString *str = (swString *) sw_malloc(sizeof(swString)); @@ -688,7 +688,7 @@ swString* swoole_coroutine_read_file(const char *file, int lock) } } -ssize_t swoole_coroutine_write_file(const char *file, char *buf, size_t length, int lock, int flags) +ssize_t Coroutine::write_file(const char *file, char *buf, size_t length, int lock, int flags) { aio_task task; @@ -713,7 +713,7 @@ ssize_t swoole_coroutine_write_file(const char *file, char *buf, size_t length, { return -1; } - coroutine_yield(task.co); + task.co->yield(); if (ev.error != 0) { SwooleG.error = ev.error; diff --git a/src/coroutine/socket.cc b/src/coroutine/socket.cc index 02b131b889f..6d04939cbfa 100644 --- a/src/coroutine/socket.cc +++ b/src/coroutine/socket.cc @@ -1,5 +1,5 @@ #include "socket.h" -#include "context.h" +#include "coroutine.h" #include "async.h" #include "buffer.h" @@ -979,14 +979,13 @@ ssize_t Socket::recvmsg(struct msghdr *msg, int flags) void Socket::yield(int operation) { - coroutine_t *co = coroutine_get_current(); - int cid = coroutine_get_cid(co); - + Coroutine *co = coroutine_get_current(); if (unlikely(!co)) { swError("Socket::yield() must be called in the coroutine."); } + int cid = co->get_cid(); errCode = 0; int ms = (int) (_timeout * 1000); if (ms <= 0 || ms >= SW_TIMER_MAX_VALUE) @@ -1026,9 +1025,7 @@ void Socket::yield(int operation) read_cid = cid; } //=== yield === - - coroutine_yield(co); - + co->yield(); //=== resume === if (operation & SOCKET_LOCK_WRITE) { @@ -1067,7 +1064,7 @@ void Socket::resume(int operation) { assert(0); } - coroutine_resume(coroutine_get_by_id(cid)); + coroutine_get_by_id(cid)->resume(); } bool Socket::bind(std::string address, int port) diff --git a/src/os/wait.cc b/src/os/wait.cc index a0f87c5e60d..e487bca38c3 100644 --- a/src/os/wait.cc +++ b/src/os/wait.cc @@ -20,10 +20,11 @@ #include using namespace std; +using namespace swoole; struct wait_task { - coroutine_t *co; + Coroutine *co; pid_t pid; int status; }; @@ -34,9 +35,6 @@ static queue wait_list; bool signal_init = false; -extern "C" -{ - static void signal_handler(int signo) { if (signo == SIGCHLD) @@ -69,12 +67,15 @@ static void signal_handler(int signo) { task->status = __stat_loc; task->pid = __pid; - coroutine_resume((coroutine_t *) task->co); + task->co->resume(); } } } } +extern "C" +{ + void swoole_coroutine_signal_init() { if (!signal_init) @@ -106,7 +107,7 @@ pid_t swoole_coroutine_waitpid(pid_t __pid, int *__stat_loc, int __options) wait_task task; task.co = coroutine_get_current();; waitpid_map[__pid] = &task; - coroutine_yield(task.co); + task.co->yield(); *__stat_loc = task.status; return task.pid; @@ -132,7 +133,7 @@ pid_t swoole_coroutine_wait(int *__stat_loc) wait_task task; task.co = coroutine_get_current();; waitpid_map[__pid] = &task; - coroutine_yield(task.co); + task.co->yield(); *__stat_loc = task.status; return task.pid; diff --git a/src/reactor/base.c b/src/reactor/base.c index 492e9cfe332..e4442fcdfb0 100644 --- a/src/reactor/base.c +++ b/src/reactor/base.c @@ -27,10 +27,6 @@ #endif #endif -#ifdef SW_COROUTINE -#include "coroutine.h" -#endif - static void swReactor_onTimeout_and_Finish(swReactor *reactor); static void swReactor_onTimeout(swReactor *reactor); static void swReactor_onFinish(swReactor *reactor); diff --git a/swoole_coroutine.cc b/swoole_coroutine.cc index 3fec9418987..3a0597d089f 100644 --- a/swoole_coroutine.cc +++ b/swoole_coroutine.cc @@ -16,9 +16,10 @@ */ #include "php_swoole.h" - #include "swoole_coroutine.h" +using namespace swoole; + #define TASK_SLOT \ ((int)((ZEND_MM_ALIGNED_SIZE(sizeof(coro_task)) + ZEND_MM_ALIGNED_SIZE(sizeof(zval)) - 1) / ZEND_MM_ALIGNED_SIZE(sizeof(zval)))) #define SWCC(x) sw_current_context->x @@ -249,8 +250,9 @@ static void php_coro_create(void *arg) php_coro_save_vm_stack(task); task->output_ptr = nullptr; task->co = coroutine_get_current(); - coroutine_set_task(task->co, (void *) task); + task->co->set_task((void *) task); task->origin_task = origin_task; + task->defer_tasks = nullptr; php_coro_og_create(origin_task); swTraceLog( @@ -381,7 +383,7 @@ long sw_coro_create(zend_fcall_info_cache *fci_cache, int argc, zval *argv, zval php_args.retval = retval; php_args.origin_task = php_coro_get_current_task(); - return coroutine_create(php_coro_create, (void*) &php_args); + return Coroutine::create(php_coro_create, (void*) &php_args); } void sw_coro_save(zval *return_value, php_context *sw_current_context) @@ -402,7 +404,7 @@ void sw_coro_yield() } coro_task *task = php_coro_get_current_task(); php_coro_yield(task); - coroutine_yield_naked(task->co); + task->co->yield_naked(); } int sw_coro_resume(php_context *sw_current_context, zval *retval, zval *coro_retval) @@ -414,7 +416,7 @@ int sw_coro_resume(php_context *sw_current_context, zval *retval, zval *coro_ret ZVAL_COPY(SWCC(current_coro_return_value_ptr), retval); } - coroutine_resume_naked(task->co); + task->co->resume_naked(); if (unlikely(EG(exception))) { diff --git a/swoole_coroutine.h b/swoole_coroutine.h index 0dd77914d4f..aee8c75b5a8 100644 --- a/swoole_coroutine.h +++ b/swoole_coroutine.h @@ -37,7 +37,7 @@ struct coro_task zend_execute_data *execute_data; zend_output_globals *output_ptr; SW_DECLARE_EG_SCOPE (scope); - coroutine_t *co; + swoole::Coroutine *co; std::stack *defer_tasks; coro_task *origin_task; }; @@ -83,6 +83,8 @@ struct php_context extern coro_global COROG; long sw_get_current_cid(); +void sw_coro_add_defer_task(swCallback cb, void *data); + void coro_init(void); void coro_check(void); @@ -99,4 +101,4 @@ int sw_coro_resume(php_context *sw_current_context, zval *retval, zval *coro_ret void sw_coro_save(zval *return_value, php_context *sw_php_context); void sw_coro_set_stack_size(int stack_size); -extern int swoole_coroutine_sleep(double msec); + diff --git a/swoole_coroutine_util.cc b/swoole_coroutine_util.cc index c13d4d3b213..db7b901a6bc 100644 --- a/swoole_coroutine_util.cc +++ b/swoole_coroutine_util.cc @@ -149,7 +149,7 @@ static PHP_METHOD(swoole_coroutine_iterator, __destruct); static PHP_METHOD(swoole_exit_exception, getFlags); static PHP_METHOD(swoole_exit_exception, getStatus); -static std::unordered_map user_yield_coros; +static std::unordered_map user_yield_coros; static zend_class_entry swoole_coroutine_util_ce; static zend_class_entry *swoole_coroutine_util_class_entry_ptr; @@ -160,10 +160,7 @@ static zend_class_entry *swoole_coroutine_iterator_class_entry_ptr; static zend_class_entry swoole_exit_exception_ce; static zend_class_entry *swoole_exit_exception_class_entry_ptr; -extern "C" -{ -int swoole_coroutine_statvfs(const char *path, struct statvfs *buf); -} +extern int swoole_coroutine_statvfs(const char *path, struct statvfs *buf); static const zend_function_entry swoole_coroutine_util_methods[] = { @@ -336,14 +333,14 @@ static PHP_METHOD(swoole_exit_exception, getStatus) */ static PHP_METHOD(swoole_coroutine_util, yield) { - coroutine_t* co = coroutine_get_current(); + Coroutine* co = coroutine_get_current(); if (unlikely(!co)) { swoole_php_fatal_error(E_ERROR, "can not yield outside coroutine"); RETURN_FALSE; } - user_yield_coros[coroutine_get_current_cid()] = 1; - coroutine_yield(co); + user_yield_coros[co->cid] = co; + co->yield(); RETURN_TRUE; } @@ -437,24 +434,21 @@ PHP_FUNCTION(swoole_coroutine_create) static PHP_METHOD(swoole_coroutine_util, resume) { long cid; - coroutine_t* co; if (zend_parse_parameters(ZEND_NUM_ARGS(), "l", &cid) == FAILURE) { RETURN_FALSE; } - if (user_yield_coros.find(cid) == user_yield_coros.end()) + std::unordered_map::iterator _i_co = user_yield_coros.find(cid); + if (_i_co == user_yield_coros.end()) { swoole_php_fatal_error(E_WARNING, "you can not resume the coroutine which is in IO operation."); RETURN_FALSE; } - else if (!(co = coroutine_get_by_id(cid))) - { - swoole_php_fatal_error(E_WARNING, "no coroutine can resume."); - RETURN_FALSE; - } + + Coroutine* co = _i_co->second; user_yield_coros.erase(cid); - coroutine_resume(co); + co->resume(); RETURN_TRUE; } @@ -505,8 +499,7 @@ static PHP_METHOD(swoole_coroutine_util, sleep) } php_swoole_check_reactor(); - - swoole_coroutine_sleep(seconds); + Coroutine::sleep(seconds); RETURN_TRUE; } @@ -1012,7 +1005,7 @@ static PHP_METHOD(swoole_coroutine_util, readFile) php_swoole_check_aio(); - swString *result = swoole_coroutine_read_file(filename, flags & LOCK_EX); + swString *result = Coroutine::read_file(filename, flags & LOCK_EX); if (result == NULL) { RETURN_FALSE; @@ -1041,12 +1034,6 @@ static PHP_METHOD(swoole_coroutine_util, writeFile) Z_PARAM_LONG(flags) ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE); - swAio_event ev; - bzero(&ev, sizeof(swAio_event)); - - ev.nbytes = l_data; - ev.buf = data; - int _flags = O_CREAT | O_WRONLY; if (flags & PHP_FILE_APPEND) { @@ -1057,7 +1044,7 @@ static PHP_METHOD(swoole_coroutine_util, writeFile) _flags |= O_TRUNC; } - ssize_t retval = swoole_coroutine_write_file(filename, data, l_data, flags & LOCK_EX, _flags); + ssize_t retval = Coroutine::write_file(filename, data, l_data, flags & LOCK_EX, _flags); if (retval < 0) { RETURN_FALSE @@ -1529,5 +1516,5 @@ PHP_FUNCTION(swoole_coroutine_defer) memcpy(defer->callback, callback, sizeof(zval)); Z_TRY_ADDREF_P(callback); - coroutine_add_defer_task(coroutine_get_current(), php_swoole_event_onDefer, defer); + sw_coro_add_defer_task(php_swoole_event_onDefer, defer); } diff --git a/swoole_runtime.cc b/swoole_runtime.cc index a4e4ff41701..db48ef1f615 100644 --- a/swoole_runtime.cc +++ b/swoole_runtime.cc @@ -1048,7 +1048,7 @@ static PHP_FUNCTION(_sleep) if (num >= 0.001 && sw_coro_is_in()) { php_swoole_check_reactor(); - RETURN_LONG(swoole_coroutine_sleep((double) num) < 0 ? num : 0); + RETURN_LONG(Coroutine::sleep((double ) num) < 0 ? num : 0); } else { @@ -1073,7 +1073,7 @@ static PHP_FUNCTION(_usleep) if (_time >= 0.001 && sw_coro_is_in()) { php_swoole_check_reactor(); - swoole_coroutine_sleep((double) num / 1000000); + Coroutine::sleep((double) num / 1000000); } else { @@ -1103,7 +1103,7 @@ static PHP_FUNCTION(_time_nanosleep) if (_time >= 0.001 && sw_coro_is_in()) { php_swoole_check_reactor(); - swoole_coroutine_sleep(_time); + Coroutine::sleep(_time); } else { @@ -1162,7 +1162,7 @@ static PHP_FUNCTION(_time_sleep_until) if (_time >= 0.001 && sw_coro_is_in()) { php_swoole_check_reactor(); - swoole_coroutine_sleep(_time); + Coroutine::sleep(_time); } else { diff --git a/tests/swoole_coroutine/defer.phpt b/tests/swoole_coroutine/defer.phpt index 9c4da021c21..10ed47917df 100644 --- a/tests/swoole_coroutine/defer.phpt +++ b/tests/swoole_coroutine/defer.phpt @@ -35,6 +35,7 @@ go(function () { assert(co::getuid() === 1); echo "1\n"; }); +swoole_event_wait(); ?> --EXPECT-- 1