Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait() with timeout #36217

Open
staticfloat opened this issue Jun 9, 2020 · 64 comments · May be fixed by #56974
Open

wait() with timeout #36217

staticfloat opened this issue Jun 9, 2020 · 64 comments · May be fixed by #56974
Labels
parallelism Parallel or distributed computation

Comments

@staticfloat
Copy link
Member

It appears that we do not support timeouts in wait(). To motivate the need for this, it's a pretty critical basic functionality for writing resilient servers; blocking forever due to a dropped packet is unacceptable, so servers must instead be written in an explicitly polling style, which is bad for latency and for CPU usage.

Implementation-wise, wait(c) waits upon a condition for a notify() to push it back onto the scheduler's work queue. It seems to me that timeouts can be implemented by starting a second task that sleep()'s for a certain amount of time, then unconditionally notify()'s that condition. The consumer of the wait() call would then need to disambiguate a timeout from an actual event.

Taking a look at the different things that can be wait()'ed upon, most are implemented in terms of Condition objects, so a very simple @async (sleep(timeout); notify(c)) should work, and the _FDWatcher from FileWatching also notifies a condition in the end, therefore I believe a uniform API is possible here.

@staticfloat staticfloat added the parallelism Parallel or distributed computation label Jun 9, 2020
@staticfloat staticfloat added this to the 1.6 milestone Jun 9, 2020
@davidanthoff
Copy link
Contributor

Maybe something like

function wait_timeout(c, timeout)
    ch = Channel(2)

    @async begin
        res = wait(c)
        push!(ch, Some(res))
    end

    @async begin
        sleep(timeout)
        push!(ch, nothing)
    end

    res = take!(ch)

    return res
end

That way if the return value is nothing, you know the operation timed-out.

@tkf
Copy link
Member

tkf commented Jun 13, 2020

The two implementations suggested here seem to leak tasks. It might be possible to avoid this by using Timer since it's close-able.

In general, it'd be nice if we can take concurrency API seriously #33248 and avoid unstructured concurrency piling up. Timeout is one of the biggest success story of structured concurrency, after all: Timeouts and cancellation for humans — njs blog

@davidanthoff
Copy link
Contributor

The two implementations suggested here seem to leak tasks.

What does that mean?

@tkf
Copy link
Member

tkf commented Jun 13, 2020

It means that tasks spawn by the function are still running after the function call ends. This violates the black box rule; i.e., all the relevant computations end at the end of the function (see the Julep and #33248 (comment)). This is bad because the errors occur in the leaked tasks are silently ignored and there is no backpressure for flooding the task scheduler.

@staticfloat
Copy link
Member Author

Can't that be solved by wrapping the entire function in a @sync block?

@tkf
Copy link
Member

tkf commented Jun 13, 2020

Don't we need to cancel the other task for that? That's why the black box rule and task cancellation are the two must-have properties of the structured concurrency.

(But it might be possible to solve this particular issue by just using Timer without implementing full-blown structured concurrency.)

@staticfloat
Copy link
Member Author

Ah, you're right of course. I got confused and thought we were notifying c automatically here, but we're not.

@JeffBezanson is flooding the task scheduler with tasks all sitting and waiting on things an issue for the runtime? For instance, if I am timing out a lot, I could imagine adding a new task to the scheduler every couple of seconds, and for a long-running instance (like a StorageServer) this could add up to thousands of tasks.

@tkf
Copy link
Member

tkf commented Jun 13, 2020

notifying c

Yeah, notifying c acts as the cancellation in one direction. We can close the Timer for cancellation in the other direction.

@staticfloat
Copy link
Member Author

Even without closing the timer, the timer will eventually elapse, and the task will go away. It may throw an error, but that's fine, because we're not catching them. ;)

Cleaner to close the timer, for sure.

@tkf
Copy link
Member

tkf commented Jun 13, 2020

What happens if you wait on the same thing twice and the first call didn't timeout? If you don't clean up the timer in the first one, isn't there a chance that the first timeout calls notify during the second wait?

(Edit: edited twice; first I thought it was stupid but then realized that it might happen for re-waitable objects like Channel)

@tkf
Copy link
Member

tkf commented Jun 13, 2020

Concretely:

c :: Channel
wait_timeout(c, 0.1) === nothing && return
x = take!(c)
wait_timeout(c, 0.1) === nothing && return  # the timer from the first `wait_timeout` can fire here
y = take!(c)

@staticfloat
Copy link
Member Author

The timer within the first wait_timeout() is signaling on a separate Channel from the one in the second wait_timeout()? I don't see how the second wait_timeout() can be effected by the first.

@tkf
Copy link
Member

tkf commented Jun 13, 2020

It's taking two things from the same channel c. I thought your plan was to notify one of the cond_ objects in the channel (maybe c.cond_wait)?

@staticfloat
Copy link
Member Author

Oh, I understand now. Your concern is that if wait_timeout(c) does a notify(c.cond_wait) internally when its timeout has elapsed (in order to stop the task from its wait()) it will falsely notify someone else who is also waiting on it. That's a good concern.

Perhaps instead we can instead throwto() in order to interrupt the other Task?

using Test

function wait_timeout(c, timeout)
    ch = Channel{Bool}(2)

    happy_task = @async begin
        wait(c)
        push!(ch, true)
    end

    timeout_task = @async begin
        sleep(timeout)
        push!(ch, false)

		# Stop the other task from waiting anymore
		Base.throwto(happy_task, InterruptException())
    end

    return take!(ch)
end

# First, test no wait, wait
@testset "no wait, wait" begin
	c = Channel(10)
	put!(c, 1)
	@test wait_timeout(c, 1.0) == true
	@test take!(c) == 1
	@test wait_timeout(c, 1.0) == false
end

# Next, test wait, no wait
@testset "wait, no wait" begin
	c = Channel(10)
	@test wait_timeout(c, 1.0) == false
	put!(c, 1)
	@test wait_timeout(c, 1.0) == true
	@test take!(c) == 1
end

# Next, test no wait, no wait, after sleeping for more than timeout:
@testset "no wait, no wait" begin
	c = Channel(10)
	put!(c, 1)
	@test wait_timeout(c, 1.0) == true
	@test take!(c) == 1

	sleep(2.0)
	put!(c, 2)
	@test wait_timeout(c, 1.0) == true
	@test take!(c) == 2
end

# Finally, test wait, wait
@testset "wait, wait" begin
	c = Channel(10)
	@test wait_timeout(c, 1.0) == false
	@test wait_timeout(c, 1.0) == false
end

@davidanthoff
Copy link
Contributor

I guess the question is whether we are absolutely positive that happy_task won't leave some shared global state (either c or ch, I guess?) in some corrupted state if that exception is thrown? I don't know how throwto is implemented, but it looks somewhat evil and risky to me :)

Is it so bad to continue to just wait in the happy task? The downside are that the task and the channel live longer than strictly necessary, I guess?

@tkf
Copy link
Member

tkf commented Jun 14, 2020

IIUC it's unsafe to call schedule(task, err, error=true) and throwto(task, err) with the task that is already scheduled. That's how I understand the comments from @vtjnash in https://discourse.julialang.org/t/stop-terminate-a-sub-task-started-with-async/32193/6

Reading the code briefly, my impression is that these functions assume the ownership of the thread-local queue. So, I guess it's unsafe to run them outside the scheduler code. But wait can use some internal assumptions so maybe there is a way out?

@tkf
Copy link
Member

tkf commented Jun 14, 2020

OK, so here is what I meant by Timer-based implementation (now I changed the return value to Bool):

function wait_timeout(c::Channel, timeout::Real)
    cancel = Atomic{Bool}(false)
    isready(c) && return true
    @sync begin
        timer = Timer(timeout)
        try
            @async begin
                try
                    wait(timer)
                catch
                    return
                end
                cancel[] = true
                lock(c.cond_wait) do
                    notify(c.cond_wait)
                end
            end

            lock(c)
            try
                while !isready(c)
                    check_channel_state(c)
                    wait(c.cond_wait)
                    cancel[] && return false
                end
            finally
                unlock(c)
            end
        finally
            close(timer)
        end
    end
    return true
end

I don't think it leaks a task (or more precisely a leaked task would be cleaned up soon enough; it'd be nice if @sync is exception-safe so that we don't have this quirk).

It passes @staticfloat's tests (thanks for sharing those, BTW!).

@davidanthoff
Copy link
Contributor

Alright, this all inspired me to read up on the various links that @tkf posted.

All the structured concurrency stuff is interesting, but in terms of a short term solution that can be added incrementally to packages, I liked .Net's cancellation framework best, by far. We also need something like that for the language server rather sooner than later, and that is the same framework that the VS Code team is using across VS Code. Sooo, I created https://github.com/davidanthoff/CancellationTokens.jl :) For now it is not thread safe, so can only be used with single threaded tasks, but the .Net original is thread safe and the source MIT licensed, so we just need to copy their design over. But the public API of the package should be more or less done. Feedback and help welcome!

@tkf
Copy link
Member

tkf commented Jun 14, 2020

read up on the various links that @tkf posted.

Mission accomplished! 😆

https://github.com/davidanthoff/CancellationTokens.jl

So my initial response was "why not just use Atomic{Bool}" but I guess you have the state machine to ensure that the cancel call ends when all the "receiver" sides acknowledged the request?

@davidanthoff
Copy link
Contributor

There is also support for waiting on a cancel token, which I don't think would work with a pure atomic, right? And the .Net implementation supports callback handlers, which I haven't added, but I think also need more than just an atomic.

@tkf
Copy link
Member

tkf commented Jun 14, 2020

Ah, I see. That makes sense.

@tkf
Copy link
Member

tkf commented Jun 15, 2020

My implementation wait_timeout(c::Channel, timeout::Real) above #36217 (comment) looks like easily generalizable. In particular, I think we can implement waitfirst(waitables...) quite easily.

I wonder if it makes sense to define internal interface something like

wait′(waitable, cancel) -> success::Bool
_notify(waitable)

where cancel is a Ref{Bool}-like object that defines thread-safe cancel[]. For normal wait, we can use

struct NoCancel end
Base.getindex(::NoCancel) = false

It's pretty straightforward to implement this for Channel and Task:

using Base.Threads: Atomic
using Base: check_channel_state

function _wait(t::Task, cancel)
    if !istaskdone(t)
        lock(t.donenotify)
        try
            while !istaskdone(t)
                wait(t.donenotify)
                cancel[] && return false   # added this line
            end
        finally
            unlock(t.donenotify)
        end
    end
    return true   # returning `true` instead
end

function _notify(t::Task)
    lock(t.donenotify) do
        notify(t.donenotify)
    end
end

function wait′(t::Task, cancel = NoCancel())
    t === current_task() && error("deadlock detected: cannot wait on current task")
    ok = _wait(t, cancel)
    if istaskfailed(t)
        throw(TaskFailedException(t))
    end
    return ok
end

function _notify(c::Channel)
    lock(c.cond_wait) do
        notify(c.cond_wait)
    end
end

function wait′(c::Channel, cancel = NoCancel())
    isready(c) && return true   # returning `true` instead
    lock(c)
    try
        while !isready(c)
            check_channel_state(c)
            wait(c.cond_wait)
            cancel[] && return false   # added this line
        end
    finally
        unlock(c)
    end
    return true   # returning `true` instead
end

With this implementation, we can now define

function waitfirst(waitables...)
    winner = Ref{Any}()
    cancel = Atomic{Bool}(false)
    @sync for w in waitables
        @async begin
            if wait′($w, $cancel)
                $cancel[] = true
                foreach(_notify, $waitables)
                $winner[] = $w
            end
        end
    end
    return winner[]
end

I think it's possible to define wait′ and _notify for Timer. Once it's implemented, it's quite easy to derive wait_timeout from waitfirst:

function wait_timeout(w, timeout::Real)
    t = Timer()
    try
       return waitfirst(w, t) === w
    finally
       close(T)
    end
end

@JeffBezanson
Copy link
Member

I don't think a function like one of these should be implemented by notifying the waited-for object. Others might be waiting for the same objects, and they would get spurious wakeups.

@tkf
Copy link
Member

tkf commented Jun 15, 2020

What are the other implementation strategies?

Maybe we can define waitfirst(waitables...) if we have waitfirst(::Condition, ::Condition) as a primitive? I'm guessing that the cancellable wait can then use "shared" (e.g., c.cond_wait for Channel) and "unshared" Conditions. If we notify the unshared condition for cancellation, we can avoid the spurious wakeups?

@vtjnash
Copy link
Member

vtjnash commented Jun 15, 2020

That seems rather complex, like you're trying to put a round peg (old multiplexing event systems) into a square hole (our Task-based ownership design). Our objects are intended to essentially already have both capabilities through the existing Task system, which is why (unlike Go and C#) we have been trying not to additionally bolt things like CancellationTokens and waitmultiple on afterwards (note that the implementation of CancellationTokens appears to itself mostly just be a thin wrapper around Timer and Future! 🙂). Here's how I intended the implementation of this to work:

function wait_until(c, timeout::Real) # `c` is any object that is both wait-able and cancel-able (e.g. any IO or a Channel, etc.)
    timer = Timer(timeout) do t
        isready(c) || close(c)
    end
    try
        return wait(c)
    finally
        close(timer)
    end
end

Short, sweet, simple, no minor resource leaks—and essentially how the timeouts are implemented for the various resources in the FileWatching module.

@davidanthoff
Copy link
Contributor

So essentially you close something that you can wait on to cancel it?

I don't see, though, how that would make something like a cancellation token framework entirely unnecessary, given that you can only really close a thing, not an operation?

@vtjnash
Copy link
Member

vtjnash commented Jun 15, 2020

Because you can also only wait on a thing, and not an operation. The expectation thus is that the only reason you want to be able to cancel something is that there's already a resource attached. Otherwise, why are you trying to waste compute cycles on a pure operation with no output?

@tkf
Copy link
Member

tkf commented Jun 15, 2020

Don't you need close(::Task) for wait_until to work in full generality? For example, wait_until(c::Channel, timeout) shouldn't close c, right? (Discussion above: #36217 (comment)). I can imagine wait_until would work if I do

function wait_until(c::Channel, timeout::Real)
    w = @async wait(c)
    timer = Timer(timeout) do t
        isready(c) || close(w)
    end
    try
        return wait(w)
    finally
        close(timer)
    end
end

I'd be very happy if we have close(::Task) 🙂

@staticfloat
Copy link
Member Author

staticfloat commented Jun 23, 2020

Let's step away from the example of retransmitting on failure and think of the more general case; I want to recv() from a socket, or more generally, wait() upon a resource, get notified by a timeout and end the recv()/wait(), and not destroy the resource (via close()) in the event of a timeout. I assert that that basic functionality is fundamental to writing robust distributed processes. If we cannot express that in Julia, I argue that our event model is fundamentally broken. As weak evidence, I put forth the fact that every major distributed API integrates timeouts into its methods, including the C socket API, the python threading/multiprocessing API and basically any other API you want to look at.

@vtjnash
Copy link
Member

vtjnash commented Jun 23, 2020

That seems like poor evidence, given that we use a platform abstraction layer (libuv) to improve upon the timeout capabilities from those APIs. As we talked about offline, there are good design patterns from only using the few primitives we have already.

@tkf
Copy link
Member

tkf commented Jun 24, 2020

State machines are hard to write, use, and debug.

I'm going to need a reference for that.

These are two examples of "structured programming" approach vs state machine:

  • yield in many languages (e.g., Python) vs Julia's iterate
  • async/await vs callback hell

As a more specific example in concurrent programming, Deadlocks in non-hierarchical CSP - Roman Elizarov - Medium is a nice post explaining how tricky communicating finite state machines can be. There are a few links to relevant discussions on select for building state machines in python-trio/trio#242 (comment).

@StefanKarpinski
Copy link
Member

State machines are hard to write, use, and debug.

I'm going to need a reference for that.

Arguably, the entire point of both closures and coroutines is to express what is ultimately just a state machine in a more convenient, less error prone way. After going to that much language design trouble to not force users to explicitly write out state machines, it seems like a bit of weak sauce to say that here it's suddenly just fine.

@vtjnash
Copy link
Member

vtjnash commented Jun 24, 2020

Thanks for the links. The conclusion in the trio link particularly seems be exactly the same as mine. That first post (Medium) includes a CFSM—it's not an attempt to avoid them, only to demonstrate problems with getting them wrong. And the second one (trio) shows ways to simplify most FSM into linear form (by breaking them up into their independent parts), but hypothesizes that they can't be simplified if any cleanup, timeout, or retry actions are present aside from close. Both links talk about how using timeouts or select to base your program is a recipe for headaches, while using coroutines (as we do) to build your FSM is less error prone as it forces you to consider which parts are independent and isolate them.

I also appreciated this conclusion to the trio posts:

Well, no. The correct behaviour is such case [of reaching a limit such as time] is shutting the client down [by calling close] and possibly logging the event.

simeonschaub pushed a commit to simeonschaub/julia that referenced this issue Jun 28, 2020
Following my own advice at JuliaLang#36217. Avoids a minor temporary resource leak.
simeonschaub pushed a commit to simeonschaub/julia that referenced this issue Aug 11, 2020
Following my own advice at JuliaLang#36217. Avoids a minor temporary resource leak.
@staticfloat staticfloat removed this from the 1.6 milestone Aug 13, 2020
@jebej
Copy link
Contributor

jebej commented Oct 21, 2020

I had also asked about this on Discourse, and I agree that there should be a way to timeout on a wait/read. I would love to be able to use julia to run my instruments, but this has proven impossible without this feature.

In my particular case, I want to work with a SCPI instrument over TCP. This involves sending text commands and reading back the response. If I make a typo in the command, I would want the read to time out so that I can try again.

I find the given solution of needing to close the socket (to abort the read) and recreate everything over again to be pretty poor.

As an example (this could be interactive or not, but I might not be able to send an interrupt):

tcp = connect(host,port)

println(tcp, ":MEASURE:VOLT?")
voltage = readline(tcp) # all good here

println(tcp, ":MEASURE:VILT?") # oh no!
voltage2 = readline(tcp) # blocks forever

Note that I tried the following:

function read_timeout(tcp::TCPSocket, timeout=10)
    # read asynchronously
    ch = Channel{String}(c -> put!(c,readline(tcp)), 0)
    # set timeout timer
    tm = Timer(t -> isready(ch) || close(ch), timeout)
    # wait until value is read or timeout
    out = nothing
    try
        out = take!(ch)
        close(ch)
    catch
        @info "Command timed out!"
    finally
        close(tm)
    end
    return out
end

But it stops working when a timeout does happen, I can't seem to read from the socket anymore, even though it is still open.

@jebej
Copy link
Contributor

jebej commented Apr 20, 2021

For what it's worth, inspired by PR #39027, I was able to write a function to read from a TCPSocket with timeout, which doesn't require killing and re-establishing the whole connection. It seems to work well in the bit of testing I have done.

function read_timeout(tcp::TCPSocket, timeout=5)
    # create timeout timer
    status_before = tcp.status
    tm = Timer(timeout) do t
        lock(tcp.cond)  
        tcp.status = Base.StatusEOF
        notify(tcp.cond)
        unlock(tcp.cond)
    end
    # read synchronously
    out = readline(tcp)
    # check timer
    if isopen(tm) # readline worked
        close(tm)
        return out
    else # got EOF because of timer
        tcp.status = status_before
        @info "Read timed out!"
        return nothing
    end
end

@vtjnash
Copy link
Member

vtjnash commented Apr 20, 2021

You should not mutate tcp.status on the way out–that is dangerous. Note that if you want unreliable connections, building those on top of reliable connections is rather awkward. It is more likely that you want UDP.

@jebej
Copy link
Contributor

jebej commented Apr 20, 2021

It's not that the connection is unreliable, it's that sometimes there is nothing to read and I want to get control back (being able to write again) without having to close the socket. If there is a better way to do this (having a timeout read), I would very much appreciate knowing how.

@vtjnash
Copy link
Member

vtjnash commented Feb 11, 2024

@async read(socket) gets you control back, without needing any other features, and without the complicated asynchronous state machine model necessitated by using wait with a timeout

@NHDaly
Copy link
Member

NHDaly commented Apr 19, 2024

An example where I want this:

# Attempt to gracefully kill the process, because force-killing it
# will get reported as a crash in our test suite. 
please_die_now(process)
# If the process failed to die after 5 seconds, forcibly kill it, since it's possibly stuck,
# which indicates a bug, but we do want to move on with the rest of our tests.
wait_until(process, 5)
if !process_exited(process)
    kill(process, 9)
end
# ...
run_more_test(...)

@StefanKarpinski
Copy link
Member

Just adding that I think that spelling this as wait( ; timeout=n) would be nicer. Perhaps we should triage whether we want this and how.

@StefanKarpinski StefanKarpinski added the triage This should be discussed on a triage call label Apr 22, 2024
@oschulz
Copy link
Contributor

oschulz commented Apr 23, 2024

spelling this as wait( ; timeout=n) would be nicer

That would be awesome, I missed having that many times in the past. Would also be super helpful with channels, for interaction with hardware, in scenarios with distributed workers (remotecall) and so on.

@LilithHafner
Copy link
Member

@NHDaly's example could be written as:

t = @spawn begin
    # Attempt to gracefully kill the process, because force-killing it
    # will get reported as a crash in our test suite. 
    please_die_now(process)
    sleep(5)
    # If the process failed to die after 5 seconds, forcibly kill it, since it's possibly stuck,
    # which indicates a bug, but we do want to move on with the rest of our tests.
    if !process_exited(process)
        kill(process, 9)
    end
end
Base.errormonitor(t)
wait(process) # hang if kill fails or if the task errors
# ...
run_more_test(...)

Which notably hangs if the process does not terminate after kill

@LilithHafner
Copy link
Member

Triage talked about this extensively and didn't come to any conclusions.

@oschulz
Copy link
Contributor

oschulz commented Apr 25, 2024

From a user point I'd say it would be nice to have a wait_for_any_of(channel1, channel2, ...) (one of these could then, for example, receive a message from a timer). Don't know if that's feasible implementation-wise (without polling/spinning)?

See below.

@gbaraldi
Copy link
Member

Julia now has waitall and waitany for tasks so I imagine this work would be an expansion of that work to be able to wait on a timer.

@oschulz
Copy link
Contributor

oschulz commented Apr 25, 2024

Julia now has waitall and waitany for tasks

Oh, I hasn't seen that - thanks @gbaraldi !

@NHDaly
Copy link
Member

NHDaly commented Apr 25, 2024

oh, me neither! Yeah, just adding a timer in there seems like it would address this well! Can we add support for that?

@mind6
Copy link

mind6 commented Jun 25, 2024

Julia now has waitall and waitany for tasks so I imagine this work would be an expansion of that work to be able to wait on a timer.

If you're using waitany to wait for a condition plus a Timer, it still requires a separate task to wait on the condition, and it will not do anything to "unleak" the task for you.

Fundamentally if you cannot interrupt tasks safely, you have to release the object it's waiting on to end the task. The user just has to manage the waitable object to prevent leaks. As previous posts have said, wait_timeout should not assume the object should be closed or notified or what not. It should only guarantee releasing the waitable object later will not cause side-affects, i.e. the waiter task it creates will terminate immediately and make no state changes.

@oschulz
Copy link
Contributor

oschulz commented Jun 25, 2024

I now have a workarounds in ParallelProcessingTools now (wait_for_all and wait_for_any) that use waits (different methods for short and long waiting times) with a dynamic backoff and timeout support.

@LilithHafner
Copy link
Member

Triage wants this (adding support for Timers to waitany would be enough).

@oschulz
Copy link
Contributor

oschulz commented Aug 15, 2024

The one drawback with Timers is that they have a fairly high minimum wait time - possibly too long when doing I/O on low-latency channels, for example. That's why I ended up using this mix of yield and sleep in ParallelProcessingTools.

@LilithHafner LilithHafner removed the triage This should be discussed on a triage call label Aug 29, 2024
@kpamnany kpamnany linked a pull request Jan 6, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parallelism Parallel or distributed computation
Projects
None yet
Development

Successfully merging a pull request may close this issue.