Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate size based flush threshold per topic #14765

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

itschrispeck
Copy link
Collaborator

@itschrispeck itschrispeck commented Jan 7, 2025

With multi-stream ingestion added, it makes sense to calculate segment flush thresholds based on individual topics, not at a table level. This patch has been running internally, behavior was validated by checking logs for calculated thresholds.

The metrics should be backwards compatible (at least for our metrics system they are, not familiar w/ all). For example, the gauge essentially changes from numRowsThreshold.tableName to numRowsThreshold.tableName.topicName

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more on why do we want to separate size info for each topic? If the data is ingested into the same table, they should have the same schema, and most of the case similar data distribution. I guess in most cases user still want to track per table threshold.
If this is indeed needed, can we make it configurable, and add a config flag to enable it?

@codecov-commenter
Copy link

codecov-commenter commented Jan 7, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 63.90%. Comparing base (59551e4) to head (c012d4f).
Report is 1539 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #14765      +/-   ##
============================================
+ Coverage     61.75%   63.90%   +2.15%     
- Complexity      207     1607    +1400     
============================================
  Files          2436     2703     +267     
  Lines        133233   150742   +17509     
  Branches      20636    23289    +2653     
============================================
+ Hits          82274    96339   +14065     
- Misses        44911    47203    +2292     
- Partials       6048     7200    +1152     
Flag Coverage Δ
custom-integration1 100.00% <ø> (+99.99%) ⬆️
integration 100.00% <ø> (+99.99%) ⬆️
integration1 100.00% <ø> (+99.99%) ⬆️
integration2 0.00% <ø> (ø)
java-11 63.83% <100.00%> (+2.12%) ⬆️
java-21 63.77% <100.00%> (+2.15%) ⬆️
skip-bytebuffers-false 63.90% <100.00%> (+2.15%) ⬆️
skip-bytebuffers-true 63.71% <100.00%> (+35.98%) ⬆️
temurin 63.90% <100.00%> (+2.15%) ⬆️
unittests 63.90% <100.00%> (+2.15%) ⬆️
unittests1 56.30% <ø> (+9.41%) ⬆️
unittests2 34.21% <100.00%> (+6.48%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@itschrispeck
Copy link
Collaborator Author

Can you elaborate more on why do we want to separate size info for each topic? If the data is ingested into the same table, they should have the same schema, and most of the case similar data distribution. I guess in most cases user still want to track per table threshold.

For our multi-stream ingestion use case data rarely has the same schema or a similar data distribution. For us, topics are logically separate datasets. The current implementation lead to many index build failures, e.g. forward index size, too many MV values, etc. Beyond build failures, we also saw wild swings in segment sizes as many segments from one topic flushed, and then many segments from another topic flushed.

A concrete example is for observability, we have service A and service B emitting to topics A and B, both ingesting into a single table. Generally they are not emitting the same shape or volume of logs, which causes inaccurate segment size estimations. Even if they are emitting the same shape/size, deployments that change the log fingerprints cannot always happen in sync, so if computed at a table level and service A was upgraded and B wasn't yet, there would be a period where segment build failures are likely. Another case is that we turn on debug level logging temporarily for A, and the same issues happen.

If this is indeed needed, can we make it configurable, and add a config flag to enable it?

What's the downside of leaving this per topic by default? If we make the above assumption, that data should have the same schema/similar data distribution, then computing it at a per topic level should not be measurably different than computing it per table. If the assumption doesn't hold, then we have a better behavior by default. I feel that an extra SegmentSizeBasedFlushThresholdUpdater instance per topic should not be too expensive to hold.

@Jackie-Jiang
Copy link
Contributor

While in most cases, there is only one topic per table (single-stream ingestion). I don't want to make size based threshold updater to be per topic by default. It will cause backward incompatibility for metrics as well.
Another drawback is slower converge time for segment size.
If you can make this only apply to multi-stream ingestion, I'm okay with that. But again, suggest making it configurable

@itschrispeck
Copy link
Collaborator Author

itschrispeck commented Jan 14, 2025

While in most cases, there is only one topic per table (single-stream ingestion). I don't want to make size based threshold updater to be per topic by default. It will cause backward incompatibility for metrics as well.
Another drawback is slower converge time for segment size.
If you can make this only apply to multi-stream ingestion, I'm okay with that. But again, suggest making it configurable

To clarify, the ask here would be to have a table based ctor and table+topic based ctor? and then use the latter for multi-stream ingestion only or if enabled? Wouldn't this add branching/make the code less maintainable for the same functionality?

For the metrics compatibility issue, I can add a second set that excludes the topic tag. The metrics systems I've used all break tags based on ., so I had hoped adding an additional tag would be generally compatible.

re: size convergence time, I feel that making the currently hardcoded weights/multipliers configurable would be a more robust way of optimizing for this

The version we're running is too old to show metrics on segment sizes, but we can see the effect of per stream sizing from the amount of mapped buffers:
image

edit: I pulled on-disk segment sizes from a single node and graphed them over time for a table, we see less variance and notably no more 20GB segments
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants