Skip to content

Commit 8eef599

Browse files
authored
Merge branch 'ndyakov/feature/CAE-1313-maint-cluster' into ndyakov/additional-e2e-proxy-tests
2 parents a99127f + dbf6fd1 commit 8eef599

File tree

9 files changed

+204
-13
lines changed

9 files changed

+204
-13
lines changed

commands_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8870,11 +8870,15 @@ var _ = Describe("Commands", func() {
88708870
It("returns latencies", func() {
88718871
const key = "latency-monitor-threshold"
88728872

8873+
// reset all latencies first to ensure clean state
8874+
err := client.LatencyReset(ctx).Err()
8875+
Expect(err).NotTo(HaveOccurred())
8876+
88738877
old := client.ConfigGet(ctx, key).Val()
88748878
client.ConfigSet(ctx, key, "1")
88758879
defer client.ConfigSet(ctx, key, old[key])
88768880

8877-
err := client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err()
8881+
err = client.Do(ctx, "DEBUG", "SLEEP", 0.01).Err()
88788882
Expect(err).NotTo(HaveOccurred())
88798883

88808884
result, err := client.Latency(ctx).Result()
@@ -8921,6 +8925,10 @@ var _ = Describe("Commands", func() {
89218925
It("reset latencies by add event name args", func() {
89228926
const key = "latency-monitor-threshold"
89238927

8928+
// reset all latencies first to ensure clean state
8929+
err := client.LatencyReset(ctx).Err()
8930+
Expect(err).NotTo(HaveOccurred())
8931+
89248932
old := client.ConfigGet(ctx, key).Val()
89258933
// Use a higher threshold (100ms) to avoid capturing normal operations
89268934
// that could cause flakiness due to timing variations

error.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ func shouldRetry(err error, retryTimeout bool) bool {
124124
if proto.IsTryAgainError(err) {
125125
return true
126126
}
127+
if proto.IsNoReplicasError(err) {
128+
return true
129+
}
127130

128131
// Fallback to string checking for backward compatibility with plain errors
129132
s := err.Error()
@@ -145,6 +148,9 @@ func shouldRetry(err error, retryTimeout bool) bool {
145148
if strings.HasPrefix(s, "MASTERDOWN ") {
146149
return true
147150
}
151+
if strings.HasPrefix(s, "NOREPLICAS ") {
152+
return true
153+
}
148154

149155
return false
150156
}
@@ -342,6 +348,14 @@ func IsOOMError(err error) bool {
342348
return proto.IsOOMError(err)
343349
}
344350

351+
// IsNoReplicasError checks if an error is a Redis NOREPLICAS error, even if wrapped.
352+
// NOREPLICAS errors occur when not enough replicas acknowledge a write operation.
353+
// This typically happens with WAIT/WAITAOF commands or CLUSTER SETSLOT with synchronous
354+
// replication when the required number of replicas cannot confirm the write within the timeout.
355+
func IsNoReplicasError(err error) bool {
356+
return proto.IsNoReplicasError(err)
357+
}
358+
345359
//------------------------------------------------------------------------------
346360

347361
type timeoutError interface {

error_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ var _ = Describe("error", func() {
4545
proto.ParseErrorReply([]byte("-READONLY You can't write against a read only replica")): true,
4646
proto.ParseErrorReply([]byte("-CLUSTERDOWN The cluster is down")): true,
4747
proto.ParseErrorReply([]byte("-TRYAGAIN Command cannot be processed, please try again")): true,
48-
proto.ParseErrorReply([]byte("-ERR other")): false,
48+
proto.ParseErrorReply([]byte("-NOREPLICAS Not enough good replicas to write")): true,
49+
proto.ParseErrorReply([]byte("-ERR other")): false,
4950
}
5051

5152
for err, expected := range data {

error_wrapping_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,10 +239,10 @@ func TestErrorWrappingInHookScenario(t *testing.T) {
239239
// TestShouldRetryWithTypedErrors tests that shouldRetry works with typed errors
240240
func TestShouldRetryWithTypedErrors(t *testing.T) {
241241
tests := []struct {
242-
name string
243-
errorMsg string
244-
shouldRetry bool
245-
retryTimeout bool
242+
name string
243+
errorMsg string
244+
shouldRetry bool
245+
retryTimeout bool
246246
}{
247247
{
248248
name: "LOADING error should retry",
@@ -280,6 +280,12 @@ func TestShouldRetryWithTypedErrors(t *testing.T) {
280280
shouldRetry: true,
281281
retryTimeout: false,
282282
},
283+
{
284+
name: "NOREPLICAS error should retry",
285+
errorMsg: "NOREPLICAS Not enough good replicas to write",
286+
shouldRetry: true,
287+
retryTimeout: false,
288+
},
283289
}
284290

285291
for _, tt := range tests {

internal/pool/pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,12 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
321321
return nil, ErrPoolExhausted
322322
}
323323

324+
// Protect against nil context due to race condition in queuedNewConn
325+
// where the context can be set to nil after timeout/cancellation
326+
if ctx == nil {
327+
ctx = context.Background()
328+
}
329+
324330
dialCtx, cancel := context.WithTimeout(ctx, p.cfg.DialTimeout)
325331
defer cancel()
326332
cn, err := p.dialConn(dialCtx, pooled)

internal/pool/pool_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,64 @@ var _ = Describe("queuedNewConn", func() {
10371037
testPool.Put(ctx, reqBConn)
10381038
Eventually(func() int { return testPool.QueueLen() }, "600ms").Should(Equal(0))
10391039
})
1040+
// Test for race condition where nil context can be passed to newConn
1041+
// This reproduces the issue reported in GitHub where queuedNewConn panics
1042+
// with "cannot create context from nil parent"
1043+
It("should handle nil context race condition in queuedNewConn", func() {
1044+
// Create a pool with very short timeouts to trigger the race condition
1045+
testPool := pool.NewConnPool(&pool.Options{
1046+
Dialer: func(ctx context.Context) (net.Conn, error) {
1047+
// Add a small delay to increase chance of race condition
1048+
time.Sleep(50 * time.Millisecond)
1049+
return dummyDialer(ctx)
1050+
},
1051+
PoolSize: int32(10),
1052+
MaxConcurrentDials: 10,
1053+
PoolTimeout: 10 * time.Millisecond, // Very short timeout
1054+
DialTimeout: 100 * time.Millisecond,
1055+
ConnMaxIdleTime: time.Millisecond,
1056+
})
1057+
defer testPool.Close()
1058+
1059+
// Try to trigger the race condition by making many concurrent requests
1060+
// with short timeouts
1061+
const numGoroutines = 50
1062+
var wg sync.WaitGroup
1063+
errors := make(chan error, numGoroutines)
1064+
1065+
for i := 0; i < numGoroutines; i++ {
1066+
wg.Add(1)
1067+
go func() {
1068+
defer GinkgoRecover()
1069+
defer wg.Done()
1070+
1071+
// Use a very short context timeout to trigger the race
1072+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
1073+
defer cancel()
1074+
1075+
_, err := testPool.Get(ctx)
1076+
if err != nil {
1077+
// We expect timeout errors, but not panics
1078+
errors <- err
1079+
}
1080+
}()
1081+
}
1082+
1083+
wg.Wait()
1084+
close(errors)
1085+
1086+
// Check that we got timeout errors (expected) but no panics
1087+
// The test passes if it doesn't panic
1088+
timeoutCount := 0
1089+
for err := range errors {
1090+
if err == context.DeadlineExceeded || err == pool.ErrPoolTimeout {
1091+
timeoutCount++
1092+
}
1093+
}
1094+
1095+
// We should have at least some timeouts due to the short timeout
1096+
Expect(timeoutCount).To(BeNumerically(">", 0))
1097+
})
10401098
})
10411099

10421100
func init() {

internal/pool/want_conn_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,3 +442,56 @@ func BenchmarkWantConnQueue_EnqueueDequeue(b *testing.B) {
442442
q.dequeue()
443443
}
444444
}
445+
446+
// TestWantConn_RaceConditionNilContext tests the race condition where
447+
// getCtxForDial can return nil after the context is cancelled.
448+
// This test verifies that the fix in newConn handles nil context gracefully.
449+
func TestWantConn_RaceConditionNilContext(t *testing.T) {
450+
// This test simulates the race condition described in the issue:
451+
// 1. Main goroutine creates a wantConn with a context
452+
// 2. Background goroutine starts but hasn't called getCtxForDial yet
453+
// 3. Main goroutine times out and calls cancel(), setting w.ctx to nil
454+
// 4. Background goroutine calls getCtxForDial() and gets nil
455+
// 5. Background goroutine calls newConn(nil, true) which should not panic
456+
457+
dialCtx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
458+
defer cancel()
459+
460+
w := &wantConn{
461+
ctx: dialCtx,
462+
cancelCtx: cancel,
463+
result: make(chan wantConnResult, 1),
464+
}
465+
466+
// Simulate the race condition by canceling the context
467+
// and then trying to get it
468+
var wg sync.WaitGroup
469+
wg.Add(1)
470+
471+
go func() {
472+
defer wg.Done()
473+
// Small delay to ensure cancel happens first
474+
time.Sleep(10 * time.Millisecond)
475+
476+
// This should return nil after cancel
477+
ctx := w.getCtxForDial()
478+
479+
// Verify that we got nil context
480+
if ctx != nil {
481+
t.Errorf("Expected nil context after cancel, got %v", ctx)
482+
}
483+
}()
484+
485+
// Cancel the context immediately
486+
w.cancel()
487+
488+
wg.Wait()
489+
490+
// Verify the wantConn state
491+
if !w.done {
492+
t.Error("wantConn should be marked as done after cancel")
493+
}
494+
if w.ctx != nil {
495+
t.Error("wantConn.ctx should be nil after cancel")
496+
}
497+
}

internal/proto/redis_errors.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,25 @@ func NewOOMError(msg string) *OOMError {
212212
return &OOMError{msg: msg}
213213
}
214214

215+
// NoReplicasError is returned when not enough replicas acknowledge a write.
216+
// This error occurs when using WAIT/WAITAOF commands or CLUSTER SETSLOT with
217+
// synchronous replication, and the required number of replicas cannot confirm
218+
// the write within the timeout period.
219+
type NoReplicasError struct {
220+
msg string
221+
}
222+
223+
func (e *NoReplicasError) Error() string {
224+
return e.msg
225+
}
226+
227+
func (e *NoReplicasError) RedisError() {}
228+
229+
// NewNoReplicasError creates a new NoReplicasError with the given message.
230+
func NewNoReplicasError(msg string) *NoReplicasError {
231+
return &NoReplicasError{msg: msg}
232+
}
233+
215234
// parseTypedRedisError parses a Redis error message and returns a typed error if applicable.
216235
// This function maintains backward compatibility by keeping the same error messages.
217236
func parseTypedRedisError(msg string) error {
@@ -235,6 +254,8 @@ func parseTypedRedisError(msg string) error {
235254
return NewTryAgainError(msg)
236255
case strings.HasPrefix(msg, "MASTERDOWN "):
237256
return NewMasterDownError(msg)
257+
case strings.HasPrefix(msg, "NOREPLICAS "):
258+
return NewNoReplicasError(msg)
238259
case msg == "ERR max number of clients reached":
239260
return NewMaxClientsError(msg)
240261
case strings.HasPrefix(msg, "NOAUTH "), strings.HasPrefix(msg, "WRONGPASS "), strings.Contains(msg, "unauthenticated"):
@@ -486,3 +507,21 @@ func IsOOMError(err error) bool {
486507
// Fallback to string checking for backward compatibility
487508
return strings.HasPrefix(err.Error(), "OOM ")
488509
}
510+
511+
// IsNoReplicasError checks if an error is a NoReplicasError, even if wrapped.
512+
func IsNoReplicasError(err error) bool {
513+
if err == nil {
514+
return false
515+
}
516+
var noReplicasErr *NoReplicasError
517+
if errors.As(err, &noReplicasErr) {
518+
return true
519+
}
520+
// Check if wrapped error is a RedisError with NOREPLICAS prefix
521+
var redisErr RedisError
522+
if errors.As(err, &redisErr) && strings.HasPrefix(redisErr.Error(), "NOREPLICAS ") {
523+
return true
524+
}
525+
// Fallback to string checking for backward compatibility
526+
return strings.HasPrefix(err.Error(), "NOREPLICAS ")
527+
}

internal/proto/redis_errors_test.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ import (
99
// TestTypedRedisErrors tests that typed Redis errors are created correctly
1010
func TestTypedRedisErrors(t *testing.T) {
1111
tests := []struct {
12-
name string
13-
errorMsg string
14-
expectedType interface{}
15-
expectedMsg string
16-
checkFunc func(error) bool
17-
extractAddr func(error) string
12+
name string
13+
errorMsg string
14+
expectedType interface{}
15+
expectedMsg string
16+
checkFunc func(error) bool
17+
extractAddr func(error) string
1818
}{
1919
{
2020
name: "LOADING error",
@@ -132,6 +132,13 @@ func TestTypedRedisErrors(t *testing.T) {
132132
expectedMsg: "OOM command not allowed when used memory > 'maxmemory'",
133133
checkFunc: IsOOMError,
134134
},
135+
{
136+
name: "NOREPLICAS error",
137+
errorMsg: "NOREPLICAS Not enough good replicas to write",
138+
expectedType: &NoReplicasError{},
139+
expectedMsg: "NOREPLICAS Not enough good replicas to write",
140+
checkFunc: IsNoReplicasError,
141+
},
135142
}
136143

137144
for _, tt := range tests {
@@ -389,4 +396,3 @@ func TestBackwardCompatibility(t *testing.T) {
389396
})
390397
}
391398
}
392-

0 commit comments

Comments
 (0)