Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(instrumentation): Add event-based tracking implementation across providers #2541

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,22 @@
import os
from typing import Collection
from opentelemetry.instrumentation.alephalpha.config import Config
from opentelemetry.instrumentation.alephalpha.utils import dont_throw
from opentelemetry.instrumentation.alephalpha.utils import (
dont_throw,
get_llm_request_attributes,
set_span_attribute,
handle_span_exception,
CompletionBuffer,
)
from opentelemetry.instrumentation.ai_providers.utils import (
create_prompt_event,
create_completion_event,
)
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.trace import get_tracer, SpanKind, Status, StatusCode
from opentelemetry._events import EventLogger

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
Expand All @@ -35,138 +45,160 @@
},
]


def should_send_prompts():
return (
os.getenv("TRACELOOP_TRACE_CONTENT") or "true"
).lower() == "true" or context_api.get_value("override_enable_content_tracing")


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


@dont_throw
def _set_input_attributes(span, llm_request_type, args, kwargs):
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, kwargs.get("model"))

if should_send_prompts():
if llm_request_type == LLMRequestTypeValues.COMPLETION:
_set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user")
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.content",
args[0].prompt.items[0].text,
)


@dont_throw
def _set_response_attributes(span, llm_request_type, response):
if should_send_prompts():
if llm_request_type == LLMRequestTypeValues.COMPLETION:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_COMPLETIONS}.0.content",
response.completions[0].completion,
)
_set_span_attribute(
span, f"{SpanAttributes.LLM_COMPLETIONS}.0.role", "assistant"
)

input_tokens = getattr(response, "num_tokens_prompt_total", 0)
output_tokens = getattr(response, "num_tokens_generated", 0)

_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_TOTAL_TOKENS,
input_tokens + output_tokens,
)
_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_COMPLETION_TOKENS,
output_tokens,
)
_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_PROMPT_TOKENS,
input_tokens,
)


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, to_wrap):
def wrapper(wrapped, instance, args, kwargs):
return func(tracer, to_wrap, wrapped, instance, args, kwargs)

return wrapper

return _with_tracer


def _llm_request_type_by_method(method_name):
if method_name == "complete":
return LLMRequestTypeValues.COMPLETION
else:
return LLMRequestTypeValues.UNKNOWN


@_with_tracer_wrapper
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
class StreamWrapper:
"""Wrapper for streaming responses."""
def __init__(self, stream, span, event_logger: EventLogger, capture_content: bool):
self.stream = stream
self.span = span
self.event_logger = event_logger
self.capture_content = capture_content
self.completion_buffer = CompletionBuffer(0)
self._span_started = False
self.setup()

def setup(self):
if not self._span_started:
self._span_started = True

def cleanup(self):
if self._span_started:
if self.completion_buffer.text_content:
# Emit completion event with buffered content
self.event_logger.emit(
create_completion_event(
{"content": self.completion_buffer.get_content()},
system="AlephAlpha",
capture_content=self.capture_content
)
)
self.span.end()
self._span_started = False

def __iter__(self):
return self

def __next__(self):
try:
chunk = next(self.stream)
if chunk.completions:
self.completion_buffer.append_content(chunk.completions[0].completion)
return chunk
except StopIteration:
self.cleanup()
raise
except Exception as error:
handle_span_exception(self.span, error)
raise

def _wrap(tracer, event_logger: EventLogger, capture_content: bool):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
):
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
llm_request_type = _llm_request_type_by_method(to_wrap.get("method"))
span = tracer.start_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "AlephAlpha",
SpanAttributes.LLM_REQUEST_TYPE: llm_request_type.value,
},
)
if span.is_recording():
_set_input_attributes(span, llm_request_type, args, kwargs)

response = wrapped(*args, **kwargs)

if response:
if span.is_recording():

_set_response_attributes(span, llm_request_type, response)
span.set_status(Status(StatusCode.OK))

span.end()
return response

def wrapper(wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
):
return wrapped(*args, **kwargs)

span_attributes = get_llm_request_attributes(kwargs)
with tracer.start_as_current_span(
name="alephalpha.completion",
kind=SpanKind.CLIENT,
attributes=span_attributes,
end_on_exit=False,
) as span:
try:
if span.is_recording():
# Emit prompt event
if should_send_prompts():
prompt_text = args[0].prompt.items[0].text
event_logger.emit(
create_prompt_event(
{"content": prompt_text},
system="AlephAlpha",
capture_content=capture_content
)
)

result = wrapped(*args, **kwargs)

# Handle streaming responses
if kwargs.get("stream", False):
return StreamWrapper(result, span, event_logger, capture_content)

if span.is_recording() and should_send_prompts():
# Emit completion event
completion_text = result.completions[0].completion
event_logger.emit(
create_completion_event(
{"content": completion_text},
system="AlephAlpha",
capture_content=capture_content
)
)

# Set usage attributes
input_tokens = getattr(result, "num_tokens_prompt_total", 0)
output_tokens = getattr(result, "num_tokens_generated", 0)
set_span_attribute(
span,
SpanAttributes.GEN_AI_USAGE_TOTAL_TOKENS,
input_tokens + output_tokens,
)
set_span_attribute(
span,
SpanAttributes.GEN_AI_USAGE_COMPLETION_TOKENS,
output_tokens,
)
set_span_attribute(
span,
SpanAttributes.GEN_AI_USAGE_PROMPT_TOKENS,
input_tokens,
)

span.set_status(Status(StatusCode.OK))
span.end()
return result

except Exception as error:
handle_span_exception(span, error)
raise

return wrapper

class AlephAlphaInstrumentor(BaseInstrumentor):
"""An instrumentor for Aleph Alpha's client library."""

def __init__(self, exception_logger=None):
super().__init__()
Config.exception_logger = exception_logger
Config.use_legacy_attributes = True

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
Config.use_legacy_attributes = kwargs.get("use_legacy_attributes", True)

tracer = get_tracer(__name__, __version__, tracer_provider)
event_logger = EventLogger(
__name__,
__version__,
tracer_provider=tracer_provider,
)

capture_content = kwargs.get("capture_content", True)

for wrapped_method in WRAPPED_METHODS:
wrap_method = wrapped_method.get("method")
wrap_function_wrapper(
"aleph_alpha_client",
f"Client.{wrap_method}",
_wrap(tracer, wrapped_method),
_wrap(tracer, event_logger, capture_content),
)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
class Config:
"""Config for AlephAlpha instrumentation"""

exception_logger = None
use_legacy_attributes = True # Controls whether to use legacy attribute-based approach or new event-based approach
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import logging
from typing import Optional, Dict, Any
from opentelemetry.instrumentation.alephalpha.config import Config
import traceback

from opentelemetry.trace import Status, StatusCode
from opentelemetry.semconv_ai import (
SpanAttributes,
LLMRequestTypeValues,
)

def dont_throw(func):
"""
A decorator that wraps the passed in function and logs exceptions instead of throwing them.

@param func: The function to wrap
@return: The wrapper function
"""
# Obtain a logger specific to the function's module
logger = logging.getLogger(func.__module__)

def wrapper(*args, **kwargs):
Expand All @@ -26,3 +27,40 @@ def wrapper(*args, **kwargs):
Config.exception_logger(e)

return wrapper

def get_llm_request_attributes(kwargs: Dict[str, Any], instance: Any = None) -> Dict[str, Any]:
"""Get common LLM request attributes."""
attributes = {
SpanAttributes.LLM_SYSTEM: "AlephAlpha",
SpanAttributes.LLM_REQUEST_TYPE: LLMRequestTypeValues.COMPLETION.value,
}

if "model" in kwargs:
attributes[SpanAttributes.GEN_AI_REQUEST_MODEL] = kwargs["model"]

return attributes

def set_span_attribute(span, name: str, value: Any):
"""Set span attribute if value is not None and not empty."""
if value is not None and value != "":
span.set_attribute(name, value)

def handle_span_exception(span, error: Exception):
"""Handle span exception by recording error and ending span."""
if span.is_recording():
span.record_exception(error)
span.set_status(Status(StatusCode.ERROR))
span.end()

class CompletionBuffer:
"""Buffer for streaming completions."""
def __init__(self, index: int):
self.index = index
self.text_content = []
self.finish_reason = None

def append_content(self, content: str):
self.text_content.append(content)

def get_content(self) -> str:
return "".join(self.text_content)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.instrumentation.alephalpha import AlephAlphaInstrumentor

pytest_plugins = []
pytest_plugins = ["pytest_recording"]


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -38,4 +38,10 @@ def environment():

@pytest.fixture(scope="module")
def vcr_config():
return {"filter_headers": ["authorization"], "decode_compressed_response": True}
return {
"filter_headers": ["authorization", "Authorization"],
"decode_compressed_response": True,
"record_mode": os.getenv("VCR_RECORD_MODE", "none"),
"filter_post_data_parameters": ["token"],
"match_on": ["method", "scheme", "host", "port", "path", "query"],
}
Loading
Loading