-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): API and Event processing improvements #1152
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Eyeballed the PR and left some comments Going to come back to it.
@@ -1004,6 +1001,40 @@ impl Display for StateChangeId { | |||
} | |||
} | |||
|
|||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] | |||
pub struct ProcessorId { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub struct ProcessorId(ProcessorType) would do the job
@@ -1012,13 +1043,175 @@ pub struct StateChange { | |||
pub created_at: u64, | |||
pub processed_at: Option<u64>, | |||
} | |||
impl StateChange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are changing the key prefix for statechanges, lets add namespaces to the keys, so that we can scan by namespaces. This would make it easy to not process state changes serially from all namespaces. Global state changes such as cluster topology changes could have global
as a prefix, and make sure global is never used as namespace name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea actually, also makes it possible to migrate to processor per namespace very easily.
|
||
#[derive(Debug, Clone, Serialize, Deserialize)] | ||
pub struct Namespace { | ||
pub name: String, | ||
pub created_at: u64, | ||
} | ||
|
||
pub struct StateMachineUpdateRequest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest moving these types into a requests.rs file in the interst of keeping this file smaller. Long files are hard for editors to handle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, let me see if they can go back in state_store based on create dependencies.
let scheduler_invocations = meter | ||
.f64_histogram("scheduler_invocations") | ||
.with_description("Scheduler invocation latencies in seconds") | ||
let processors_duration = meter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not measure both processor duration and scheduler invocation duration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are but using a label on the metrics.
So the same metric name is used for all processors.
#[derive(Debug, Clone)] | ||
struct ProcessorHandle { | ||
state_change_tx: tokio::sync::watch::Sender<()>, | ||
request_tx: tokio::sync::mpsc::UnboundedSender<DispatchedRequest>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably not a good idea. Keeping this to even 5k might be better than making this unbounded. We could keep this in the first iteration of this PR though if it makes things easy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change this as part of splitting up of having a processor per namespace, currently this behaves a lot like unbounded API requests coming in. Except taking more memory.
Edit: I'll change this in this PR.
|
||
impl StateChangeDispatcher for Dispatcher { | ||
fn dispatch_event(&self, changes: Vec<StateChange>) -> Result<()> { | ||
changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am adding comments and tweaking it a bit.
/// Dispatches state changes to the appropriate processors based on the change types.
fn dispatch_state_change(&self, changes: Vec<StateChange>) -> Result<()> {
changes
.iter()
// get all processor ids that are subscribed to the state change
.flat_map(|sc| self.subscribed_processor_ids(sc.clone()))
// dedupe the processor ids
.unique()
// dispatch the state change to each processor
.try_for_each(|processor_id| -> Result<()> {
let handle = match self.processors.get(&processor_id) {
Some(tx) => Ok(tx.value().clone()),
None => Err(anyhow::anyhow!("processor not found: {:?}", processor_id)),
}?;
handle.state_change_tx.send(())?;
Ok(())
})?;
Ok(())
}
New change.
|
edf60e2
to
7148a3f
Compare
7148a3f
to
1332cd9
Compare
1332cd9
to
f6c580d
Compare
Context
We've seen in:
That API calls that can mutate the state while jobs are happening (task creation, allocation, replay) result in a multitude of edge cases.
What
We decided to take a no-locking approach to fix the root cause which requires a significant overhaul of the contract between API, state machines and tasks/worker.
This is as opposed to the alternative which would require leveraging rocksdb get_for_update locks.
The benefits of the chosen approach are:
The downsides of this approach are:
Changes:
requests_inflight
number of requests in flight. (Per processor)requests_queue_duration_seconds
time spent waiting for a processor in seconds. (per processor)processor_duration_seconds
Processors latencies in seconds. (per processor)Diagram
Metrics example:
Details
HELP requests_inflight number of requests in flight
TYPE requests_inflight gauge
requests_inflight{processor="Namespace",otel_scope_name="dispatcher_metrics"} 292
requests_inflight{processor="TaskAllocator",otel_scope_name="dispatcher_metrics"} 10
HELP requests_queue_duration_seconds time spent waiting for a processor in seconds
TYPE requests_queue_duration_seconds histogram
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.001"} 132
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.005"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.01"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.05"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.1"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="0.5"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="1"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="5"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="10"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="25"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="50"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="75"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="100"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="250"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="500"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="750"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="1000"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="2500"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="5000"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="7500"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="10000"} 146
requests_queue_duration_seconds_bucket{processor="Namespace",otel_scope_name="dispatcher_metrics",le="+Inf"} 146
requests_queue_duration_seconds_sum{processor="Namespace",otel_scope_name="dispatcher_metrics"} 0.035963831999999994
requests_queue_duration_seconds_count{processor="Namespace",otel_scope_name="dispatcher_metrics"} 146
Future:
Testing
Contribution Checklist
make fmt
in the package directory.make fmt
inserver/
.