Skip to content

Conversation

cpegeric
Copy link
Contributor

@cpegeric cpegeric commented Sep 5, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #21835

What this PR does / why we need it:

cdc with hnsw

For the design, visit https://github.com/cpegeric/mo-docs/blob/distributed_index/design/mo/sql/20250908-cpegeric-iscp_index_design.md


PR Type

Feature, Enhancement, Tests, Bug fix


Description

Major Feature: Implements Change Data Capture (CDC) for vector indexes (HNSW, IVF) and fulltext indexes with asynchronous processing capabilities
New ASYNC keyword: Adds support for ASYNC flag in index creation statements, enabling asynchronous index building and updates
CDC Infrastructure: Introduces comprehensive CDC task management, SQL writers, and synchronization mechanisms for vector indexes
HNSW Refactoring: Refactors HNSW implementation to use model-based architecture, replacing separate build/search index structures
Enhanced Data Type Support: Enables additional data types in ISCP utilities including JSON, arrays, date/time, and UUID types
Comprehensive Testing: Adds extensive test coverage for CDC operations, async indexing, and vector operations
DDL Integration: Integrates CDC task lifecycle management into DDL operations (CREATE, DROP, ALTER, TRUNCATE)
Bug Fixes: Resolves watermark updater issues and improves error handling across multiple components


Diagram Walkthrough

flowchart LR
  A["SQL DDL Operations"] --> B["CDC Task Manager"]
  B --> C["Index Consumer"]
  C --> D["Vector Index Models"]
  D --> E["HNSW/IVF/Fulltext Indexes"]
  F["ASYNC Keyword Parser"] --> A
  G["CDC SQL Writers"] --> C
  H["Sync Operations"] --> D
Loading

File Walkthrough

Relevant files
Feature
15 files
index_sqlwriter.go
Add vector index SQL writers for CDC operations                   

pkg/iscp/index_sqlwriter.go

• Implements SQL writers for different vector index algorithms (HNSW,
IVFFLAT, Fulltext)
• Provides IndexSqlWriter interface with methods
for Insert, Upsert, Delete operations
• Includes base implementation
and specialized writers for each algorithm type
• Generates SQL
statements for CDC operations on vector index tables

+649/-0 
sync.go
Implement HNSW vector index CDC synchronization                   

pkg/vectorindex/hnsw/sync.go

• Implements HNSW vector index CDC synchronization functionality

Provides CdcSync function to update HNSW index via CDC operations

Includes HnswSync struct for managing index updates and SQL generation

• Handles parallel and sequential update operations for vector data

+676/-0 
model.go
Add HNSW vector index model implementation                             

pkg/vectorindex/hnsw/model.go

• Implements HnswModel struct for managing HNSW vector index models

Provides methods for index operations (Add, Remove, Contains, Search)

• Includes file I/O operations for saving/loading index data

Supports SQL generation for database synchronization

+573/-0 
index_consumer.go
Add IndexConsumer for vector index CDC processing               

pkg/iscp/index_consumer.go

• Adds new IndexConsumer struct implementing CDC (Change Data Capture)
for vector indexes
• Implements consumer interface with methods for
processing insert/delete batches from data streams
• Handles both
snapshot and tail data types with SQL generation and execution

Includes row processing logic for upsert and delete operations with
proper SQL batching

+435/-0 
ddl.go
Integrate CDC task management into DDL operations               

pkg/sql/compile/ddl.go

• Integrates CDC task management into DDL operations (CREATE, DROP,
ALTER, TRUNCATE)
• Adds calls to create/drop index CDC tasks for async
vector indexes
• Includes database-level CDC job cleanup on DROP
DATABASE
• Handles CDC task lifecycle during table operations

+80/-3   
cdc_util.go
Add CDC utilities for vector index management                       

pkg/sql/compile/cdc_util.go

• Implements CDC task management utilities for vector indexes

Provides functions to create/delete PITR (Point-in-Time Recovery) and
CDC tasks
• Includes validation logic for async index types (HNSW,
IVF, fulltext)
• Handles job registration/unregistration with proper
error handling

+273/-0 
build_dml_util.go
Add async index support to DML operations                               

pkg/sql/plan/build_dml_util.go

• Adds async index support to DML operations
• Skips synchronous index
updates for async-enabled indexes
• Updates MultiTableIndex struct to
include IndexAlgoParams
• Implements async flag checking for IVF and
fulltext indexes

+53/-4   
secondary_index_utils.go
Add async parameter support for secondary indexes               

pkg/catalog/secondary_index_utils.go

• Adds Async parameter support for index configurations
• Implements
IsIndexAsync function to check async flag from parameters
• Updates
parameter parsing to handle async flag in index options
• Extends
index parameter string generation with async support

+38/-3   
types.go
Add CDC data structures for vector indexes                             

pkg/vectorindex/types.go

• Adds CDC-related constants and data structures for vector indexes

Implements VectorIndexCdc and VectorIndexCdcEntry types for change
tracking
• Adds HnswCdcParam struct for CDC parameter configuration

Includes JSON serialization support for CDC data structures

+87/-0   
ddl_index_algo.go
Integrate async CDC tasks into index algorithm handling   

pkg/sql/compile/ddl_index_algo.go

• Integrates async CDC task creation for vector indexes
• Adds CDC
task creation for IVF and fulltext indexes when async is enabled

Updates index creation flow to handle async configuration
• Includes
proper error handling and logging for CDC operations

+37/-5   
func_hnsw.go
Add HNSW CDC update function implementation                           

pkg/sql/plan/function/func_hnsw.go

• Implements hnswCdcUpdate function for processing HNSW CDC updates

Handles JSON deserialization of CDC data and parameter validation

Integrates with HNSW sync functionality for applying changes

Includes proper error handling and logging for CDC operations

+77/-0   
sqlexec.go
Add transaction-aware SQL execution utility                           

pkg/vectorindex/sqlexec/sqlexec.go

• Adds RunTxn function for executing SQL within transactions

Provides transaction-aware SQL execution with proper context handling

• Includes account ID and session information propagation
• Supports
batch SQL execution with rollback capabilities

+27/-0   
create.go
Add async flag support to index options                                   

pkg/sql/parsers/tree/create.go

• Adds Async boolean field to IndexOption struct
• Updates format
method to output ASYNC keyword when enabled
• Extends index option
parsing to support async flag
• Maintains backward compatibility with
existing index options

+4/-0     
list_builtIn.go
Register HNSW CDC update as built-in function                       

pkg/sql/plan/function/list_builtIn.go

• Registers new HNSW_CDC_UPDATE built-in function
• Defines function
signature with proper parameter types
• Adds function to supported
built-ins list with overload specification
• Includes proper return
type definition for the function

+21/-0   
consumer.go
Add index sync consumer support to factory                             

pkg/iscp/consumer.go

• Adds support for ConsumerType_IndexSync in consumer factory
• Routes
index sync consumer type to NewIndexConsumer
• Extends consumer
creation logic for vector index CDC
• Maintains existing consumer
types while adding new functionality

+3/-0     
Enhancement
12 files
util.go
Enable additional data type support in ISCP utilities       

pkg/iscp/util.go

• Uncomments and enables support for additional data types in row
extraction
• Adds support for JSON, bit, array types, date/time types,
decimal types, UUID, and other specialized types
• Includes appendHex
function for binary data formatting
• Enhances SQL conversion for NULL
values with proper type casting

+134/-127
alter.go
Integrate ISCP cleanup in ALTER TABLE operations                 

pkg/sql/compile/alter.go

• Adds ISCP job cleanup during ALTER TABLE operations
• Calls
DropAllIndexCdcTasks for temporary tables to prevent unwanted CDC jobs

• Includes fulltext index handling in the reindex process
• Improves
error handling and logging for alter table operations

+32/-11 
fulltext.go
Enhance fulltext index tokenization with composite key support

pkg/sql/plan/fulltext.go

• Enhances fulltext index tokenization to support both table scan and
values scan
• Adds support for composite primary keys in fulltext
indexing
• Improves parameter handling for different scan types

Updates function documentation with detailed usage examples

+54/-12 
func_cast.go
Add dimension validation for array type casting                   

pkg/sql/plan/function/func_cast.go

• Adds dimension validation for array type casting
• Implements proper
array dimension checking against type width
• Bypasses dimension check
for maximum dimension arrays
• Improves error reporting for array
dimension mismatches

+11/-2   
hnsw.go
Relax node type constraints in HNSW operations                     

pkg/sql/plan/hnsw.go

• Comments out table scan validation in HNSW create function
• Removes
strict requirement for TABLE_SCAN node type
• Allows more flexible
node types in HNSW operations
• Maintains existing functionality while
reducing constraints

+6/-4     
types.go
Add description string support for vector array types       

pkg/container/types/types.go

• Adds description string support for vector array types
• Implements
proper formatting for T_array_float32 and T_array_float64
• Includes
dimension information in type descriptions
• Improves type
introspection for vector data types

+4/-0     
build_show_util.go
Add async flag display in SHOW CREATE TABLE                           

pkg/sql/plan/build_show_util.go

• Adds async flag display in CREATE TABLE SQL construction
• Checks
for async parameter in index configuration
• Appends "ASYNC" keyword
to index definition when enabled
• Improves SHOW CREATE TABLE output
for async indexes

+5/-0     
data_retriever.go
Add account and table ID getter methods                                   

pkg/iscp/data_retriever.go

• Added GetAccountID() method returning uint32 account ID
• Added
GetTableID() method returning uint64 table ID

+8/-0     
types.go
Add index algorithm parameters field to MultiTableIndex   

pkg/sql/plan/types.go

• Added IndexAlgoParams string field to MultiTableIndex struct

Reformatted existing fields with proper alignment

+3/-2     
types.go
Extend DataRetriever interface with ID getters                     

pkg/iscp/types.go

• Added GetAccountID() method signature to DataRetriever interface

Added GetTableID() method signature to DataRetriever interface

+2/-0     
types.go
Add async parameter to fulltext parser configuration         

pkg/fulltext/types.go

• Added Async string field to FullTextParserParam struct with JSON tag

+1/-0     
mysql_sql.y
Add ASYNC keyword support for index creation                         

pkg/sql/parsers/dialect/mysql/mysql_sql.y

• Added ASYNC token declaration
• Added parsing logic for ASYNC index
option
• Added ASYNC to non-reserved keywords list
• Extended index
option handling to support async flag

+10/-1   
Tests
16 files
index_consumer_test.go
Add test coverage for index consumer operations                   

pkg/iscp/index_consumer_test.go

• Adds comprehensive test cases for index consumer functionality

Tests HNSW snapshot and tail operations with mock data
• Includes mock
implementations for SQL executor and data retriever
• Validates SQL
generation for CDC operations

+381/-0 
sync_test.go
Add comprehensive tests for HNSW sync operations                 

pkg/vectorindex/hnsw/sync_test.go

• Provides extensive test coverage for HNSW synchronization operations

• Tests various CDC scenarios including upsert, delete, insert
operations
• Includes tests for multi-file scenarios and shuffled
operations
• Uses mock functions for database operations

+370/-0 
index_sqlwriter_test.go
Add comprehensive tests for index SQL writers                       

pkg/iscp/index_sqlwriter_test.go

• Adds comprehensive test cases for different index SQL writers (IVF,
HNSW, fulltext)
• Tests SQL generation for various table definitions
and index configurations
• Includes tests for composite primary keys
and multiple column scenarios
• Validates proper SQL output format for
different vector index types

+242/-0 
model_test.go
Add comprehensive tests for HNSW model functionality         

pkg/vectorindex/hnsw/model_test.go

• Adds comprehensive tests for the new HnswModel struct
• Tests model
loading, searching, adding/removing vectors, and persistence

Includes tests for dirty state management and SQL generation

Validates proper cleanup and error handling scenarios

+206/-0 
search_test.go
Update HNSW search tests for model-based architecture       

pkg/vectorindex/hnsw/search_test.go

• Updates existing tests to work with new model-based architecture

Adds mock functions for testing multiple index files
• Includes tests
for catalog SQL execution and metadata batches
• Extends test coverage
for multi-file index scenarios

+112/-0 
func_hnsw_test.go
Add tests for HNSW CDC update function                                     

pkg/sql/plan/function/func_hnsw_test.go

• Adds test cases for the new hnswCdcUpdate function
• Tests various
error conditions (null parameters, invalid JSON)
• Validates proper
argument count and type checking
• Ensures function handles edge cases
gracefully

+129/-0 
mysql_sql_test.go
Add tests for async index syntax parsing                                 

pkg/sql/parsers/dialect/mysql/mysql_sql_test.go

• Adds test cases for async keyword in index creation statements

Tests async flag parsing for HNSW, IVF, and fulltext indexes

Validates proper SQL output formatting with async keyword
• Ensures
parser correctly handles async index syntax

+9/-1     
build_test.go
Update HNSW build tests for model refactoring                       

pkg/vectorindex/hnsw/build_test.go

• Updates build tests to use new HnswModel instead of HnswBuildIndex

Modifies test setup to work with refactored model architecture

Updates function calls to match new model-based API
• Maintains test
coverage while adapting to structural changes

+5/-5     
types_test.go
Add tests for vector index CDC data structures                     

pkg/vectorindex/types_test.go

• Adds tests for CDC data structures and JSON serialization
• Tests
insert, delete, and upsert operations on CDC objects
• Validates
proper JSON output format for CDC entries
• Tests CDC object state
management (empty, full, reset)

+63/-0   
function_id_test.go
Update function ID tests for HNSW CDC function                     

pkg/sql/plan/function/function_id_test.go

• Updates predefined function ID test map with new HNSW function

Adds test case for HNSW_CDC_UPDATE function ID
• Updates function end
number in test expectations
• Maintains test coverage for function ID
registry

+3/-1     
vector_ivf_async.result
Test results for asynchronous IVF vector indexing               

test/distributed/cases/vector/vector_ivf_async.result

• Test results for IVF vector index with ASYNC option
• Covers table
creation, index creation with ASYNC, data insertion, and vector
similarity queries
• Includes sleep operations to test asynchronous
behavior

+58/-0   
vector_ivf_async.sql
Test cases for asynchronous IVF vector indexing                   

test/distributed/cases/vector/vector_ivf_async.sql

• Test cases for IVF vector index with ASYNC option
• Tests index
creation, data loading, and vector similarity searches
• Includes
timing tests with sleep operations for async behavior

+59/-0   
vector_hnsw_async.result
Test results for asynchronous HNSW vector indexing             

test/distributed/cases/vector/vector_hnsw_async.result

• Test results for HNSW vector index with ASYNC option
• Covers CRUD
operations, bulk data loading, and vector similarity queries
• Tests
asynchronous index building behavior with timing

+66/-0   
vector_hnsw_async.sql
Test cases for asynchronous HNSW vector indexing                 

test/distributed/cases/vector/vector_hnsw_async.sql

• Test cases for HNSW vector index with ASYNC option
• Tests index
creation on empty and populated tables
• Includes CDC-style operations
and timing tests

+96/-0   
fulltext_async.sql
Test cases for asynchronous fulltext indexing                       

test/distributed/cases/fulltext/fulltext_async.sql

• Test cases for fulltext index with ASYNC option
• Tests index
creation, data insertion, and fulltext search functionality
• Includes
timing tests with sleep operations

+21/-0   
fulltext_async.result
Test results for asynchronous fulltext indexing                   

test/distributed/cases/fulltext/fulltext_async.result

• Test results for fulltext index with ASYNC option
• Shows successful
creation and querying of async fulltext indexes
• Demonstrates search
functionality after async index building

+19/-0   
Code refactoring
2 files
search.go
Refactor HNSW search to use model-based architecture         

pkg/vectorindex/hnsw/search.go

• Removes HnswSearchIndex struct and related loading/searching methods

• Refactors to use new HnswModel struct instead of HnswSearchIndex

Extracts metadata loading logic into standalone LoadMetadata function

• Simplifies search implementation by delegating to model-based
approach

+23/-199
build.go
Refactor HNSW build to use model-based architecture           

pkg/vectorindex/hnsw/build.go

• Removes HnswBuildIndex struct and related build methods
• Refactors
to use HnswModel for building vector indexes
• Updates function
signatures to work with new model-based approach
• Removes constants
and helper methods that are now in the model

+11/-184
Bug fix
2 files
watermark_updater.go
Fix watermark updater bugs and improve error handling       

pkg/iscp/watermark_updater.go

• Fixes bug in job unregistration when no table IDs exist
• Adds null
check for drop_at column access in query results
• Improves error
handling in database cleanup operations
• Prevents SQL execution when
table ID list is empty

+12/-8   
build_ddl.go
Fix fulltext index option handling in DDL                               

pkg/sql/plan/build_ddl.go

• Fixes fulltext index option handling in table creation
• Ensures
index algorithm parameters are set even without parser name
• Improves
parameter extraction logic for fulltext indexes
• Maintains proper
index configuration during DDL operations

+4/-4     
Error handling
2 files
util.go
Update fulltext index SQL generation with error handling 

pkg/sql/compile/util.go

• Updates genInsertIndexTableSqlForFullTextIndex to return error

Changes function signature to include error handling
• Maintains
backward compatibility while improving error propagation
• Minor
refactoring for better error management

+2/-2     
iteration.go
Improve error handling and context in ISCP iteration         

pkg/iscp/iteration.go

• Adds proper error handling for CollectChanges function
• Sets system
account context for consumer operations
• Improves error propagation
in iteration execution
• Ensures proper cleanup of resources on errors

+5/-1     
Configuration changes
2 files
function_id.go
Add function ID for HNSW CDC update                                           

pkg/sql/plan/function/function_id.go

• Adds HNSW_CDC_UPDATE function ID constant
• Updates function end
number to accommodate new function
• Registers function name in
function ID registry
• Maintains proper function ID sequencing

+7/-1     
keywords.go
Add async keyword to MySQL parser                                               

pkg/sql/parsers/dialect/mysql/keywords.go

• Adds "async" keyword to MySQL parser keyword list
• Maps "async"
string to ASYNC token type
• Enables parsing of async keyword in SQL
statements
• Extends parser vocabulary for index options

+1/-0     
Miscellaneous
4 files
mock_consumer.go
Update mock consumer to use system account constant           

pkg/iscp/mock_consumer.go

• Updates context creation to use system account constant
• Replaces
hardcoded account ID with proper catalog constant
• Improves
consistency in account handling across ISCP components
• Minor
refactoring for better code maintainability

+1/-1     
vector_hnsw.result
Update vector dimension error message format                         

test/distributed/cases/vector/vector_hnsw.result

• Updated error message format for vector dimension mismatch
• Changed
from "vector ops between different dimensions" to "expected vector
dimension != actual dimension"

+1/-1     
vector_index.result
Update vector dimension error message format                         

test/distributed/cases/vector/vector_index.result

• Updated error message format for vector dimension mismatch
• Changed
from "vector ops between different dimensions" to "expected vector
dimension != actual dimension"

+1/-1     
array.result
Update vector dimension error message format                         

test/distributed/cases/array/array.result

• Updated error message format for vector dimension mismatch
• Changed
from "vector ops between different dimensions" to "expected vector
dimension != actual dimension"

+2/-2     
Additional files
1 files
mysql_sql.go +8607/-8632

Copy link
Contributor

mergify bot commented Sep 22, 2025

⚠️ The sha of the head commit of this PR conflicts with #22519. Mergify cannot evaluate rules on this PR. ⚠️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants