Skip to content

Commit

Permalink
WIP: Add support for logs
Browse files Browse the repository at this point in the history
Note that this branch will be rebased on a periodic basis to stay
on top of changes in main. If you are checking this out early, do
remember to do `git pull -r` to avoid conflicts.
  • Loading branch information
jjatria committed Apr 19, 2024
1 parent 20bc2e0 commit fc22c58
Show file tree
Hide file tree
Showing 18 changed files with 804 additions and 297 deletions.
1 change: 1 addition & 0 deletions Changes
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Revision history for OpenTelemetry-SDK

* Clarify meaning of OTEL_BSP_EXPORT_TIMEOUT in POD
* Minor correction in Resource POD
* EXPERIMENTAL: Add support for logs

0.020 2023-11-26 16:18:52+00:00 Europe/London

Expand Down
29 changes: 29 additions & 0 deletions META.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"OpenTelemetry::Exporter::OTLP" : "0"
},
"requires" : {
"Const::Fast" : "0",
"Feature::Compat::Try" : "0",
"Future::AsyncAwait" : "0.38",
"IO::Async::Loop" : "0",
Expand Down Expand Up @@ -71,6 +72,14 @@
}
},
"provides" : {
"OpenTelemetry::Processor::Batch" : {
"file" : "lib/OpenTelemetry/Processor/Batch.pm",
"version" : "0.022"
},
"OpenTelemetry::Processor::Simple" : {
"file" : "lib/OpenTelemetry/Processor/Simple.pm",
"version" : "0.022"
},
"OpenTelemetry::SDK" : {
"file" : "lib/OpenTelemetry/SDK.pm",
"version" : "0.022"
Expand All @@ -83,6 +92,26 @@
"file" : "lib/OpenTelemetry/SDK/InstrumentationScope.pm",
"version" : "0.022"
},
"OpenTelemetry::SDK::Logs::LogRecord" : {
"file" : "lib/OpenTelemetry/SDK/Logs/LogRecord.pm",
"version" : "0.022"
},
"OpenTelemetry::SDK::Logs::LogRecord::Processor::Batch" : {
"file" : "lib/OpenTelemetry/SDK/Logs/LogRecord/Processor/Batch.pm",
"version" : "0.022"
},
"OpenTelemetry::SDK::Logs::LogRecord::Processor::Simple" : {
"file" : "lib/OpenTelemetry/SDK/Logs/LogRecord/Processor/Simple.pm",
"version" : "0.022"
},
"OpenTelemetry::SDK::Logs::Logger" : {
"file" : "lib/OpenTelemetry/SDK/Logs/Logger.pm",
"version" : "0.022"
},
"OpenTelemetry::SDK::Logs::LoggerProvider" : {
"file" : "lib/OpenTelemetry/SDK/Logs/LoggerProvider.pm",
"version" : "0.022"
},
"OpenTelemetry::SDK::Resource" : {
"file" : "lib/OpenTelemetry/SDK/Resource.pm",
"version" : "0.022"
Expand Down
1 change: 1 addition & 0 deletions cpanfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
requires 'Const::Fast';
requires 'Feature::Compat::Try';
requires 'Future::AsyncAwait', '0.38'; # Object::Pad compatibility
requires 'IO::Async::Loop';
Expand Down
200 changes: 200 additions & 0 deletions lib/OpenTelemetry/Processor/Batch.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use Object::Pad ':experimental(init_expr)';
# ABSTRACT: A batched OpenTelemetry processor

package OpenTelemetry::Processor::Batch;

our $VERSION = '0.014';

# TODO: If we move the different base implementations of processors
# to the top-level, that still leaves the specific implementations
# for Traces / Metrics / Logs to exist somewhere else.
# This works, although we need to be careful about clarifying what
# the expectations are for classes in each namespace, and the what
# are the interfaces they expose.
# In all honesty, this separation ended up looking a lot better than
# expected.
class OpenTelemetry::Processor::Batch :does(OpenTelemetry::Processor) {
use Feature::Compat::Defer;
use Feature::Compat::Try;
use Future::AsyncAwait;
use IO::Async::Function;
use IO::Async::Loop;
use Mutex;
use OpenTelemetry::Common qw( config timeout_timestamp maybe_timeout );
use OpenTelemetry::Constants -export;
use OpenTelemetry::X;
use OpenTelemetry;

use Log::Any;
my $logger = Log::Any->get_logger( category => 'OpenTelemetry' );

use Metrics::Any '$metrics', strict => 1,
name_prefix => [qw( otel processor batch )];

$metrics->make_gauge( 'buffer_use',
name => [qw( buffer use )],
description => 'Ratio between maximum queue size and the size of the queue at export time',
);

field $batch_size :param //= config('BSP_MAX_EXPORT_BATCH_SIZE') // 512;
field $exporter_timeout :param //= config('BSP_EXPORT_TIMEOUT') // 30_000;
field $max_queue_size :param //= config('BSP_MAX_QUEUE_SIZE') // 2_048;
field $schedule_delay :param //= config('BSP_SCHEDULE_DELAY') // 5_000;
field $exporter :param;

field $lock = Mutex->new;

field $done :reader;
field $function;
field @queue;

ADJUST {
die OpenTelemetry::X->create(
Invalid => "Exporter must implement the OpenTelemetry::Exporter interface: " . ( ref $exporter || $exporter )
) unless $exporter && $exporter->DOES('OpenTelemetry::Exporter');

if ( $batch_size > $max_queue_size ) {
$logger->warn(
'Max export batch size cannot be greater than maximum queue size when instantiating batch processor',
{
batch_size => $batch_size,
queue_size => $max_queue_size,
},
);
$batch_size = $max_queue_size;
}

# This is a non-standard variable, so we make it Perl-specific
my $max_workers = $ENV{OTEL_PERL_BSP_MAX_WORKERS};

$function = IO::Async::Function->new(
$max_workers ? ( max_workers => $max_workers ) : (),

code => sub ( $exporter, $batch, $timeout ) {
$exporter->export( $batch, $timeout );
},
);

IO::Async::Loop->new->add($function);

# TODO: Should this be made configurable? The Ruby SDK
# allows users to not start the thread on boot, although
# this is not standard
$function->start;
}

method process ( @items ) {
try {
my $batch = $lock->enter(
sub {
my $overflow = @queue + @items - $max_queue_size;
if ( $overflow > 0 ) {
# If the buffer is full, we drop old spans first
# The queue is always FIFO, even for dropped spans
# This behaviour is not in the spec, but is
# consistent with the Ruby implementation.
# For context, the Go implementation instead
# blocks until there is room in the buffer.
splice @queue, 0, $overflow;
$self->report_dropped( 'buffer-full', $overflow );
}

push @queue, @items;

return [] if @queue < $batch_size;

$metrics->set_gauge_to(
buffer_use => @queue / $max_queue_size
) if @queue;

[ splice @queue, 0, $batch_size ];
}
);

return unless @$batch;

$function->call(
args => [ $exporter, $batch, $exporter_timeout ],
on_result => sub ( $type, $result ) {
my $count = scalar @$batch;

return $self->report_result( EXPORT_RESULT_FAILURE, $count )
unless $type eq 'return';

$self->report_result( $result, $count );
},
);
}
catch($e) {
warn $e;
}

return;
}

method report_dropped ( $reason, $count ) { $self }

method report_result ( $result, $count ) { $result }

async method shutdown ( $timeout = undef ) {
return EXPORT_RESULT_SUCCESS if $done;

$done = 1;

my $start = timeout_timestamp;

# TODO: The Ruby implementation ignores whether the force_flush
# times out. Is this correct?
await $self->force_flush( maybe_timeout $timeout, $start );

$self->report_dropped( 'terminating', scalar @queue ) if @queue;
@queue = ();

await $function->stop if $function->workers;

await $exporter->shutdown( maybe_timeout $timeout, $start );
}

async method force_flush ( $timeout = undef ) {
return EXPORT_RESULT_SUCCESS if $done;

my $start = timeout_timestamp;

my @stack = $lock->enter( sub { splice @queue, 0, @queue } );

defer {
# If we still have any spans left it has to be because we
# timed out and couldn't export them. In that case, we drop
# them and report
$self->report_dropped( 'force-flush', scalar @stack ) if @stack;
}

while ( @stack ) {
my $remaining = maybe_timeout $timeout, $start;
return EXPORT_RESULT_TIMEOUT if $timeout and !$remaining;

my $batch = [ splice @stack, 0, $batch_size ];

my $count = scalar @$batch;
try {
my $result = await $function->call(
args => [ $exporter, $batch, $remaining ],
);

$self->report_result( $result, $count );

return $result unless $result == EXPORT_RESULT_SUCCESS;
}
catch ($e) {
return $self->report_result( EXPORT_RESULT_FAILURE, $count);
}
}

await $exporter->force_flush( maybe_timeout $timeout, $start );
}

method DESTROY {
try { $function->stop->get }
catch ($e) { }
}
}
39 changes: 39 additions & 0 deletions lib/OpenTelemetry/Processor/Simple.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use Object::Pad ':experimental(init_expr)';
# ABSTRACT: A basic OpenTelemetry span processor

package OpenTelemetry::Processor::Simple;

our $VERSION = '0.014';

class OpenTelemetry::Processor::Simple :does(OpenTelemetry::Processor) {
use Feature::Compat::Try;
use Future::AsyncAwait;
use OpenTelemetry::X;

field $exporter :param;

ADJUST {
die OpenTelemetry::X->create(
Invalid => "Exporter must implement the OpenTelemetry::Exporter interface: " . ( ref $exporter || $exporter )
) unless $exporter && $exporter->DOES('OpenTelemetry::Exporter');
}

method process ( @items ) {
try {
$exporter->export(\@items);
}
catch ($e) {
warn $e;
}

return;
}

async method shutdown ( $timeout = undef ) {
await $exporter->shutdown( $timeout );
}

async method force_flush ( $timeout = undef ) {
await $exporter->force_flush( $timeout );
}
}
Loading

0 comments on commit fc22c58

Please sign in to comment.