1+
2+ import { newLogger } from '@dbux/common/src/log/logger' ;
3+ import NestedError from '@dbux/common/src/NestedError' ;
4+ import { uploadPathways } from '@dbux/projects/src/firestore/upload' ;
5+ import SafetyStorage from './SafetyStorage' ;
6+
7+ // eslint-disable-next-line no-unused-vars
8+ const { log, debug, warn, error : logError } = newLogger ( 'PathwaysDataBuffer' ) ;
9+
10+ const DefaultBufferSize = 50 ;
11+ const DefaultBufferFlushTime = 2 * 60 * 1000 ; // 2 minutes
12+
13+ export default class PathwaysDataBuffer {
14+ constructor ( sessionId , collectionName ) {
15+ const storageKeyName = `dbux.pathwaysDataBuffer.${ collectionName } ` ;
16+ this . buffer = new SafetyStorage ( storageKeyName ) ;
17+ this . sessionId = sessionId ;
18+ this . collectionName = collectionName ;
19+ this . _previousFlushTime = Date . now ( ) ;
20+ }
21+
22+ /**
23+ * @return {Array }
24+ */
25+ safeGetBuffer ( ) {
26+ return this . buffer . get ( ) || [ ] ;
27+ }
28+
29+ async add ( entries ) {
30+ await this . buffer . acquireLock ( ) ;
31+
32+ try {
33+ let buffer = this . safeGetBuffer ( ) ;
34+ buffer . push ( ...entries ) ;
35+ await this . buffer . set ( buffer ) ;
36+ }
37+ finally {
38+ this . buffer . releaseLock ( ) ;
39+ }
40+ }
41+
42+ async maybeFlush ( ) {
43+ if ( ! this . _flushing && ( this . safeGetBuffer ( ) . length >= DefaultBufferSize || Date . now ( ) - this . _previousFlushTime >= DefaultBufferFlushTime ) ) {
44+ await this . forceFlush ( ) ;
45+ }
46+ }
47+
48+ async forceFlush ( ) {
49+ this . _flushing = true ;
50+ await this . _flush ( ) ;
51+ }
52+
53+ async _flush ( ) {
54+ this . _flushing = true ;
55+ await this . buffer . acquireLock ( ) ;
56+
57+ let buffer ;
58+ try {
59+ buffer = this . safeGetBuffer ( ) ;
60+ await this . buffer . set ( [ ] ) ;
61+ }
62+ finally {
63+ this . buffer . releaseLock ( ) ;
64+ }
65+
66+ try {
67+ await this . addDoc ( buffer ) ;
68+ }
69+ catch ( err ) {
70+ this . _flushing = false ;
71+ throw new NestedError ( `Failed when flushing` , err ) ;
72+ }
73+
74+ this . _previousFlushTime = Date . now ( ) ;
75+ }
76+
77+ async addDoc ( entries ) {
78+ return await uploadPathways ( this . sessionId , this . collectionName , entries ) ;
79+ }
80+ }
0 commit comments