diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index fa0fa6367e..97f85c244e 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -19,19 +19,7 @@ func (j *XrayTrafficJob) Run() { return } - // get Client Traffic - - clientTraffics, err := j.xrayService.GetXrayClientTraffic() - if err != nil { - logger.Warning("get xray client traffic failed:", err) - return - } - err = j.inboundService.AddClientTraffic(clientTraffics) - if err != nil { - logger.Warning("add client traffic failed:", err) - } - - traffics, err := j.xrayService.GetXrayTraffic() + traffics, clientTraffics, err := j.xrayService.GetXrayTraffic() if err != nil { logger.Warning("get xray traffic failed:", err) return @@ -41,6 +29,10 @@ func (j *XrayTrafficJob) Run() { logger.Warning("add traffic failed:", err) } + err = j.inboundService.AddClientTraffic(clientTraffics) + if err != nil { + logger.Warning("add client traffic failed:", err) + } } diff --git a/web/service/inbound.go b/web/service/inbound.go index 411fe72510..f6514288a1 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -193,7 +193,7 @@ func (s *InboundService) AddClientTraffic(traffics []*xray.ClientTraffic) (err e err := db.Model(model.Inbound{}).Where("settings like ?", "%" + traffic.Email + "%").First(inbound).Error traffic.InboundId = inbound.Id if err != nil { - logger.Warning("AddClientTraffic find model ", err) + logger.Warning("AddClientTraffic find model ", err, traffic.Email) continue } // get settings clients diff --git a/web/service/xray.go b/web/service/xray.go index 4a062f1628..d4aa58b568 100644 --- a/web/service/xray.go +++ b/web/service/xray.go @@ -116,18 +116,12 @@ func (s *XrayService) GetXrayConfig() (*xray.Config, error) { return xrayConfig, nil } -func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, error) { +func (s *XrayService) GetXrayTraffic() ([]*xray.Traffic, []*xray.ClientTraffic, error) { if !s.IsXrayRunning() { - return nil, errors.New("xray is not running") + return nil, nil, errors.New("xray is not running") } return p.GetTraffic(true) } -func (s *XrayService) GetXrayClientTraffic() ([]*xray.ClientTraffic, error) { - if !s.IsXrayRunning() { - return nil, errors.New("xray is not running") - } - return p.GetClientTraffic(false) -} func (s *XrayService) RestartXray(isForce bool) error { lock.Lock() diff --git a/xray/process.go b/xray/process.go index 16d8a2706a..aa0040d54c 100644 --- a/xray/process.go +++ b/xray/process.go @@ -230,13 +230,13 @@ func (p *process) Stop() error { return p.cmd.Process.Kill() } -func (p *process) GetTraffic(reset bool) ([]*Traffic, error) { +func (p *process) GetTraffic(reset bool) ([]*Traffic, []*ClientTraffic, error) { if p.apiPort == 0 { - return nil, common.NewError("xray api port wrong:", p.apiPort) + return nil, nil, common.NewError("xray api port wrong:", p.apiPort) } conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%v", p.apiPort), grpc.WithInsecure()) if err != nil { - return nil, err + return nil, nil, err } defer conn.Close() @@ -248,13 +248,43 @@ func (p *process) GetTraffic(reset bool) ([]*Traffic, error) { } resp, err := client.QueryStats(ctx, request) if err != nil { - return nil, err + return nil, nil, err } tagTrafficMap := map[string]*Traffic{} + emailTrafficMap := map[string]*ClientTraffic{} + + clientTraffics := make([]*ClientTraffic, 0) traffics := make([]*Traffic, 0) for _, stat := range resp.GetStat() { matchs := trafficRegex.FindStringSubmatch(stat.Name) if len(matchs) < 3 { + + matchs := ClientTrafficRegex.FindStringSubmatch(stat.Name) + if len(matchs) < 3 { + continue + }else { + + isUser := matchs[1] == "user" + email := matchs[2] + isDown := matchs[3] == "downlink" + if ! isUser { + continue + } + traffic, ok := emailTrafficMap[email] + if !ok { + traffic = &ClientTraffic{ + Email: email, + } + emailTrafficMap[email] = traffic + clientTraffics = append(clientTraffics, traffic) + } + if isDown { + traffic.Down = stat.Value + } else { + traffic.Up = stat.Value + } + + } continue } isInbound := matchs[1] == "inbound" @@ -279,55 +309,5 @@ func (p *process) GetTraffic(reset bool) ([]*Traffic, error) { } } - return traffics, nil -} -func (p *process) GetClientTraffic(reset bool) ([]*ClientTraffic, error) { - if p.apiPort == 0 { - return nil, common.NewError("xray api port wrong:", p.apiPort) - } - conn, err := grpc.Dial(fmt.Sprintf("127.0.0.1:%v", p.apiPort), grpc.WithInsecure()) - if err != nil { - return nil, err - } - defer conn.Close() - - client := statsservice.NewStatsServiceClient(conn) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - defer cancel() - request := &statsservice.QueryStatsRequest{ - Reset_: reset, - } - resp, err := client.QueryStats(ctx, request) - if err != nil { - return nil, err - } - emailTrafficMap := map[string]*ClientTraffic{} - traffics := make([]*ClientTraffic, 0) - for _, stat := range resp.GetStat() { - matchs := ClientTrafficRegex.FindStringSubmatch(stat.Name) - if len(matchs) < 3 { - continue - } - isUser := matchs[1] == "user" - email := matchs[2] - isDown := matchs[3] == "downlink" - if ! isUser { - continue - } - traffic, ok := emailTrafficMap[email] - if !ok { - traffic = &ClientTraffic{ - Email: email, - } - emailTrafficMap[email] = traffic - traffics = append(traffics, traffic) - } - if isDown { - traffic.Down = stat.Value - } else { - traffic.Up = stat.Value - } - } - - return traffics, nil + return traffics, clientTraffics, nil }