Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent leaking received dogstatsd metrics on agent shutdown #33383

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion comp/dogstatsd/listeners/uds_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ func (l *UDSListener) handleConnection(conn netUnixConn, closeFunc CloseFunction
l.telemetryStore.tlmUDSConnections.Inc(tlmListenerID, l.transport)
defer func() {
_ = closeFunc(conn)
packetsBuffer.Flush()
packetsBuffer.Close()
if telemetryWithFullListenerID {
l.clearTelemetry(tlmListenerID)
Expand Down
21 changes: 19 additions & 2 deletions comp/dogstatsd/packets/assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ package packets
import (
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/util/log"
)

const messageSeparator = byte('\n')
Expand All @@ -24,6 +26,7 @@ type Assembler struct {
flushTimer *time.Ticker
closeChannel chan struct{}
packetSourceType SourceType
wg sync.WaitGroup
sync.Mutex
}

Expand All @@ -44,13 +47,18 @@ func NewAssembler(flushTimer time.Duration, packetsBuffer *Buffer, sharedPacketP
}

func (p *Assembler) flushLoop() {
p.wg.Add(1)
for {
select {
case <-p.flushTimer.C:
p.Lock()
p.flush()
p.Unlock()
case <-p.closeChannel:
p.Lock()
p.flush()
p.Unlock()
p.wg.Done()
return
}
}
Expand Down Expand Up @@ -87,7 +95,16 @@ func (p *Assembler) flush() {

// Close closes the packet assembler
func (p *Assembler) Close() {
p.Lock()
close(p.closeChannel)
p.Unlock()
finChan := make(chan struct{})
go func() {
defer close(finChan)
p.wg.Wait()
}()

select {
case <-finChan:
case <-time.After(time.Second):
log.Debug("Timeout flushing the dogstatsd assembler on stop")
}
}
28 changes: 21 additions & 7 deletions comp/dogstatsd/packets/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package packets
import (
"sync"
"time"

"github.com/DataDog/datadog-agent/pkg/util/log"
)

// Buffer is a buffer of packets that will automatically flush to the given
Expand All @@ -20,6 +22,7 @@ type Buffer struct {
outputChannel chan Packets
closeChannel chan struct{}
m sync.Mutex
wg sync.WaitGroup
telemetryStore *TelemetryStore
}

Expand All @@ -39,6 +42,7 @@ func NewBuffer(bufferSize uint, flushTimer time.Duration, outputChannel chan Pac
}

func (pb *Buffer) flushLoop() {
pb.wg.Add(1)
for {
select {
case <-pb.flushTimer.C:
Expand All @@ -47,6 +51,10 @@ func (pb *Buffer) flushLoop() {
pb.telemetryStore.tlmBufferFlushedTimer.Inc(pb.listenerID)
pb.m.Unlock()
case <-pb.closeChannel:
pb.m.Lock()
pb.flush()
pb.m.Unlock()
pb.wg.Done()
return
}
}
Expand All @@ -70,13 +78,6 @@ func (pb *Buffer) Append(packet *Packet) {
}
}

// Flush offers a thread-safe method to force a flush of the appended packets
func (pb *Buffer) Flush() {
pb.m.Lock()
pb.flush()
pb.m.Unlock()
}

func (pb *Buffer) flush() {
if len(pb.packets) > 0 {
t1 := time.Now()
Expand All @@ -97,6 +98,19 @@ func (pb *Buffer) flush() {
// Close closes the packet buffer
func (pb *Buffer) Close() {
close(pb.closeChannel)
finChan := make(chan struct{})

go func() {
defer close(finChan)
pb.wg.Wait()
}()

select {
case <-finChan:
case <-time.After(time.Second):
log.Debug("Timeout flushing the dogstatsd buffer on stop")
}

if pb.listenerID != "" {
pb.telemetryStore.tlmBufferSize.Delete(pb.listenerID)
pb.telemetryStore.tlmChannelSize.Delete(pb.listenerID)
Expand Down
2 changes: 1 addition & 1 deletion comp/dogstatsd/packets/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,6 @@ func TestBufferFlush(t *testing.T) {
}

buffer.Append(packet)
buffer.Flush()
buffer.Close()
assert.Equal(t, 1, len(packetChannel))
}
3 changes: 2 additions & 1 deletion comp/dogstatsd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,11 @@ func (s *server) stop(context.Context) error {
if !s.IsRunning() {
return nil
}
close(s.stopChan)
for _, l := range s.listeners {
l.Stop()
}
close(s.stopChan)

if s.Statistics != nil {
s.Statistics.Stop()
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/aggregator/demultiplexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type trigger struct {
// service checks and such have to be waited for before returning
// from Flush()
waitForSerializer bool

// used to flush all available data during a flush procedure, event data belonging to
// incomplete buckets. Is generally only appropriate to set during shutdown, since
// subsequent transmissions of the same bucket will override earlier entries in the backend.
forceFlushAll bool
}

// flushTrigger is a trigger used to flush data, results is expected to be written
Expand Down
55 changes: 40 additions & 15 deletions pkg/aggregator/demultiplexer_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ type AgentDemultiplexer struct {
m sync.RWMutex

// stopChan completely stops the flushLoop of the Demultiplexer when receiving
// a message, not doing anything else.
stopChan chan struct{}
// a message, not doing anything else. Passing a non-nil trigger will perform
// a final flush.
stopChan chan *trigger
// flushChan receives a trigger to run an internal flush of all
// samplers (TimeSampler, BufferedAggregator (CheckSampler, Events, ServiceChecks))
// to the shared serializer.
Expand Down Expand Up @@ -200,7 +201,7 @@ func initAgentDemultiplexer(log log.Component,
demux := &AgentDemultiplexer{
log: log,
options: options,
stopChan: make(chan struct{}),
stopChan: make(chan *trigger),
flushChan: make(chan trigger),

// Input
Expand Down Expand Up @@ -301,17 +302,24 @@ func (d *AgentDemultiplexer) flushLoop() {
for {
select {
// stop sequence
case <-d.stopChan:
case trigger, ok := <-d.stopChan:
if ok && trigger != nil {
// Final flush requested
d.flushToSerializer(trigger.time, trigger.waitForSerializer, trigger.forceFlushAll)
if trigger.blockChan != nil {
trigger.blockChan <- struct{}{}
}
}
return
// manual flush sequence
case trigger := <-d.flushChan:
d.flushToSerializer(trigger.time, trigger.waitForSerializer)
d.flushToSerializer(trigger.time, trigger.waitForSerializer, trigger.forceFlushAll)
if trigger.blockChan != nil {
trigger.blockChan <- struct{}{}
}
// automatic flush sequence
case t := <-flushTicker:
d.flushToSerializer(t, false)
d.flushToSerializer(t, false, false)
}
}
}
Expand All @@ -320,6 +328,7 @@ func (d *AgentDemultiplexer) flushLoop() {
// Resources are released, the instance should not be used after a call to `Stop()`.
func (d *AgentDemultiplexer) Stop(flush bool) {
timeout := pkgconfigsetup.Datadog().GetDuration("aggregator_stop_timeout") * time.Second
forceFlushAll := pkgconfigsetup.Datadog().GetBool("dogstatsd_flush_incomplete_buckets")

if d.noAggStreamWorker != nil {
d.noAggStreamWorker.stop(flush)
Expand All @@ -332,18 +341,31 @@ func (d *AgentDemultiplexer) Stop(flush bool) {
time: time.Now(),
blockChan: make(chan struct{}),
waitForSerializer: flush,
forceFlushAll: forceFlushAll,
}
timeoutStart := time.Now()

d.flushChan <- trigger
select {
case <-trigger.blockChan:
case <-time.After(timeout):
d.log.Errorf("flushing data on Stop() timed out")
d.log.Errorf("triggering flushing data on Stop() timed out")

case d.stopChan <- &trigger:
timeout = timeout - time.Since(timeoutStart)
select {
case <-trigger.blockChan:
case <-time.After(timeout):
d.log.Errorf("completing flushing data on Stop() timed out")
}
}
}

// stops the flushloop and makes sure no automatic flushes will happen anymore
d.stopChan <- struct{}{}
} else {
// stops the flushloop and makes sure no automatic flushes will happen anymore
select {
case d.stopChan <- nil:
case <-time.After(timeout):
d.log.Debug("unable to guarantee flush loop termination on Stop()")
}
}

d.m.Lock()
defer d.m.Unlock()
Expand Down Expand Up @@ -380,6 +402,7 @@ func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSeri
time: start,
waitForSerializer: waitForSerializer,
blockChan: make(chan struct{}),
forceFlushAll: false,
}
d.flushChan <- trigger
<-trigger.blockChan
Expand All @@ -397,7 +420,7 @@ func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSeri
// If one day a better (faster?) solution is needed, we could either consider:
// - to have an implementation of SendIterableSeries listening on multiple sinks in parallel, or,
// - to have a thread-safe implementation of the underlying `util.BufferedChan`.
func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerializer bool) {
func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerializer bool, forceFlushAll bool) {
d.m.RLock()
defer d.m.RUnlock()

Expand All @@ -419,8 +442,9 @@ func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerialize
// order the flush to the time sampler, and wait, in a different routine
t := flushTrigger{
trigger: trigger{
time: start,
blockChan: make(chan struct{}),
time: start,
blockChan: make(chan struct{}),
forceFlushAll: forceFlushAll,
},
sketchesSink: sketchesSink,
seriesSink: seriesSink,
Expand All @@ -439,6 +463,7 @@ func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerialize
time: start,
blockChan: make(chan struct{}),
waitForSerializer: waitForSerializer,
forceFlushAll: forceFlushAll,
},
sketchesSink: sketchesSink,
seriesSink: seriesSink,
Expand Down
8 changes: 7 additions & 1 deletion pkg/aggregator/demultiplexer_serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ func (d *ServerlessDemultiplexer) Run() {
// Stop stops the wrapped aggregator and the forwarder.
func (d *ServerlessDemultiplexer) Stop(flush bool) {
if flush {
d.ForceFlushToSerializer(time.Now(), true)
forceFlushAll := pkgconfigsetup.Datadog().GetBool("dogstatsd_flush_incomplete_buckets")
d.forceFlushToSerializer(time.Now(), true, forceFlushAll)
}

d.statsdWorker.stop()
Expand All @@ -104,6 +105,10 @@ func (d *ServerlessDemultiplexer) Stop(flush bool) {

// ForceFlushToSerializer flushes all data from the time sampler to the serializer.
func (d *ServerlessDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool) {
d.forceFlushToSerializer(start, waitForSerializer, false)
}

func (d *ServerlessDemultiplexer) forceFlushToSerializer(start time.Time, waitForSerializer bool, forceFlushAll bool) {
d.flushLock.Lock()
defer d.flushLock.Unlock()

Expand All @@ -119,6 +124,7 @@ func (d *ServerlessDemultiplexer) ForceFlushToSerializer(start time.Time, waitFo
time: start,
blockChan: make(chan struct{}),
waitForSerializer: waitForSerializer,
forceFlushAll: forceFlushAll,
},
sketchesSink: sketchesSink,
seriesSink: seriesSink,
Expand Down
20 changes: 13 additions & 7 deletions pkg/aggregator/time_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package aggregator
import (
"fmt"
"io"
"math"
"strconv"

tagger "github.com/DataDog/datadog-agent/comp/core/tagger/def"
Expand Down Expand Up @@ -124,14 +125,14 @@ func (s *TimeSampler) newSketchSeries(ck ckey.ContextKey, points []metrics.Sketc
return ss
}

func (s *TimeSampler) flushSeries(cutoffTime int64, series metrics.SerieSink) {
func (s *TimeSampler) flushSeries(cutoffTime int64, series metrics.SerieSink, forceFullFlush bool) {
// Map to hold the expired contexts that will need to be deleted after the flush so that we stop sending zeros
contextMetricsFlusher := metrics.NewContextMetricsFlusher()

if len(s.metricsByTimestamp) > 0 {
for bucketTimestamp, contextMetrics := range s.metricsByTimestamp {
// disregard when the timestamp is too recent
if s.isBucketStillOpen(bucketTimestamp, cutoffTime) {
if s.isBucketStillOpen(bucketTimestamp, cutoffTime) && !forceFullFlush {
continue
}

Expand Down Expand Up @@ -199,10 +200,15 @@ func (s *TimeSampler) dedupSerieBySerieSignature(
}
}

func (s *TimeSampler) flushSketches(cutoffTime int64, sketchesSink metrics.SketchesSink) {
func (s *TimeSampler) flushSketches(cutoffTime int64, sketchesSink metrics.SketchesSink, forceFullFlush bool) {
pointsByCtx := make(map[ckey.ContextKey][]metrics.SketchPoint)

s.sketchMap.flushBefore(cutoffTime, func(ck ckey.ContextKey, p metrics.SketchPoint) {
flushAllBefore := cutoffTime
if forceFullFlush {
flushAllBefore = math.MaxInt64
}

s.sketchMap.flushBefore(flushAllBefore, func(ck ckey.ContextKey, p metrics.SketchPoint) {
if p.Sketch == nil {
return
}
Expand All @@ -218,12 +224,12 @@ func (s *TimeSampler) flushSketches(cutoffTime int64, sketchesSink metrics.Sketc
}
}

func (s *TimeSampler) flush(timestamp float64, series metrics.SerieSink, sketches metrics.SketchesSink) {
func (s *TimeSampler) flush(timestamp float64, series metrics.SerieSink, sketches metrics.SketchesSink, forceFlushAll bool) {
// Compute a limit timestamp
cutoffTime := s.calculateBucketStart(timestamp)

s.flushSeries(cutoffTime, series)
s.flushSketches(cutoffTime, sketches)
s.flushSeries(cutoffTime, series, forceFlushAll)
s.flushSketches(cutoffTime, sketches, forceFlushAll)
// expiring contexts
s.contextResolver.expireContexts(int64(timestamp))
s.lastCutOffTime = cutoffTime
Expand Down
2 changes: 1 addition & 1 deletion pkg/aggregator/time_sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,6 @@ func flushSerie(sampler *TimeSampler, timestamp float64) (metrics.Series, metric
var series metrics.Series
var sketches metrics.SketchSeriesList

sampler.flush(timestamp, &series, &sketches)
sampler.flush(timestamp, &series, &sketches, false)
return series, sketches
}
Loading
Loading