From cf17e132d1fcabb0e968f5cd1734e3f05a08a17c Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Fri, 16 Aug 2024 14:04:37 -0700 Subject: [PATCH] feat(libp2p): expose libp2p bandwidth metrics This exposes bandwidth metrics via async callback to avoid allocating/reporting metrics on any hot-paths. I'm using open telemetry as we've already setup a bridge for F3 and opencensus is deprecated in favor of open telemetry (so we're going to slowly move over anyways). --- chain/lf3/f3.go | 17 ----------------- metrics/otel_bridge.go | 21 +++++++++++++++++++++ node/modules/lp2p/host.go | 2 ++ node/modules/lp2p/metrics.go | 30 ++++++++++++++++++++++++++++++ node/modules/lp2p/transport.go | 29 +++++++++++++++++++++++++++-- 5 files changed, 80 insertions(+), 19 deletions(-) create mode 100644 metrics/otel_bridge.go create mode 100644 node/modules/lp2p/metrics.go diff --git a/chain/lf3/f3.go b/chain/lf3/f3.go index 2514320ea7b..eba612e77b1 100644 --- a/chain/lf3/f3.go +++ b/chain/lf3/f3.go @@ -10,9 +10,6 @@ import ( logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/prometheus" - "go.opentelemetry.io/otel/sdk/metric" "go.uber.org/fx" "golang.org/x/xerrors" @@ -53,20 +50,6 @@ type F3Params struct { var log = logging.Logger("f3") -func init() { - // Set up otel to prometheus reporting so that F3 metrics are reported via lotus - // prometheus metrics. This bridge eventually gets picked up by opencensus - // exporter as HTTP handler. This by default registers an otel collector against - // the global prometheus registry. In the future, we should clean up metrics in - // Lotus and move it all to use otel. For now, bridge away. - if bridge, err := prometheus.New(); err != nil { - log.Errorf("could not create the otel prometheus exporter: %v", err) - } else { - provider := metric.NewMeterProvider(metric.WithReader(bridge)) - otel.SetMeterProvider(provider) - } -} - func New(mctx helpers.MetricsCtx, lc fx.Lifecycle, params F3Params) (*F3, error) { ds := namespace.Wrap(params.Datastore, datastore.NewKey("/f3")) diff --git a/metrics/otel_bridge.go b/metrics/otel_bridge.go new file mode 100644 index 00000000000..76572f5afb2 --- /dev/null +++ b/metrics/otel_bridge.go @@ -0,0 +1,21 @@ +package metrics + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/prometheus" + "go.opentelemetry.io/otel/sdk/metric" +) + +func init() { + // Set up otel to prometheus reporting so that F3 metrics are reported via lotus + // prometheus metrics. This bridge eventually gets picked up by opencensus + // exporter as HTTP handler. This by default registers an otel collector against + // the global prometheus registry. In the future, we should clean up metrics in + // Lotus and move it all to use otel. For now, bridge away. + if bridge, err := prometheus.New(); err != nil { + log.Errorf("could not create the otel prometheus exporter: %v", err) + } else { + provider := metric.NewMeterProvider(metric.WithReader(bridge)) + otel.SetMeterProvider(provider) + } +} diff --git a/node/modules/lp2p/host.go b/node/modules/lp2p/host.go index baea4cf0656..599b5e12d7c 100644 --- a/node/modules/lp2p/host.go +++ b/node/modules/lp2p/host.go @@ -16,6 +16,7 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" routedhost "github.com/libp2p/go-libp2p/p2p/host/routed" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + "go.opentelemetry.io/otel/metric" "go.uber.org/fx" "github.com/filecoin-project/lotus/build" @@ -64,6 +65,7 @@ func Host(mctx helpers.MetricsCtx, buildVersion build.BuildVersion, lc fx.Lifecy lc.Append(fx.Hook{ OnStop: func(ctx context.Context) error { + _ = registration.Unregister() return h.Close() }, }) diff --git a/node/modules/lp2p/metrics.go b/node/modules/lp2p/metrics.go new file mode 100644 index 00000000000..386721dbe61 --- /dev/null +++ b/node/modules/lp2p/metrics.go @@ -0,0 +1,30 @@ +package lp2p + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var otelmeter = otel.Meter("libp2p") + +var attrPeerID = attribute.Key("peer") +var attrProtocolID = attribute.Key("protocol") +var attrDirectionInbound = attribute.String("direction", "inbound") +var attrDirectionOutbound = attribute.String("direction", "outbound") + +var otelmetrics = struct { + bandwidth metric.Int64ObservableGauge +}{ + bandwidth: must(otelmeter.Int64ObservableGauge("libp2p_bandwidth", + metric.WithDescription("Libp2p stream traffic."), + metric.WithUnit("By"), + )), +} + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/node/modules/lp2p/transport.go b/node/modules/lp2p/transport.go index 536f612b1aa..55ddd4cb702 100644 --- a/node/modules/lp2p/transport.go +++ b/node/modules/lp2p/transport.go @@ -1,11 +1,16 @@ package lp2p import ( + "context" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/peer" noise "github.com/libp2p/go-libp2p/p2p/security/noise" tls "github.com/libp2p/go-libp2p/p2p/security/tls" libp2pquic "github.com/libp2p/go-libp2p/p2p/transport/quic" + "go.opentelemetry.io/otel/metric" + "go.uber.org/fx" ) var DefaultTransports = simpleOpt(libp2p.DefaultTransports) @@ -31,8 +36,28 @@ func Security(enabled, preferTLS bool) interface{} { } } -func BandwidthCounter() (opts Libp2pOpts, reporter metrics.Reporter) { +func BandwidthCounter(lc fx.Lifecycle, id peer.ID) (opts Libp2pOpts, reporter metrics.Reporter, err error) { reporter = metrics.NewBandwidthCounter() opts.Opts = append(opts.Opts, libp2p.BandwidthReporter(reporter)) - return opts, reporter + + // Register it with open telemetry. We report by-callback instead of implementing a custom + // bandwidth counter to avoid allocating every time we read/write to a stream (and to stay + // out of the hot path). + peerIdAttr := attrPeerID.String(id.String()) + registration, err := otelmeter.RegisterCallback(func(ctx context.Context, obs metric.Observer) error { + for p, bw := range reporter.GetBandwidthByProtocol() { + protoAttr := attrProtocolID.String(string(p)) + obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalOut, + metric.WithAttributes(peerIdAttr, protoAttr, attrDirectionOutbound)) + obs.ObserveInt64(otelmetrics.bandwidth, bw.TotalIn, + metric.WithAttributes(peerIdAttr, protoAttr, attrDirectionInbound)) + } + return nil + }, otelmetrics.bandwidth) + if err != nil { + return Libp2pOpts{}, nil, err + } + lc.Append(fx.StopHook(registration.Unregister)) + + return opts, reporter, nil }