Skip to content

Commit

Permalink
PMM-8770 enable safe metrics collection (#389)
Browse files Browse the repository at this point in the history
* PMM-8489 Fixed topology labels on mongos

* Updated mog for go 1.16

* Updated dependency

* Updated toolkit dep to 3.x

* Updated go.sum for go 1.16

In my local I have go 1.17 so the sum was different and tests were
failing.

* Fixed mod conflicts

* Revert "Merge branch 'PMM-8489_topology_labels_on_mongos'"

This reverts commit 8905fce, reversing
changes made to 4553084.

* PMM-8770 Updated discovery mode

Now discovering mode automatically search for all databases and all
collections. It might receive a filter list and ignores system
collections.

* PMM-8770 Added common functions

* PMM-8770 New collstats limit

* Fixed race condition

* Top collector won't be enabled on mongos

* Fixes for CR

* Renamed collector.topmetric param

Co-authored-by: JiriCtvrtka <[email protected]>
  • Loading branch information
percona-csalguero and JiriCtvrtka authored Dec 1, 2021
1 parent 705db98 commit c8360b4
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 53 deletions.
25 changes: 14 additions & 11 deletions exporter/collstats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,27 @@ func (d *collstatsCollector) Describe(ch chan<- *prometheus.Desc) {
}

func (d *collstatsCollector) Collect(ch chan<- prometheus.Metric) {
collections := d.collections

if d.discoveringMode {
databases := map[string][]string{}
for _, dbCollection := range d.collections {
parts := strings.Split(dbCollection, ".")
if _, ok := databases[parts[0]]; !ok {
db := parts[0]
databases[db], _ = d.client.Database(parts[0]).ListCollectionNames(d.ctx, bson.D{})
}
namespaces, err := listAllCollections(d.ctx, d.client, d.collections)
if err != nil {
d.logger.Errorf("cannot auto discover databases and collections")

return
}

d.collections = fromMapToSlice(databases)
collections = fromMapToSlice(namespaces)
}
for _, dbCollection := range d.collections {

for _, dbCollection := range collections {
parts := strings.Split(dbCollection, ".")
if len(parts) != 2 { //nolint:gomnd
if len(parts) < 2 { //nolint:gomnd
continue
}

database := parts[0]
collection := parts[1]
collection := strings.Join(parts[1:], ".") // support collections having a .

aggregation := bson.D{
{
Expand All @@ -82,12 +83,14 @@ func (d *collstatsCollector) Collect(ch chan<- prometheus.Metric) {
cursor, err := d.client.Database(database).Collection(collection).Aggregate(d.ctx, mongo.Pipeline{aggregation, project})
if err != nil {
d.logger.Errorf("cannot get $collstats cursor for collection %s.%s: %s", database, collection, err)

continue
}

var stats []bson.M
if err = cursor.All(d.ctx, &stats); err != nil {
d.logger.Errorf("cannot get $collstats for collection %s.%s: %s", database, collection, err)

continue
}

Expand Down
98 changes: 98 additions & 0 deletions exporter/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package exporter

import (
"context"

"github.com/AlekSi/pointer"
"github.com/pkg/errors"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)

var systemDBs = []string{"admin", "config", "local"} //nolint:gochecknoglobals

func listCollections(ctx context.Context, client *mongo.Client, collections []string, database string) ([]string, error) {
filter := bson.D{} // Default=empty -> list all collections

// if there is a filter with the list of collections we want, create a filter like
// $or: {
// {"$regex": "collection1"},
// {"$regex": "collection2"},
// }
if len(collections) > 0 {
matchExpressions := []bson.D{}

for _, collection := range collections {
matchExpressions = append(matchExpressions,
bson.D{{Key: "name", Value: primitive.Regex{Pattern: collection, Options: "i"}}})
}

filter = bson.D{{Key: "$or", Value: matchExpressions}}
}

databases, err := client.Database(database).ListCollectionNames(ctx, filter)
if err != nil {
return nil, errors.Wrap(err, "cannot get the list of collections for discovery")
}

return databases, nil
}

func databases(ctx context.Context, client *mongo.Client, exclude []string) ([]string, error) {
opts := &options.ListDatabasesOptions{NameOnly: pointer.ToBool(true), AuthorizedDatabases: pointer.ToBool(true)}
filterExpressions := []bson.D{}
for _, dbname := range exclude {
filterExpressions = append(filterExpressions,
bson.D{{Key: "name", Value: bson.D{{Key: "$ne", Value: dbname}}}},
)
}

filter := bson.D{{Key: "$and", Value: filterExpressions}}

dbNames, err := client.ListDatabaseNames(ctx, filter, opts)
if err != nil {
return nil, errors.Wrap(err, "cannot get the database names list")
}

return dbNames, nil
}

func listAllCollections(ctx context.Context, client *mongo.Client, filter []string) (map[string][]string, error) {
namespaces := make(map[string][]string)
// exclude system databases
dbnames, err := databases(ctx, client, systemDBs)
if err != nil {
return nil, errors.Wrap(err, "cannot get the list of all collections in the server")
}

for _, dbname := range dbnames {
colls, err := listCollections(ctx, client, filter, dbname)
if err != nil {
return nil, errors.Wrapf(err, "cannot list the collections for %q", dbname)
}
namespaces[dbname] = colls
}

return namespaces, nil
}

func allCollectionsCount(ctx context.Context, client *mongo.Client, filter []string) (int, error) {
databases, err := databases(ctx, client, systemDBs)
if err != nil {
return 0, errors.Wrap(err, "cannot retrieve the collection names for count collections")
}

var count int

for _, dbname := range databases {
colls, err := listCollections(ctx, client, filter, dbname)
if err != nil {
return 0, errors.Wrap(err, "cannot get collections count")
}
count += len(colls)
}

return count, nil
}
54 changes: 54 additions & 0 deletions exporter/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package exporter

import (
"context"
"sort"
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.mongodb.org/mongo-driver/bson"

"github.com/percona/mongodb_exporter/internal/tu"
)

func TestListCollections(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

client := tu.DefaultTestClient(ctx, t)

databases := []string{"testdb01", "testdb02"}
collections := []string{"col01", "col02", "colxx", "colyy"}

defer func() {
for _, dbname := range databases {
client.Database(dbname).Drop(ctx) //nolint:errcheck
}
}()

for _, dbname := range databases {
for _, coll := range collections {
for j := 0; j < 10; j++ {
_, err := client.Database(dbname).Collection(coll).InsertOne(ctx, bson.M{"f1": j, "f2": "2"})
assert.NoError(t, err)
}
}
}

want := []string{"col01", "col02", "colxx"}
collections, err := listCollections(ctx, client, []string{"col0", "colx"}, databases[0])
sort.Strings(collections)

assert.NoError(t, err)
assert.Equal(t, want, collections)

count, err := allCollectionsCount(ctx, client, nil)
assert.NoError(t, err)
assert.True(t, count > 8)

count, err = allCollectionsCount(ctx, client, []string{"col0", "colx"})
assert.NoError(t, err)
assert.Equal(t, 6, count)
}
83 changes: 59 additions & 24 deletions exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,35 @@ import (

// Exporter holds Exporter methods and attributes.
type Exporter struct {
path string
client *mongo.Client
clientMu sync.Mutex
logger *logrus.Logger
opts *Opts
webListenAddress string
path string
client *mongo.Client
clientMu sync.Mutex
logger *logrus.Logger
opts *Opts
webListenAddress string
lock *sync.Mutex
totalCollectionsCount int
}

// Opts holds new exporter options.
type Opts struct {
CollStatsCollections []string
CollStatsLimit int
CollectorTopMetrics bool
CompatibleMode bool
DiscoveringMode bool
GlobalConnPool bool
DirectConnect bool
URI string
Path string
WebListenAddress string
IndexStatsCollections []string
CollStatsCollections []string
Logger *logrus.Logger
DisableDefaultRegistry bool
DisableDiagnosticData bool
DisableReplicasetStatus bool
DisableDefaultRegistry bool
DiscoveringMode bool
EnableDBStats bool
CollectorTopMetrics bool
EnableTop bool
GlobalConnPool bool
IndexStatsCollections []string
Logger *logrus.Logger
Path string
URI string
WebListenAddress string
}

var (
Expand All @@ -78,21 +82,31 @@ func New(opts *Opts) *Exporter {
ctx := context.Background()

exp := &Exporter{
path: opts.Path,
logger: opts.Logger,
opts: opts,
webListenAddress: opts.WebListenAddress,
path: opts.Path,
logger: opts.Logger,
opts: opts,
webListenAddress: opts.WebListenAddress,
lock: &sync.Mutex{},
totalCollectionsCount: -1, // not calculated yet. waiting the db connection.
}
// Try initial connect. Connection will be retried with every scrape.
go func() {
if _, err := exp.getClient(ctx); err != nil {
_, err := exp.getClient(ctx)
if err != nil {
exp.logger.Errorf("Cannot connect to MongoDB: %v", err)
}
}()

return exp
}

func (e *Exporter) getTotalCollectionsCount() int {
e.lock.Lock()
defer e.lock.Unlock()

return e.totalCollectionsCount
}

func (e *Exporter) makeRegistry(ctx context.Context, client *mongo.Client, topologyInfo labelsGetter) *prometheus.Registry {
registry := prometheus.NewRegistry()

Expand All @@ -108,7 +122,17 @@ func (e *Exporter) makeRegistry(ctx context.Context, client *mongo.Client, topol
e.logger.Errorf("Cannot get node type to check if this is a mongos: %s", err)
}

if len(e.opts.CollStatsCollections) > 0 {
// enable collection dependant collectors like collstats and indexstats
enableCollStats := false
if e.opts.CollStatsLimit == -1 {
enableCollStats = true
}
if e.getTotalCollectionsCount() > 0 && e.getTotalCollectionsCount() < e.opts.CollStatsLimit {
enableCollStats = true
}

// if we manually set the collection names we want or auto discovery is set
if (len(e.opts.CollStatsCollections) > 0 || e.opts.DiscoveringMode) && enableCollStats {
cc := collstatsCollector{
ctx: ctx,
client: client,
Expand All @@ -121,7 +145,8 @@ func (e *Exporter) makeRegistry(ctx context.Context, client *mongo.Client, topol
registry.MustRegister(&cc)
}

if len(e.opts.IndexStatsCollections) > 0 {
// if we manually set the collection names we want or auto discovery is set
if (len(e.opts.IndexStatsCollections) > 0 || e.opts.DiscoveringMode) && enableCollStats {
ic := indexstatsCollector{
ctx: ctx,
client: client,
Expand Down Expand Up @@ -155,7 +180,7 @@ func (e *Exporter) makeRegistry(ctx context.Context, client *mongo.Client, topol
registry.MustRegister(&cc)
}

if e.opts.CollectorTopMetrics {
if e.opts.CollectorTopMetrics && nodeType != typeMongos {
tc := topCollector{
ctx: ctx,
client: client,
Expand Down Expand Up @@ -229,6 +254,16 @@ func (e *Exporter) Handler() http.Handler {

return
}

if e.getTotalCollectionsCount() < 0 {
count, err := allCollectionsCount(ctx, client, nil)
if err == nil {
e.lock.Lock()
e.totalCollectionsCount = count
e.lock.Unlock()
}
}

// Close client after usage
if !e.opts.GlobalConnPool {
defer func() {
Expand Down
Loading

0 comments on commit c8360b4

Please sign in to comment.