Skip to content

Conversation

@LeftHandCold
Copy link
Contributor

@LeftHandCold LeftHandCold commented Nov 6, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22296

What this PR does / why we need it:

GC support CDC


PR Type

Enhancement


Description

  • Add CDC (Change Data Capture) support to garbage collection system

  • Implement CDC watermark tracking for database-level protection

  • Extend GC filtering logic to protect CDC-related objects

  • Add comprehensive test coverage for CDC metadata handling


Diagram Walkthrough

flowchart LR
  A["CDC Watermark Table"] -->|GetCDC| B["SnapshotMeta"]
  B -->|CDCTables| C["checkpointCleaner"]
  C -->|cdcWatermarks| D["GCWindow"]
  D -->|ExecuteGlobalCheckpointBasedGC| E["CheckpointBasedGCJob"]
  E -->|MakeSnapshotAndPitrFineFilter| F["GC Filter Logic"]
  F -->|Protect Objects| G["CDC Database Objects"]
Loading

File Walkthrough

Relevant files
Enhancement
checkpoint.go
Add CDC watermark retrieval and passing to GC execution   

pkg/vm/engine/tae/db/gc/v3/checkpoint.go

  • Add cdcTablesFunc optional field for testing CDC table retrieval
  • Implement GetCDCsLocked() method to fetch CDC data from snapshot
    metadata
  • Implement CDCTables() method with fallback to actual CDC data
    retrieval
  • Pass cdcWatermarks parameter to ExecuteGlobalCheckpointBasedGC() calls
  • Add comment explaining CDC DB updates from watermark table data
+34/-0   
exec_v1.go
Integrate CDC watermarks into GC job filtering logic         

pkg/vm/engine/tae/db/gc/v3/exec_v1.go

  • Add cdcWatermarks field to CheckpointBasedGCJob struct
  • Update NewCheckpointBasedGCJob() constructor to accept CDC watermarks
  • Pass cdcWatermarks and file service parameters to
    MakeSnapshotAndPitrFineFilter()
  • Implement CDC protection logic in fine filter for both snapshot and
    non-snapshot objects
  • Check CDC database membership and protect CNCreated/Appendable objects
    based on watermark timestamps
+51/-0   
types.go
Add CDC tables method to Cleaner interface                             

pkg/vm/engine/tae/db/gc/v3/types.go

  • Add CDCTables() method to Cleaner interface
  • Method returns map of database IDs to CDC watermark timestamps
+1/-0     
window.go
Pass CDC watermarks through GC window execution                   

pkg/vm/engine/tae/db/gc/v3/window.go

  • Add cdcWatermarks parameter to ExecuteGlobalCheckpointBasedGC() method
    signature
  • Pass CDC watermarks to NewCheckpointBasedGCJob() constructor
+2/-0     
snapshot.go
Add CDC watermark metadata tracking and extraction             

pkg/vm/engine/tae/logtail/snapshot.go

  • Add CdcTidIdx constant to table info type enumeration
  • Define CDC watermark schema column constants (account ID, task ID, db
    name, table name, watermark, error message)
  • Introduce specialTableInfo struct to handle PITR and CDC table data
    uniformly
  • Refactor PITR handling to use new specialTableInfo structure
  • Add CDC table support with cdc field in SnapshotMeta
  • Implement GetCDC() method to extract database-level minimum watermarks
    from CDC records
  • Add GetTableIDToDBIDMap() method for efficient table-to-database ID
    mapping
  • Update metadata persistence and rebuild logic to handle CDC tables
  • Add RebuildCdc() method for restoring CDC table IDs from persisted
    metadata
+316/-26
Tests
mock_cleaner.go
Add CDC tables method to mock cleaner                                       

pkg/vm/engine/tae/db/gc/v3/mock_cleaner.go

  • Add CDCTables() method stub to MockCleaner interface implementation
  • Return nil watermarks and nil error for testing purposes
+4/-0     
db_test.go
Add comprehensive CDC metadata integration test                   

pkg/vm/engine/tae/db/test/db_test.go

  • Add comprehensive TestCdcMeta() test function with 400+ lines
  • Create CDC watermark table schema matching mo_cdc_watermark structure
  • Set up multiple test databases with CDC records at different watermark
    timestamps
  • Test minimum watermark selection logic per database
  • Verify CDC watermark persistence across database restart
  • Test dynamic watermark updates with new CDC records
+396/-0 
Formatting
snapshot_test.go
Fix import ordering in snapshot test                                         

pkg/vm/engine/tae/logtail/snapshot_test.go

  • Reorder imports to follow Go conventions (standard library first, then
    external packages)
+1/-1     

@qodo-merge-pro
Copy link

qodo-merge-pro bot commented Nov 6, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🟡
🎫 #22296
🟢 Implement support so CDC (and ISCP) are respected in GC logic by using appropriate
watermarks.
Provide necessary internal plumbing to discover CDC-related retention points from
metadata.
Ensure functionality is covered by tests validating CDC watermarks’ effect.
🔴 Use compacted PITR instead of standard PITR for ISCP data retention.
Compacted PITR should retain CN-created and appendable objects for a configurable
duration, pinning fewer objects while allowing long durations.
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: Passed

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Audit logging: New CDC-related operations and GC decisions add minimal Info/Warn logs without consistent
structure or complete context (e.g., user/task identifiers), so it is unclear if critical
CDC/GC actions are fully auditable.

Referred Code
					zap.String("ts", pitrTS.ToString()),
				)
			}
		}
	}
	return pitrInfo, nil
}

func (sm *SnapshotMeta) GetCDC(
	ctx context.Context,
	sid string,
	fs fileservice.FileService,
	mp *mpool.MPool,
) (map[uint64]types.TS, error) {
	idxes := []uint16{ColCdcAccountId, ColCdcDbName, ColCdcTableName, ColCdcWatermark}

	sm.RLock()
	cdcClone := sm.cdc.clone()
	tablePKIndexClone := make(map[string][]*tableInfo)
	for k, v := range sm.tablePKIndex {
		tablePKIndexClone[k] = v


 ... (clipped 69 lines)
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Error handling: Several new paths return errors upward without contextual wrapping or logging (e.g., CDC
processing and fine filter creation), which may hinder debugging of edge cases like empty
CDC maps or decode failures.

Referred Code
					zap.String("ts", pitrTS.ToString()),
				)
			}
		}
	}
	return pitrInfo, nil
}

func (sm *SnapshotMeta) GetCDC(
	ctx context.Context,
	sid string,
	fs fileservice.FileService,
	mp *mpool.MPool,
) (map[uint64]types.TS, error) {
	idxes := []uint16{ColCdcAccountId, ColCdcDbName, ColCdcTableName, ColCdcWatermark}

	sm.RLock()
	cdcClone := sm.cdc.clone()
	tablePKIndexClone := make(map[string][]*tableInfo)
	for k, v := range sm.tablePKIndex {
		tablePKIndexClone[k] = v


 ... (clipped 86 lines)
Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status:
Log content: Info/Warn logs added for CDC processing may log database and table names and watermarks,
which could be sensitive in some environments; verify this complies with logging policy
and redaction requirements.

Referred Code
		zap.Error(err),
		zap.Uint64("account-id", accountID),
		zap.String("db-name", dbName),
		zap.String("table-name", tableName))
	return nil
}
pk := tuple.ErrString(nil)

// Find tableInfo from tablePKIndex
if tInfos, ok := tablePKIndexClone[pk]; ok && len(tInfos) > 0 {
	tableInfo := tInfos[0]
	dbID := tableInfo.dbID

	// For the same dbID, take the smallest TS
	existingTS := dbWatermarks[dbID]
	if existingTS.IsEmpty() || cdcTS.LT(&existingTS) {
		dbWatermarks[dbID] = cdcTS
	}

	logutil.Info(
		"GC-GetCDC",


 ... (clipped 6 lines)
  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-merge-pro
Copy link

qodo-merge-pro bot commented Nov 6, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix incorrect CDC protection logic
Suggestion Impact:The commit adjusted the CDC protection conditions to first protect objects with empty drop timestamps and modified the logic around dropTS and createTS checks, aligning with the intent to avoid premature GC. Although not exactly the same condition as suggested, it implements the protection for non-dropped objects and changes the logic flow accordingly.

code diff:

-									if (!entry.dropTS.IsEmpty() && entry.dropTS.LT(&cdcTS)) ||
+									if entry.dropTS.IsEmpty() {
+										continue
+									}
+									if entry.dropTS.LT(&cdcTS) ||
 										entry.createTS.GT(&cdcTS) {
 										// Protect this object
 										continue
@@ -464,7 +467,10 @@
 						// This table is in a CDC database
 						// For CNCreated or Appendable objects, check if we should protect them
 						if stats.GetCNCreated() || stats.GetAppendable() {
-							if (!deleteTS.IsEmpty() && deleteTS.LT(&cdcTS)) ||
+							if deleteTS.IsEmpty() {
+								continue
+							}
+							if deleteTS.LT(&cdcTS) ||
 								createTS.GT(&cdcTS) {
 								// Protect this object
 								continue

Correct the CDC protection logic to prevent premature garbage collection of
objects. The current logic incorrectly protects objects dropped before the CDC
watermark and fails to protect objects dropped after it.

pkg/vm/engine/tae/db/gc/v3/exec_v1.go [417-440]

 					if !logtail.ObjectIsSnapshotRefers(
 						entry.stats, pitr, &entry.createTS, &entry.dropTS, sp,
 					) {
 						// Check CDC logic similar to ISCP
 						if cdcWatermarks == nil {
 							bm.Add(uint64(i))
 							continue
 						}
 						// Get dbID from tableID
 						if dbID, ok := tableIDToDBID[tableID]; ok {
 							if cdcTS, ok := cdcWatermarks[dbID]; ok {
 								// This table is in a CDC database
 								// For CNCreated or Appendable objects, check if we should protect them
 								if entry.stats.GetCNCreated() || entry.stats.GetAppendable() {
-									if (!entry.dropTS.IsEmpty() && entry.dropTS.LT(&cdcTS)) ||
-										entry.createTS.GT(&cdcTS) {
+									// An object is protected if it's not dropped, or dropped after the CDC watermark.
+									if entry.dropTS.IsEmpty() || entry.dropTS.GE(&cdcTS) {
 										// Protect this object
 										continue
 									}
 								}
 							}
 						}
 						bm.Add(uint64(i))
 					}

[Suggestion processed]

Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a critical bug in the CDC garbage collection logic that could lead to premature deletion of data needed by CDC consumers. The proposed fix is accurate and resolves this potential data loss issue.

High
  • Update

This reverts commit 4b24044.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/feature Review effort 4/5 size/L Denotes a PR that changes [500,999] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants