Skip to content
Draft
Show file tree
Hide file tree
Changes from 109 commits
Commits
Show all changes
257 commits
Select commit Hold shift + click to select a range
a91f165
function
cpegeric Apr 28, 2025
a1ce79c
add hnsw function
cpegeric Apr 28, 2025
035ba59
cdc param
cpegeric Apr 29, 2025
4bf9ed9
cdc param
cpegeric Apr 29, 2025
e11c07e
merge fix
cpegeric Apr 29, 2025
5a412a2
bug fix
cpegeric Apr 29, 2025
1b9b50a
support hnswsync
cpegeric Apr 29, 2025
78d842f
add sqlexecutor
cpegeric Apr 30, 2025
fcb5ed2
txn
cpegeric May 1, 2025
64d5cb1
check rollback error and skip
cpegeric May 1, 2025
ab9d98c
cleanup
cpegeric May 1, 2025
797d78a
merge fix
cpegeric May 9, 2025
07b5f65
Merge branch 'main' into cdc_sqlexecutor
cpegeric May 14, 2025
0838cb0
rename Hnsw to VectorIndex
cpegeric May 14, 2025
28cdb7c
refactor code
cpegeric May 14, 2025
cf25e24
refactoring
cpegeric May 14, 2025
df53b4f
add remove() and contains() and load from view
cpegeric May 14, 2025
509137a
unload
cpegeric May 14, 2025
fdeb4c1
add cdc sync
cpegeric May 14, 2025
f08aa29
CdcSync
cpegeric May 14, 2025
7f791a7
error checking
cpegeric May 14, 2025
cc15bbf
add dimension as function argument
cpegeric May 14, 2025
7fd2758
load metadata
cpegeric May 14, 2025
00050c1
update
cpegeric May 14, 2025
81a964e
update
cpegeric May 14, 2025
a7c6fc6
update
cpegeric May 15, 2025
f6c64b1
update
cpegeric May 15, 2025
af57e76
Merge branch 'main' into cdc_sqlexecutor
cpegeric May 15, 2025
1bb32a0
add txn
cpegeric May 15, 2025
b435ee4
runTxn
cpegeric May 15, 2025
827a42d
destroy
cpegeric May 15, 2025
6109470
update
cpegeric May 15, 2025
7f190b9
bug fix save to file
cpegeric May 15, 2025
62aa87e
bug fix with init index capacity
cpegeric May 15, 2025
e2278a3
bug fix dirty
cpegeric May 15, 2025
6e682b9
update Len and Capacity
cpegeric May 15, 2025
ed49fd1
update
cpegeric May 16, 2025
1e5b6a8
view
cpegeric May 16, 2025
36bc11f
bug fix view
cpegeric May 16, 2025
3d70d9a
add tests
cpegeric May 22, 2025
3d94c96
Merge branch 'main' into cdc_sqlexecutor
cpegeric May 22, 2025
3967bb6
update
cpegeric May 22, 2025
6db0cb6
add tests
cpegeric May 22, 2025
d123d51
add sync test
cpegeric May 23, 2025
35ca423
update tests
cpegeric May 23, 2025
dd2ecdb
delete all tests
cpegeric May 23, 2025
5356fd9
update tests
cpegeric May 23, 2025
69dffe1
add test
cpegeric May 23, 2025
48a3f59
add test
cpegeric May 23, 2025
4a0db76
update
cpegeric May 23, 2025
03b7f3b
remove InsertMeta
cpegeric May 23, 2025
fc450b1
start with empty
cpegeric May 23, 2025
862cd46
delete 2 files
cpegeric May 23, 2025
fd6eee9
shuffle test
cpegeric May 23, 2025
8c51bbe
update shuffle
cpegeric May 23, 2025
3e5f164
create/drop index
cpegeric May 27, 2025
2f00775
remove new cdc sql syntax
cpegeric May 27, 2025
4deaa70
Merge branch 'main' into cdc_sqlexecutor_cleanup
mergify[bot] May 27, 2025
9006006
update sca
cpegeric May 27, 2025
543ad89
Merge branch 'cdc_sqlexecutor_cleanup' of github.com:cpegeric/matrixo…
cpegeric May 27, 2025
c2a30cb
fix sca test
cpegeric May 27, 2025
04f097d
check nil vector
cpegeric May 27, 2025
d21748a
fix sca
cpegeric May 27, 2025
5747b92
bug fix
cpegeric May 27, 2025
ea5e8e6
add test
cpegeric Jun 2, 2025
390e907
clear cache after cdc sync
cpegeric Jun 2, 2025
66eef84
add bvt test
cpegeric Jun 2, 2025
eddf530
fix test
cpegeric Jun 2, 2025
3f355c7
update unittest
cpegeric Jun 2, 2025
53e2f96
bug fix check channel closed
cpegeric Jun 2, 2025
89efa85
fix sca
cpegeric Jun 2, 2025
f6609de
fix sca
cpegeric Jun 2, 2025
1575bd9
fix sca
cpegeric Jun 2, 2025
94549ac
check vector dimension when cast from string
cpegeric Jun 3, 2025
e34c04f
bypass dimension check when width is max dimension
cpegeric Jun 3, 2025
1f626f3
fix bvt
cpegeric Jun 3, 2025
22eee6b
test atomicBatch
cpegeric Jun 4, 2025
ecfeab8
check errors
cpegeric Jun 4, 2025
d9a6023
test sendX
cpegeric Jun 4, 2025
ea82266
update bvt with manual pitr and cdc task
cpegeric Jun 4, 2025
6539fff
increase sleep
cpegeric Jun 4, 2025
917356b
update
cpegeric Jun 4, 2025
397f518
add tests
cpegeric Jun 5, 2025
e74e66d
fix bvt test on multi-cn env
cpegeric Jun 5, 2025
8f5f976
add more tests
cpegeric Jun 5, 2025
598d60f
cleanup and remove stderr
cpegeric Jun 5, 2025
50be8db
update and comments
cpegeric Jun 5, 2025
b9ea603
Merge branch 'main' into cdc_sqlexecutor_cleanup
cpegeric Jun 5, 2025
9144730
debug
cpegeric Jun 6, 2025
e00b89a
fix sca
cpegeric Jun 6, 2025
fe08c31
Merge branch 'main' into cdc_sqlexecutor_cleanup
cpegeric Jun 9, 2025
6da9b68
add test
cpegeric Jun 9, 2025
b381acd
add license
cpegeric Jun 9, 2025
644fbfa
performance
cpegeric Jun 9, 2025
1f7654c
update
cpegeric Jun 9, 2025
8b6975c
update
cpegeric Jun 9, 2025
b8c3c15
fix thread safe
cpegeric Jun 10, 2025
a964de2
cleanup
cpegeric Jun 10, 2025
8b962d0
never unload when insertAll
cpegeric Jun 10, 2025
dc4cbbd
merge fix
cpegeric Jun 11, 2025
3d7be05
better message
cpegeric Jun 11, 2025
ae92606
Merge branch 'main' into cdc_sqlexecutor_cleanup
cpegeric Jun 12, 2025
652783c
fix bvt -- drop pitr
cpegeric Jun 12, 2025
c0f162b
cleanup
cpegeric Jun 12, 2025
4ce4a5d
take timing
cpegeric Jun 13, 2025
ba61a0f
support values as input
cpegeric Jun 19, 2025
67feed9
Comment on composite primary key
cpegeric Jun 19, 2025
1b8735b
comments
cpegeric Jun 19, 2025
a12d40d
add sql writer
cpegeric Jun 19, 2025
acb4387
update
cpegeric Jun 20, 2025
edc33bb
update
cpegeric Jun 20, 2025
0a1f61c
update
cpegeric Jun 20, 2025
c393b7c
update
cpegeric Jun 20, 2025
dabe24b
update
cpegeric Jun 20, 2025
56bc1df
update ivfflat
cpegeric Jun 20, 2025
5e0a2bd
update ivfflat
cpegeric Jun 20, 2025
56b6a28
update
cpegeric Jun 20, 2025
04097a4
add db
cpegeric Jun 20, 2025
9210d80
reset
cpegeric Jun 20, 2025
5aa4712
empty
cpegeric Jun 20, 2025
09de3b1
hnsw sql writer
cpegeric Jun 20, 2025
5a664e6
update hnsw
cpegeric Jun 20, 2025
77037ae
template
cpegeric Jun 20, 2025
5200881
update
cpegeric Jun 20, 2025
fa50232
add test
cpegeric Jun 20, 2025
cfe6a0b
update test
cpegeric Jun 20, 2025
0b04c8f
ivf
cpegeric Jun 20, 2025
da1d2ec
update
cpegeric Jun 20, 2025
581a07d
version
cpegeric Jun 20, 2025
09dc52d
update
cpegeric Jun 20, 2025
e04b63c
update
cpegeric Jun 20, 2025
17057d7
bug fix
cpegeric Jun 20, 2025
d437339
index sinker with IndexSqlWriter
cpegeric Jun 23, 2025
5c648f3
remove comment
cpegeric Jun 23, 2025
53b6438
rename hsnw to index
cpegeric Jun 23, 2025
8069eb0
delete if vector is nil
cpegeric Jun 23, 2025
4afbe23
rename file
cpegeric Jun 23, 2025
4772c2f
support multi-indexes
cpegeric Jun 23, 2025
12429c2
cleanup
cpegeric Jun 23, 2025
b32d846
cleanup
cpegeric Jun 23, 2025
dd93d28
use constant
cpegeric Jun 23, 2025
173589a
rename file
cpegeric Jun 23, 2025
a44aacf
todo
cpegeric Jun 23, 2025
9aa078c
bvt test
cpegeric Jun 23, 2025
01fc88d
cleanup
cpegeric Jun 23, 2025
a39fa82
cleanup
cpegeric Jun 23, 2025
613544e
sca
cpegeric Jun 23, 2025
1a38d8b
add license
cpegeric Jun 23, 2025
be20232
bug fix
cpegeric Jun 23, 2025
622a888
more comments
cpegeric Jun 23, 2025
95a9a5c
delete sql
cpegeric Jun 23, 2025
4f06629
cleanup
cpegeric Jun 23, 2025
3c34919
cleanup
cpegeric Jun 23, 2025
d60145c
bug fix delete row only have 1 column pk
cpegeric Jun 24, 2025
84d945c
bug fix pre-defined column name
cpegeric Jun 24, 2025
789fa58
hardcode composite primary key column to varbinary
cpegeric Jun 24, 2025
504b616
bug fix
cpegeric Jun 24, 2025
e97612c
disable fulltext and ivfflat
cpegeric Jun 24, 2025
3e5ead4
bug fix
cpegeric Jun 24, 2025
f64f6de
Merge branch 'cdc_sqlexecutor_cleanup' into cdc_fulltext
cpegeric Jun 24, 2025
2806c24
only enable hnsw
cpegeric Jun 24, 2025
9ba390e
add async option
cpegeric Jun 25, 2025
ef894ea
skip async with DML
cpegeric Jun 25, 2025
1de75a6
catalog.IsIndexAsync
cpegeric Jun 25, 2025
9f83f62
async
cpegeric Jun 25, 2025
4d9cca5
Merge branch 'cdc_fulltext' into cdc_sqlexecutor_cleanup
cpegeric Jun 25, 2025
4e527c4
update
cpegeric Jun 25, 2025
99ce5a6
fix sca
cpegeric Jun 25, 2025
0f89628
fix merge
cpegeric Jun 25, 2025
14b0e7f
Merge branch 'main' into cdc_fulltext
cpegeric Jun 26, 2025
5143b81
Merge branch 'main' into cdc_fulltext
cpegeric Jun 27, 2025
72231a7
add cdc util
cpegeric Jun 27, 2025
7f7e096
create/delete cdc task
cpegeric Jun 27, 2025
884102e
update
cpegeric Jun 27, 2025
82b76f8
update
cpegeric Jun 27, 2025
959b527
update
cpegeric Jun 27, 2025
662fd57
Merge branch 'cdc_fulltext' into cdc_sqlexecutor_cleanup
cpegeric Jun 27, 2025
c16cf90
truncate table
cpegeric Jun 27, 2025
b423bcc
truncate table
cpegeric Jun 27, 2025
20fb6e5
update
cpegeric Jun 27, 2025
5caab53
cleanup
cpegeric Jun 27, 2025
43e9116
update
cpegeric Jun 27, 2025
a4a753d
update
cpegeric Jun 27, 2025
42a9adc
hnsw disable alter reindex
cpegeric Jun 27, 2025
2ed6051
alter reindex
cpegeric Jun 27, 2025
164693e
sca
cpegeric Jun 27, 2025
7757192
bug fix
cpegeric Jun 27, 2025
38775df
bug fix
cpegeric Jun 27, 2025
852b8c2
update
cpegeric Jun 30, 2025
d7241ae
use pitr_name
cpegeric Jun 30, 2025
b6639d2
add check pitr before create
cpegeric Jun 30, 2025
49a4cef
update
cpegeric Jun 30, 2025
f09eb13
update
cpegeric Jun 30, 2025
22e91b3
update
cpegeric Jun 30, 2025
04393a1
consumer
cpegeric Jun 30, 2025
3400935
license
cpegeric Jun 30, 2025
11c74ff
update
cpegeric Jun 30, 2025
7d106fb
update
cpegeric Jun 30, 2025
d4e19e6
use transaction from DataRetriever
cpegeric Jul 1, 2025
1704875
update watermark
cpegeric Jul 1, 2025
4c4c3a8
update
cpegeric Jul 1, 2025
7b02d81
update
cpegeric Jul 1, 2025
16600d3
statement option
cpegeric Jul 1, 2025
ab06863
statement option
cpegeric Jul 1, 2025
e83058f
snapshot
cpegeric Jul 1, 2025
952b57a
run
cpegeric Jul 1, 2025
06af88a
update
cpegeric Jul 1, 2025
c72197d
move to idxcdc
cpegeric Jul 1, 2025
1d2b203
update
cpegeric Jul 1, 2025
ffa5da7
update
cpegeric Jul 1, 2025
fc4b7d2
update idxcdc
cpegeric Jul 1, 2025
3b0ca17
update
cpegeric Jul 1, 2025
fab81f1
tail use insert, snapshot use upsert
cpegeric Jul 1, 2025
268ab6e
update
cpegeric Jul 1, 2025
d06cb28
update
cpegeric Jul 1, 2025
76b2c62
mock retriever
cpegeric Jul 1, 2025
98439c8
flush at the end
cpegeric Jul 1, 2025
e0aca74
update test
cpegeric Jul 2, 2025
352655d
update
cpegeric Jul 2, 2025
2567793
add test
cpegeric Jul 2, 2025
ed0c94b
merge fix watermarkUpdater
cpegeric Jul 2, 2025
ccf384c
update
cpegeric Jul 2, 2025
9afc4df
Merge branch 'main' into cdc_fulltext
cpegeric Jul 3, 2025
0b6cfd9
add cnUUID
cpegeric Jul 3, 2025
97113c4
remove unneccessary code
cpegeric Jul 3, 2025
7259ddc
update
cpegeric Jul 3, 2025
5aa0926
api
cpegeric Jul 3, 2025
55258af
merge fix
cpegeric Jul 8, 2025
7a6ba7e
bug fix cdc
cpegeric Jul 8, 2025
a5ed284
bvt test
cpegeric Jul 8, 2025
839a494
fix drop index
cpegeric Jul 8, 2025
6ed579a
fix sca
cpegeric Jul 8, 2025
381665e
Merge branch 'main' into cdc_fulltext
cpegeric Jul 9, 2025
346f32c
Merge branch 'main' into cdc_sqlexecutor_cleanup
cpegeric Jul 9, 2025
9edbd11
bug fix thread id
cpegeric Jul 9, 2025
e744672
Merge branch 'cdc_sqlexecutor_cleanup' into cdc_fulltext
cpegeric Jul 9, 2025
f1b1962
rename idxcdc to iscp
cpegeric Jul 22, 2025
51a0044
merge fix function id
cpegeric Jul 22, 2025
44c63ff
fix sca
cpegeric Jul 22, 2025
254f20e
fix sca
cpegeric Jul 23, 2025
cef2cac
merge fix
cpegeric Aug 18, 2025
8db5fe7
merge fix
cpegeric Aug 19, 2025
cb1f323
new index consumer
cpegeric Aug 19, 2025
16c0cd3
uncomment used code
cpegeric Aug 20, 2025
3f9d25e
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 20, 2025
718ca4b
merge fix
cpegeric Aug 22, 2025
6e02233
update
cpegeric Aug 22, 2025
19a52a3
rename
cpegeric Aug 22, 2025
5819c2d
update async bvt
cpegeric Aug 22, 2025
f1ee83b
add index consumer
cpegeric Aug 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
573 changes: 573 additions & 0 deletions pkg/cdc/hnsw_sinker.go

Large diffs are not rendered by default.

692 changes: 692 additions & 0 deletions pkg/cdc/hnsw_sinker_test.go

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions pkg/cdc/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
"context"
"database/sql"
"fmt"
"go.uber.org/zap"
"strings"
"sync/atomic"
"time"

"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
Expand Down Expand Up @@ -53,6 +54,7 @@ var (
)

var NewSinker = func(
cnUUID string,
sinkUri UriInfo,
dbTblInfo *DbTableInfo,
watermarkUpdater IWatermarkUpdater,
Expand All @@ -68,6 +70,10 @@ var NewSinker = func(
return NewConsoleSinker(dbTblInfo, watermarkUpdater), nil
}

if sinkUri.SinkTyp == CDCSinkType_HnswSync {
return NewHnswSyncSinker(cnUUID, sinkUri, dbTblInfo, watermarkUpdater, tableDef, retryTimes, retryDuration, ar, maxSqlLength, sendSqlTimeout)
}

var (
err error
sink Sink
Expand Down Expand Up @@ -445,7 +451,8 @@ func (s *mysqlSinker) SendDummy() {
}

func (s *mysqlSinker) Error() error {
if errPtr := s.err.Load().(*error); *errPtr != nil {
if ptr := s.err.Load(); ptr != nil {
errPtr := ptr.(*error)
if moErr, ok := (*errPtr).(*moerr.Error); !ok {
return moerr.ConvertGoError(context.Background(), *errPtr)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cdc/sinker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestNewSinker(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := NewSinker(tt.args.sinkUri, tt.args.dbTblInfo, tt.args.watermarkUpdater, tt.args.tableDef, tt.args.retryTimes, tt.args.retryDuration, tt.args.ar, CDCDefaultTaskExtra_MaxSQLLen, CDCDefaultSendSqlTimeout)
got, err := NewSinker("", tt.args.sinkUri, tt.args.dbTblInfo, tt.args.watermarkUpdater, tt.args.tableDef, tt.args.retryTimes, tt.args.retryDuration, tt.args.ar, CDCDefaultTaskExtra_MaxSQLLen, CDCDefaultSendSqlTimeout)
if !tt.wantErr(t, err, fmt.Sprintf("NewSinker(%v, %v, %v, %v, %v, %v)", tt.args.sinkUri, tt.args.dbTblInfo, tt.args.watermarkUpdater, tt.args.tableDef, tt.args.retryTimes, tt.args.retryDuration)) {
return
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/cdc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ const (
)

const (
CDCSinkType_MySQL = "mysql"
CDCSinkType_MO = "matrixone"
CDCSinkType_Console = "console"
CDCSinkType_MySQL = "mysql"
CDCSinkType_MO = "matrixone"
CDCSinkType_Console = "console"
CDCSinkType_HnswSync = "hnswsync"
)

const (
Expand Down
1 change: 1 addition & 0 deletions pkg/frontend/cdc_exector.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ func (exec *CDCTaskExecutor) addExecPipelineForTable(ctx context.Context, info *

// step 2. new sinker
sinker, err := cdc.NewSinker(
exec.cnUUID,
exec.sinkUri,
info,
exec.watermarkUpdater,
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/cdc_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (opts *CDCCreateTaskOptions) ValidateAndFill(
if cdc.EnableConsoleSink && opts.SinkType == cdc.CDCSinkType_Console {
opts.UseConsole = true
}
if !opts.UseConsole && opts.SinkType != cdc.CDCSinkType_MySQL && opts.SinkType != cdc.CDCSinkType_MO {
if !opts.UseConsole && opts.SinkType != cdc.CDCSinkType_MySQL && opts.SinkType != cdc.CDCSinkType_MO && opts.SinkType != cdc.CDCSinkType_HnswSync {
err = moerr.NewInternalErrorf(ctx, "unsupported sink type: %s", req.SinkType)
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2891,7 +2891,7 @@ func TestCdcTask_addExecPipelineForTable(t *testing.T) {
})
defer stubGetTableDef.Reset()

stubSinker := gostub.Stub(&cdc.NewSinker, func(cdc.UriInfo, *cdc.DbTableInfo, cdc.IWatermarkUpdater,
stubSinker := gostub.Stub(&cdc.NewSinker, func(string, cdc.UriInfo, *cdc.DbTableInfo, cdc.IWatermarkUpdater,
*plan.TableDef, int, time.Duration, *cdc.ActiveRoutine, uint64, string) (cdc.Sinker, error) {
return &mockSinker{}, nil
})
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2108,6 +2108,13 @@ func (s *Scope) DropIndex(c *Compile) error {
if err != nil {
return err
}

// TODO: HNSWCDC delete cdc task for vector, fulltext index
// cdc task name = __mo_cdc_{qry.Database}_{qry.Table}_{qry.IndexName}
// pitr name = __mo_table_pitr_{qry.Database}_{qry.Table}
// DROP PITR IF EXISTS `__mo_table_pitr_${qry.Database}_${srctable}`
// DROP CDC TASK __mo_cdc_${qry.Database}_${srctable}_${qry.IndexName}

return nil
}

Expand Down Expand Up @@ -2668,6 +2675,8 @@ func (s *Scope) DropTable(c *Compile) error {
}
}

// TODO: HSNWCDC delete cdc task of the vector and fulltext index here

if isTemp {
if err := dbSource.Delete(c.proc.Ctx, engine.GetTempTableName(dbName, tblName)); err != nil {
return err
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/compile/ddl_index_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,5 +556,18 @@ func (s *Scope) handleVectorHnswIndex(
}
}

// TODO: HNSWCDC 4. register CDC update
sqls, err = genCdcHnswIndex(c.proc, indexDefs, qryDatabase, originalTableDef)
if err != nil {
return err
}

for _, sql := range sqls {
err = c.runSql(sql)
if err != nil {
return err
}
}

return nil
}
38 changes: 38 additions & 0 deletions pkg/sql/compile/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,41 @@ func genBuildHnswIndex(proc *process.Process, indexDefs map[string]*plan.IndexDe

return []string{sql}, nil
}

// TODO: HNSWCDC 4. register CDC update
// DROP PITR IF EXISTS `__mo_table_pitr_${db}_${srctable}`
// DROP CDC IF EXISTS TASK __mo_cdc_${db}_${srctable}_${indexInfo.IndexName} NOTE: IF EXISTS is not valid SQL for DROP CDC
// CREATE PITR __mo_table_pitr_${db}_${srctable} for table ${db} ${srctable) range 2 'h';
// CREATE CDC __mo_cdc_${db}_${srctable}_${indexInfo.IndexName} 'mysql://root:111@127.0.0.1:6001' 'hnswsync' 'mysql://root:111@127.0.0.1:6001' '${db}.${srctable}' {'Level'='table'}'
func genCdcHnswIndex(proc *process.Process, indexDefs map[string]*plan.IndexDef, qryDatabase string, originalTableDef *plan.TableDef) ([]string, error) {

idxdef_meta, ok := indexDefs[catalog.Hnsw_TblType_Metadata]
if !ok {
return nil, moerr.NewInternalErrorNoCtx("hnsw_meta index definition not found")
}
srctbl := originalTableDef.Name
pitrname := fmt.Sprintf("__mo_index_pitr_%s_%s", qryDatabase, srctbl)
cdcname := fmt.Sprintf("__mo_index_cdc_%s_%s_%s", qryDatabase, srctbl, idxdef_meta.IndexName)

var sql string

sqls := make([]string, 0, 3)

// CREATE PITR
sql = fmt.Sprintf("DROP PITR IF EXISTS `%s`;", pitrname)
sqls = append(sqls, sql)

sql = fmt.Sprintf("CREATE PITR `%s` FOR TABLE `%s` `%s` range 2 'h';", pitrname, qryDatabase, srctbl)
sqls = append(sqls, sql)

// CREATE CDC TASK
dummyurl := "mysql://root:111@127.0.0.1:6001"
sql = fmt.Sprintf("CREATE CDC `%s` '%s' 'hnswsync' '%s' '%s.%s' {'Level'='table'};", cdcname, dummyurl, dummyurl, qryDatabase, srctbl)
sqls = append(sqls, sql)

//os.Stderr.WriteString(fmt.Sprintf("%v\n", sqls))
// TODO: HNSWCDC remove the line below to run the above SQLs
sqls = sqls[:0]

return sqls, nil
}
13 changes: 11 additions & 2 deletions pkg/sql/plan/function/func_cast.go
Original file line number Diff line number Diff line change
Expand Up @@ -4408,6 +4408,8 @@ func strToArray[T types.RealNumbers](
from vector.FunctionParameterWrapper[types.Varlena],
to *vector.FunctionResult[types.Varlena], length int, _ types.Type) error {

toType := to.GetType()

var i uint64
var l = uint64(length)
for i = 0; i < l; i++ {
Expand All @@ -4417,11 +4419,18 @@ func strToArray[T types.RealNumbers](
return err
}
} else {

b, err := types.StringToArrayToBytes[T](convertByteSliceToString(v))
arr, err := types.StringToArray[T](convertByteSliceToString(v))
if err != nil {
return err
}

// bypass the dimension check if width is max dimension
if int(toType.Width) != types.MaxArrayDimension && int(toType.Width) != len(arr) {
return moerr.NewArrayDefMismatchNoCtx(int(toType.Width), len(arr))
}

b := types.ArrayToBytes[T](arr)

if err = to.AppendBytes(b, false); err != nil {
return err
}
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/plan/function/func_hnsw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright 2021 - 2022 Matrix Origin
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package function

import (
"encoding/json"

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vectorindex"
"github.com/matrixorigin/matrixone/pkg/vectorindex/hnsw"
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

func hnswCdcUpdate(ivecs []*vector.Vector, result vector.FunctionResultWrapper, proc *process.Process, length int, selectList *FunctionSelectList) error {

if len(ivecs) != 4 {
return moerr.NewInvalidInput(proc.Ctx, "number of arguments != 4")
}

dbVec := vector.GenerateFunctionStrParameter(ivecs[0])
tblVec := vector.GenerateFunctionStrParameter(ivecs[1])
dimVec := vector.GenerateFunctionFixedTypeParameter[int32](ivecs[2])
cdcVec := vector.GenerateFunctionStrParameter(ivecs[3])

for i := uint64(0); i < uint64(length); i++ {
dbname, isnull := dbVec.GetStrValue(i)
if isnull {
return moerr.NewInvalidInput(proc.Ctx, "dbname is null")
}

tblname, isnull := tblVec.GetStrValue(i)
if isnull {
return moerr.NewInvalidInput(proc.Ctx, "table name is null")

}

dim, isnull := dimVec.GetValue(i)
if isnull {
return moerr.NewInvalidInput(proc.Ctx, "dimension is null")
}

cdcstr, isnull := cdcVec.GetStrValue(i)
if isnull {
return moerr.NewInvalidInput(proc.Ctx, "cdc is null")
}

var cdc vectorindex.VectorIndexCdc[float32]
err := json.Unmarshal([]byte(cdcstr), &cdc)
if err != nil {
return moerr.NewInvalidInput(proc.Ctx, "cdc is not json object")
}
logutil.Infof("hnsw_cdc_update: START db=%s, table=%s\n", dbname, tblname)
// hnsw sync
//os.Stderr.WriteString(fmt.Sprintf("db=%s, table=%s, dim=%d, json=%s\n", dbname, tblname, dim, cdcstr))
err = hnsw.CdcSync(proc, string(dbname), string(tblname), dim, &cdc)
if err != nil {
return err
}
logutil.Infof("hnsw_cdc_update: END db=%s, table=%s\n", dbname, tblname)
}

return nil
}
Loading
Loading