Skip to content

Commit 4cb2b87

Browse files
feat(NODE-7121): prevent connection churn on backpressure errors when establishing connections (#4800)
1 parent b1b6e81 commit 4cb2b87

21 files changed

+871
-422
lines changed

src/cmap/connect.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,11 @@ import {
3535
/** @public */
3636
export type Stream = Socket | TLSSocket;
3737

38+
function applyBackpressureLabels(error: MongoError) {
39+
error.addErrorLabel(MongoErrorLabel.SystemOverloadedError);
40+
error.addErrorLabel(MongoErrorLabel.RetryableError);
41+
}
42+
3843
export async function connect(options: ConnectionOptions): Promise<Connection> {
3944
let connection: Connection | null = null;
4045
try {
@@ -103,6 +108,8 @@ export async function performInitialHandshake(
103108
const authContext = new AuthContext(conn, credentials, options);
104109
conn.authContext = authContext;
105110

111+
// If we encounter an error preparing the handshake document, do NOT apply backpressure labels. Errors
112+
// encountered building the handshake document are all client-side, and do not indicate an overloaded server.
106113
const handshakeDoc = await prepareHandshakeDocument(authContext);
107114

108115
// @ts-expect-error: TODO(NODE-5141): The options need to be filtered properly, Connection options differ from Command options
@@ -163,12 +170,15 @@ export async function performInitialHandshake(
163170
try {
164171
await provider.auth(authContext);
165172
} catch (error) {
173+
// NOTE: If we encounter an error authenticating a connection, do NOT apply backpressure labels.
174+
166175
if (error instanceof MongoError) {
167176
error.addErrorLabel(MongoErrorLabel.HandshakeError);
168177
if (needsRetryableWriteLabel(error, response.maxWireVersion, conn.description.type)) {
169178
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
170179
}
171180
}
181+
172182
throw error;
173183
}
174184
}
@@ -189,6 +199,11 @@ export async function performInitialHandshake(
189199
if (error instanceof MongoError) {
190200
error.addErrorLabel(MongoErrorLabel.HandshakeError);
191201
}
202+
// If we encounter a network error executing the initial handshake, apply backpressure labels.
203+
if (error instanceof MongoNetworkError) {
204+
applyBackpressureLabels(error);
205+
}
206+
192207
throw error;
193208
}
194209
}
@@ -424,6 +439,10 @@ export async function makeSocket(options: MakeConnectionOptions): Promise<Stream
424439
socket = await connectedSocket;
425440
return socket;
426441
} catch (error) {
442+
// If we encounter an error while establishing a socket, apply the backpressure labels to it. We cannot
443+
// differentiate between DNS, TLS errors and network errors without refactoring our connection establishment to
444+
// handle all three steps separately.
445+
applyBackpressureLabels(error);
427446
socket.destroy();
428447
throw error;
429448
} finally {

src/error.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,9 @@ export const MongoErrorLabel = Object.freeze({
9999
ResetPool: 'ResetPool',
100100
PoolRequestedRetry: 'PoolRequestedRetry',
101101
InterruptInUseConnections: 'InterruptInUseConnections',
102-
NoWritesPerformed: 'NoWritesPerformed'
102+
NoWritesPerformed: 'NoWritesPerformed',
103+
RetryableError: 'RetryableError',
104+
SystemOverloadedError: 'SystemOverloadedError'
103105
} as const);
104106

105107
/** @public */

src/sdam/server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,8 @@ export class Server extends TypedEventEmitter<ServerEvents> {
400400
error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError);
401401
const isNetworkTimeoutBeforeHandshakeError =
402402
error instanceof MongoNetworkError && error.beforeHandshake;
403-
const isAuthHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
403+
const isAuthOrEstablishmentHandshakeError = error.hasErrorLabel(MongoErrorLabel.HandshakeError);
404+
const isSystemOverloadError = error.hasErrorLabel(MongoErrorLabel.SystemOverloadedError);
404405

405406
// Perhaps questionable and divergent from the spec, but considering MongoParseErrors like state change errors was legacy behavior.
406407
if (isStateChangeError(error) || error instanceof MongoParseError) {
@@ -424,8 +425,12 @@ export class Server extends TypedEventEmitter<ServerEvents> {
424425
} else if (
425426
isNetworkNonTimeoutError ||
426427
isNetworkTimeoutBeforeHandshakeError ||
427-
isAuthHandshakeError
428+
isAuthOrEstablishmentHandshakeError
428429
) {
430+
// Do NOT clear the pool if we encounter a system overloaded error.
431+
if (isSystemOverloadError) {
432+
return;
433+
}
429434
// from the SDAM spec: The driver MUST synchronize clearing the pool with updating the topology.
430435
// In load balanced mode: there is no monitoring, so there is no topology to update. We simply clear the pool.
431436
// For other topologies: the `ResetPool` label instructs the topology to clear the server's pool in `updateServer()`.

test/integration/server-discovery-and-monitoring/server_discovery_and_monitoring.prose.test.ts

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
import { expect } from 'chai';
22
import { once } from 'events';
33

4-
import { type MongoClient } from '../../../src';
4+
import {
5+
type ConnectionCheckOutFailedEvent,
6+
type ConnectionPoolClearedEvent,
7+
type MongoClient
8+
} from '../../../src';
59
import {
610
CONNECTION_POOL_CLEARED,
711
CONNECTION_POOL_READY,
812
SERVER_HEARTBEAT_FAILED,
913
SERVER_HEARTBEAT_SUCCEEDED
1014
} from '../../../src/constants';
15+
import { sleep } from '../../tools/utils';
1116

1217
describe('Server Discovery and Monitoring Prose Tests', function () {
1318
context('Monitors sleep at least minHeartbeatFrequencyMS between checks', function () {
@@ -187,4 +192,93 @@ describe('Server Discovery and Monitoring Prose Tests', function () {
187192
}
188193
});
189194
});
195+
196+
context('Connection Pool Backpressure', function () {
197+
let client: MongoClient;
198+
const checkoutFailedEvents: Array<ConnectionCheckOutFailedEvent> = [];
199+
const poolClearedEvents: Array<ConnectionPoolClearedEvent> = [];
200+
201+
beforeEach(async function () {
202+
// 1. Create a test client that listens to CMAP events, with maxConnecting=100.
203+
client = this.configuration.newClient({}, { maxConnecting: 100 });
204+
205+
client.on('connectionCheckOutFailed', e => checkoutFailedEvents.push(e));
206+
client.on('connectionPoolCleared', e => poolClearedEvents.push(e));
207+
208+
await client.connect();
209+
210+
// 2. Run the following commands to set up the rate limiter.
211+
// ```python
212+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=True)
213+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentRatePerSec=20)
214+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentBurstCapacitySecs=1)
215+
// client.admin.command("setParameter", 1, ingressConnectionEstablishmentMaxQueueDepth=1)
216+
// ```
217+
const admin = client.db('admin').admin();
218+
await admin.command({
219+
setParameter: 1,
220+
ingressConnectionEstablishmentRateLimiterEnabled: true
221+
});
222+
await admin.command({
223+
setParameter: 1,
224+
ingressConnectionEstablishmentRatePerSec: 20
225+
});
226+
await admin.command({
227+
setParameter: 1,
228+
ingressConnectionEstablishmentBurstCapacitySecs: 1
229+
});
230+
await admin.command({
231+
setParameter: 1,
232+
ingressConnectionEstablishmentMaxQueueDepth: 1
233+
});
234+
235+
// 3. Add a document to the test collection so that the sleep operations will actually block:
236+
// `client.test.test.insert_one({})`.
237+
await client.db('test').collection('test').insertOne({});
238+
});
239+
240+
afterEach(async function () {
241+
// 7. Sleep for 1 second to clear the rate limiter.
242+
await sleep(1000);
243+
244+
// 8. Ensure that the following command runs at test teardown even if the test fails.
245+
// `client.admin("setParameter", 1, ingressConnectionEstablishmentRateLimiterEnabled=False)`.
246+
const admin = client.db('admin').admin();
247+
await admin.command({
248+
setParameter: 1,
249+
ingressConnectionEstablishmentRateLimiterEnabled: false
250+
});
251+
252+
await client.close();
253+
});
254+
255+
it(
256+
'does not clear the pool when connections are closed due to connection storms',
257+
{
258+
requires: {
259+
// This test requires MongoDB 7.0+.
260+
mongodb: '>=7.0' // rate limiting added in 7.0
261+
}
262+
},
263+
async function () {
264+
// 4. Run the following find command on the collection in 100 parallel threads/coroutines. Run these commands concurrently
265+
// but block on their completion, and ignore errors raised by the command.
266+
// `client.test.test.find_one({"$where": "function() { sleep(2000); return true; }})`
267+
await Promise.allSettled(
268+
Array.from({ length: 100 }).map(() =>
269+
client
270+
.db('test')
271+
.collection('test')
272+
.findOne({ $where: 'function() { sleep(2000); return true; }' })
273+
)
274+
);
275+
276+
// 5. Assert that at least 10 `ConnectionCheckOutFailedEvent` occurred.
277+
expect(checkoutFailedEvents.length).to.be.at.least(10);
278+
279+
// 6. Assert that 0 `PoolClearedEvent` occurred.
280+
expect(poolClearedEvents).to.be.empty;
281+
}
282+
);
283+
});
190284
});

test/spec/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"isMaster",
1818
"hello"
1919
],
20-
"closeConnection": true,
20+
"errorCode": 91,
2121
"appName": "poolCreateMinSizeErrorTest"
2222
}
2323
},

test/spec/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ failPoint:
1111
mode: { times: 50 }
1212
data:
1313
failCommands: ["isMaster","hello"]
14-
closeConnection: true
14+
errorCode: 91
1515
appName: "poolCreateMinSizeErrorTest"
1616
poolOptions:
1717
minPoolSize: 1

test/spec/load-balancers/sdam-error-handling.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"description": "state change errors are correctly handled",
3-
"schemaVersion": "1.3",
3+
"schemaVersion": "1.4",
44
"runOnRequirements": [
55
{
66
"topologies": [
@@ -263,7 +263,7 @@
263263
"description": "errors during the initial connection hello are ignored",
264264
"runOnRequirements": [
265265
{
266-
"minServerVersion": "4.9"
266+
"minServerVersion": "4.4.7"
267267
}
268268
],
269269
"operations": [
@@ -282,7 +282,7 @@
282282
"isMaster",
283283
"hello"
284284
],
285-
"closeConnection": true,
285+
"errorCode": 11600,
286286
"appName": "lbSDAMErrorTestClient"
287287
}
288288
}
@@ -297,7 +297,7 @@
297297
}
298298
},
299299
"expectError": {
300-
"isClientError": true
300+
"isError": true
301301
}
302302
}
303303
],

test/spec/load-balancers/sdam-error-handling.yml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
description: state change errors are correctly handled
22

3-
schemaVersion: '1.3'
3+
schemaVersion: '1.4'
44

55
runOnRequirements:
66
- topologies: [ load-balanced ]
@@ -141,9 +141,8 @@ tests:
141141
# to the same mongos on which the failpoint is set.
142142
- description: errors during the initial connection hello are ignored
143143
runOnRequirements:
144-
# Server version 4.9+ is needed to set a fail point on the initial
145-
# connection handshake with the appName filter due to SERVER-49336.
146-
- minServerVersion: '4.9'
144+
# Require SERVER-49336 for failCommand + appName on the initial handshake.
145+
- minServerVersion: '4.4.7'
147146
operations:
148147
- name: failPoint
149148
object: testRunner
@@ -154,14 +153,14 @@ tests:
154153
mode: { times: 1 }
155154
data:
156155
failCommands: [isMaster, hello]
157-
closeConnection: true
156+
errorCode: 11600
158157
appName: *singleClientAppName
159158
- name: insertOne
160159
object: *singleColl
161160
arguments:
162161
document: { x: 1 }
163162
expectError:
164-
isClientError: true
163+
isError: true
165164
expectEvents:
166165
- client: *singleClient
167166
eventType: cmap

test/spec/server-discovery-and-monitoring/errors/error_handling_handshake.json

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,22 @@
9797
"outcome": {
9898
"servers": {
9999
"a:27017": {
100-
"type": "Unknown",
101-
"topologyVersion": null,
100+
"type": "RSPrimary",
101+
"setName": "rs",
102+
"topologyVersion": {
103+
"processId": {
104+
"$oid": "000000000000000000000001"
105+
},
106+
"counter": {
107+
"$numberLong": "1"
108+
}
109+
},
102110
"pool": {
103-
"generation": 1
111+
"generation": 0
104112
}
105113
}
106114
},
107-
"topologyType": "ReplicaSetNoPrimary",
115+
"topologyType": "ReplicaSetWithPrimary",
108116
"logicalSessionTimeoutMinutes": null,
109117
"setName": "rs"
110118
}

test/spec/server-discovery-and-monitoring/errors/error_handling_handshake.yml

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,4 @@ phases:
4343
when: beforeHandshakeCompletes
4444
maxWireVersion: 9
4545
type: timeout
46-
outcome:
47-
servers:
48-
a:27017:
49-
type: Unknown
50-
topologyVersion: null
51-
pool:
52-
generation: 1
53-
topologyType: ReplicaSetNoPrimary
54-
logicalSessionTimeoutMinutes: null
55-
setName: rs
46+
outcome: *outcome

0 commit comments

Comments
 (0)