Skip to content

Commit

Permalink
Add usePipelineRead to support streaming call (#3354)
Browse files Browse the repository at this point in the history
* Add usePipelineRead to support streaming call

* Fix warning

* Fix typo
  • Loading branch information
twose authored Jun 1, 2020
1 parent 713e752 commit 1f5a30d
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 22 deletions.
1 change: 1 addition & 0 deletions include/http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ enum swHttp2_stream_flag
SW_HTTP2_STREAM_REQUEST_END = 1 << 0,
SW_HTTP2_STREAM_PIPELINE_REQUEST = 1 << 1,
SW_HTTP2_STREAM_PIPELINE_RESPONSE = 1 << 2,
SW_HTTP2_STREAM_USE_PIPELINE_READ = 1 << 3,
};

#define SW_HTTP2_FRAME_HEADER_SIZE 9
Expand Down
50 changes: 28 additions & 22 deletions swoole_http2_client_coro.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class http2_client
}

bool connect();
http2_client_stream* create_stream(uint32_t stream_id, bool pipeline);
http2_client_stream* create_stream(uint32_t stream_id, uint8_t flags);
void destroy_stream(http2_client_stream *stream);

inline bool delete_stream(uint32_t stream_id)
Expand All @@ -173,7 +173,7 @@ class http2_client
bool send_window_update(int stream_id, uint32_t size);
bool send_ping_frame();
bool send_data(uint32_t stream_id, const char *p, size_t len, int flag);
uint32_t send_request(zval *req);
uint32_t send_request(zval *zrequest);
bool write_data(uint32_t stream_id, zval *zdata, bool end);
bool send_goaway_frame(zend_long error_code, const char *debug_data, size_t debug_data_len);
enum swReturn_code parse_frame(zval *return_value, bool pipeline_read = false);
Expand Down Expand Up @@ -728,7 +728,7 @@ enum swReturn_code http2_client::parse_frame(zval *return_value, bool pipeline_r
bool end = (flags & SW_HTTP2_FLAG_END_STREAM) ||
type == SW_HTTP2_TYPE_RST_STREAM ||
type == SW_HTTP2_TYPE_GOAWAY;
pipeline_read = (pipeline_read && (stream->flags & SW_HTTP2_STREAM_PIPELINE_RESPONSE));
pipeline_read = ((pipeline_read || (stream->flags & SW_HTTP2_STREAM_USE_PIPELINE_READ)) && (stream->flags & SW_HTTP2_STREAM_PIPELINE_RESPONSE));
if (end || pipeline_read)
{
zval *zresponse = &stream->zresponse;
Expand Down Expand Up @@ -1169,13 +1169,13 @@ void http2_client::destroy_stream(http2_client_stream *stream)
efree(stream);
}

http2_client_stream* http2_client::create_stream(uint32_t stream_id, bool pipeline)
http2_client_stream* http2_client::create_stream(uint32_t stream_id, uint8_t flags)
{
// malloc
http2_client_stream *stream = (http2_client_stream *) ecalloc(1, sizeof(http2_client_stream));
// init
stream->stream_id = stream_id;
stream->flags = pipeline ? SW_HTTP2_STREAM_PIPELINE_REQUEST : SW_HTTP2_STREAM_NORMAL;
stream->flags = flags;
stream->remote_window_size = SW_HTTP2_DEFAULT_WINDOW_SIZE;
stream->local_window_size = SW_HTTP2_DEFAULT_WINDOW_SIZE;
streams.emplace(stream_id, stream);
Expand Down Expand Up @@ -1225,11 +1225,12 @@ bool http2_client::send_data(uint32_t stream_id, const char *p, size_t len, int
return true;
}

uint32_t http2_client::send_request(zval *req)
uint32_t http2_client::send_request(zval *zrequest)
{
zval *zheaders = sw_zend_read_and_convert_property_array(swoole_http2_request_ce, req, ZEND_STRL("headers"), 0);
zval *zdata = sw_zend_read_property(swoole_http2_request_ce, req, ZEND_STRL("data"), 0);
zval *zpipeline = sw_zend_read_property(swoole_http2_request_ce, req, ZEND_STRL("pipeline"), 0);
zval *zheaders = sw_zend_read_and_convert_property_array(swoole_http2_request_ce, zrequest, ZEND_STRL("headers"), 0);
zval *zdata = sw_zend_read_property(swoole_http2_request_ce, zrequest, ZEND_STRL("data"), 0);
zval *zpipeline = sw_zend_read_property(swoole_http2_request_ce, zrequest, ZEND_STRL("pipeline"), 0);
zval ztmp, *zuse_pipeline_read = zend_read_property(Z_OBJCE_P(zrequest), zrequest, ZEND_STRL("usePipelineRead"), 1, &ztmp);
bool is_data_empty = Z_TYPE_P(zdata) == IS_STRING ? Z_STRLEN_P(zdata) == 0 : !zval_is_true(zdata);

if (ZVAL_IS_ARRAY(zdata))
Expand All @@ -1241,14 +1242,24 @@ uint32_t http2_client::send_request(zval *req)
* send headers
*/
char* buffer = SwooleTG.buffer_stack->str;
ssize_t bytes = http2_client_build_header(zobject, req, buffer + SW_HTTP2_FRAME_HEADER_SIZE);
ssize_t bytes = http2_client_build_header(zobject, zrequest, buffer + SW_HTTP2_FRAME_HEADER_SIZE);

if (bytes <= 0)
{
return 0;
}

auto stream = create_stream(stream_id, Z_BVAL_P(zpipeline));
uint8_t flags = 0;
if (zval_is_true(zpipeline))
{
flags |= SW_HTTP2_STREAM_PIPELINE_REQUEST;
}
if (zval_is_true(zuse_pipeline_read))
{
flags |= SW_HTTP2_STREAM_USE_PIPELINE_READ;
}

auto stream = create_stream(stream_id, flags);

if (is_data_empty)
{
Expand Down Expand Up @@ -1387,25 +1398,20 @@ bool http2_client::send_goaway_frame(zend_long error_code, const char *debug_dat

static PHP_METHOD(swoole_http2_client_coro, send)
{
zval *request;
http2_client *h2c = php_swoole_get_h2c(ZEND_THIS);

if (!h2c->is_available())
{
RETURN_FALSE;
}

if (zend_parse_parameters(ZEND_NUM_ARGS(), "z", &request) == FAILURE)
{
RETURN_FALSE;
}
if (Z_TYPE_P(request) != IS_OBJECT || !instanceof_function(Z_OBJCE_P(request), swoole_http2_request_ce))
{
zend_throw_exception_ex(swoole_http2_client_coro_exception_ce, SW_ERROR_INVALID_PARAMS, "Object is not a instanceof %s", ZSTR_VAL(swoole_http2_request_ce->name));
RETURN_FALSE;
}
zval *zrequest;

ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_OBJECT_OF_CLASS(zrequest, swoole_http2_request_ce)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

uint32_t stream_id = h2c->send_request(request);
uint32_t stream_id = h2c->send_request(zrequest);
if (stream_id == 0)
{
RETURN_FALSE;
Expand Down

0 comments on commit 1f5a30d

Please sign in to comment.