Skip to content

Commit

Permalink
Cleanup useless code[9]
Browse files Browse the repository at this point in the history
  • Loading branch information
matyhtf committed Dec 14, 2018
1 parent e268dd6 commit a0beeff
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 265 deletions.
5 changes: 2 additions & 3 deletions include/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ typedef struct

int swServer_master_onAccept(swReactor *reactor, swEvent *event);
void swServer_master_onTimer(swTimer *timer, swTimer_node *tnode);
void swServer_update_time(swServer *serv);
int swServer_master_send(swServer *serv, swSendData *_send);

int swServer_onFinish(swFactory *factory, swSendData *resp);
int swServer_onFinish2(swFactory *factory, swSendData *resp);
Expand Down Expand Up @@ -735,7 +735,7 @@ int swServer_create_task_worker(swServer *serv);
void swServer_enable_accept(swReactor *reactor);
void swServer_reopen_log_file(swServer *serv);

void swTaskWorker_init(swProcessPool *pool);
void swTaskWorker_init(swServer *serv);
int swTaskWorker_onTask(swProcessPool *pool, swEventData *task);
int swTaskWorker_onFinish(swReactor *reactor, swEvent *event);
void swTaskWorker_onStart(swProcessPool *pool, int worker_id);
Expand Down Expand Up @@ -1018,7 +1018,6 @@ void swReactorThread_free(swServer *serv);
int swReactorThread_close(swReactor *reactor, int fd);
int swReactorThread_onClose(swReactor *reactor, swEvent *event);
int swReactorThread_dispatch(swConnection *conn, char *data, uint32_t length);
int swReactorThread_send(swServer *serv, swSendData *_send);
int swReactorThread_send2worker(swServer *serv, void *data, int len, uint16_t target_worker_id);

int swReactorProcess_create(swServer *serv);
Expand Down
2 changes: 1 addition & 1 deletion src/factory/base.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ int swFactory_finish(swFactory *factory, swSendData *resp)
{
resp->length = resp->info.len;
}
if (swReactorThread_send(factory->ptr, resp) < 0)
if (swServer_master_send(factory->ptr, resp) < 0)
{
return SW_ERR;
}
Expand Down
6 changes: 2 additions & 4 deletions src/network/manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,12 @@ int swManager_start(swFactory *factory)
{
return SW_ERR;
}

swProcessPool *pool = &serv->gs->task_workers;
swTaskWorker_init(pool);
swTaskWorker_init(serv);

swWorker *worker;
for (i = 0; i < serv->task_worker_num; i++)
{
worker = &pool->workers[i];
worker = &serv->gs->task_workers.workers[i];
if (swServer_worker_create(serv, worker) < 0)
{
return SW_ERR;
Expand Down
8 changes: 4 additions & 4 deletions src/network/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ static int swPort_http_static_handler(swServer *serv, swHttpRequest *request, sw
SW_HTTP_SERVER_SOFTWARE
);
response.data = header_buffer;
swReactorThread_send(serv, &response);
swServer_master_send(serv, &response);
goto _finish;
}
}
Expand Down Expand Up @@ -763,7 +763,7 @@ static int swPort_http_static_handler(swServer *serv, swHttpRequest *request, sw
conn->tcp_nopush = 1;
}
#endif
swReactorThread_send(serv, &response);
swServer_master_send(serv, &response);

buffer.offset = 0;
buffer.length = file_stat.st_size;
Expand All @@ -772,15 +772,15 @@ static int swPort_http_static_handler(swServer *serv, swHttpRequest *request, sw
response.length = response.info.len = sizeof(swSendFile_request) + buffer.length + 1;
response.data = (void*) &buffer;

swReactorThread_send(serv, &response);
swServer_master_send(serv, &response);

_finish:
if (!request->keep_alive)
{
response.info.type = SW_EVENT_CLOSE;
response.length = 0;
response.data = NULL;
swReactorThread_send(serv, &response);
swServer_master_send(serv, &response);
}

return SW_TRUE;
Expand Down
2 changes: 1 addition & 1 deletion src/network/reactor_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ int swReactorProcess_start(swServer *serv)
{
return SW_ERR;
}
swTaskWorker_init(&serv->gs->task_workers);
swTaskWorker_init(serv);
if (swProcessPool_start(&serv->gs->task_workers) < 0)
{
return SW_ERR;
Expand Down
239 changes: 4 additions & 235 deletions src/network/reactor_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ static void swReactorThread_onStreamResponse(swStream *stream, char *data, uint3
response.info.len = 0;
response.length = length;
response.data = data;
swReactorThread_send(SwooleG.serv, &response);
swServer_master_send(SwooleG.serv, &response);
}

/**
Expand Down Expand Up @@ -434,7 +434,7 @@ static int swReactorThread_onPipeReceive(swReactor *reactor, swEvent *ev)
{
_send.data = resp.data;
_send.length = resp.info.len;
swReactorThread_send(serv, &_send);
swServer_master_send(serv, &_send);
}
//use send shm
else if (_send.info.from_fd == SW_RESPONSE_SHM)
Expand All @@ -456,7 +456,7 @@ static int swReactorThread_onPipeReceive(swReactor *reactor, swEvent *ev)
memcpy(&pkg_header, _send.data + 4, sizeof(pkg_header));
swWarn("fd=%d, worker=%d, index=%d, serid=%d", _send.info.fd, pkg_header.worker, pkg_header.index, pkg_header.serid);
#endif
swReactorThread_send(serv, &_send);
swServer_master_send(serv, &_send);
worker->lock.unlock(&worker->lock);
}
//use tmp file
Expand All @@ -469,7 +469,7 @@ static int swReactorThread_onPipeReceive(swReactor *reactor, swEvent *ev)
}
_send.data = data->str;
_send.length = data->length;
swReactorThread_send(serv, &_send);
swServer_master_send(serv, &_send);
}
//reactor thread exit
else if (_send.info.from_fd == SW_RESPONSE_EXIT)
Expand Down Expand Up @@ -553,237 +553,6 @@ int swReactorThread_send2worker(swServer *serv, void *data, int len, uint16_t ta
return ret;
}

/**
* send to client or append to out_buffer
*/
int swReactorThread_send(swServer *serv, swSendData *_send)
{
uint32_t session_id = _send->info.fd;
void *_send_data = _send->data;
uint32_t _send_length = _send->length;

swConnection *conn;
if (_send->info.type != SW_EVENT_CLOSE)
{
conn = swServer_connection_verify(serv, session_id);
}
else
{
conn = swServer_connection_verify_no_ssl(serv, session_id);
}
if (!conn)
{
if (_send->info.type == SW_EVENT_TCP)
{
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_NOT_EXIST, "send %d byte failed, session#%d does not exist.", _send_length, session_id);
}
else
{
swoole_error_log(SW_LOG_NOTICE, SW_ERROR_SESSION_NOT_EXIST, "send event$[%d] failed, session#%d does not exist.", _send->info.type, session_id);
}
return SW_ERR;
}

int fd = conn->fd;
swReactor *reactor;

if (serv->factory_mode == SW_MODE_BASE)
{
reactor = &(serv->reactor_threads[0].reactor);
if (conn->overflow)
{
if (serv->send_yield)
{
SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW;
}
else
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "connection#%d output buffer overflow.", fd);
}
return SW_ERR;
}
}
else
{
reactor = &(serv->reactor_threads[conn->from_id].reactor);
assert(fd % serv->reactor_num == reactor->id);
assert(fd % serv->reactor_num == SwooleTG.id);
}

/**
* Reset send buffer, Immediately close the connection.
*/
if (_send->info.type == SW_EVENT_CLOSE && (conn->close_reset || conn->removed))
{
goto close_fd;
}
else if (_send->info.type == SW_EVENT_CONFIRM)
{
reactor->add(reactor, conn->fd, conn->fdtype | SW_EVENT_READ);
conn->listen_wait = 0;
return SW_OK;
}
/**
* pause recv data
*/
else if (_send->info.type == SW_EVENT_PAUSE_RECV)
{
if (conn->events & SW_EVENT_WRITE)
{
return reactor->set(reactor, conn->fd, conn->fdtype | SW_EVENT_WRITE);
}
else
{
return reactor->del(reactor, conn->fd);
}
}
/**
* resume recv data
*/
else if (_send->info.type == SW_EVENT_RESUME_RECV)
{
if (conn->events & SW_EVENT_WRITE)
{
return reactor->set(reactor, conn->fd, conn->fdtype | SW_EVENT_READ | SW_EVENT_WRITE);
}
else
{
return reactor->add(reactor, conn->fd, conn->fdtype | SW_EVENT_READ);
}
}

if (swBuffer_empty(conn->out_buffer))
{
/**
* close connection.
*/
if (_send->info.type == SW_EVENT_CLOSE)
{
close_fd:
reactor->close(reactor, fd);
return SW_OK;
}
#ifdef SW_REACTOR_SYNC_SEND
//Direct send
if (_send->info.type != SW_EVENT_SENDFILE)
{
if (!conn->direct_send)
{
goto buffer_send;
}

int n;

direct_send:
n = swConnection_send(conn, _send_data, _send_length, 0);
if (n == _send_length)
{
return SW_OK;
}
else if (n > 0)
{
_send_data += n;
_send_length -= n;
goto buffer_send;
}
else if (errno == EINTR)
{
goto direct_send;
}
else
{
goto buffer_send;
}
}
#endif
//buffer send
else
{
#ifdef SW_REACTOR_SYNC_SEND
buffer_send:
#endif
if (!conn->out_buffer)
{
conn->out_buffer = swBuffer_new(SW_BUFFER_SIZE);
if (conn->out_buffer == NULL)
{
return SW_ERR;
}
}
}
}

swBuffer_chunk *chunk;
//close connection
if (_send->info.type == SW_EVENT_CLOSE)
{
chunk = swBuffer_new_chunk(conn->out_buffer, SW_CHUNK_CLOSE, 0);
chunk->store.data.val1 = _send->info.type;
conn->close_queued = 1;
}
//sendfile to client
else if (_send->info.type == SW_EVENT_SENDFILE)
{
swSendFile_request *req = (swSendFile_request *) _send_data;
swConnection_sendfile(conn, req->filename, req->offset, req->length);
}
//send data
else
{
//connection is closed
if (conn->removed)
{
swWarn("connection#%d is closed by client.", fd);
return SW_ERR;
}
//connection output buffer overflow
if (conn->out_buffer->length >= conn->buffer_size)
{
if (serv->send_yield)
{
SwooleG.error = SW_ERROR_OUTPUT_BUFFER_OVERFLOW;
}
else
{
swoole_error_log(SW_LOG_WARNING, SW_ERROR_OUTPUT_BUFFER_OVERFLOW, "connection#%d output buffer overflow.", fd);
}
conn->overflow = 1;
if (serv->onBufferEmpty && serv->onBufferFull == NULL)
{
conn->high_watermark = 1;
}
}

int _length = _send_length;
void* _pos = _send_data;
int _n;

//buffer enQueue
while (_length > 0)
{
_n = _length >= SW_BUFFER_SIZE_BIG ? SW_BUFFER_SIZE_BIG : _length;
swBuffer_append(conn->out_buffer, _pos, _n);
_pos += _n;
_length -= _n;
}

swListenPort *port = swServer_get_port(serv, fd);
if (serv->onBufferFull && conn->high_watermark == 0 && conn->out_buffer->length >= port->buffer_high_watermark)
{
swServer_tcp_notify(serv, conn, SW_EVENT_BUFFER_FULL);
conn->high_watermark = 1;
}
}

//listen EPOLLOUT event
if (reactor->set(reactor, fd, SW_EVENT_TCP | SW_EVENT_WRITE | SW_EVENT_READ) < 0
&& (errno == EBADF || errno == ENOENT))
{
goto close_fd;
}

return SW_OK;
}

/**
* [ReactorThread] worker pipe can write.
Expand Down
Loading

0 comments on commit a0beeff

Please sign in to comment.