-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implemented reconnection logic in queue-worker #52
Implemented reconnection logic in queue-worker #52
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This time, you don't have concurrent access between connect/reconnect and a publish, however, you do possibly unsubscribe and close connection from the signal handler.
So I would replace the individual connect and subscribe (and connectAndSubscribe) with something like setup() or init() and have that use the lock for the whole time.
The NatsQueue.unsubscribe() and .close() would need to get the lock before attempting to do anything.
Finally, this time I would recommend being able to cancel the sleep in reconnect() as to not block the Ctrl+C.
Thanks @kozlovic. I'll work on that tomorrow! |
6190d33
to
5a1855f
Compare
@kozlovic I think I have addressed all things from your comment. Would you be so kind, and do me PR once again? :-) Thanks for your time! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the for
loop in reconnect
could be simplified to remove the label and continue. My preference is not to use context, this is just what it is, a preference. I think that the context usage is correct, but again, this is not something I usually use so not too familiar with its usage.
5a1855f
to
0c37ef2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
I guess |
@@ -58,6 +58,26 @@ func (ReadConfig) Read() QueueWorkerConfig { | |||
} | |||
} | |||
|
|||
if value, exists := os.LookupEnv("faas_max_reconnect"); exists { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure we should have the faas
prefix here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, do you want me to remove it or not?
} | ||
} | ||
if err := natsQueue.closeConnection(); err != nil { | ||
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a panic or just logged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Panic should be fine, because it'll still unwind the stack, and it's after we received signal to close everything
val, err := strconv.Atoi(value) | ||
|
||
if err != nil { | ||
log.Println("converting faas_max_reconnect to int error:", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what is the default otherwise? 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
reconnectDelayVal, durationErr := time.ParseDuration(value) | ||
|
||
if durationErr != nil { | ||
log.Println("parse env var: faas_reconnect_delay as time.Duration error:", durationErr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0
main.go
Outdated
@@ -49,17 +50,134 @@ func makeClient() http.Client { | |||
return proxyClient | |||
} | |||
|
|||
type NatsQueue struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put it in its own file to help with the diff and maintenance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With methods, or just plain struct?
connMutex: &sync.RWMutex{}, | ||
maxReconnect: config.MaxReconnect, | ||
reconnectDelay: config.ReconnectDelay, | ||
quitCh: make(chan struct{}), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whilst making changes for the other comments, can struct{}
be exchanged for bool
or int
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I strongly disagree with that one. This is common pattern to do signalling channels when you are not interested in the value being sent to the channel to use struct{}
because it is not allocating any memory for that.
reconnectDelay time.Duration | ||
conn stan.Conn | ||
connMutex *sync.RWMutex | ||
quitCh chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If quitCh
is for a graceful shutdown and we're editing the file how about shutdownCh
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we even need a suffix of ch
in that instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is ch
suffix it's easier to understand looking into to code. At least for me and I saw that pattern a lot in Go code. If you want I can remove it but I will stand behind the idea to leave the suffix.
0c37ef2
to
08424a2
Compare
How it was tested: I have testd it by deploying it to my local cluster with and without persistence store in nats (mysql) and I tried to simulate disconnections from nats server by killing the containers with it, and trying to async invoke functions Signed-off-by: Bart Smykla <[email protected]>
08424a2
to
1f14fad
Compare
Hi @bartsmykla thanks for editing the commit. Do you know why the build failed? |
Please see above. |
I'm not sure why this line is in the PR, but it doesn't appear to work?
|
@alexellis It shouldn't be here. When doing rebasing I have commited by mistake a line from nats-streaming prometheus exporter testing. :-/ |
OK. I'll remove it and fix another bug I found in the code via #54 |
It's the second part of: #49 which should resolve openfaas/faas#1031 and #33
Signed-off-by: Bart Smykla [email protected]
Description
Implemented reconnection logic in queue-worker
Motivation and Context
How Has This Been Tested?
I have testd it by deploying it to my local cluster with and without
persistence store in nats (mysql) and I tried to simulate
disconnections from nats server by killing the containers with it,
and trying to async invoke functions.
Logs after I scaled nats server to zero, waited some time (few seconds), and scaled it back again:
Types of changes
Checklist:
git commit -s