-
Notifications
You must be signed in to change notification settings - Fork 3.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
onClose中产生socket错误无法继续往下执行? #3036
Comments
连接异常是目标服务器无法连接,和连接被重置 |
我理解没错的话, 那就是 thrift官方实现的Socket客户端 有问题? |
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
namespace Thrift\Transport;
use Thrift\Exception\TException;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
/**
* Sockets implementation of the TTransport interface.
*
* @package thrift.transport
*/
class TSocket extends TTransport
{
/**
* Handle to PHP socket
*
* @var resource
*/
protected $handle_ = null;
/**
* Remote hostname
*
* @var string
*/
protected $host_ = 'localhost';
/**
* Remote port
*
* @var int
*/
protected $port_ = '9090';
/**
* Send timeout in seconds.
*
* Combined with sendTimeoutUsec this is used for send timeouts.
*
* @var int
*/
protected $sendTimeoutSec_ = 0;
/**
* Send timeout in microseconds.
*
* Combined with sendTimeoutSec this is used for send timeouts.
*
* @var int
*/
protected $sendTimeoutUsec_ = 100000;
/**
* Recv timeout in seconds
*
* Combined with recvTimeoutUsec this is used for recv timeouts.
*
* @var int
*/
protected $recvTimeoutSec_ = 0;
/**
* Recv timeout in microseconds
*
* Combined with recvTimeoutSec this is used for recv timeouts.
*
* @var int
*/
protected $recvTimeoutUsec_ = 750000;
/**
* Persistent socket or plain?
*
* @var bool
*/
protected $persist_ = false;
/**
* Debugging on?
*
* @var bool
*/
protected $debug_ = false;
/**
* Debug handler
*
* @var mixed
*/
protected $debugHandler_ = null;
/**
* Socket constructor
*
* @param string $host Remote hostname
* @param int $port Remote port
* @param bool $persist Whether to use a persistent socket
* @param string $debugHandler Function to call for error logging
*/
public function __construct(
$host = 'localhost',
$port = 9090,
$persist = false,
$debugHandler = null
) {
$this->host_ = $host;
$this->port_ = $port;
$this->persist_ = $persist;
$this->debugHandler_ = $debugHandler ? $debugHandler : 'error_log';
}
/**
* @param resource $handle
* @return void
*/
public function setHandle($handle)
{
$this->handle_ = $handle;
stream_set_blocking($this->handle_, false);
}
/**
* Sets the send timeout.
*
* @param int $timeout Timeout in milliseconds.
*/
public function setSendTimeout($timeout)
{
$this->sendTimeoutSec_ = floor($timeout / 1000);
$this->sendTimeoutUsec_ =
($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000;
}
/**
* Sets the receive timeout.
*
* @param int $timeout Timeout in milliseconds.
*/
public function setRecvTimeout($timeout)
{
$this->recvTimeoutSec_ = floor($timeout / 1000);
$this->recvTimeoutUsec_ =
($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000;
}
/**
* Sets debugging output on or off
*
* @param bool $debug
*/
public function setDebug($debug)
{
$this->debug_ = $debug;
}
/**
* Get the host that this socket is connected to
*
* @return string host
*/
public function getHost()
{
return $this->host_;
}
/**
* Get the remote port that this socket is connected to
*
* @return int port
*/
public function getPort()
{
return $this->port_;
}
/**
* Tests whether this is open
*
* @return bool true if the socket is open
*/
public function isOpen()
{
return is_resource($this->handle_);
}
/**
* Connects the socket.
*/
public function open()
{
if ($this->isOpen()) {
throw new TTransportException('Socket already connected', TTransportException::ALREADY_OPEN);
}
if (empty($this->host_)) {
throw new TTransportException('Cannot open null host', TTransportException::NOT_OPEN);
}
if ($this->port_ <= 0) {
throw new TTransportException('Cannot open without port', TTransportException::NOT_OPEN);
}
if ($this->persist_) {
$this->handle_ = @pfsockopen(
$this->host_,
$this->port_,
$errno,
$errstr,
$this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000)
);
} else {
$this->handle_ = @fsockopen(
$this->host_,
$this->port_,
$errno,
$errstr,
$this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000)
);
}
// Connect failed?
if ($this->handle_ === false) {
$error = 'TSocket: Could not connect to ' .
$this->host_ . ':' . $this->port_ . ' (' . $errstr . ' [' . $errno . '])';
if ($this->debug_) {
call_user_func($this->debugHandler_, $error);
}
throw new TException($error);
}
if (function_exists('socket_import_stream') && function_exists('socket_set_option')) {
$socket = socket_import_stream($this->handle_);
socket_set_option($socket, SOL_TCP, TCP_NODELAY, 1);
}
}
/**
* Closes the socket.
*/
public function close()
{
@fclose($this->handle_);
$this->handle_ = null;
}
/**
* Read from the socket at most $len bytes.
*
* This method will not wait for all the requested data, it will return as
* soon as any data is received.
*
* @param int $len Maximum number of bytes to read.
* @return string Binary data
*/
public function read($len)
{
$null = null;
$read = array($this->handle_);
$readable = @stream_select(
$read,
$null,
$null,
$this->recvTimeoutSec_,
$this->recvTimeoutUsec_
);
if ($readable > 0) {
$data = fread($this->handle_, $len);
if ($data === false) {
throw new TTransportException('TSocket: Could not read ' . $len . ' bytes from ' .
$this->host_ . ':' . $this->port_);
} elseif ($data == '' && feof($this->handle_)) {
throw new TTransportException('TSocket read 0 bytes');
}
return $data;
} elseif ($readable === 0) {
throw new TTransportException('TSocket: timed out reading ' . $len . ' bytes from ' .
$this->host_ . ':' . $this->port_);
} else {
throw new TTransportException('TSocket: Could not read ' . $len . ' bytes from ' .
$this->host_ . ':' . $this->port_);
}
}
/**
* Write to the socket.
*
* @param string $buf The data to write
*/
public function write($buf)
{
$null = null;
$write = array($this->handle_);
// keep writing until all the data has been written
while (TStringFuncFactory::create()->strlen($buf) > 0) {
// wait for stream to become available for writing
$writable = @stream_select(
$null,
$write,
$null,
$this->sendTimeoutSec_,
$this->sendTimeoutUsec_
);
if ($writable > 0) {
// write buffer to stream
$written = fwrite($this->handle_, $buf);
if ($written === -1 || $written === false) {
throw new TTransportException(
'TSocket: Could not write ' . TStringFuncFactory::create()->strlen($buf) . ' bytes ' .
$this->host_ . ':' . $this->port_
);
}
// determine how much of the buffer is left to write
$buf = TStringFuncFactory::create()->substr($buf, $written);
} elseif ($writable === 0) {
throw new TTransportException(
'TSocket: timed out writing ' . TStringFuncFactory::create()->strlen($buf) . ' bytes from ' .
$this->host_ . ':' . $this->port_
);
} else {
throw new TTransportException(
'TSocket: Could not write ' . TStringFuncFactory::create()->strlen($buf) . ' bytes ' .
$this->host_ . ':' . $this->port_
);
}
}
}
/**
* Flush output to the socket.
*
* Since read(), readAll() and write() operate on the sockets directly,
* this is a no-op
*
* If you wish to have flushable buffering behaviour, wrap this TSocket
* in a TBufferedTransport.
*/
public function flush()
{
// no-op
}
} 你可以看看这个代码,是官方客户端,很简单的操作,fsockopen打开连接,stream_select等待可读或可写,再调用fwrite或者fread。 |
我替换也就是把fsockopen换成swoole的协程Socket,以及对应的读写,取消掉了stream_select,因为协程客户端自动挂起和激活 |
<?php
use Swoole\Coroutine\Socket;
use Thrift\Exception\TTransportException;
use Thrift\Factory\TStringFuncFactory;
use Thrift\Transport\TTransport;
class SwooleSocket extends TTransport
{
/**
* Socket对象
* @var Socket
*/
protected $handle = null;
/**
* 目标地址
* @var string
*/
protected $host = 'localhost';
/**
* 目标端口
* @var int
*/
protected $port = 9090;
/**
* 超时时间,单位毫秒
* @var int
*/
protected $timeout;
/**
* SwooleSocket constructor.
* @param string $host 目标地址
* @param int $port 目标端口
* @param int $timeout 超时时间
*/
public function __construct(string $host, int $port, int $timeout = 30)
{
$this->host = $host;
$this->port = $port;
$this->timeout = $timeout * 1000;
}
/**
* 连接是否打开
* @return bool
*/
public function isOpen()
{
return is_resource($this->handle);
}
/**
* 打开连接
* @throws TTransportException
*/
public function open()
{
if ($this->isOpen()) {
throw new TTransportException('Socket already connected', TTransportException::ALREADY_OPEN);
}
if (empty($this->host)) {
throw new TTransportException('Cannot open null host', TTransportException::NOT_OPEN);
}
if ($this->port <= 0) {
throw new TTransportException('Cannot open without port', TTransportException::NOT_OPEN);
}
$this->handle = new Socket(AF_INET, SOCK_STREAM, 0);
$retval = $this->handle->connect($this->host, $this->port, $this->timeout);
if ($retval === false) {
if ($this->handle->errCode == SOCKET_ETIMEDOUT) {
throw new TTransportException('Connect timeout.', TTransportException::TIMED_OUT);
} else {
throw new TTransportException('Connect error. [' . $this->handle->errCode . ']',
TTransportException::NOT_OPEN);
}
}
}
/**
* 关闭Socket连接
*/
public function close()
{
$this->handle->close();
$this->handle = null;
}
/**
* 读取指定长度的数据
* @param int $len 需要读取的数据长度
* @return mixed|string
* @throws TTransportException
*/
public function read($len)
{
$data = $this->handle->recv($len, $this->timeout);
if ($data === false) {
throw new TTransportException(
'Could not read ' . $len . ' bytes. [' . $this->handle->errCode . ']',
TTransportException::UNKNOWN
);
}
return $data;
}
/**
* 发送指定数据
* @param string $buf 需要发送的数据
* @throws TTransportException
*/
public function write($buf)
{
while (TStringFuncFactory::create()->strlen($buf) > 0) {
$write_length = $this->handle->send($buf, $this->timeout);
if ($write_length === false) {
throw new TTransportException(
'Could not write ' . TStringFuncFactory::create()->strlen($buf) . ' bytes. [' . $this->handle->errCode . ']',
TTransportException::UNKNOWN
);
}
//确定剩下多少缓冲区要写
$buf = TStringFuncFactory::create()->substr($buf, $write_length);
}
}
} 这个是我封装替换的 |
是否开启了Runtime hook ? 是否是你自己的PHP代码中存在死循环? |
开启了RuntimeHook的。 |
有可能, 是否有能直接复现的例子打包给我或是可以提供ssh供调试? |
通过事后替换实现来看,只存在十秒不到的连接波动,后面就远程调用恢复正常了。 |
仅靠这些信息不好判断啊 得先确定是PHP层面死循环了还是底层死循环了 可以用gdb单步跟踪看看 如果有原始的strace日志最好 signal13是指向已关闭的连接发送了数据, 一般由于没有判断send返回值的代码导致, 但是看你的代码又不像 |
很奇怪, 确实看不出来原因, 代码没发现问题 不过提一嘴你改成Socket以后那个isOpen方法没改, 等价的应是is_object |
这个时候哪怕kill掉那些出现问题的woker进程后,也无法停止服务了,master进程一直残留 |
是不是特殊情况下send没返回false,也就没有抛出异常,导致一直在循环发送数据? |
isOpen是指是否创建了socket吧 不是判断连接是否存活(也没法准确判断), 有个socket->peek就是判断连接是否可读的, 不过你这里不用改 看代码是没问题的, socket->send失败了是返回false的, 这时候你抛异常就抛到trycatch的地方了(从这里开始就是PHP层的事了, 应该和swoole没关系了), 然后你就应该终止逻辑从onClose中返回了 |
抛出异常是到了onClose的trycatch,此时我没有再重复调用offLine,而是继续往下清理别的数据。 |
这里死循环 send 了,请检查你的 PHP 代码,是否没有检测 send 的返回值,如果 send 操作返回 false ,应当跳出循环。 |
代码就在上面,判断了send返回值,如果是false就直接抛出异常了,异常就直接到上面最开始贴的代码try里面捕获异常记录日志,然后就继续往下走没有涉及循环 |
@z5864703 能否给我一份可以稳定重现的 demo 程序,或者 给我一个现场机器,我ssh 来观察一下 |
最近没有出现,稳定复现只在线上,线下我也没复现出来。。。因为要模拟大量长连接,只有在维持大量长连接下,RPC请求连接超时或丢失情况下出现。下次出现我在QQ上@你现场看看了 |
@z5864703 请在 QQ 上联系我,到现场跟踪 |
onClose 回调函数中有一些逻辑使用了全局变量,存在不可重入的问题,正在寻找解决方案。 在 onClose 回调中使用了协程 API 产生了调度,可能使得 close 出现了问题。 |
重新看了一下 strace 日志,是因为 没有检测 $client->send 返回值引起了死循环。 |
这个问题定位到了,是swoole协程客户端在对端关闭连接情况下,recv返回值会存在""空字符串情况,上面代码只判断了返回false情况就死循环了。 |
连接关闭返回空字符串应该是合理的。如果是写C代码,连接关闭返回值是0,错误是-1。 |
但是文档描述是产生错误返回false,那是不是会造成误解?那就需要更新下文档,返回空字符串表示是连接关闭 |
Please answer these questions before submitting your issue. Thanks!
首先在onWorkerStart启动事件里面启用了Runtime::enableCoroutine
在onClose事件中调用了thrift协议的远程调用
用thrift官方实现的Socket客户端,如果try里发生连接异常,会在这里造成死循环,应该是重复回调onClose事件,主要是通过strace工具观察到woker进程一直在执行connect、epoll_ctl、epoll_wait、read、sendto、recvfrom,除了这些函数需要传入的ID不同,其他都是类似的在一直执行offLine。
后面我用swoole的协程Socket实现了socket客户端,替代了thrift官方的,之后就没有出现死循环造成进程100%占用了。
同时观测到在使用原生socket实现的客户端在执行过程中会有一次madvise调用,而用协程的没有这个调用。
What did you expect to see?
正常
What did you see instead?
死循环
What version of Swoole are you using (show your
php --ri swoole
)?The text was updated successfully, but these errors were encountered: