Skip to content

Commit

Permalink
Add queue metrics exporter
Browse files Browse the repository at this point in the history
Changes:
- Added `x/metrics` package
- Added `tools/metrics_exporter` binary
  • Loading branch information
hibiken authored Dec 16, 2021
1 parent ddfc674 commit 43cb4dd
Show file tree
Hide file tree
Showing 8 changed files with 585 additions and 6 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
# Ignore examples for now
/examples

# Ignore command binary
# Ignore tool binaries
/tools/asynq/asynq
/tools/metrics_exporter/metrics_exporter

# Ignore asynq config file
.asynq.*

# Ignore editor config files
.vscode
.idea
.idea
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Package `x/metrics` is added.
- Tool `tools/metrics_exporter` binary is added.

## [0.19.1] - 2021-12-12

### Added
Expand Down
13 changes: 9 additions & 4 deletions tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ go 1.13

require (
github.com/fatih/color v1.9.0
github.com/go-redis/redis/v8 v8.11.2
github.com/google/uuid v1.2.0
github.com/hibiken/asynq v0.17.1
github.com/go-redis/redis/v8 v8.11.4
github.com/google/uuid v1.3.0
github.com/hibiken/asynq v0.19.0
github.com/hibiken/asynq/x v0.0.0-00010101000000-000000000000
github.com/mitchellh/go-homedir v1.1.0
github.com/prometheus/client_golang v1.11.0
github.com/spf13/cobra v1.1.1
github.com/spf13/viper v1.7.0
)

replace github.com/hibiken/asynq => ./..
replace (
github.com/hibiken/asynq => ./..
github.com/hibiken/asynq/x => ./../x
)
80 changes: 80 additions & 0 deletions tools/go.sum

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions tools/metrics_exporter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package main

import (
"flag"
"fmt"
"log"
"net/http"

"github.com/hibiken/asynq"
"github.com/hibiken/asynq/x/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

// Declare command-line flags.
// These variables are binded to flags in init().
var (
flagRedisAddr string
flagRedisDB int
flagRedisPassword string
flagRedisUsername string
flagPort int
)

func init() {
flag.StringVar(&flagRedisAddr, "redis-addr", "127.0.0.1:6379", "host:port of redis server to connect to")
flag.IntVar(&flagRedisDB, "redis-db", 0, "redis DB number to use")
flag.StringVar(&flagRedisPassword, "redis-password", "", "password used to connect to redis server")
flag.StringVar(&flagRedisUsername, "redis-username", "", "username used to connect to redis server")
flag.IntVar(&flagPort, "port", 9876, "port to use for the HTTP server")
}

func main() {
flag.Parse()
// Using NewPedanticRegistry here to test the implementation of Collectors and Metrics.
reg := prometheus.NewPedanticRegistry()

inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: flagRedisAddr,
DB: flagRedisDB,
Password: flagRedisPassword,
Username: flagRedisUsername,
})

reg.MustRegister(
metrics.NewQueueMetricsCollector(inspector),
// Add the standard process and go metrics to the registry
collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}),
collectors.NewGoCollector(),
)

http.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
log.Printf("exporter server is listening on port: %d\n", flagPort)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", flagPort), nil))
}
12 changes: 12 additions & 0 deletions x/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
module github.com/hibiken/asynq/x

go 1.16

require (
github.com/go-redis/redis/v8 v8.11.4
github.com/google/uuid v1.3.0
github.com/hibiken/asynq v0.19.0
github.com/prometheus/client_golang v1.11.0
)

replace github.com/hibiken/asynq => ./..
256 changes: 256 additions & 0 deletions x/go.sum

Large diffs are not rendered by default.

164 changes: 164 additions & 0 deletions x/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Package metrics provides implementations of prometheus.Collector to collect Asynq queue metrics.
package metrics

import (
"fmt"
"log"

"github.com/hibiken/asynq"
"github.com/prometheus/client_golang/prometheus"
)

// Namespace used in fully-qualified metrics names.
const namespace = "asynq"

// QueueMetricsCollector gathers queue metrics.
// It implements prometheus.Collector interface.
//
// All metrics exported from this collector have prefix "asynq".
type QueueMetricsCollector struct {
inspector *asynq.Inspector
}

// collectQueueInfo gathers QueueInfo of all queues.
// Since this operation is expensive, it must be called once per collection.
func (qmc *QueueMetricsCollector) collectQueueInfo() ([]*asynq.QueueInfo, error) {
qnames, err := qmc.inspector.Queues()
if err != nil {
return nil, fmt.Errorf("failed to get queue names: %v", err)
}
infos := make([]*asynq.QueueInfo, len(qnames))
for i, qname := range qnames {
qinfo, err := qmc.inspector.GetQueueInfo(qname)
if err != nil {
return nil, fmt.Errorf("failed to get queue info: %v", err)
}
infos[i] = qinfo
}
return infos, nil
}

// Descriptors used by QueueMetricsCollector
var (
tasksQueuedDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "tasks_enqueued_total"),
"Number of tasks enqueued; broken down by queue and state.",
[]string{"queue", "state"}, nil,
)

queueSizeDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "queue_size"),
"Number of tasks in a queue",
[]string{"queue"}, nil,
)

queueLatencyDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "queue_latency_seconds"),
"Number of seconds the oldest pending task is waiting in pending state to be processed.",
[]string{"queue"}, nil,
)

queueMemUsgDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "queue_memory_usage_approx_bytes"),
"Number of memory used by a given queue (approximated number by sampling).",
[]string{"queue"}, nil,
)

pausedQueues = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "queue_paused_total"),
"Number of queues paused",
[]string{"queue"}, nil,
)
)

func (qmc *QueueMetricsCollector) Describe(ch chan<- *prometheus.Desc) {
prometheus.DescribeByCollect(qmc, ch)
}

func (qmc *QueueMetricsCollector) Collect(ch chan<- prometheus.Metric) {
queueInfos, err := qmc.collectQueueInfo()
if err != nil {
log.Printf("Failed to collect metrics data: %v", err)
}
for _, info := range queueInfos {
ch <- prometheus.MustNewConstMetric(
tasksQueuedDesc,
prometheus.GaugeValue,
float64(info.Active),
info.Queue,
"active",
)
ch <- prometheus.MustNewConstMetric(
tasksQueuedDesc,
prometheus.GaugeValue,
float64(info.Pending),
info.Queue,
"pending",
)
ch <- prometheus.MustNewConstMetric(
tasksQueuedDesc,
prometheus.GaugeValue,
float64(info.Scheduled),
info.Queue,
"scheduled",
)
ch <- prometheus.MustNewConstMetric(
tasksQueuedDesc,
prometheus.GaugeValue,
float64(info.Retry),
info.Queue,
"retry",
)
ch <- prometheus.MustNewConstMetric(
tasksQueuedDesc,
prometheus.GaugeValue,
float64(info.Archived),
info.Queue,
"archived",
)
ch <- prometheus.MustNewConstMetric(
tasksQueuedDesc,
prometheus.GaugeValue,
float64(info.Completed),
info.Queue,
"completed",
)

ch <- prometheus.MustNewConstMetric(
queueSizeDesc,
prometheus.GaugeValue,
float64(info.Size),
info.Queue,
)

ch <- prometheus.MustNewConstMetric(
queueLatencyDesc,
prometheus.GaugeValue,
info.Latency.Seconds(),
info.Queue,
)

ch <- prometheus.MustNewConstMetric(
queueMemUsgDesc,
prometheus.GaugeValue,
float64(info.MemoryUsage),
info.Queue,
)

pausedValue := 0 // zero to indicate "not paused"
if info.Paused {
pausedValue = 1
}
ch <- prometheus.MustNewConstMetric(
pausedQueues,
prometheus.GaugeValue,
float64(pausedValue),
info.Queue,
)
}
}

// NewQueueMetricsCollector returns a collector that exports metrics about Asynq queues.
func NewQueueMetricsCollector(inspector *asynq.Inspector) *QueueMetricsCollector {
return &QueueMetricsCollector{inspector: inspector}
}

0 comments on commit 43cb4dd

Please sign in to comment.