-
-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Supervisor shutdown options #65
base: main
Are you sure you want to change the base?
Changes from all commits
0e62918
b0144d7
23980a4
51d375f
7239299
8ca1bd2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
// TODO: specify amount of time permitted for shut-down | ||
import gleam/erlang/atom | ||
import gleam/erlang/node.{type Node} | ||
import gleam/erlang/process.{type Pid, type Subject} | ||
import gleam/option.{type Option, None, Some} | ||
|
@@ -40,6 +40,7 @@ pub opaque type ChildSpec(msg, argument, returning) { | |
// TODO: merge this into one field | ||
start: fn(argument) -> Result(Subject(msg), StartError), | ||
returning: fn(argument, Subject(msg)) -> returning, | ||
shutdown: ShutdownOptions, | ||
) | ||
} | ||
|
||
|
@@ -57,6 +58,11 @@ type Instruction { | |
StartFrom(Pid) | ||
} | ||
|
||
type ShutdownOptions { | ||
BrutalKill | ||
Timeout(Int) | ||
} | ||
|
||
type State(a) { | ||
State( | ||
restarts: IntensityTracker, | ||
|
@@ -96,10 +102,72 @@ fn start_child( | |
)) | ||
} | ||
|
||
// TODO: more sophsiticated stopping of processes. i.e. give supervisors | ||
// more time to shut down. | ||
fn shutdown_child(pid: Pid, _spec: ChildSpec(msg, arg_1, arg_2)) -> Nil { | ||
process.send_exit(pid) | ||
@external(erlang, "erlang", "exit") | ||
fn erlang_exit(pid: Pid, reason: atom.Atom) -> Nil | ||
|
||
fn shutdown_child(pid: Pid, spec: ChildSpec(msg, arg_1, arg_2)) { | ||
case spec.shutdown { | ||
BrutalKill -> shutdown_child_brutal_kill(pid) | ||
Timeout(timeout) -> shutdown_child_timeout(pid, timeout) | ||
} | ||
} | ||
|
||
fn shutdown_child_timeout(pid: Pid, timeout: Int) { | ||
let monitor = process.monitor_process(pid) | ||
erlang_exit(pid, atom.create_from_string("shutdown")) | ||
|
||
let result = | ||
process.new_selector() | ||
|> process.selecting_process_down(monitor, fn(a) { a }) | ||
|> process.select(timeout) | ||
|
||
case result { | ||
Ok(process.ProcessDown(pid, _reason)) -> { | ||
unlink_flush(pid) | ||
} | ||
|
||
Error(Nil) -> { | ||
process.kill(pid) | ||
|
||
let result = | ||
process.new_selector() | ||
|> process.selecting_process_down(monitor, fn(a) { a }) | ||
|> process.select_forever() | ||
|
||
case result { | ||
process.ProcessDown(pid, _reason) -> unlink_flush(pid) | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn shutdown_child_brutal_kill(pid: Pid) { | ||
let monitor = process.monitor_process(pid) | ||
process.kill(pid) | ||
|
||
let result = | ||
process.new_selector() | ||
|> process.selecting_process_down(monitor, fn(a) { a }) | ||
|> process.select_forever() | ||
|
||
case result { | ||
process.ProcessDown(pid, _reason) -> unlink_flush(pid) | ||
} | ||
} | ||
|
||
// We call unlink in order to guarantee that the 'EXIT' has arrived | ||
// from the dead process. See the [unlink](https://www.erlang.org/doc/apps/erts/erlang.html#unlink/1) docs for details. | ||
fn unlink_flush(pid) { | ||
process.unlink(pid) | ||
let result = | ||
process.new_selector() | ||
|> process.selecting_anything(fn(a) { a }) | ||
|> process.select(0) | ||
|
||
case result { | ||
Error(Nil) -> Nil | ||
Ok(_) -> Nil | ||
} | ||
} | ||
|
||
fn perform_instruction_for_child( | ||
|
@@ -201,7 +269,11 @@ pub fn add( | |
pub fn supervisor( | ||
start: fn(argument) -> Result(Subject(msg), StartError), | ||
) -> ChildSpec(msg, argument, argument) { | ||
ChildSpec(start: start, returning: fn(argument, _channel) { argument }) | ||
ChildSpec( | ||
start: start, | ||
returning: fn(argument, _channel) { argument }, | ||
shutdown: Timeout(5000), | ||
) | ||
} | ||
|
||
/// Prepare a new worker type child. | ||
|
@@ -223,7 +295,11 @@ pub fn supervisor( | |
pub fn worker( | ||
start: fn(argument) -> Result(Subject(msg), StartError), | ||
) -> ChildSpec(msg, argument, argument) { | ||
ChildSpec(start: start, returning: fn(argument, _channel) { argument }) | ||
ChildSpec( | ||
start: start, | ||
returning: fn(argument, _channel) { argument }, | ||
shutdown: Timeout(5000), | ||
) | ||
} | ||
|
||
// TODO: test | ||
|
@@ -237,7 +313,21 @@ pub fn returning( | |
child: ChildSpec(msg, argument_a, argument_b), | ||
updater: fn(argument_a, Subject(msg)) -> argument_c, | ||
) -> ChildSpec(msg, argument_a, argument_c) { | ||
ChildSpec(start: child.start, returning: updater) | ||
ChildSpec(start: child.start, returning: updater, shutdown: child.shutdown) | ||
} | ||
|
||
/// Change the shutdown timeout | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add more documentation here please 🙏 |
||
pub fn shutdown_timeout( | ||
child: ChildSpec(msg, argument_a, argument_b), | ||
timeout: Int, | ||
) -> ChildSpec(msg, argument_a, argument_b) { | ||
ChildSpec(..child, shutdown: Timeout(timeout)) | ||
} | ||
|
||
pub fn shutdown_brutal_kill( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's this function for? It is undocumented. |
||
child: ChildSpec(msg, argument_a, argument_b), | ||
) -> ChildSpec(msg, argument_a, argument_b) { | ||
ChildSpec(..child, shutdown: BrutalKill) | ||
} | ||
|
||
fn init( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,18 +43,23 @@ pub fn supervisor_test() { | |
|> should.be_ok | ||
|
||
// Assert children have started | ||
let assert Ok(#(1, p)) = process.receive(subject, 10) | ||
let assert Ok(#(2, _)) = process.receive(subject, 10) | ||
let assert Ok(#(3, _)) = process.receive(subject, 10) | ||
let assert Ok(#(1, pid1)) = process.receive(subject, 10) | ||
let assert Ok(#(2, pid2)) = process.receive(subject, 10) | ||
let assert Ok(#(3, pid3)) = process.receive(subject, 10) | ||
let assert Error(Nil) = process.receive(subject, 10) | ||
|
||
// Kill first child an assert they all restart | ||
process.kill(p) | ||
process.kill(pid1) | ||
let assert Ok(#(1, p1)) = process.receive(subject, 10) | ||
let assert Ok(#(2, p2)) = process.receive(subject, 10) | ||
let assert Ok(#(3, _)) = process.receive(subject, 10) | ||
let assert Error(Nil) = process.receive(subject, 10) | ||
|
||
// Ensure that the original processes are dead | ||
should.be_false(process.is_alive(pid1)) | ||
should.be_false(process.is_alive(pid2)) | ||
should.be_false(process.is_alive(pid3)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you move these to a new test please 🙏 |
||
|
||
// Kill second child an assert the following children restart | ||
process.kill(p2) | ||
let assert Ok(#(2, _)) = process.receive(subject, 10) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This drops the first message in the inbox, but this may not be the exit message. It needs to drop specifically the expected message here.