Skip to content

Conversation

@spiridonov
Copy link
Contributor

@spiridonov spiridonov commented Dec 5, 2025

What this PR does / why we need it:

This change allows a worker to finish processing current jobs and send results back after being stopped.

Which issue(s) this PR fixes:
Fixes #2175

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@spiridonov spiridonov marked this pull request as ready for review December 5, 2025 19:55
@spiridonov spiridonov requested a review from a team as a code owner December 5, 2025 19:55
Copy link
Member

@rfratto rfratto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice start!

Comment on lines +19 to +20
// LocalWorker2 is another address of the local worker.
LocalWorker2 net.Addr = localAddr("worker2")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, the package API shouldn't be leaking concepts used for tests.

Workers are allowed to dial themselves (worker connecting to worker), so you likely don't need to introduce another local address here.

Comment on lines +182 to +186
// Spin up the listener for peer connections
peerConnectionsCtx, peerConnectionsCancel := context.WithCancel(context.Background())
defer peerConnectionsCancel()
listenerCtx, listenerCancel := context.WithCancel(context.Background())
defer listenerCancel()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline: the most important part about a graceful shutdown is we eventually stop accepting new tasks, and wait for current tasks to finish.

We actually don't want to stop accepting new connections, since we may be running a task which depends on a yet-to-be-scheduled scan task, and we need to receive those results to properly compute results.

func (t *thread) Run(ctx context.Context) error {
// thread is stopped. Run will not stop if any job failed, it will log the error and continue
// acceptinh other jobs.
func (t *thread) Run() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simplify this a bit by continuing to pass the context into run, but not attaching the context to the readyRequest.Context. That way <-ctx.Done() acts as you're using t.stopped now, and you don't need to add the additional Stop method.

// lifetime of the thread, but can also be canceled by the scheduler.
req := readyRequest{
Context: ctx,
Context: context.Background(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readyRequest.Context is always context.Background now, so I think we can remove the field from the struct now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants