@@ -50,7 +50,29 @@ static final boolean willTrace(){
50
50
static final void trace ( String s ){
51
51
TRACE_LOGGER .log ( TRACE_LEVEL , s );
52
52
}
53
-
53
+
54
+ static int chooseBatchSize (int batchSize , int limit , int fetched ) {
55
+ int bs = Math .abs (batchSize );
56
+ int remaining = limit > 0 ? limit - fetched : 0 ;
57
+ int res = 0 ;
58
+ if (bs == 0 && remaining > 0 )
59
+ res = remaining ;
60
+ else if (bs > 0 && remaining == 0 )
61
+ res = bs ;
62
+ else
63
+ res = Math .min (bs , remaining );
64
+
65
+ if (batchSize < 0 ) {
66
+ // force close
67
+ res = -res ;
68
+ }
69
+
70
+ if (res == 1 ) {
71
+ // optimization: use negative batchsize to close cursor
72
+ res = -1 ;
73
+ }
74
+ return res ;
75
+ }
54
76
55
77
/**
56
78
* @param mongo the Mongo instance
@@ -260,15 +282,15 @@ public WriteResult remove( DBObject o , com.mongodb.WriteConcern concern )
260
282
}
261
283
262
284
@ Override
263
- Iterator <DBObject > __find ( DBObject ref , DBObject fields , int numToSkip , int batchSize , int options )
285
+ Iterator <DBObject > __find ( DBObject ref , DBObject fields , int numToSkip , int batchSize , int limit , int options )
264
286
throws MongoException {
265
287
266
288
if ( ref == null )
267
289
ref = new BasicDBObject ();
268
290
269
291
if ( willTrace () ) trace ( "find: " + _fullNameSpace + " " + JSON .serialize ( ref ) );
270
292
271
- OutMessage query = OutMessage .query ( _mongo , options , _fullNameSpace , numToSkip , batchSize , ref , fields );
293
+ OutMessage query = OutMessage .query ( _mongo , options , _fullNameSpace , numToSkip , chooseBatchSize ( batchSize , limit , 0 ) , ref , fields );
272
294
273
295
Response res = _connector .call ( _db , this , query , null , 2 );
274
296
@@ -282,7 +304,7 @@ Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int
282
304
throw e ;
283
305
}
284
306
285
- return new Result ( this , res , batchSize , options );
307
+ return new Result ( this , res , batchSize , limit , options );
286
308
}
287
309
288
310
@ Override
@@ -329,28 +351,34 @@ public void createIndex( final DBObject keys, final DBObject options )
329
351
330
352
class Result implements Iterator <DBObject > {
331
353
332
- Result ( MyCollection coll , Response res , int numToReturn , int options ){
333
- init ( res );
354
+ Result ( MyCollection coll , Response res , int batchSize , int limit , int options ){
334
355
_collection = coll ;
335
- _numToReturn = numToReturn ;
356
+ _batchSize = batchSize ;
357
+ _limit = limit ;
336
358
_options = options ;
337
359
_host = res ._host ;
360
+ init ( res );
338
361
}
339
362
340
363
private void init ( Response res ){
341
364
_totalBytes += res ._len ;
342
365
_curResult = res ;
343
366
_cur = res .iterator ();
344
367
_sizes .add ( res .size () );
368
+ _numFetched += res .size ();
345
369
346
370
if ( ( res ._flags & Bytes .RESULTFLAG_CURSORNOTFOUND ) > 0 ){
347
371
throw new MongoException .CursorNotFound ();
348
372
}
373
+
374
+ if (res ._cursor > 0 && _limit > 0 && _limit - _numFetched <= 0 ) {
375
+ // fetched all docs within limit, close cursor server-side
376
+ killCursor ();
377
+ }
349
378
}
350
379
351
380
public DBObject next (){
352
381
if ( _cur .hasNext () ) {
353
- _numSeen ++;
354
382
return _cur .next ();
355
383
}
356
384
@@ -382,7 +410,7 @@ private void _advance(){
382
410
383
411
m .writeInt ( 0 );
384
412
m .writeCString ( _collection ._fullNameSpace );
385
- m .writeInt ( _numToReturn - _numSeen ); // num to return
413
+ m .writeInt ( chooseBatchSize ( _batchSize , _limit , _numFetched ) );
386
414
m .writeLong ( _curResult .cursor () );
387
415
388
416
Response res = _connector .call ( DBApiLayer .this , _collection , m , _host );
@@ -394,12 +422,12 @@ public void remove(){
394
422
throw new RuntimeException ( "can't remove this way" );
395
423
}
396
424
397
- public int getNumberToReturn (){
398
- return _numToReturn ;
425
+ public int getBatchSize (){
426
+ return _batchSize ;
399
427
}
400
428
401
- public void setNumberToReturn (int num ){
402
- _numToReturn = num ;
429
+ public void setBatchSize (int size ){
430
+ _batchSize = size ;
403
431
}
404
432
405
433
public String toString (){
@@ -439,39 +467,47 @@ List<Integer> getSizes(){
439
467
void close (){
440
468
// not perfectly thread safe here, may need to use an atomicBoolean
441
469
if (_curResult != null ) {
442
- long curId = _curResult . cursor ();
470
+ killCursor ();
443
471
_curResult = null ;
444
472
_cur = null ;
445
-
446
- if (curId > 0 ) {
447
- List <Long > l = new ArrayList <Long >();
448
- l .add (curId );
449
-
450
- try {
451
- killCursors (_host , l );
452
- } catch (Throwable t ) {
453
- Bytes .LOGGER .log (Level .WARNING , "can't clean 1 cursor" , t );
454
- _deadCursorIds .add (new DeadCursor (curId , _host ));
455
- }
456
- }
457
473
}
458
474
}
459
475
476
+ void killCursor () {
477
+ if (_curResult == null )
478
+ return ;
479
+ long curId = _curResult .cursor ();
480
+ if (curId < 0 )
481
+ return ;
482
+
483
+ List <Long > l = new ArrayList <Long >();
484
+ l .add (curId );
485
+
486
+ try {
487
+ killCursors (_host , l );
488
+ } catch (Throwable t ) {
489
+ Bytes .LOGGER .log (Level .WARNING , "can't clean 1 cursor" , t );
490
+ _deadCursorIds .add (new DeadCursor (curId , _host ));
491
+ }
492
+ _curResult ._cursor = 0 ;
493
+ }
494
+
460
495
public ServerAddress getServerAddress () {
461
496
return _host ;
462
497
}
463
498
464
499
Response _curResult ;
465
500
Iterator <DBObject > _cur ;
466
- int _numToReturn ;
501
+ int _batchSize ;
502
+ int _limit ;
467
503
final MyCollection _collection ;
468
504
final int _options ;
469
505
final ServerAddress _host ; // host where first went. all subsequent have to go there
470
506
471
507
private long _totalBytes = 0 ;
472
508
private int _numGetMores = 0 ;
473
509
private List <Integer > _sizes = new ArrayList <Integer >();
474
- private int _numSeen = 0 ;
510
+ private int _numFetched = 0 ;
475
511
476
512
} // class Result
477
513
0 commit comments