-
Notifications
You must be signed in to change notification settings - Fork 3.9k
chore: graceful worker shutdown #20136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
rfratto
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice start!
| // LocalWorker2 is another address of the local worker. | ||
| LocalWorker2 net.Addr = localAddr("worker2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, the package API shouldn't be leaking concepts used for tests.
Workers are allowed to dial themselves (worker connecting to worker), so you likely don't need to introduce another local address here.
| // Spin up the listener for peer connections | ||
| peerConnectionsCtx, peerConnectionsCancel := context.WithCancel(context.Background()) | ||
| defer peerConnectionsCancel() | ||
| listenerCtx, listenerCancel := context.WithCancel(context.Background()) | ||
| defer listenerCancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline: the most important part about a graceful shutdown is we eventually stop accepting new tasks, and wait for current tasks to finish.
We actually don't want to stop accepting new connections, since we may be running a task which depends on a yet-to-be-scheduled scan task, and we need to receive those results to properly compute results.
| func (t *thread) Run(ctx context.Context) error { | ||
| // thread is stopped. Run will not stop if any job failed, it will log the error and continue | ||
| // acceptinh other jobs. | ||
| func (t *thread) Run() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can simplify this a bit by continuing to pass the context into run, but not attaching the context to the readyRequest.Context. That way <-ctx.Done() acts as you're using t.stopped now, and you don't need to add the additional Stop method.
| // lifetime of the thread, but can also be canceled by the scheduler. | ||
| req := readyRequest{ | ||
| Context: ctx, | ||
| Context: context.Background(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readyRequest.Context is always context.Background now, so I think we can remove the field from the struct now
What this PR does / why we need it:
This change allows a worker to finish processing current jobs and send results back after being stopped.
Which issue(s) this PR fixes:
Fixes #2175
Special notes for your reviewer:
Checklist
CONTRIBUTING.mdguide (required)featPRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.docs/sources/setup/upgrade/_index.mddeprecated-config.yamlanddeleted-config.yamlfiles respectively in thetools/deprecated-config-checkerdirectory. Example PR