Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f3e9126
lazy cluster topology reload
ndyakov Nov 24, 2025
6ed2349
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 1, 2025
e9da546
fix discrepancies between options structs
ndyakov Dec 2, 2025
644a1aa
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 2, 2025
fc05874
Update osscluster_lazy_reload_test.go
ndyakov Dec 2, 2025
f3bab0d
Update osscluster.go
ndyakov Dec 2, 2025
60c6edc
wip fault with mock proxy
ndyakov Dec 4, 2025
1902cbb
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 4, 2025
853c7f2
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 4, 2025
908d602
make lint happy
ndyakov Dec 4, 2025
aa1f970
fix linter issues
ndyakov Dec 5, 2025
e097b3e
faster tests with mocks
ndyakov Dec 5, 2025
ce7fb37
linter once again
ndyakov Dec 5, 2025
e390e2d
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/CA…
ndyakov Dec 5, 2025
3923484
add complex node test
ndyakov Dec 5, 2025
7971f01
add ci e2e
ndyakov Dec 5, 2025
8e4ea91
use correct redis container
ndyakov Dec 6, 2025
7decf55
e2e fix
ndyakov Dec 6, 2025
9c0d34d
additional e2e tests
ndyakov Dec 11, 2025
27b0f87
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/ad…
ndyakov Dec 11, 2025
a99127f
fix data race
ndyakov Dec 11, 2025
8eef599
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/ad…
ndyakov Dec 16, 2025
115ad8a
fix random shard picker
ndyakov Dec 17, 2025
4ec6958
fix e2e tests
ndyakov Dec 17, 2025
6deefcf
fix for empty endpoint
ndyakov Dec 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions maintnotifications/e2e/logcollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,34 @@ func (tlc *TestLogCollector) DoPrint() {
// MatchFunc is a slice of functions that check the logs for a specific condition
// use in WaitForLogMatchFunc
type MatchFunc struct {
completed atomic.Bool
F func(lstring string) bool
matches []string
found chan struct{} // channel to notify when match is found, will be closed
done func()
completed atomic.Bool
F func(lstring string) bool
matches []string
matchesMu sync.Mutex // protects matches slice
found chan struct{} // channel to notify when match is found, will be closed
done func()
}

func (tlc *TestLogCollector) Printf(_ context.Context, format string, v ...interface{}) {
tlc.mu.Lock()
defer tlc.mu.Unlock()
lstr := fmt.Sprintf(format, v...)
if len(tlc.matchFuncs) > 0 {

// Check if there are match functions to process
// Use matchFuncsMutex to safely read matchFuncs
tlc.matchFuncsMutex.Lock()
hasMatchFuncs := len(tlc.matchFuncs) > 0
// Create a copy of matchFuncs to avoid holding the lock while processing
matchFuncsCopy := make([]*MatchFunc, len(tlc.matchFuncs))
copy(matchFuncsCopy, tlc.matchFuncs)
tlc.matchFuncsMutex.Unlock()

if hasMatchFuncs {
go func(lstr string) {
for _, matchFunc := range tlc.matchFuncs {
for _, matchFunc := range matchFuncsCopy {
if matchFunc.F(lstr) {
matchFunc.matchesMu.Lock()
matchFunc.matches = append(matchFunc.matches, lstr)
matchFunc.matchesMu.Unlock()
matchFunc.done()
return
}
Expand All @@ -118,6 +130,7 @@ func (tlc *TestLogCollector) Printf(_ context.Context, format string, v ...inter
fmt.Println(lstr)
}
tlc.l = append(tlc.l, fmt.Sprintf(format, v...))
tlc.mu.Unlock()
}

func (tlc *TestLogCollector) WaitForLogContaining(searchString string, timeout time.Duration) bool {
Expand Down Expand Up @@ -170,7 +183,12 @@ func (tlc *TestLogCollector) WaitForLogMatchFunc(mf func(string) bool, timeout t

select {
case <-matchFunc.found:
return matchFunc.matches[0], true
matchFunc.matchesMu.Lock()
defer matchFunc.matchesMu.Unlock()
if len(matchFunc.matches) > 0 {
return matchFunc.matches[0], true
}
return "", false
case <-time.After(timeout):
return "", false
}
Expand Down
59 changes: 50 additions & 9 deletions maintnotifications/e2e/notification_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,21 @@ type NotificationInjector interface {
InjectSMIGRATED(ctx context.Context, seqID int64, hostPort string, slots ...string) error

// InjectMOVING injects a MOVING notification (for standalone)
InjectMOVING(ctx context.Context, seqID int64, slot int) error
// Format: ["MOVING", seqID, timeS, endpoint]
InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error

// InjectMIGRATING injects a MIGRATING notification (for standalone)
InjectMIGRATING(ctx context.Context, seqID int64, slot int) error

// InjectMIGRATED injects a MIGRATED notification (for standalone)
InjectMIGRATED(ctx context.Context, seqID int64, slot int) error

// InjectFAILING_OVER injects a FAILING_OVER notification
InjectFAILING_OVER(ctx context.Context, seqID int64) error

// InjectFAILED_OVER injects a FAILED_OVER notification
InjectFAILED_OVER(ctx context.Context, seqID int64) error

// Start starts the injector (if needed)
Start() error

Expand Down Expand Up @@ -475,8 +482,8 @@ func (p *ProxyNotificationInjector) InjectSMIGRATED(ctx context.Context, seqID i
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, slot int) error {
notification := formatMovingNotification(seqID, slot)
func (p *ProxyNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error {
notification := formatMovingNotification(seqID, timeS, endpoint)
return p.injectNotification(notification)
}

Expand All @@ -490,6 +497,16 @@ func (p *ProxyNotificationInjector) InjectMIGRATED(ctx context.Context, seqID in
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) InjectFAILING_OVER(ctx context.Context, seqID int64) error {
notification := formatFailingOverNotification(seqID)
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) InjectFAILED_OVER(ctx context.Context, seqID int64) error {
notification := formatFailedOverNotification(seqID)
return p.injectNotification(notification)
}

func (p *ProxyNotificationInjector) injectNotification(notification string) error {
url := p.apiBaseURL + "/send-to-all-clients?encoding=raw"
resp, err := p.httpClient.Post(url, "application/octet-stream", strings.NewReader(notification))
Expand Down Expand Up @@ -541,9 +558,14 @@ func formatSMigratedNotification(seqID int64, endpoints ...string) string {
return strings.Join(parts, "")
}

func formatMovingNotification(seqID int64, slot int) string {
slotStr := fmt.Sprintf("%d", slot)
return fmt.Sprintf(">3\r\n$6\r\nMOVING\r\n:%d\r\n$%d\r\n%s\r\n", seqID, len(slotStr), slotStr)
func formatMovingNotification(seqID int64, timeS int64, endpoint string) string {
// Format: ["MOVING", seqID, timeS, endpoint]
if endpoint == "" {
// 3 elements: MOVING, seqID, timeS
return fmt.Sprintf(">3\r\n$6\r\nMOVING\r\n:%d\r\n:%d\r\n", seqID, timeS)
}
// 4 elements: MOVING, seqID, timeS, endpoint
return fmt.Sprintf(">4\r\n$6\r\nMOVING\r\n:%d\r\n:%d\r\n$%d\r\n%s\r\n", seqID, timeS, len(endpoint), endpoint)
}

func formatMigratingNotification(seqID int64, slot int) string {
Expand All @@ -556,6 +578,16 @@ func formatMigratedNotification(seqID int64, slot int) string {
return fmt.Sprintf(">3\r\n$8\r\nMIGRATED\r\n:%d\r\n$%d\r\n%s\r\n", seqID, len(slotStr), slotStr)
}

func formatFailingOverNotification(seqID int64) string {
// Format: ["FAILING_OVER", seqID]
return fmt.Sprintf(">2\r\n$12\r\nFAILING_OVER\r\n:%d\r\n", seqID)
}

func formatFailedOverNotification(seqID int64) string {
// Format: ["FAILED_OVER", seqID]
return fmt.Sprintf(">2\r\n$11\r\nFAILED_OVER\r\n:%d\r\n", seqID)
}


// FaultInjectorNotificationInjector implements NotificationInjector using the real fault injector
type FaultInjectorNotificationInjector struct {
Expand Down Expand Up @@ -646,9 +678,9 @@ func (f *FaultInjectorNotificationInjector) InjectSMIGRATED(ctx context.Context,
return fmt.Errorf("SMIGRATED cannot be directly injected with real fault injector - it's generated when migration completes")
}

func (f *FaultInjectorNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, slot int) error {
// MOVING notifications are generated during slot migration
return fmt.Errorf("MOVING cannot be directly injected with real fault injector - it's generated during migration")
func (f *FaultInjectorNotificationInjector) InjectMOVING(ctx context.Context, seqID int64, timeS int64, endpoint string) error {
// MOVING notifications are generated during bind action
return fmt.Errorf("MOVING cannot be directly injected with real fault injector - it's generated during bind action")
}

func (f *FaultInjectorNotificationInjector) InjectMIGRATING(ctx context.Context, seqID int64, slot int) error {
Expand All @@ -667,4 +699,13 @@ func (f *FaultInjectorNotificationInjector) InjectMIGRATED(ctx context.Context,
return fmt.Errorf("MIGRATED cannot be directly injected with real fault injector - it's generated when migration completes")
}

func (f *FaultInjectorNotificationInjector) InjectFAILING_OVER(ctx context.Context, seqID int64) error {
// FAILING_OVER is generated automatically when failover starts
return fmt.Errorf("FAILING_OVER cannot be directly injected with real fault injector - it's generated when failover starts")
}

func (f *FaultInjectorNotificationInjector) InjectFAILED_OVER(ctx context.Context, seqID int64) error {
// FAILED_OVER is generated automatically when failover completes
return fmt.Errorf("FAILED_OVER cannot be directly injected with real fault injector - it's generated when failover completes")
}

Loading
Loading