@@ -7,9 +7,16 @@ import SafetyStorage from './SafetyStorage';
77// eslint-disable-next-line no-unused-vars
88const { log, debug, warn, error : logError } = newLogger ( 'PathwaysDataBuffer' ) ;
99
10- const DefaultBufferSize = 50 ;
10+ // const Verbose = true;
11+ const Verbose = false ;
12+
13+ const DefaultBufferSize = 10 ;
1114const DefaultBufferFlushTime = 2 * 60 * 1000 ; // 2 minutes
15+ // const DefaultBufferFlushTime = 1 * 1000; // 1 second
1216
17+ /**
18+ * @template T
19+ */
1320export default class PathwaysDataBuffer {
1421 constructor ( sessionId , collectionName ) {
1522 const storageKeyName = `dbux.pathwaysDataBuffer.${ collectionName } ` ;
@@ -26,11 +33,14 @@ export default class PathwaysDataBuffer {
2633 return this . buffer . get ( ) || [ ] ;
2734 }
2835
36+ /**
37+ * @param {T[] } entries
38+ */
2939 async add ( entries ) {
3040 await this . buffer . acquireLock ( ) ;
3141
3242 try {
33- let buffer = this . safeGetBuffer ( ) ;
43+ const buffer = this . safeGetBuffer ( ) ;
3444 buffer . push ( ...entries ) ;
3545 await this . buffer . set ( buffer ) ;
3646 }
@@ -41,40 +51,47 @@ export default class PathwaysDataBuffer {
4151
4252 async maybeFlush ( ) {
4353 if ( ! this . _flushing && ( this . safeGetBuffer ( ) . length >= DefaultBufferSize || Date . now ( ) - this . _previousFlushTime >= DefaultBufferFlushTime ) ) {
44- await this . forceFlush ( ) ;
54+ await this . flush ( ) ;
4555 }
4656 }
4757
48- async forceFlush ( ) {
49- this . _flushing = true ;
50- await this . _flush ( ) ;
51- }
52-
53- async _flush ( ) {
58+ async flush ( ) {
59+ Verbose && log ( `Flushing collection ${ this . collectionName } ` ) ;
5460 this . _flushing = true ;
55- await this . buffer . acquireLock ( ) ;
5661
57- let buffer ;
5862 try {
59- buffer = this . safeGetBuffer ( ) ;
60- await this . buffer . set ( [ ] ) ;
61- }
62- finally {
63- this . buffer . releaseLock ( ) ;
64- }
63+ await this . buffer . acquireLock ( ) ;
6564
66- try {
67- await this . addDoc ( buffer ) ;
65+ let buffer ;
66+ try {
67+ buffer = this . safeGetBuffer ( ) ;
68+ await this . buffer . set ( [ ] ) ;
69+ }
70+ finally {
71+ this . buffer . releaseLock ( ) ;
72+ }
73+
74+ try {
75+ await this . addDoc ( buffer ) ;
76+ }
77+ catch ( err ) {
78+ throw new NestedError ( `Failed when flushing` , err ) ;
79+ }
80+
81+ this . _previousFlushTime = Date . now ( ) ;
6882 }
69- catch ( err ) {
83+ finally {
7084 this . _flushing = false ;
71- throw new NestedError ( `Failed when flushing` , err ) ;
7285 }
73-
74- this . _previousFlushTime = Date . now ( ) ;
7586 }
7687
7788 async addDoc ( entries ) {
78- return await uploadPathways ( this . sessionId , this . collectionName , entries ) ;
89+ if ( entries . length ) {
90+ Verbose && log ( `Uploading ${ entries . length } "${ this . collectionName } " of session "${ this . sessionId } "` ) ;
91+ return await uploadPathways ( this . sessionId , this . collectionName , entries ) ;
92+ }
93+ else {
94+ return null ;
95+ }
7996 }
8097}
0 commit comments