Skip to content

Commit

Permalink
feat: Add support for multi-account query
Browse files Browse the repository at this point in the history
  • Loading branch information
gtn3010 committed Nov 27, 2024
1 parent 49a296a commit 66d16cd
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 35 deletions.
6 changes: 3 additions & 3 deletions pkg/clients/cloudwatch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Client interface {
// ListMetrics returns the list of metrics and dimensions for a given namespace
// and metric name. Results pagination is handled automatically: the caller can
// optionally pass a non-nil func in order to handle results pages.
ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) error
ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, includeLinkedAccounts []string, recentlyActiveOnly bool, fn func(page []*model.Metric)) error

// GetMetricData returns the output of the GetMetricData CloudWatch API.
// Results pagination is handled automatically.
Expand Down Expand Up @@ -87,9 +87,9 @@ func (c limitedConcurrencyClient) GetMetricData(ctx context.Context, getMetricDa
return res
}

func (c limitedConcurrencyClient) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) error {
func (c limitedConcurrencyClient) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, includeLinkedAccounts []string, recentlyActiveOnly bool, fn func(page []*model.Metric)) error {
c.limiter.Acquire(listMetricsCall)
err := c.client.ListMetrics(ctx, namespace, metric, recentlyActiveOnly, fn)
err := c.client.ListMetrics(ctx, namespace, metric, includeLinkedAccounts, recentlyActiveOnly, fn)
c.limiter.Release(listMetricsCall)
return err
}
46 changes: 35 additions & 11 deletions pkg/clients/cloudwatch/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package v1

import (
"context"
"slices"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -38,11 +39,14 @@ func NewClient(logger logging.Logger, cloudwatchAPI cloudwatchiface.CloudWatchAP
}
}

func (c client) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) error {
func (c client) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, includeLinkedAccounts []string, recentlyActiveOnly bool, fn func(page []*model.Metric)) error {
filter := &cloudwatch.ListMetricsInput{
MetricName: aws.String(metric.Name),
Namespace: aws.String(namespace),
}
if len(includeLinkedAccounts) > 0 {
filter.IncludeLinkedAccounts = aws.Bool(true)
}
if recentlyActiveOnly {
filter.RecentlyActive = aws.String("PT3H")
}
Expand All @@ -54,7 +58,7 @@ func (c client) ListMetrics(ctx context.Context, namespace string, metric *model
err := c.cloudwatchAPI.ListMetricsPagesWithContext(ctx, filter, func(page *cloudwatch.ListMetricsOutput, lastPage bool) bool {
promutil.CloudwatchAPICounter.WithLabelValues("ListMetrics").Inc()

metricsPage := toModelMetric(page)
metricsPage := toModelMetric(page, includeLinkedAccounts)

if c.logger.IsDebugEnabled() {
c.logger.Debug("ListMetrics", "output", metricsPage, "last_page", lastPage)
Expand All @@ -72,15 +76,31 @@ func (c client) ListMetrics(ctx context.Context, namespace string, metric *model
return nil
}

func toModelMetric(page *cloudwatch.ListMetricsOutput) []*model.Metric {
func toModelMetric(page *cloudwatch.ListMetricsOutput, includeLinkedAccounts []string) []*model.Metric {
modelMetrics := make([]*model.Metric, 0, len(page.Metrics))
for _, cloudwatchMetric := range page.Metrics {
modelMetric := &model.Metric{
MetricName: *cloudwatchMetric.MetricName,
Namespace: *cloudwatchMetric.Namespace,
Dimensions: toModelDimensions(cloudwatchMetric.Dimensions),
if len(includeLinkedAccounts) > 0 {
for i := 0; i < len(page.Metrics); i++ {
linkedAccountId := *page.OwningAccounts[i]
if !slices.Contains(includeLinkedAccounts, "*") && !slices.Contains(includeLinkedAccounts, linkedAccountId) {
continue
}
modelMetric := &model.Metric{
MetricName: *page.Metrics[i].MetricName,
Namespace: *page.Metrics[i].Namespace,
Dimensions: toModelDimensions(page.Metrics[i].Dimensions),
LinkedAccountId: linkedAccountId,
}
modelMetrics = append(modelMetrics, modelMetric)
}
} else {
for _, cloudwatchMetric := range page.Metrics {
modelMetric := &model.Metric{
MetricName: *cloudwatchMetric.MetricName,
Namespace: *cloudwatchMetric.Namespace,
Dimensions: toModelDimensions(cloudwatchMetric.Dimensions),
}
modelMetrics = append(modelMetrics, modelMetric)
}
modelMetrics = append(modelMetrics, modelMetric)
}
return modelMetrics
}
Expand Down Expand Up @@ -109,11 +129,15 @@ func (c client) GetMetricData(ctx context.Context, getMetricData []*model.Cloudw
Period: &data.GetMetricDataProcessingParams.Period,
Stat: &data.GetMetricDataProcessingParams.Statistic,
}
metricDataQueries = append(metricDataQueries, &cloudwatch.MetricDataQuery{
metricDataQuery := &cloudwatch.MetricDataQuery{
Id: &data.GetMetricDataProcessingParams.QueryID,
MetricStat: metricStat,
ReturnData: aws.Bool(true),
})
}
if data.LinkedAccountId != "" {
metricDataQuery.AccountId = aws.String(data.LinkedAccountId)
}
metricDataQueries = append(metricDataQueries, metricDataQuery)
}
input := &cloudwatch.GetMetricDataInput{
EndTime: &endTime,
Expand Down
46 changes: 35 additions & 11 deletions pkg/clients/cloudwatch/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package v2

import (
"context"
"slices"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -38,11 +39,14 @@ func NewClient(logger logging.Logger, cloudwatchAPI *cloudwatch.Client) cloudwat
}
}

func (c client) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, recentlyActiveOnly bool, fn func(page []*model.Metric)) error {
func (c client) ListMetrics(ctx context.Context, namespace string, metric *model.MetricConfig, includeLinkedAccounts []string, recentlyActiveOnly bool, fn func(page []*model.Metric)) error {
filter := &cloudwatch.ListMetricsInput{
MetricName: aws.String(metric.Name),
Namespace: aws.String(namespace),
}
if len(includeLinkedAccounts) > 0 {
filter.IncludeLinkedAccounts = aws.Bool(true)
}
if recentlyActiveOnly {
filter.RecentlyActive = types.RecentlyActivePt3h
}
Expand All @@ -64,7 +68,7 @@ func (c client) ListMetrics(ctx context.Context, namespace string, metric *model
return err
}

metricsPage := toModelMetric(page)
metricsPage := toModelMetric(page, includeLinkedAccounts)
if c.logger.IsDebugEnabled() {
c.logger.Debug("ListMetrics", "output", metricsPage)
}
Expand All @@ -75,15 +79,31 @@ func (c client) ListMetrics(ctx context.Context, namespace string, metric *model
return nil
}

func toModelMetric(page *cloudwatch.ListMetricsOutput) []*model.Metric {
func toModelMetric(page *cloudwatch.ListMetricsOutput, includeLinkedAccounts []string) []*model.Metric {
modelMetrics := make([]*model.Metric, 0, len(page.Metrics))
for _, cloudwatchMetric := range page.Metrics {
modelMetric := &model.Metric{
MetricName: *cloudwatchMetric.MetricName,
Namespace: *cloudwatchMetric.Namespace,
Dimensions: toModelDimensions(cloudwatchMetric.Dimensions),
if len(includeLinkedAccounts) > 0 {
for i := 0; i < len(page.Metrics); i++ {
linkedAccountId := page.OwningAccounts[i]
if !slices.Contains(includeLinkedAccounts, "*") && !slices.Contains(includeLinkedAccounts, linkedAccountId) {
continue
}
modelMetric := &model.Metric{
MetricName: *page.Metrics[i].MetricName,
Namespace: *page.Metrics[i].Namespace,
Dimensions: toModelDimensions(page.Metrics[i].Dimensions),
LinkedAccountId: linkedAccountId,
}
modelMetrics = append(modelMetrics, modelMetric)
}
} else {
for _, cloudwatchMetric := range page.Metrics {
modelMetric := &model.Metric{
MetricName: *cloudwatchMetric.MetricName,
Namespace: *cloudwatchMetric.Namespace,
Dimensions: toModelDimensions(cloudwatchMetric.Dimensions),
}
modelMetrics = append(modelMetrics, modelMetric)
}
modelMetrics = append(modelMetrics, modelMetric)
}
return modelMetrics
}
Expand Down Expand Up @@ -112,11 +132,15 @@ func (c client) GetMetricData(ctx context.Context, getMetricData []*model.Cloudw
Period: aws.Int32(int32(data.GetMetricDataProcessingParams.Period)),
Stat: &data.GetMetricDataProcessingParams.Statistic,
}
metricDataQueries = append(metricDataQueries, types.MetricDataQuery{
metricDataQuery := types.MetricDataQuery{
Id: &data.GetMetricDataProcessingParams.QueryID,
MetricStat: metricStat,
ReturnData: aws.Bool(true),
})
}
if data.LinkedAccountId != "" {
metricDataQuery.AccountId = aws.String(data.LinkedAccountId)
}
metricDataQueries = append(metricDataQueries, metricDataQuery)
}

input := &cloudwatch.GetMetricDataInput{
Expand Down
7 changes: 7 additions & 0 deletions pkg/clients/tagging/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (c client) GetResources(ctx context.Context, job model.DiscoveryJob, region
var resources []*model.TaggedResource
shouldHaveDiscoveredResources := false

if len(job.IncludeLinkedAccounts) > 0 {
// when setting `includeLinkedAccounts`, don't get resources in cross accounts (because we need more permissions in cross accounts)
c.logger.Debug("Return empty resources when enable includeLinkedAccounts")
resources = []*model.TaggedResource{}
return resources, nil
}

if len(svc.ResourceFilters) > 0 {
shouldHaveDiscoveredResources = true

Expand Down
7 changes: 7 additions & 0 deletions pkg/clients/tagging/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func (c client) GetResources(ctx context.Context, job model.DiscoveryJob, region
var resources []*model.TaggedResource
shouldHaveDiscoveredResources := false

if len(job.IncludeLinkedAccounts) > 0 {
// when setting `includeLinkedAccounts`, don't get resources in cross accounts (because we need more permissions in cross accounts)
c.logger.Debug("Return empty resources when enable includeLinkedAccounts")
resources = []*model.TaggedResource{}
return resources, nil
}

if len(svc.ResourceFilters) > 0 {
shouldHaveDiscoveredResources = true
filters := make([]string, 0, len(svc.ResourceFilters))
Expand Down
2 changes: 1 addition & 1 deletion pkg/clients/v2/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (t testClient) GetAccountAlias(_ context.Context) (string, error) {
return "", nil
}

func (t testClient) ListMetrics(_ context.Context, _ string, _ *model.MetricConfig, _ bool, _ func(page []*model.Metric)) error {
func (t testClient) ListMetrics(_ context.Context, _ string, _ *model.MetricConfig, _ []string, _ bool, _ func(page []*model.Metric)) error {
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Job struct {
RoundingPeriod *int64 `yaml:"roundingPeriod"`
RecentlyActiveOnly bool `yaml:"recentlyActiveOnly"`
IncludeContextOnInfoMetrics bool `yaml:"includeContextOnInfoMetrics"`
IncludeLinkedAccounts []string `yaml:"includeLinkedAccounts"`
JobLevelMetricFields `yaml:",inline"`
}

Expand All @@ -88,6 +89,7 @@ type CustomNamespace struct {
CustomTags []Tag `yaml:"customTags"`
DimensionNameRequirements []string `yaml:"dimensionNameRequirements"`
RoundingPeriod *int64 `yaml:"roundingPeriod"`
IncludeLinkedAccounts []string `yaml:"includeLinkedAccounts"`
JobLevelMetricFields `yaml:",inline"`
}

Expand Down Expand Up @@ -421,6 +423,7 @@ func (c *ScrapeConf) toModelConfig() model.JobsConfig {
job.CustomTags = toModelTags(discoveryJob.CustomTags)
job.Metrics = toModelMetricConfig(discoveryJob.Metrics)
job.IncludeContextOnInfoMetrics = discoveryJob.IncludeContextOnInfoMetrics
job.IncludeLinkedAccounts = discoveryJob.IncludeLinkedAccounts
job.DimensionsRegexps = svc.ToModelDimensionsRegexp()

job.ExportedTagsOnMetrics = []string{}
Expand Down Expand Up @@ -456,6 +459,7 @@ func (c *ScrapeConf) toModelConfig() model.JobsConfig {
job.Roles = toModelRoles(customNamespaceJob.Roles)
job.CustomTags = toModelTags(customNamespaceJob.CustomTags)
job.Metrics = toModelMetricConfig(customNamespaceJob.Metrics)
job.IncludeLinkedAccounts = customNamespaceJob.IncludeLinkedAccounts
jobsCfg.CustomNamespaceJobs = append(jobsCfg.CustomNamespaceJobs, job)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/job/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func getMetricDataForQueriesForCustomNamespace(

go func(metric *model.MetricConfig) {
defer wg.Done()
err := clientCloudwatch.ListMetrics(ctx, customNamespaceJob.Namespace, metric, customNamespaceJob.RecentlyActiveOnly, func(page []*model.Metric) {
err := clientCloudwatch.ListMetrics(ctx, customNamespaceJob.Namespace, metric, customNamespaceJob.IncludeLinkedAccounts, customNamespaceJob.RecentlyActiveOnly, func(page []*model.Metric) {
var data []*model.CloudwatchData

for _, cwMetric := range page {
Expand All @@ -75,6 +75,7 @@ func getMetricDataForQueriesForCustomNamespace(
data = append(data, &model.CloudwatchData{
MetricName: metric.Name,
ResourceName: customNamespaceJob.Name,
LinkedAccountId: cwMetric.LinkedAccountId,
Namespace: customNamespaceJob.Namespace,
Dimensions: cwMetric.Dimensions,
GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{
Expand Down
3 changes: 2 additions & 1 deletion pkg/job/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func getMetricDataForQueries(
go func(metric *model.MetricConfig) {
defer wg.Done()

err := clientCloudwatch.ListMetrics(ctx, svc.Namespace, metric, discoveryJob.RecentlyActiveOnly, func(page []*model.Metric) {
err := clientCloudwatch.ListMetrics(ctx, svc.Namespace, metric, discoveryJob.IncludeLinkedAccounts, discoveryJob.RecentlyActiveOnly, func(page []*model.Metric) {
data := getFilteredMetricDatas(logger, discoveryJob.Type, discoveryJob.ExportedTagsOnMetrics, page, discoveryJob.DimensionNameRequirements, metric, assoc)

mux.Lock()
Expand Down Expand Up @@ -169,6 +169,7 @@ func getFilteredMetricDatas(
getMetricsData = append(getMetricsData, &model.CloudwatchData{
MetricName: m.Name,
ResourceName: resource.ARN,
LinkedAccountId: cwMetric.LinkedAccountId,
Namespace: namespace,
Dimensions: cwMetric.Dimensions,
GetMetricDataProcessingParams: &model.GetMetricDataProcessingParams{
Expand Down
28 changes: 21 additions & 7 deletions pkg/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type DiscoveryJob struct {
RecentlyActiveOnly bool
ExportedTagsOnMetrics []string
IncludeContextOnInfoMetrics bool
IncludeLinkedAccounts []string
DimensionsRegexps []DimensionsRegexp
}

Expand All @@ -65,6 +66,17 @@ type CustomNamespaceJob struct {
Metrics []*MetricConfig
CustomTags []Tag
DimensionNameRequirements []string
IncludeLinkedAccounts []string
JobLevelMetricFields
}

type JobLevelMetricFields struct {
Statistics []string
Period int64
Length int64
Delay int64
NilToZero *bool
AddCloudwatchTimestamp *bool
}

type Role struct {
Expand Down Expand Up @@ -106,9 +118,10 @@ type Dimension struct {

type Metric struct {
// The dimensions for the metric.
Dimensions []Dimension
MetricName string
Namespace string
Dimensions []Dimension
MetricName string
Namespace string
LinkedAccountId string
}

type Datapoint struct {
Expand Down Expand Up @@ -160,10 +173,11 @@ type CloudwatchData struct {
// DiscoveryJob = Resource ARN associated with the metric or global when it could not be associated but shouldn't be dropped
// StaticJob = Resource Name from static job config
// CustomNamespace = Custom Namespace job name
ResourceName string
Namespace string
Tags []Tag
Dimensions []Dimension
ResourceName string
Namespace string
Tags []Tag
Dimensions []Dimension
LinkedAccountId string
// GetMetricDataProcessingParams includes necessary fields to run GetMetricData
GetMetricDataProcessingParams *GetMetricDataProcessingParams

Expand Down
5 changes: 5 additions & 0 deletions pkg/promutil/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ func BuildMetrics(results []model.CloudwatchMetricResult, labelsSnakeCase bool,
name := BuildMetricName(metric.Namespace, metric.MetricName, statistic)

promLabels := createPrometheusLabels(metric, labelsSnakeCase, contextLabels, logger)
if metric.LinkedAccountId != "" {
contextLabels["account_id"] = metric.LinkedAccountId
}
maps.Copy(promLabels, contextLabels)

observedMetricLabels = recordLabelsForMetric(name, promLabels, observedMetricLabels)

output = append(output, &PrometheusMetric{
Expand Down

0 comments on commit 66d16cd

Please sign in to comment.