Skip to content

Commit

Permalink
improve task creation and cancellation
Browse files Browse the repository at this point in the history
If a FrameProcessor needs to create a task it should use
FrameProcessor.create_task() and FrameProcessor.cancel_task(). This gives
Pipecat more control over all the tasks that are created in Pipecat.

Both functions internally use the utils module: utils.create_task() and
utils.cancel_task() which should also be used outside of FrameProcessors. That
is, unless strictly necessary, we should avoid using asyncio.create_task().
  • Loading branch information
aconchillo committed Jan 24, 2025
1 parent b881dd5 commit 9017c22
Show file tree
Hide file tree
Showing 31 changed files with 480 additions and 565 deletions.
3 changes: 1 addition & 2 deletions examples/foundational/22b-natural-conversation-proposal.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ async def _start(self):
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())

async def _stop(self):
self._gate_task.cancel()
await self._gate_task
await self.cancel_task(self._gate_task)

async def _gate_task_handler(self):
while True:
Expand Down
39 changes: 19 additions & 20 deletions examples/foundational/22c-natural-conversation-mixed-llms.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@
Examples:
# Complete Wh-question
[{"role": "assistant", "content": "I can help you learn."},
[{"role": "assistant", "content": "I can help you learn."},
{"role": "user", "content": "What's the fastest way to learn Spanish"}]
Output: YES
# Complete Yes/No question despite STT error
[{"role": "assistant", "content": "I know about planets."},
[{"role": "assistant", "content": "I know about planets."},
{"role": "user", "content": "Is is Jupiter the biggest planet"}]
Output: YES
Expand All @@ -118,12 +118,12 @@
Examples:
# Direct instruction
[{"role": "assistant", "content": "I can explain many topics."},
[{"role": "assistant", "content": "I can explain many topics."},
{"role": "user", "content": "Tell me about black holes"}]
Output: YES
# Action demand
[{"role": "assistant", "content": "I can help with math."},
[{"role": "assistant", "content": "I can help with math."},
{"role": "user", "content": "Solve this equation x plus 5 equals 12"}]
Output: YES
Expand All @@ -134,12 +134,12 @@
Examples:
# Specific answer
[{"role": "assistant", "content": "What's your favorite color?"},
[{"role": "assistant", "content": "What's your favorite color?"},
{"role": "user", "content": "I really like blue"}]
Output: YES
# Option selection
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
[{"role": "assistant", "content": "Would you prefer morning or evening?"},
{"role": "user", "content": "Morning"}]
Output: YES
Expand All @@ -153,17 +153,17 @@
Examples:
# Self-correction reaching completion
[{"role": "assistant", "content": "What would you like to know?"},
[{"role": "assistant", "content": "What would you like to know?"},
{"role": "user", "content": "Tell me about... no wait, explain how rainbows form"}]
Output: YES
# Topic change with complete thought
[{"role": "assistant", "content": "The weather is nice today."},
[{"role": "assistant", "content": "The weather is nice today."},
{"role": "user", "content": "Actually can you tell me who invented the telephone"}]
Output: YES
# Mid-sentence completion
[{"role": "assistant", "content": "Hello I'm ready."},
[{"role": "assistant", "content": "Hello I'm ready."},
{"role": "user", "content": "What's the capital of? France"}]
Output: YES
Expand All @@ -175,12 +175,12 @@
Examples:
# Acknowledgment
[{"role": "assistant", "content": "Should we talk about history?"},
[{"role": "assistant", "content": "Should we talk about history?"},
{"role": "user", "content": "Sure"}]
Output: YES
# Disagreement with completion
[{"role": "assistant", "content": "Is that what you meant?"},
[{"role": "assistant", "content": "Is that what you meant?"},
{"role": "user", "content": "No not really"}]
Output: YES
Expand All @@ -194,12 +194,12 @@
Examples:
# Word repetition but complete
[{"role": "assistant", "content": "I can help with that."},
[{"role": "assistant", "content": "I can help with that."},
{"role": "user", "content": "What what is the time right now"}]
Output: YES
# Missing punctuation but complete
[{"role": "assistant", "content": "I can explain that."},
[{"role": "assistant", "content": "I can explain that."},
{"role": "user", "content": "Please tell me how computers work"}]
Output: YES
Expand All @@ -211,12 +211,12 @@
Examples:
# Filler words but complete
[{"role": "assistant", "content": "What would you like to know?"},
[{"role": "assistant", "content": "What would you like to know?"},
{"role": "user", "content": "Um uh how do airplanes fly"}]
Output: YES
# Thinking pause but incomplete
[{"role": "assistant", "content": "I can explain anything."},
[{"role": "assistant", "content": "I can explain anything."},
{"role": "user", "content": "Well um I want to know about the"}]
Output: NO
Expand All @@ -241,17 +241,17 @@
Examples:
# Incomplete despite corrections
[{"role": "assistant", "content": "What would you like to know about?"},
[{"role": "assistant", "content": "What would you like to know about?"},
{"role": "user", "content": "Can you tell me about"}]
Output: NO
# Complete despite multiple artifacts
[{"role": "assistant", "content": "I can help you learn."},
[{"role": "assistant", "content": "I can help you learn."},
{"role": "user", "content": "How do you I mean what's the best way to learn programming"}]
Output: YES
# Trailing off incomplete
[{"role": "assistant", "content": "I can explain anything."},
[{"role": "assistant", "content": "I can explain anything."},
{"role": "user", "content": "I was wondering if you could tell me why"}]
Output: NO
"""
Expand Down Expand Up @@ -374,8 +374,7 @@ async def _start(self):
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())

async def _stop(self):
self._gate_task.cancel()
await self._gate_task
await cancel_task(self._gate_task)

async def _gate_task_handler(self):
while True:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@
)
from pipecat.processors.filters.function_filter import FunctionFilter
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.user_idle_processor import UserIdleProcessor
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.deepgram import DeepgramSTTService
from pipecat.services.google import GoogleLLMContext, GoogleLLMService
from pipecat.sync.base_notifier import BaseNotifier
from pipecat.sync.event_notifier import EventNotifier
Expand Down Expand Up @@ -440,11 +438,11 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):

if isinstance(frame, UserStartedSpeakingFrame):
if self._idle_task:
self._idle_task.cancel()
await self.cancel_task(self._idle_task)
elif isinstance(frame, TextFrame) and frame.text.startswith("YES"):
logger.debug("Completeness check YES")
if self._idle_task:
self._idle_task.cancel()
await self.cancel_task(self._idle_task)
await self.push_frame(UserStoppedSpeakingFrame())
await self._audio_accumulator.reset()
await self._notifier.notify()
Expand Down Expand Up @@ -602,8 +600,7 @@ async def _start(self):
self._gate_task = self.get_event_loop().create_task(self._gate_task_handler())

async def _stop(self):
self._gate_task.cancel()
await self._gate_task
await self.cancel_task(self._gate_task)

async def _gate_task_handler(self):
while True:
Expand Down
2 changes: 1 addition & 1 deletion src/pipecat/audio/vad/silero.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,5 +159,5 @@ def voice_confidence(self, buffer) -> float:
return new_confidence
except Exception as e:
# This comes from an empty audio array
logger.exception(f"Error analyzing audio with Silero VAD: {e}")
logger.error(f"Error analyzing audio with Silero VAD: {e}")
return 0
50 changes: 20 additions & 30 deletions src/pipecat/pipeline/parallel_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,18 @@ async def _start(self):

async def _stop(self):
# The up task doesn't receive an EndFrame, so we just cancel it.
self._up_task.cancel()
await self._up_task
# The down tasks waits for the last EndFrame send by the internal
await self.cancel_task(self._up_task)
# The down tasks waits for the last EndFrame sent by the internal
# pipelines.
await self._down_task

async def _cancel(self):
self._up_task.cancel()
await self._up_task
self._down_task.cancel()
await self._down_task
await self.cancel_task(self._up_task)
await self.cancel_task(self._down_task)

async def _create_tasks(self):
loop = self.get_event_loop()
self._up_task = loop.create_task(self._process_up_queue())
self._down_task = loop.create_task(self._process_down_queue())
self._up_task = self.create_task(self._process_up_queue())
self._down_task = self.create_task(self._process_down_queue())

async def _drain_queues(self):
while not self._up_queue.empty:
Expand All @@ -185,32 +181,26 @@ async def _parallel_push_frame(self, frame: Frame, direction: FrameDirection):

async def _process_up_queue(self):
while True:
try:
frame = await self._up_queue.get()
await self._parallel_push_frame(frame, FrameDirection.UPSTREAM)
self._up_queue.task_done()
except asyncio.CancelledError:
break
frame = await self._up_queue.get()
await self._parallel_push_frame(frame, FrameDirection.UPSTREAM)
self._up_queue.task_done()

async def _process_down_queue(self):
running = True
while running:
try:
frame = await self._down_queue.get()
frame = await self._down_queue.get()

endframe_counter = self._endframe_counter.get(frame.id, 0)
endframe_counter = self._endframe_counter.get(frame.id, 0)

# If we have a counter, decrement it.
if endframe_counter > 0:
self._endframe_counter[frame.id] -= 1
endframe_counter = self._endframe_counter[frame.id]
# If we have a counter, decrement it.
if endframe_counter > 0:
self._endframe_counter[frame.id] -= 1
endframe_counter = self._endframe_counter[frame.id]

# If we don't have a counter or we reached 0, push the frame.
if endframe_counter == 0:
await self._parallel_push_frame(frame, FrameDirection.DOWNSTREAM)
# If we don't have a counter or we reached 0, push the frame.
if endframe_counter == 0:
await self._parallel_push_frame(frame, FrameDirection.DOWNSTREAM)

running = not (endframe_counter == 0 and isinstance(frame, EndFrame))
running = not (endframe_counter == 0 and isinstance(frame, EndFrame))

self._down_queue.task_done()
except asyncio.CancelledError:
break
self._down_queue.task_done()
Loading

0 comments on commit 9017c22

Please sign in to comment.