diff --git a/src/sessions.ts b/src/sessions.ts index 0788ff15386..af8d6e63ada 100644 --- a/src/sessions.ts +++ b/src/sessions.ts @@ -1,3 +1,5 @@ +import { setTimeout } from 'timers/promises'; + import { Binary, type Document, Long, type Timestamp } from './bson'; import type { CommandOptions, Connection } from './cmap/connection'; import { ConnectionPoolMetrics } from './cmap/metrics'; @@ -714,6 +716,8 @@ export class ClientSession timeoutMS?: number; } ): Promise { + // 1.iii Set TIMEOUT_MS to be timeoutMS if given, otherwise 120-seconds + // DP: We do this indirectly as a combination of timeoutMS->timeoutContext and this MAX_TIMEOUT const MAX_TIMEOUT = 120000; const timeoutMS = options?.timeoutMS ?? this.timeoutMS ?? null; @@ -724,18 +728,25 @@ export class ClientSession serverSelectionTimeoutMS: this.clientOptions.serverSelectionTimeoutMS, socketTimeoutMS: this.clientOptions.socketTimeoutMS }) - : null; + : null; // DP: this is always a CSOT context or null + // 1.i Record the current monotonic time, which will be used to enforce the 120-second / CSOT timeout before later retry attempts. const startTime = this.timeoutContext?.csotEnabled() ? this.timeoutContext.start : now(); + // DP: the CSOT check is redundant, because of the definition in L725 let committed = false; - let result: any; + let result: T; try { - while (!committed) { - this.startTransaction(options); // may throw on error + // 1.ii Set retry to 0. This will be used for backoff later in step 7. + for (let retry = 0; !committed; ++retry) { + // 2. Invoke startTransaction on the session + // 3. If startTransaction reported an error, propagate that error to the caller of withTransaction and return immediately. + this.startTransaction(options); // may throw on error (satisfies the spec requirement 3. above) try { + // 4. Invoke the callback. + // 5. Control returns to withTransaction. (continued below) const promise = fn(this); if (!isPromiseLike(promise)) { throw new MongoInvalidArgumentError( @@ -745,20 +756,27 @@ export class ClientSession result = await promise; + // 5. (cont.) Determine the current state of the ClientSession (continued below) if ( this.transaction.state === TxnState.NO_TRANSACTION || this.transaction.state === TxnState.TRANSACTION_COMMITTED || this.transaction.state === TxnState.TRANSACTION_ABORTED ) { - // Assume callback intentionally ended the transaction + // 7. If the ClientSession is in the "no transaction", "transaction aborted", or "transaction committed" state, + // assume the callback intentionally aborted or committed the transaction and return immediately. return result; } + // 5. (cont.) and whether the callback reported an error + // 6. If the callback reported an error: } catch (fnError) { + // DP: The preemptive abort isn't spec; this !MongoError would be an error thrown by the callback. + // DP: Is it safe to assume that the callback hasn't committed the transaction before throwing? if (!(fnError instanceof MongoError) || fnError instanceof MongoInvalidArgumentError) { await this.abortTransaction(); throw fnError; } + // 6.i If the ClientSession is in the "starting transaction" or "transaction in progress" state, invoke abortTransaction on the session if ( this.transaction.state === TxnState.STARTING_TRANSACTION || this.transaction.state === TxnState.TRANSACTION_IN_PROGRESS @@ -766,13 +784,20 @@ export class ClientSession await this.abortTransaction(); } + // 6.ii If the callback's error includes a "TransientTransactionError" label and the elapsed time of withTransaction is less than TIMEOUT_MS, calculate the backoffMS if ( fnError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && - (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) + (this.timeoutContext?.csotEnabled() || now() - startTime < MAX_TIMEOUT) ) { + // 6.ii (cont.) If elapsed time + backoffMS > TIMEOUT_MS, then raise last known error. Otherwise, sleep for backoffMS, increment retry, and jump back to step two. + // DP: The spec would have us apply backoff and jitter here instead of immediately retrying the startTransaction, which is where this continue is sending us continue; } + // 6.iii If the callback's error includes a "UnknownTransactionCommitResult" label, the callback must have manually committed a transaction, + // propagate the callback's error to the caller of withTransaction and return immediately. + // DP: The 6.iii check is redundant with 6.iv, so we don't write code for it + // 6.iv Otherwise, propagate the callback's error to the caller of withTransaction and return immediately. throw fnError; } @@ -783,35 +808,76 @@ export class ClientSession * apply a majority write concern if commitTransaction is * being retried (see: DRIVERS-601) */ + // 8. Invoke commitTransaction on the session. await this.commitTransaction(); committed = true; - } catch (commitError) { - /* - * Note: a maxTimeMS error will have the MaxTimeMSExpired - * code (50) and can be reported as a top-level error or - * inside writeConcernError, ex. - * { ok:0, code: 50, codeName: 'MaxTimeMSExpired' } - * { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } } - */ - if ( - !isMaxTimeMSExpiredError(commitError) && - commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) && - (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) - ) { - continue; - } - if ( - commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError) && - (this.timeoutContext != null || now() - startTime < MAX_TIMEOUT) - ) { - break; + // 9. If commitTransaction reported an error: + } catch (commitError) { + // If CSOT is enabled, we repeatedly retry until timeoutMS expires. This is enforced by providing a + // timeoutContext to each async API, which know how to cancel themselves (i.e., the next retry will + // abort the withTransaction call). + // If CSOT is not enabled, do we still have time remaining or have we timed out? + const hasTimedOut = + !this.timeoutContext?.csotEnabled() && now() - startTime >= MAX_TIMEOUT; + + if (!hasTimedOut) { + // 9.i If the commitTransaction error includes a "UnknownTransactionCommitResult" label + // and the error is not MaxTimeMSExpired + // and the elapsed time of withTransaction is less than TIMEOUT_MS, jump back to step eight + if ( + !isMaxTimeMSExpiredError(commitError) && + commitError.hasErrorLabel(MongoErrorLabel.UnknownTransactionCommitResult) + ) { + /* + * Note: a maxTimeMS error will have the MaxTimeMSExpired + * code (50) and can be reported as a top-level error or + * inside writeConcernError, ex. + * { ok:0, code: 50, codeName: 'MaxTimeMSExpired' } + * { ok:1, writeConcernError: { code: 50, codeName: 'MaxTimeMSExpired' } } + */ + continue; + } + + // 9.ii If the commitTransaction error includes a "TransientTransactionError" label + // and the elapsed time of withTransaction is less than TIMEOUT_MS, jump back to step two. + // DP: Step two makes no sense here, perhaps the intent is to perform the instructions in 6.ii and then jump back to 8? + if (commitError.hasErrorLabel(MongoErrorLabel.TransientTransactionError)) { + const BACKOFF_INITIAL_MS = 5; // 6.ii.c BACKOFF_INITIAL is 5ms + const BACKOFF_MAX_MS = 500; // 6.ii.d BACKOFF_MAX is 500ms + const BACKOFF_GROWTH = 1.5; // DP: convenience var for the formula below + const jitter = Math.random(); // 6.ii.a jitter is a random float between [0, 1) + // 6.ii (cont.) calculate the backoffMS to bejitter * min(BACKOFF_INITIAL * (1.5**retry), BACKOFF_MAX) + const backoffMS = + jitter * Math.min(BACKOFF_INITIAL_MS * BACKOFF_GROWTH ** retry, BACKOFF_MAX_MS); + + const willExceedTransactionDeadline = + (this.timeoutContext?.csotEnabled() && + backoffMS > this.timeoutContext.remainingTimeMS) || + now() + backoffMS > startTime + MAX_TIMEOUT; + + // 6.ii (cont.) If elapsed time + backoffMS > TIMEOUT_MS, then raise last known error + // DP: the lines above do this math slightly indirectly, but this break does NOT raise the last known error + // because breaking the while loop takes us to the return result statement outside the while(!committed) + if (willExceedTransactionDeadline) { + break; + } + + // 6.ii (cont.) Otherwise, sleep for backoffMS, increment retry, and jump back to step two. + // DP: Assuming the spec means step 8 + await setTimeout(backoffMS); + + break; + } } + // 9.iii Otherwise, propagate the commitTransaction error to the caller of withTransaction and return immediately. throw commitError; } } } + + // @ts-expect-error Result is always defined if we reach here, the for-loop above convinces TS it is not. return result; } finally { this.timeoutContext = null; diff --git a/test/integration/transactions-convenient-api/transactions-convenient-api.prose.test.ts b/test/integration/transactions-convenient-api/transactions-convenient-api.prose.test.ts new file mode 100644 index 00000000000..398cce6317a --- /dev/null +++ b/test/integration/transactions-convenient-api/transactions-convenient-api.prose.test.ts @@ -0,0 +1,72 @@ +import { expect } from 'chai'; +import { test } from 'mocha'; +import * as sinon from 'sinon'; + +import { type MongoClient } from '../../../src'; +import { configureFailPoint, type FailCommandFailPoint, measureDuration } from '../../tools/utils'; + +const failCommand: FailCommandFailPoint = { + configureFailPoint: 'failCommand', + mode: { + times: 13 + }, + data: { + failCommands: ['commitTransaction'], + errorCode: 251 + } +}; + +describe('Retry Backoff is Enforced', function () { + let client: MongoClient; + + beforeEach(async function () { + client = this.configuration.newClient(); + }); + + afterEach(async function () { + sinon.restore(); + await client?.close(); + }); + + test( + 'works', + { + requires: { + mongodb: '>=4.4', // failCommand + topology: '!single' // transactions can't run on standalone servers + } + }, + async function () { + const randomStub = sinon.stub(Math, 'random'); + + randomStub.returns(0); + + await configureFailPoint(this.configuration, failCommand); + + const { duration: noBackoffTime } = await measureDuration(() => { + return client.withSession(async s => { + await s.withTransaction(async s => { + await client.db('foo').collection('bar').insertOne({ name: 'bailey' }, { session: s }); + }); + }); + }); + + randomStub.returns(1); + + await configureFailPoint(this.configuration, failCommand); + + const { duration: fullBackoffDuration } = await measureDuration(() => { + return client.withSession(async s => { + await s.withTransaction(async s => { + await client.db('foo').collection('bar').insertOne({ name: 'bailey' }, { session: s }); + }); + }); + }); + + expect(fullBackoffDuration).to.be.within( + noBackoffTime + 2200 - 1000, + noBackoffTime + 2200 + 1000 + ); + } + ); +});