Skip to content

Commit

Permalink
convert to C++ project
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Nov 22, 2018
1 parent 78a9c7a commit 28a41b4
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 208 deletions.
12 changes: 6 additions & 6 deletions include/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ struct notify_msg_t
struct timeout_msg_t
{
Channel *chan;
coroutine_t *co;
Coroutine *co;
bool error;
swTimer_node *timer;
};

class Channel
{
private:
std::list<coroutine_t *> producer_queue;
std::list<coroutine_t *> consumer_queue;
std::list<Coroutine *> producer_queue;
std::list<Coroutine *> consumer_queue;
std::queue<void *> data_queue;
size_t capacity;

Expand Down Expand Up @@ -68,7 +68,7 @@ class Channel
return producer_queue.size();
}

inline void remove(coroutine_t *co)
inline void remove(Coroutine *co)
{
consumer_queue.remove(co);
}
Expand All @@ -87,9 +87,9 @@ class Channel
return data;
}

inline coroutine_t* pop_coroutine(enum channel_op type)
inline Coroutine* pop_coroutine(enum channel_op type)
{
coroutine_t* co;
Coroutine* co;
if (type == PRODUCER)
{
co = producer_queue.front();
Expand Down
3 changes: 2 additions & 1 deletion include/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#endif

#include "swoole.h"
#include "coroutine.h"
#include "error.h"

#if __linux__
Expand All @@ -28,6 +27,8 @@
#include <valgrind/valgrind.h>
#endif

typedef void (*coroutine_func_t)(void*);

namespace swoole
{
//namespace start
Expand Down
82 changes: 48 additions & 34 deletions include/coroutine.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@
+----------------------------------------------------------------------+
*/

#ifndef SW_COROUTINE_H_
#define SW_COROUTINE_H_
#pragma once

#include "swoole.h"

#ifdef __cplusplus
extern "C"
{
#endif
#include "context.h"

#define DEFAULT_MAX_CORO_NUM 3000
#define DEFAULT_STACK_SIZE 8192
Expand All @@ -32,41 +27,67 @@ extern "C"
#define CORO_LIMIT -1
#define CORO_INVALID -2

typedef struct coroutine_s coroutine_t;
typedef void (*coroutine_func_t)(void*);

typedef enum
{
SW_CORO_INIT = 0, SW_CORO_YIELD, SW_CORO_RUNNING, SW_CORO_END,
} sw_coro_state;

typedef void (*coro_php_create_t)();
typedef void (*coro_php_yield_t)(void*);
typedef void (*coro_php_resume_t)(void*);
typedef void (*coro_php_close_t)();

typedef enum
namespace swoole
{
SW_CORO_INIT = 0, SW_CORO_YIELD, SW_CORO_RUNNING, SW_CORO_END,
} sw_coro_state;
class Coroutine
{
public:
swoole::Context ctx;
long cid;
sw_coro_state state;
void *task;

Coroutine(long _cid, size_t stack_size, coroutine_func_t fn, void *private_data) :
ctx(stack_size, fn, private_data)
{
cid = _cid;
task = nullptr;
state = SW_CORO_INIT;
}

void resume();
void yield();

void coro_yield();
void coro_handle_timeout();
void resume_naked();
void yield_naked();

/* basic api */
long coroutine_create(coroutine_func_t func, void* args);
void coroutine_resume(coroutine_t *co);
void coroutine_yield(coroutine_t *co);
void coroutine_resume_naked(coroutine_t *co);
void coroutine_yield_naked(coroutine_t *co);
void coroutine_release(coroutine_t *co);
void release();

inline void set_task(void *_task)
{
task = _task;
}

inline long get_cid()
{
return cid;
}

static long create(coroutine_func_t fn, void* args);
static int sleep(double sec);
static swString* read_file(const char *file, int lock);
static ssize_t write_file(const char *file, char *buf, size_t length, int lock, int flags);
};
}
/* co task */
void coroutine_set_task(coroutine_t *co, void *ptr);
void* coroutine_get_current_task();
void* coroutine_get_task_by_cid(long cid);
void coroutine_add_defer_task(coroutine_t *co, swCallback cb, void *data);
void coroutine_do_defer_task(coroutine_t *co);
/* get coroutine */
coroutine_t* coroutine_get_current();
coroutine_t *coroutine_get_by_id(long cid);
swoole::Coroutine* coroutine_get_current();
swoole::Coroutine* coroutine_get_by_id(long cid);
/* get cid */
long coroutine_get_cid(coroutine_t *co);
long coroutine_get_cid(swoole::Coroutine *co);
long coroutine_get_current_cid();
void coroutine_set_stack_size(int stack_size);
/* callback */
Expand All @@ -78,10 +99,3 @@ void coroutine_set_onClose(coro_php_close_t func);
void internal_coro_yield(void *arg);
void internal_coro_resume(void *arg);

swString* swoole_coroutine_read_file(const char *file, int lock);
ssize_t swoole_coroutine_write_file(const char *file, char *buf, size_t length, int lock, int flags);

#ifdef __cplusplus
} /* end extern "C" */
#endif
#endif
99 changes: 36 additions & 63 deletions src/coroutine/base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,129 +15,107 @@
*/

#include "coroutine.h"
#include "context.h"
#include "async.h"

#include <stack>
#include <unordered_map>
#include <string>

using namespace swoole;

struct coroutine_s
{
public:
Context ctx;
long cid;
sw_coro_state state;
void *task;

coroutine_s(long _cid, size_t stack_size, coroutine_func_t fn, void *private_data) :
ctx(stack_size, fn, private_data)
{
cid = _cid;
task = nullptr;
state = SW_CORO_INIT;
}
};

static struct
{
int stack_size;
int call_stack_size;
long last_cid;
struct coroutine_s* call_stack[SW_MAX_CORO_NESTING_LEVEL];
Coroutine* call_stack[SW_MAX_CORO_NESTING_LEVEL];
coro_php_yield_t onYield; /* before php yield coro */
coro_php_resume_t onResume; /* before php resume coro */
coro_php_close_t onClose; /* before php close coro */
} swCoroG = { SW_DEFAULT_C_STACK_SIZE, 0, 1, { nullptr, }, nullptr, nullptr, nullptr };

static std::unordered_map<long, coroutine_s*> coroutines;
static std::unordered_map<long, Coroutine*> coroutines;

long coroutine_create(coroutine_func_t fn, void* args)
long Coroutine::create(coroutine_func_t fn, void* args)
{
if (unlikely(swCoroG.call_stack_size == SW_MAX_CORO_NESTING_LEVEL))
{
swWarn("reaches the max coroutine nesting level %d", SW_MAX_CORO_NESTING_LEVEL);
return CORO_LIMIT;
}
long cid = swCoroG.last_cid++;
coroutine_t *co = new coroutine_s(cid, swCoroG.stack_size, fn, args);
Coroutine *co = new Coroutine(cid, swCoroG.stack_size, fn, args);
coroutines[cid] = co;
swCoroG.call_stack[swCoroG.call_stack_size++] = co;
co->state = SW_CORO_RUNNING;
co->ctx.SwapIn();
if (co->ctx.end)
{
co->state = SW_CORO_END;
coroutine_release(co);
co->release();
}
return cid;
}

void coroutine_yield(coroutine_t *co)
void Coroutine::yield()
{
if (swCoroG.onYield)
{
swCoroG.onYield(co->task);
swCoroG.onYield(task);
}
swCoroG.call_stack_size--;
co->state = SW_CORO_YIELD;
co->ctx.SwapOut();
state = SW_CORO_YIELD;
ctx.SwapOut();
}

void coroutine_resume(coroutine_t *co)
void Coroutine::resume()
{
if (swCoroG.onResume)
{
swCoroG.onResume(co->task);
swCoroG.onResume(task);
}
swCoroG.call_stack[swCoroG.call_stack_size++] = co;
co->state = SW_CORO_RUNNING;
co->ctx.SwapIn();
if (co->ctx.end)
swCoroG.call_stack[swCoroG.call_stack_size++] = this;
state = SW_CORO_RUNNING;
ctx.SwapIn();
if (ctx.end)
{
coroutine_release(co);
release();
}
}

void coroutine_yield_naked(coroutine_t *co)
void Coroutine::yield_naked()
{
swCoroG.call_stack_size--;
co->state = SW_CORO_YIELD;
co->ctx.SwapOut();
state = SW_CORO_YIELD;
ctx.SwapOut();
}

void coroutine_resume_naked(coroutine_t *co)
void Coroutine::resume_naked()
{
swCoroG.call_stack[swCoroG.call_stack_size++] = co;
co->state = SW_CORO_RUNNING;
co->ctx.SwapIn();
if (co->ctx.end)
swCoroG.call_stack[swCoroG.call_stack_size++] = this;
state = SW_CORO_RUNNING;
ctx.SwapIn();
if (ctx.end)
{
coroutine_release(co);
release();
}
}

void coroutine_release(coroutine_t *co)
void Coroutine::release()
{
co->state = SW_CORO_END;
state = SW_CORO_END;
if (swCoroG.onClose)
{
swCoroG.onClose();
}
swCoroG.call_stack_size--;
coroutines.erase(co->cid);
delete co;
}

void coroutine_set_task(coroutine_t *co, void *task)
{
co->task = task;
coroutines.erase(cid);
delete this;
}

void* coroutine_get_task_by_cid(long cid)
{
coroutine_t *co = coroutine_get_by_id(cid);
Coroutine *co = coroutine_get_by_id(cid);
if (co == nullptr)
{
return nullptr;
Expand All @@ -148,9 +126,9 @@ void* coroutine_get_task_by_cid(long cid)
}
}

coroutine_t* coroutine_get_by_id(long cid)
Coroutine* coroutine_get_by_id(long cid)
{
std::unordered_map<long, coroutine_s*>::iterator i = coroutines.find(cid);
std::unordered_map<long, Coroutine*>::iterator i = coroutines.find(cid);
if (i == coroutines.end())
{
return nullptr;
Expand All @@ -161,14 +139,14 @@ coroutine_t* coroutine_get_by_id(long cid)
}
}

coroutine_t* coroutine_get_current()
Coroutine* coroutine_get_current()
{
return likely(swCoroG.call_stack_size > 0) ? swCoroG.call_stack[swCoroG.call_stack_size - 1] : nullptr;
}

void* coroutine_get_current_task()
{
coroutine_t* co = coroutine_get_current();
Coroutine* co = coroutine_get_current();
if (co == nullptr)
{
return nullptr;
Expand All @@ -181,12 +159,7 @@ void* coroutine_get_current_task()

long coroutine_get_current_cid()
{
coroutine_t* co = coroutine_get_current();
return likely(co) ? co->cid : -1;
}

long coroutine_get_cid(coroutine_t *co)
{
Coroutine* co = coroutine_get_current();
return likely(co) ? co->cid : -1;
}

Expand Down
Loading

0 comments on commit 28a41b4

Please sign in to comment.