Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 68 additions & 29 deletions src/sessions.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setTimeout } from 'timers/promises';

import { Binary, type Document, Long, type Timestamp } from './bson';
import type { CommandOptions, Connection } from './cmap/connection';
import { ConnectionPoolMetrics } from './cmap/metrics';
Expand Down Expand Up @@ -732,10 +734,37 @@ export class ClientSession
: processTimeMS();

let committed = false;
let result: any;
let result: T;

let lastError: Error | null = null;

try {
while (!committed) {
retryTransaction: for (let attempt = 0, isRetry = attempt > 0; !committed; ++attempt) {
if (isRetry) {
const BACKOFF_INITIAL_MS = 5;
const BACKOFF_MAX_MS = 500;
const BACKOFF_GROWTH = 1.5;
const jitter = Math.random();
const backoffMS =
jitter * Math.min(BACKOFF_INITIAL_MS * BACKOFF_GROWTH ** attempt, BACKOFF_MAX_MS);

const willExceedTransactionDeadline =
(this.timeoutContext?.csotEnabled() &&
backoffMS > this.timeoutContext.remainingTimeMS) ||
processTimeMS() + backoffMS > startTime + MAX_TIMEOUT;

if (willExceedTransactionDeadline) {
throw (
lastError ??
new MongoRuntimeError(
`Transaction retry did not record an error: should never occur. Please file a bug.`
)
);
}

await setTimeout(backoffMS);
}

// 2. Invoke startTransaction on the session
// 3. If `startTransaction` reported an error, propagate that error to the caller of `withTransaction` and return immediately.
this.startTransaction(options); // may throw on error
Expand Down Expand Up @@ -783,11 +812,12 @@ export class ClientSession

if (
fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT)
(this.timeoutContext?.csotEnabled() || processTimeMS() - startTime < MAX_TIMEOUT)
) {
// 6.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of `withTransaction`
// is less than 120 seconds, jump back to step two.
continue;
lastError = fnError;
continue retryTransaction;
}

// 6.iii If the callback's error includes a "UnknownTransactionCommitResult" label, the callback must have manually committed a transaction,
Expand All @@ -797,7 +827,7 @@ export class ClientSession
throw fnError;
}

while (!committed) {
retryCommit: while (!committed) {
try {
/*
* We will rely on ClientSession.commitTransaction() to
Expand All @@ -809,37 +839,46 @@ export class ClientSession
committed = true;
// 9. If commitTransaction reported an error:
} catch (commitError) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) &&
(this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT)
) {
// 9.i If the `commitTransaction` error includes a "UnknownTransactionCommitResult" label and the error is not
// MaxTimeMSExpired and the elapsed time of `withTransaction` is less than 120 seconds, jump back to step eight.
continue;
}

if (
commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) &&
(this.timeoutContext != null || processTimeMS() - startTime < MAX_TIMEOUT)
) {
// 9.ii If the commitTransaction error includes a "TransientTransactionError" label
// and the elapsed time of withTransaction is less than 120 seconds, jump back to step two.
break;
// If CSOT is enabled, we repeatedly retry until timeoutMS expires. This is enforced by providing a
// timeoutContext to each async API, which know how to cancel themselves (i.e., the next retry will
// abort the withTransaction call).
// If CSOT is not enabled, do we still have time remaining or have we timed out?
const hasTimedOut =
!this.timeoutContext?.csotEnabled() && processTimeMS() - startTime >= MAX_TIMEOUT;

if (!hasTimedOut) {
/*
* Note: a maxTimeMS error will have the MaxTimeMSExpired
* code (50) and can be reported as a top-level error or
* inside writeConcernError, ex.
* { ok:0, code: 50, codeName: 'MaxTimeMSExpired' }
* { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } }
*/
if (
!isMaxTimeMSExpiredError(commitError) &&
commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult)
) {
// 9.i If the `commitTransaction` error includes a "UnknownTransactionCommitResult" label and the error is not
// MaxTimeMSExpired and the elapsed time of `withTransaction` is less than 120 seconds, jump back to step eight.
continue retryCommit;
}

if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) {
// 9.ii If the commitTransaction error includes a "TransientTransactionError" label
// and the elapsed time of withTransaction is less than 120 seconds, jump back to step two.
lastError = commitError;

continue retryTransaction;
}
}

// 9.iii Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately.
throw commitError;
}
}
}

// @ts-expect-error Result is always defined if we reach here, the for-loop above convinces TS it is not.
return result;
} finally {
this.timeoutContext = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { expect } from 'chai';
import { test } from 'mocha';
import * as sinon from 'sinon';

import { type ClientSession, type Collection, type MongoClient } from '../../../src';
import { configureFailPoint, type FailCommandFailPoint, measureDuration } from '../../tools/utils';

const failCommand: FailCommandFailPoint = {
configureFailPoint: 'failCommand',
mode: {
times: 13
},
data: {
failCommands: ['commitTransaction'],
errorCode: 251 // no such transaction
}
};

describe('Retry Backoff is Enforced', function () {
// 1. let client be a MongoClient
let client: MongoClient;

// 2. let coll be a collection
let collection: Collection;

beforeEach(async function () {
client = this.configuration.newClient();
collection = client.db('foo').collection('bar');
});

afterEach(async function () {
sinon.restore();
await client?.close();
});

test(
'works',
{
requires: {
mongodb: '>=4.4', // failCommand
topology: '!single' // transactions can't run on standalone servers
}
},
async function () {
const randomStub = sinon.stub(Math, 'random');

// 3.i Configure the random number generator used for jitter to always return 0
randomStub.returns(0);

// 3.ii Configure a fail point that forces 13 retries
await configureFailPoint(this.configuration, failCommand);

// 3.iii
const callback = async (s: ClientSession) => {
await collection.insertOne({}, { session: s });
};

// 3.iv Let no_backoff_time be the duration of the withTransaction API call
const { duration: noBackoffTime } = await measureDuration(() => {
return client.withSession(async s => {
await s.withTransaction(callback);
});
});

// 4.i Configure the random number generator used for jitter to always return 1.
randomStub.returns(1);

// 4.ii Configure a fail point that forces 13 retries like in step 3.2.
await configureFailPoint(this.configuration, failCommand);

// 4.iii Use the same callback defined in 3.3.
// 4.iv Let with_backoff_time be the duration of the withTransaction API call
const { duration: fullBackoffDuration } = await measureDuration(() => {
return client.withSession(async s => {
await s.withTransaction(callback);
});
});

// 5. Compare the two time between the two runs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is how we catch spec typos 😅 time -> times
let's update here and maybe include this in the next spec PR or as a one-off?

// The sum of 13 backoffs is roughly 2.2 seconds. There is a 1-second window to account for potential variance between the two runs.
expect(fullBackoffDuration).to.be.within(
noBackoffTime + 2200 - 1000,
noBackoffTime + 2200 + 1000
);
}
);
});