Skip to content

Commit

Permalink
[5.0] Fix FTP(multiple connections) in curl hook (#4927)
Browse files Browse the repository at this point in the history
* Fix multiple connections in curl hook

# Conflicts:
#	ext-src/swoole_curl.cc

* Fix

* Add curl ftp test

# Conflicts:
#	tests/include/skipif.inc

* Fix memory leak

* Optimize

* Fix

* Fix

* Fix

* Fix

* Fix test
  • Loading branch information
Yurunsoft authored Dec 19, 2022
1 parent 8cd8aa7 commit 3ec40d2
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 46 deletions.
12 changes: 8 additions & 4 deletions ext-src/php_swoole_curl.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,19 @@ namespace curl {

class Multi;

struct Handle {
CURL *cp;
struct HandleSocket {
Socket *socket;
Multi *multi;
int event_bitmask;
int event_fd;
int action;
};

struct Handle {
CURL *cp;
Multi *multi;
std::map<int, HandleSocket *> sockets;
};

struct Selector {
bool timer_callback = false;
std::set<Handle *> active_handles;
Expand Down Expand Up @@ -149,7 +153,7 @@ class Multi {

CURLcode exec(php_curl *ch);
long select(php_curlm *mh, double timeout = -1);
void callback(Handle *handle, int event_bitmask);
void callback(Handle *handle, int event_bitmask, int sockfd = -1);

static int cb_readable(Reactor *reactor, Event *event);
static int cb_writable(Reactor *reactor, Event *event);
Expand Down
144 changes: 102 additions & 42 deletions ext-src/swoole_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ namespace curl {

static int execute_callback(Event *event, int bitmask) {
Handle *handle = (Handle *) event->socket->object;
handle->event_bitmask |= bitmask;
handle->event_fd = event->fd;
handle->multi->callback(handle, bitmask);
auto it = handle->sockets.find(event->fd);
if (it != handle->sockets.end()) {
it->second->event_bitmask |= bitmask;
it->second->event_fd = event->fd;
}
handle->multi->callback(handle, bitmask, event->fd);
return 0;
}

Expand Down Expand Up @@ -76,7 +79,9 @@ Socket *Multi::create_socket(CURL *cp, curl_socket_t sockfd) {
curl_multi_assign(multi_handle_, sockfd, (void *) socket);

Handle *handle = get_handle(cp);
handle->socket = socket;
HandleSocket *handle_socket = new HandleSocket();
handle_socket->socket = socket;
handle->sockets[sockfd] = handle_socket;
handle->cp = cp;
socket->object = handle;

Expand All @@ -95,7 +100,11 @@ void Multi::del_event(CURL *cp, void *socket_ptr, curl_socket_t sockfd) {

Handle *handle = get_handle(cp);
if (handle) {
handle->socket = nullptr;
auto it = handle->sockets.find(sockfd);
if (it != handle->sockets.end()) {
handle->sockets.erase(it);
delete it->second;
}
}

swoole_trace_log(SW_TRACE_CO_CURL, SW_ECHO_RED " handle=%p, curl=%p, fd=%d", "[DEL_EVENT]", handle, cp, sockfd);
Expand All @@ -120,7 +129,10 @@ void Multi::set_event(CURL *cp, void *socket_ptr, curl_socket_t sockfd, int acti
}
}
Handle *handle = get_handle(cp);
handle->action = action;
auto it = handle->sockets.find(sockfd);
if (it != handle->sockets.end()) {
it->second->action = action;
}

swoole_trace_log(SW_TRACE_CO_CURL,
SW_ECHO_GREEN " handle=%p, curl=%p, fd=%d, events=%d",
Expand Down Expand Up @@ -164,15 +176,19 @@ CURLcode Multi::exec(php_curl *ch) {
}

Handle *handle = get_handle(ch->cp);
HandleSocket *handle_socket = nullptr;
bool is_canceled = false;

SW_LOOP {
if (handle->socket && handle->socket->removed) {
if (swoole_event_add(handle->socket, get_event(handle->action)) == SW_OK) {
event_count_++;
for (auto it : handle->sockets) {
handle_socket = it.second;
if (handle_socket->socket && handle_socket->socket->removed) {
if (swoole_event_add(handle_socket->socket, get_event(handle_socket->action)) == SW_OK) {
event_count_++;
}
swoole_trace_log(
SW_TRACE_CO_CURL, "resume, handle=%p, curl=%p, fd=%d", handle, ch->cp, handle_socket->socket->get_fd());
}
swoole_trace_log(
SW_TRACE_CO_CURL, "resume, handle=%p, curl=%p, fd=%d", handle, ch->cp, handle->socket->get_fd());
}

co = check_bound_co();
Expand All @@ -188,9 +204,13 @@ CURLcode Multi::exec(php_curl *ch) {
int sockfd = last_sockfd;
int bitmask = 0;
if (sockfd >= 0) {
bitmask = handle->event_bitmask;
if (handle->socket && !handle->socket->removed && swoole_event_del(handle->socket) == SW_OK) {
event_count_--;
auto it = handle->sockets.find(sockfd);
if (it != handle->sockets.end()) {
handle_socket = it->second;
bitmask = handle_socket->event_bitmask;
if (!handle_socket->socket->removed && swoole_event_del(handle_socket->socket) == SW_OK) {
event_count_--;
}
}
}
del_timer();
Expand All @@ -206,13 +226,36 @@ CURLcode Multi::exec(php_curl *ch) {
break;
}
set_timer();
if (sockfd >= 0 && handle->socket && handle->socket->removed) {
if (swoole_event_add(handle->socket, get_event(handle->action)) == SW_OK) {
event_count_++;
if (sockfd >= 0) {
auto it = handle->sockets.find(sockfd);
if (it != handle->sockets.end()) {
handle_socket = it->second;
if (handle_socket->socket && handle_socket->socket->removed) {
if (swoole_event_add(handle_socket->socket, get_event(handle_socket->action)) == SW_OK) {
event_count_++;
}
}
}
}
if (!timer && handle->socket->removed) {
break;

if (!timer) {
bool removed = true;
for (auto it = handle->sockets.begin(); it != handle->sockets.end();) {
handle_socket = it->second;
if (handle_socket->socket) {
if (handle_socket->socket->removed) {
it = handle->sockets.erase(it);
delete handle_socket;
continue;
} else {
removed = false;
}
}
++it;
}
if (removed) {
break;
}
}
}

Expand Down Expand Up @@ -246,7 +289,7 @@ int Multi::handle_timeout(CURLM *mh, long timeout_ms, void *userp) {
Multi *multi = (Multi *) userp;
swoole_trace_log(SW_TRACE_CO_CURL, SW_ECHO_BLUE "timeout_ms=%ld", "[HANDLE_TIMEOUT]", timeout_ms);
if (!swoole_event_is_available()) {
return 0;
return -1;
}
if (timeout_ms < 0) {
multi->del_timer();
Expand All @@ -268,6 +311,8 @@ long Multi::select(php_curlm *mh, double timeout) {
return CURLE_FAILED_INIT;
}

Socket *socket = nullptr;

for (zend_llist_element *element = mh->easyh.head; element; element = element->next) {
zval *z_ch = (zval *) element->data;
php_curl *ch;
Expand All @@ -276,18 +321,20 @@ long Multi::select(php_curlm *mh, double timeout) {
}
Handle *handle = get_handle(ch->cp);

swoole_trace_log(SW_TRACE_CO_CURL,
"handle=%p, handle->socket=%p, handle->socket->removed=%d",
handle,
handle ? handle->socket : nullptr,
handle ? (handle->socket ? handle->socket->removed : 1) : 1);

if (handle && handle->socket && handle->socket->removed) {
if (swoole_event_add(handle->socket, get_event(handle->action)) == SW_OK) {
event_count_++;
if (handle) {
for (auto it : handle->sockets) {
socket = it.second->socket;

swoole_trace_log(SW_TRACE_CO_CURL, "handle=%p, socket=%p, socket->removed=%d", handle, socket, socket ? socket->removed : 0);

if (socket && socket->removed) {
if (swoole_event_add(socket, get_event(it.second->action)) == SW_OK) {
event_count_++;
}
swoole_trace_log(
SW_TRACE_CO_CURL, "resume, handle=%p, curl=%p, fd=%d", handle, ch->cp, socket->get_fd());
}
}
swoole_trace_log(
SW_TRACE_CO_CURL, "resume, handle=%p, curl=%p, fd=%d", handle, ch->cp, handle->socket->get_fd());
}
}
set_timer();
Expand All @@ -312,10 +359,15 @@ long Multi::select(php_curlm *mh, double timeout) {
continue;
}
Handle *handle = get_handle(ch->cp);
if (handle && handle->socket && !handle->socket->removed && swoole_event_del(handle->socket) == SW_OK) {
swoole_trace_log(
SW_TRACE_CO_CURL, "suspend, handle=%p, curl=%p, fd=%d", handle, ch->cp, handle->socket->get_fd());
event_count_--;
if (handle) {
for (auto it : handle->sockets) {
socket = it.second->socket;
if (socket && !socket->removed && swoole_event_del(socket) == SW_OK) {
swoole_trace_log(
SW_TRACE_CO_CURL, "suspend, handle=%p, curl=%p, fd=%d", handle, ch->cp, socket->get_fd());
event_count_--;
}
}
}
}
del_timer();
Expand All @@ -328,19 +380,25 @@ long Multi::select(php_curlm *mh, double timeout) {

for (auto iter = selector->active_handles.begin(); iter != selector->active_handles.end(); iter++) {
Handle *handle = *iter;
curl_multi_socket_action(multi_handle_, handle->event_fd, handle->event_bitmask, &running_handles_);
swoole_trace_log(SW_TRACE_CO_CURL, "socket_action[socket], running_handles=%d", running_handles_);
if (handle) {
for (auto it = handle->sockets.begin(); it != handle->sockets.end(); ) {
HandleSocket *handle_socket = it->second;
it++;
curl_multi_socket_action(multi_handle_, handle_socket->event_fd, handle_socket->event_bitmask, &running_handles_);
swoole_trace_log(SW_TRACE_CO_CURL, "socket_action[socket], running_handles=%d", running_handles_);
}
}
}

selector->active_handles.clear();

return count;
}

void Multi::callback(Handle *handle, int event_bitmask) {
swoole_trace_log(SW_TRACE_CO_CURL, "handle=%p, event_bitmask=%d, co=%p", handle, event_bitmask, co);
void Multi::callback(Handle *handle, int event_bitmask, int sockfd) {
swoole_trace_log(SW_TRACE_CO_CURL, "handle=%p, event_bitmask=%d, co=%p, sockfd=%d", handle, event_bitmask, co, sockfd);
if (handle) {
last_sockfd = handle->event_fd;
last_sockfd = sockfd;
} else {
last_sockfd = -1;
}
Expand All @@ -351,8 +409,10 @@ void Multi::callback(Handle *handle, int event_bitmask) {
}
if (!co) {
if (handle) {
if (swoole_event_del(handle->socket) == SW_OK) {
event_count_--;
for (auto it : handle->sockets) {
if (swoole_event_del(it.second->socket) == SW_OK) {
event_count_--;
}
}
} else {
del_timer();
Expand Down
7 changes: 7 additions & 0 deletions scripts/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,10 @@ services:
socks5:
container_name: "socks5"
image: "xkuma/socks5"
ftp:
container_name: "ftp"
image: "fauria/vsftpd"
environment:
FTP_USER: admin
FTP_PASS: admin

6 changes: 6 additions & 0 deletions tests/include/config.php
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,9 @@
define('MAX_LOOPS', [12, 24, 100, 1000][PRESSURE_LEVEL] * 1000);
define('MAX_PROCESS_NUM', [2, 4, 6, 8][PRESSURE_LEVEL]);
define('MAX_PACKET_NUM', [1024, 2048, 4096, 10000][PRESSURE_LEVEL]);

/** ============== FTP ============== */
define('FTP_HOST', IS_IN_CI ? 'ftp' : '127.0.0.1');
define('FTP_PORT', 21);
define('FTP_USER', 'admin');
define('FTP_PASS', 'admin');
5 changes: 5 additions & 0 deletions tests/include/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -830,3 +830,8 @@ function swoole_loop($fn)
$fn($i++);
}
}

function build_ftp_url(string $path = ''): string
{
return 'ftp://' . FTP_USER . ':' . FTP_PASS . '@' . FTP_HOST . ':' . FTP_PORT . '/' . $path;
}
6 changes: 6 additions & 0 deletions tests/include/skipif.inc
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,9 @@ function skip_if_no_coroutine_get_execute_time()
{
skip('no Swoole\Coroutine::getExecuteTime', !method_exists(Swoole\Coroutine::class, 'getExecuteTime'));
}

function skip_if_no_ftp()
{
require_once __DIR__ . '/config.php';
skip('no available proxy', !check_tcp_port(FTP_HOST, FTP_PORT));
}
47 changes: 47 additions & 0 deletions tests/swoole_curl/ftp.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
--TEST--
swoole_curl: ftp
--SKIPIF--
<?php
require __DIR__ . '/../include/skipif.inc';
skip_if_no_ftp();
?>
--FILE--
<?php
require __DIR__ . '/../include/bootstrap.php';

use Swoole\Runtime;

use function Swoole\Coroutine\run;

Runtime::enableCoroutine(SWOOLE_HOOK_NATIVE_CURL);

run(function () {
$fileName = __DIR__ . '/upload/curl_testdata1.txt';
$fp = fopen($fileName, 'r');
$ftpUrl = build_ftp_url('1.txt');

// upload
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $ftpUrl);
curl_setopt($ch, CURLOPT_UPLOAD, true);
curl_setopt($ch, CURLOPT_INFILE, $fp);
curl_setopt($ch, CURLOPT_INFILESIZE, filesize($fileName));
Assert::true(curl_exec($ch));
Assert::eq(curl_errno($ch), 0);
Assert::eq(curl_error($ch), '');
curl_close($ch);
fclose($fp);

// download
$ch = curl_init();
curl_setopt($ch, CURLOPT_URL, $ftpUrl);
curl_setopt($ch, \CURLOPT_RETURNTRANSFER, 1);
Assert::eq(curl_exec($ch), file_get_contents($fileName));
Assert::eq(curl_errno($ch), 0);
Assert::eq(curl_error($ch), '');
curl_close($ch);
});
echo "Done\n";
?>
--EXPECT--
Done

0 comments on commit 3ec40d2

Please sign in to comment.