Skip to content

Commit 7408493

Browse files
vbabaninrozza
andauthored
Reuse ConnectionSource to avoid extra server selection. (#1813)
JAVA-5974 --------- Co-authored-by: Ross Lawley <ross@mongodb.com>
1 parent 07a7357 commit 7408493

File tree

6 files changed

+281
-13
lines changed

6 files changed

+281
-13
lines changed

driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818

1919

2020
import com.mongodb.MongoException;
21+
import com.mongodb.ReadPreference;
22+
import com.mongodb.assertions.Assertions;
2123
import com.mongodb.internal.TimeoutContext;
2224
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
2325
import com.mongodb.internal.async.SingleResultCallback;
26+
import com.mongodb.internal.binding.AsyncConnectionSource;
2427
import com.mongodb.internal.binding.AsyncReadBinding;
2528
import com.mongodb.internal.connection.OperationContext;
2629
import com.mongodb.lang.NonNull;
@@ -232,8 +235,9 @@ private void retryOperation(final AsyncBlock asyncBlock,
232235
} else {
233236
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken,
234237
assertNotNull(source).getServerDescription().getMaxWireVersion());
235-
source.release();
236-
changeStreamOperation.executeAsync(binding, operationContext, (asyncBatchCursor, t1) -> {
238+
// We wrap the binding so that the selected AsyncConnectionSource is reused, preventing redundant server selection.
239+
// Consequently, the same AsyncConnectionSource remains pinned to the resulting AsyncCommandCursor.
240+
changeStreamOperation.executeAsync(new AsyncSourceAwareReadBinding(source, binding), operationContext, (asyncBatchCursor, t1) -> {
237241
if (t1 != null) {
238242
callback.onResult(null, t1);
239243
} else {
@@ -242,6 +246,7 @@ private void retryOperation(final AsyncBlock asyncBlock,
242246
operationContext);
243247
} finally {
244248
try {
249+
source.release();
245250
binding.release(); // release the new change stream batch cursor's reference to the binding
246251
} finally {
247252
resumeableOperation(asyncBlock, callback, operationContext, tryNext);
@@ -252,5 +257,51 @@ private void retryOperation(final AsyncBlock asyncBlock,
252257
}
253258
});
254259
}
260+
261+
/**
262+
* Does not retain wrapped {@link AsyncReadBinding} as it serves as a wrapper only.
263+
*/
264+
private static class AsyncSourceAwareReadBinding implements AsyncReadBinding {
265+
private final AsyncConnectionSource source;
266+
private final AsyncReadBinding binding;
267+
268+
AsyncSourceAwareReadBinding(final AsyncConnectionSource source, final AsyncReadBinding binding) {
269+
this.source = source;
270+
this.binding = binding;
271+
}
272+
273+
@Override
274+
public ReadPreference getReadPreference() {
275+
return binding.getReadPreference();
276+
}
277+
278+
@Override
279+
public void getReadConnectionSource(final OperationContext operationContext, final SingleResultCallback<AsyncConnectionSource> callback) {
280+
source.retain();
281+
callback.onResult(source, null);
282+
}
283+
284+
@Override
285+
public void getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference,
286+
final OperationContext operationContext,
287+
final SingleResultCallback<AsyncConnectionSource> callback) {
288+
throw Assertions.fail();
289+
}
290+
291+
@Override
292+
public AsyncReadBinding retain() {
293+
return binding.retain();
294+
}
295+
296+
@Override
297+
public int release() {
298+
return binding.release();
299+
}
300+
301+
@Override
302+
public int getCount() {
303+
return binding.getCount();
304+
}
305+
}
255306
}
256307

driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
import com.mongodb.MongoChangeStreamException;
2020
import com.mongodb.MongoException;
2121
import com.mongodb.MongoOperationTimeoutException;
22+
import com.mongodb.ReadPreference;
2223
import com.mongodb.ServerAddress;
2324
import com.mongodb.ServerCursor;
25+
import com.mongodb.assertions.Assertions;
2426
import com.mongodb.internal.TimeoutContext;
27+
import com.mongodb.internal.binding.ConnectionSource;
2528
import com.mongodb.internal.binding.ReadBinding;
2629
import com.mongodb.internal.connection.OperationContext;
2730
import com.mongodb.lang.Nullable;
@@ -251,9 +254,11 @@ private void resumeChangeStream(final OperationContext operationContext) {
251254
wrapped.close(operationContextWithDefaultMaxTime);
252255
withReadConnectionSource(binding, operationContext, (source, operationContextWithMinRtt) -> {
253256
changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion());
257+
// We wrap the binding so that the selected ConnectionSource is reused, preventing redundant server selection.
258+
// Consequently, the same ConnectionSource remains pinned to the resulting CommandCursor.
259+
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(new SourceAwareReadBinding(source, binding), operationContextWithDefaultMaxTime)).getWrapped();
254260
return null;
255261
});
256-
wrapped = ((ChangeStreamBatchCursor<T>) changeStreamOperation.execute(binding, operationContextWithDefaultMaxTime)).getWrapped();
257262
binding.release(); // release the new change stream batch cursor's reference to the binding
258263
}
259264

@@ -264,4 +269,48 @@ private boolean hasPreviousNextTimedOut() {
264269
private static boolean isTimeoutException(final Throwable exception) {
265270
return exception instanceof MongoOperationTimeoutException;
266271
}
272+
273+
/**
274+
* Does not retain wrapped {link @ReadBinding} as it serves as a wrapper only.
275+
*/
276+
private static class SourceAwareReadBinding implements ReadBinding {
277+
private final ConnectionSource source;
278+
private final ReadBinding binding;
279+
280+
SourceAwareReadBinding(final ConnectionSource source, final ReadBinding binding) {
281+
this.source = source;
282+
this.binding = binding;
283+
}
284+
285+
@Override
286+
public ReadPreference getReadPreference() {
287+
return binding.getReadPreference();
288+
}
289+
290+
@Override
291+
public ConnectionSource getReadConnectionSource(final OperationContext ignored) {
292+
source.retain();
293+
return source;
294+
}
295+
296+
@Override
297+
public ConnectionSource getReadConnectionSource(final int minWireVersion, final ReadPreference fallbackReadPreference, final OperationContext ignored) {
298+
throw Assertions.fail();
299+
}
300+
301+
@Override
302+
public int getCount() {
303+
return binding.getCount();
304+
}
305+
306+
@Override
307+
public ReadBinding retain() {
308+
return binding.retain();
309+
}
310+
311+
@Override
312+
public int release() {
313+
return binding.release();
314+
}
315+
}
267316
}

driver-core/src/test/unit/com/mongodb/internal/operation/ChangeStreamBatchCursorTest.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import static org.junit.jupiter.api.Assertions.assertThrows;
5454
import static org.junit.jupiter.api.Assertions.assertTrue;
5555
import static org.mockito.ArgumentMatchers.any;
56-
import static org.mockito.ArgumentMatchers.eq;
5756
import static org.mockito.Mockito.atLeastOnce;
5857
import static org.mockito.Mockito.clearInvocations;
5958
import static org.mockito.Mockito.doNothing;
@@ -216,7 +215,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() {
216215
verify(newCursor).next(operationContextCaptor.capture()));
217216
verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion);
218217
verify(changeStreamOperation, times(1)).getDecoder();
219-
verify(changeStreamOperation, times(1)).execute(eq(readBinding), any());
218+
verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any());
220219
verifyNoMoreInteractions(changeStreamOperation);
221220
verify(newCursor, times(1)).next(any());
222221
verify(newCursor, atLeastOnce()).getPostBatchResumeToken();
@@ -245,7 +244,7 @@ void shouldResumeOnlyOnceOnSubsequentCallsAfterTimeoutError() {
245244
void shouldPropagateAnyErrorsOccurredInAggregateOperation() {
246245
when(cursor.next(any())).thenThrow(new MongoOperationTimeoutException("timeout"));
247246
MongoNotPrimaryException resumableError = new MongoNotPrimaryException(new BsonDocument(), new ServerAddress());
248-
when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow(resumableError);
247+
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow(resumableError);
249248

250249
ChangeStreamBatchCursor<Document> cursor = createChangeStreamCursor();
251250
//when
@@ -272,12 +271,12 @@ void shouldResumeAfterTimeoutInAggregateOnNextCall() {
272271
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding);
273272

274273
//second next operation times out on resume attempt when creating change stream
275-
when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow(
274+
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow(
276275
new MongoOperationTimeoutException("timeout during resumption"));
277276
assertThrows(MongoOperationTimeoutException.class, cursor::next);
278-
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation);
277+
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding);
279278

280-
doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(eq(readBinding), any());
279+
doReturn(newChangeStreamCursor).when(changeStreamOperation).execute(any(ReadBinding.class), any());
281280

282281
//when third operation succeeds to resume and call next
283282
sleep(TIMEOUT_CONSUMPTION_SLEEP_MS);
@@ -308,7 +307,7 @@ void shouldCloseChangeStreamWhenResumeOperationFailsDueToNonTimeoutError() {
308307
clearInvocations(this.cursor, newCursor, timeoutContext, changeStreamOperation, readBinding);
309308

310309
//when second next operation errors on resume attempt when creating change stream
311-
when(changeStreamOperation.execute(eq(readBinding), any())).thenThrow(
310+
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenThrow(
312311
new MongoNotPrimaryException(new BsonDocument(), new ServerAddress()));
313312
assertThrows(MongoNotPrimaryException.class, cursor::next);
314313

@@ -344,7 +343,11 @@ private void verifyNoResumeAttemptCalled() {
344343
private void verifyResumeAttemptCalled() {
345344
verify(cursor, times(1)).close(any());
346345
verify(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion);
347-
verify(changeStreamOperation, times(1)).execute(eq(readBinding), any());
346+
verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any());
347+
verifyNoMoreInteractions(cursor);
348+
verify(changeStreamOperation, times(1)).execute(any(ReadBinding.class), any());
349+
// Verify server selection is done once for the resume attempt.
350+
verify(readBinding, times(1)).getReadConnectionSource(any());
348351
verifyNoMoreInteractions(cursor);
349352
}
350353

@@ -394,10 +397,14 @@ void setUp() {
394397
changeStreamOperation = mock(ChangeStreamOperation.class);
395398
when(changeStreamOperation.getDecoder()).thenReturn(new DocumentCodec());
396399
doNothing().when(changeStreamOperation).setChangeStreamOptionsForResume(resumeToken, maxWireVersion);
397-
when(changeStreamOperation.execute(eq(readBinding), any())).thenReturn(newChangeStreamCursor);
400+
when(changeStreamOperation.execute(any(ReadBinding.class), any())).thenAnswer(invocation -> {
401+
ReadBinding binding = invocation.getArgument(0);
402+
OperationContext operationContext = invocation.getArgument(1);
403+
binding.getReadConnectionSource(operationContext);
404+
return newChangeStreamCursor;
405+
});
398406
}
399407

400-
401408
private void assertTimeoutWasRefreshedForOperation(final TimeoutContext timeoutContextUsedForOperation) {
402409
assertNotNull(timeoutContextUsedForOperation.getTimeout(), "TimeoutMs was not set");
403410
timeoutContextUsedForOperation.getTimeout().run(TimeUnit.MILLISECONDS, () -> {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.reactivestreams.client;
18+
19+
import com.mongodb.MongoClientSettings;
20+
import com.mongodb.client.AbstractChangeSteamFunctionalTest;
21+
import com.mongodb.client.MongoClient;
22+
import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient;
23+
24+
public class ChangeStreamFunctionalTest extends AbstractChangeSteamFunctionalTest {
25+
@Override
26+
protected MongoClient createMongoClient(final MongoClientSettings mongoClientSettings) {
27+
return new SyncMongoClient(mongoClientSettings);
28+
}
29+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client;
18+
19+
import com.mongodb.ClusterFixture;
20+
import com.mongodb.MongoClientSettings;
21+
import com.mongodb.MongoNamespace;
22+
import com.mongodb.client.model.changestream.ChangeStreamDocument;
23+
import com.mongodb.client.test.CollectionHelper;
24+
import org.bson.BsonDocument;
25+
import org.bson.BsonTimestamp;
26+
import org.bson.Document;
27+
import org.bson.codecs.BsonDocumentCodec;
28+
import org.junit.jupiter.api.AfterEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.time.Instant;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
34+
import static com.mongodb.client.Fixture.getDefaultDatabaseName;
35+
import static java.lang.String.format;
36+
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
import static org.junit.jupiter.api.Assertions.assertNotNull;
38+
import static org.junit.jupiter.api.Assumptions.assumeTrue;
39+
40+
/**
41+
* The {@link ChangeStreamProseTest}, which is defined only for sync driver, should be migrated to this class.
42+
* Once this done, this class should be renamed to ChangeStreamProseTest.
43+
*/
44+
public abstract class AbstractChangeSteamFunctionalTest {
45+
46+
private static final String FAIL_COMMAND_NAME = "failCommand";
47+
private static final MongoNamespace NAMESPACE = new MongoNamespace(getDefaultDatabaseName(), "test");
48+
private final CollectionHelper<BsonDocument> collectionHelper = new CollectionHelper<>(new BsonDocumentCodec(), NAMESPACE);
49+
50+
protected abstract MongoClient createMongoClient(MongoClientSettings mongoClientSettings);
51+
52+
@Test
53+
public void shouldDoOneServerSelectionForResumeAttempt() {
54+
//given
55+
assumeTrue(ClusterFixture.isDiscoverableReplicaSet());
56+
AtomicInteger serverSelectionCounter = new AtomicInteger();
57+
BsonTimestamp startTime = new BsonTimestamp((int) Instant.now().getEpochSecond(), 0);
58+
try (MongoClient mongoClient = createMongoClient(Fixture.getMongoClientSettingsBuilder()
59+
.applyToClusterSettings(builder -> builder.serverSelector(clusterDescription -> {
60+
serverSelectionCounter.incrementAndGet();
61+
return clusterDescription.getServerDescriptions();
62+
})).build())) {
63+
64+
MongoCollection<Document> collection = mongoClient
65+
.getDatabase(NAMESPACE.getDatabaseName())
66+
.getCollection(NAMESPACE.getCollectionName());
67+
68+
collectionHelper.runAdminCommand("{"
69+
+ " configureFailPoint: \"" + FAIL_COMMAND_NAME + "\","
70+
+ " mode: {"
71+
+ " times: 1"
72+
+ " },"
73+
+ " data: {"
74+
+ " failCommands: ['getMore'],"
75+
+ " errorCode: 9001,"
76+
+ " errorLabels: ['ResumableChangeStreamError']"
77+
+ " }"
78+
+ "}");
79+
// We insert document here, because async cursor performs aggregate and getMore right after we call cursor()
80+
collection.insertOne(Document.parse("{ x: 1 }"));
81+
serverSelectionCounter.set(0);
82+
83+
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = collection.watch()
84+
.batchSize(0)
85+
.startAtOperationTime(startTime)
86+
.cursor()) {
87+
88+
//when
89+
ChangeStreamDocument<Document> changeStreamDocument = cursor.next();
90+
//then
91+
assertNotNull(changeStreamDocument);
92+
int actualCountOfServerSelections = serverSelectionCounter.get();
93+
assertEquals(2, actualCountOfServerSelections,
94+
format("Expected 2 server selections (initial aggregate command + resume attempt aggregate command), but there were %s",
95+
actualCountOfServerSelections));
96+
}
97+
}
98+
}
99+
100+
@AfterEach
101+
public void tearDown() throws InterruptedException {
102+
ClusterFixture.disableFailPoint(FAIL_COMMAND_NAME);
103+
collectionHelper.drop();
104+
}
105+
}

0 commit comments

Comments
 (0)