Skip to content

Commit c6431e9

Browse files
authored
Added recipe for YDB Go SDK BulkUpsert Apache Arrow (#26108)
1 parent 93c3757 commit c6431e9

File tree

2 files changed

+167
-0
lines changed

2 files changed

+167
-0
lines changed

ydb/docs/en/core/recipes/ydb-sdk/bulk-upsert.md

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,89 @@ Below are code examples showing the {{ ydb-short-name }} SDK built-in tools for
138138

139139
{% endcut %}
140140

141+
{% cut "Bulk upsert `Apache Arrow` data" %}
142+
143+
In the following example, the [arrow package](https://pkg.go.dev/github.com/apache/arrow-go/v18/arrow) is used to prepare the data.
144+
145+
```go
146+
package main
147+
148+
import (
149+
"bytes"
150+
"context"
151+
"fmt"
152+
153+
"github.com/apache/arrow-go/v18/arrow"
154+
"github.com/apache/arrow-go/v18/arrow/array"
155+
"github.com/apache/arrow-go/v18/arrow/ipc"
156+
"github.com/apache/arrow-go/v18/arrow/memory"
157+
"github.com/ydb-platform/ydb-go-sdk/v3"
158+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
159+
)
160+
161+
func main() {
162+
ctx := context.Background()
163+
db, err := ydb.Open(ctx,
164+
os.Getenv("YDB_CONNECTION_STRING"),
165+
ydb.WithAccessTokenCredentials(os.Getenv("YDB_TOKEN")),
166+
)
167+
if err != nil {
168+
panic(err)
169+
}
170+
defer db.Close(ctx) // cleanup resources
171+
172+
mem := memory.NewGoAllocator()
173+
174+
schema := arrow.NewSchema([]arrow.Field{
175+
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
176+
{Name: "val", Type: arrow.BinaryTypes.String},
177+
}, nil)
178+
179+
b := array.NewRecordBuilder(mem, schema)
180+
defer b.Release()
181+
182+
b.Field(0).(*array.Int64Builder).AppendValues(
183+
[]int64{123, 234}, nil)
184+
185+
b.Field(1).(*array.StringBuilder).AppendValues(
186+
[]string{"data1", "data2"}, nil)
187+
188+
rec := b.NewRecordBatch()
189+
defer rec.Release()
190+
191+
schemaPayload := ipc.GetSchemaPayload(rec.Schema(), mem)
192+
defer schemaPayload.Release()
193+
194+
dataPayload, err := ipc.GetRecordBatchPayload(rec)
195+
if err != nil {
196+
panic(err)
197+
}
198+
defer dataPayload.Release()
199+
200+
var schemaBuf bytes.Buffer
201+
_, err = schemaPayload.WritePayload(&schemaBuf)
202+
if err != nil {
203+
panic(err)
204+
}
205+
206+
var dataBuf bytes.Buffer
207+
_, err = dataPayload.WritePayload(&dataBuf)
208+
if err != nil {
209+
panic(err)
210+
}
211+
212+
err = db.Table().BulkUpsert(ctx, "/local/bulk_upsert_example", table.BulkUpsertDataArrow(
213+
dataBuf.Bytes(),
214+
table.WithArrowSchema(schemaBuf.Bytes()), // schema is required
215+
))
216+
if err != nil {
217+
fmt.Printf("unexpected error: %v", err)
218+
}
219+
}
220+
```
221+
222+
{% endcut %}
223+
141224
- Go (database/sql)
142225

143226
The implementation of {{ ydb-short-name }} `database/sql` doesn't support bulk nontransactional upsert of data.

ydb/docs/ru/core/recipes/ydb-sdk/bulk-upsert.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,90 @@
137137

138138
{% endcut %}
139139

140+
{% cut "Пакетная вставка `Apache Arrow`" %}
141+
142+
В следующем примере для подготовки данных используется пакет [arrow](https://pkg.go.dev/github.com/apache/arrow-go/v18/arrow).
143+
144+
```go
145+
package main
146+
147+
import (
148+
"bytes"
149+
"context"
150+
"fmt"
151+
152+
"github.com/apache/arrow-go/v18/arrow"
153+
"github.com/apache/arrow-go/v18/arrow/array"
154+
"github.com/apache/arrow-go/v18/arrow/ipc"
155+
"github.com/apache/arrow-go/v18/arrow/memory"
156+
"github.com/ydb-platform/ydb-go-sdk/v3"
157+
"github.com/ydb-platform/ydb-go-sdk/v3/table"
158+
)
159+
160+
func main() {
161+
ctx := context.Background()
162+
db, err := ydb.Open(ctx,
163+
os.Getenv("YDB_CONNECTION_STRING"),
164+
ydb.WithAccessTokenCredentials(os.Getenv("YDB_TOKEN")),
165+
)
166+
if err != nil {
167+
panic(err)
168+
}
169+
defer db.Close(ctx) // cleanup resources
170+
171+
mem := memory.NewGoAllocator()
172+
173+
schema := arrow.NewSchema([]arrow.Field{
174+
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
175+
{Name: "val", Type: arrow.BinaryTypes.String},
176+
}, nil)
177+
178+
b := array.NewRecordBuilder(mem, schema)
179+
defer b.Release()
180+
181+
b.Field(0).(*array.Int64Builder).AppendValues(
182+
[]int64{123, 234}, nil)
183+
184+
b.Field(1).(*array.StringBuilder).AppendValues(
185+
[]string{"data1", "data2"}, nil)
186+
187+
rec := b.NewRecordBatch()
188+
defer rec.Release()
189+
190+
schemaPayload := ipc.GetSchemaPayload(rec.Schema(), mem)
191+
defer schemaPayload.Release()
192+
193+
dataPayload, err := ipc.GetRecordBatchPayload(rec)
194+
if err != nil {
195+
panic(err)
196+
}
197+
defer dataPayload.Release()
198+
199+
var schemaBuf bytes.Buffer
200+
_, err = schemaPayload.WritePayload(&schemaBuf)
201+
if err != nil {
202+
panic(err)
203+
}
204+
205+
var dataBuf bytes.Buffer
206+
_, err = dataPayload.WritePayload(&dataBuf)
207+
if err != nil {
208+
panic(err)
209+
}
210+
211+
err = db.Table().BulkUpsert(ctx, "/local/bulk_upsert_example", table.BulkUpsertDataArrow(
212+
dataBuf.Bytes(),
213+
table.WithArrowSchema(schemaBuf.Bytes()), // schema is required
214+
))
215+
if err != nil {
216+
fmt.Printf("unexpected error: %v", err)
217+
}
218+
}
219+
220+
```
221+
222+
{% endcut %}
223+
140224
- Go (database/sql)
141225

142226
Реализация `database/sql` драйвера для {{ ydb-short-name }} не поддерживает нетранзакционную пакетную вставку данных.

0 commit comments

Comments
 (0)