From a24377f576ffb350a84a6975101925fa60dc507c Mon Sep 17 00:00:00 2001 From: Tahiya Salam Date: Fri, 5 Jul 2024 15:07:37 -0400 Subject: [PATCH 1/5] WIP - Download dataset file --- cli/app.go | 25 +++++++++ cli/data.go | 4 +- cli/dataset.go | 139 +++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 161 insertions(+), 7 deletions(-) diff --git a/cli/app.go b/cli/app.go index 5060da95632..ecc6d11b5bc 100644 --- a/cli/app.go +++ b/cli/app.go @@ -822,6 +822,31 @@ var app = &cli.App{ }, Action: DatasetCreateAction, }, + { + Name: "download", + Usage: "download data from a dataset", + UsageText: createUsageText("dataset download", + []string{datasetFlagDatasetID, datasetFlagName}, false), + Flags: []cli.Flag{ + &cli.PathFlag{ + Name: dataFlagDestination, + Required: true, + Usage: "output directory for downloaded data", + }, + &cli.StringFlag{ + Name: datasetFlagDatasetID, + Required: true, + Usage: "dataset ID of the dataset to be downloaded", + }, + &cli.BoolFlag{ + Name: datasetFlagIncludeJSONLines, + Required: false, + Usage: "option to include JSON Lines files for local testing", + Value: false, + }, + }, + Action: DatasetDownloadAction, + }, }, }, { diff --git a/cli/data.go b/cli/data.go index 567129acf95..ebe3187cbd2 100644 --- a/cli/data.go +++ b/cli/data.go @@ -68,7 +68,7 @@ func (c *viamClient) dataExportAction(cCtx *cli.Context) error { switch cCtx.String(dataFlagDataType) { case dataTypeBinary: - if err := c.binaryData(cCtx.Path(dataFlagDestination), filter, cCtx.Uint(dataFlagParallelDownloads)); err != nil { + if err := c.binaryData(cCtx.Path(dataFlagDestination), filter, cCtx.Uint(dataFlagParallelDownloads), false); err != nil { return err } case dataTypeTabular: @@ -264,7 +264,7 @@ func createDataFilter(c *cli.Context) (*datapb.Filter, error) { } // BinaryData downloads binary data matching filter to dst. -func (c *viamClient) binaryData(dst string, filter *datapb.Filter, parallelDownloads uint) error { +func (c *viamClient) binaryData(dst string, filter *datapb.Filter, parallelDownloads uint, includeJSONL bool) error { if err := c.ensureLoggedIn(); err != nil { return err } diff --git a/cli/dataset.go b/cli/dataset.go index 2ce64c3b134..06e0c145a6f 100644 --- a/cli/dataset.go +++ b/cli/dataset.go @@ -2,18 +2,25 @@ package cli import ( "context" + "encoding/json" + "os" + "path/filepath" "github.com/pkg/errors" "github.com/urfave/cli/v2" + "go.uber.org/multierr" + datapb "go.viam.com/api/app/data/v1" + v1 "go.viam.com/api/app/data/v1" datasetpb "go.viam.com/api/app/dataset/v1" ) const ( - datasetFlagName = "name" - datasetFlagDatasetID = "dataset-id" - datasetFlagDatasetIDs = "dataset-ids" - dataFlagLocationID = "location-id" - dataFlagFileIDs = "file-ids" + datasetFlagName = "name" + datasetFlagDatasetID = "dataset-id" + datasetFlagDatasetIDs = "dataset-ids" + dataFlagLocationID = "location-id" + dataFlagFileIDs = "file-ids" + datasetFlagIncludeJSONLines = "jsonl" ) // DatasetCreateAction is the corresponding action for 'dataset create'. @@ -150,3 +157,125 @@ func (c *viamClient) deleteDataset(datasetID string) error { printf(c.c.App.Writer, "Dataset with ID %s deleted", datasetID) return nil } + +// DatasetDeleteAction is the corresponding action for 'dataset rename'. +func DatasetDownloadAction(c *cli.Context) error { + client, err := newViamClient(c) + if err != nil { + return err + } + if err := client.downloadDataset(c.Path(dataFlagDestination), c.String(datasetFlagDatasetID), c.Bool(datasetFlagIncludeJSONLines)); err != nil { + return err + } + return nil +} + +// downloadDataset downloads a dataset with the specified ID. +func (c *viamClient) downloadDataset(dst, datasetID string, includeJSONLines bool) error { + if err := c.ensureLoggedIn(); err != nil { + return err + } + + var datasetFile *os.File + if includeJSONLines { + //nolint:gosec + datasetPath := filepath.Join(dst, "dataset.jsonl") + if err := os.MkdirAll(filepath.Dir(datasetPath), 0o700); err != nil { + return errors.Wrapf(err, "could not create dataset directory %s", filepath.Dir(datasetPath)) + } + //nolint:gosec + datasetFile, err := os.Create(datasetPath) + if err != nil { + return err + } + defer func() { + if err := datasetFile.Close(); err != nil { + Errorf(c.c.App.ErrWriter, "failed to close dataset file %q", datasetFile.Name()) + } + }() + } + + return c.performActionOnBinaryDataFromFilter( + func(id *datapb.BinaryID) error { + downloadErr := downloadBinary(c.c.Context, c.dataClient, dst, id, c.authFlow.httpClient, c.conf.Auth) + var datasetErr error + if includeJSONLines { + datasetErr = binaryDataToJSONLines(c.c.Context, c.dataClient, datasetFile, id) + } + return multierr.Combine(downloadErr, datasetErr) + }, + &datapb.Filter{ + DatasetId: datasetID, + }, 100, + func(i int32) { + printf(c.c.App.Writer, "Downloaded %d files", i) + }, + ) +} + +// Annotation holds the label associated with the image. +type Annotation struct { + AnnotationLabel string `json:"annotation_label"` +} + +// ObjectDetection defines the format of the data in jsonlines for object detection. +type ObjectDetection struct { + ImageGCSURI string `json:"image_gcs_uri"` + BBoxAnnotations []BBoxAnnotation `json:"bounding_box_annotations"` +} + +// ImageMetadata defines the format of the data in jsonlines for custom training. +type ImageMetadata struct { + ImagePath string `json:"image_path"` + ClassificationAnnotations []Annotation `json:"classification_annotations"` + BBoxAnnotations []*datapb.BoundingBox `json:"bounding_box_annotations"` +} + +// BBoxAnnotation holds the information associated with each bounding box. +type BBoxAnnotation struct { + AnnotationLabel string `json:"annotation_label"` + XMinNormalized float64 `json:"x_min_normalized"` + XMaxNormalized float64 `json:"x_max_normalized"` + YMinNormalized float64 `json:"y_min_normalized"` + YMaxNormalized float64 `json:"y_max_normalized"` +} + +func binaryDataToJSONLines(ctx context.Context, client v1.DataServiceClient, file *os.File, + id *datapb.BinaryID, +) error { + resp, err := client.BinaryDataByIDs(ctx, &datapb.BinaryDataByIDsRequest{ + BinaryIds: []*datapb.BinaryID{id}, + IncludeBinary: false, + }) + + data := resp.GetData() + if len(data) != 1 { + return errors.Errorf("expected a single response, received %d", len(data)) + } + datum := data[0] + + // Make JSONLines file + var jsonl interface{} + + annotations := []Annotation{} + for _, tag := range datum.GetMetadata().GetCaptureMetadata().GetTags() { + annotations = append(annotations, Annotation{AnnotationLabel: tag}) + } + jsonl = ImageMetadata{ + ImagePath: filenameForDownload(datum.GetMetadata()), + ClassificationAnnotations: annotations, + BBoxAnnotations: datum.GetMetadata().GetAnnotations().GetBboxes(), + } + + line, err := json.Marshal(jsonl) + if err != nil { + return errors.Wrap(err, "error formatting JSON") + } + line = append(line, "\n"...) + _, err = file.Write(line) + if err != nil { + return errors.Wrap(err, "error writing to file") + } + + return nil +} From 3fc61c36d9564dc85b5d908c3ebfaaa526c0a27c Mon Sep 17 00:00:00 2001 From: Tahiya Salam Date: Mon, 8 Jul 2024 09:51:18 -0400 Subject: [PATCH 2/5] Add CLI command for downloading dataset --- cli/app.go | 6 +++++ cli/data.go | 4 +-- cli/dataset.go | 67 ++++++++++++++++++++++++++++++++------------------ 3 files changed, 51 insertions(+), 26 deletions(-) diff --git a/cli/app.go b/cli/app.go index ecc6d11b5bc..d957ef766c2 100644 --- a/cli/app.go +++ b/cli/app.go @@ -844,6 +844,12 @@ var app = &cli.App{ Usage: "option to include JSON Lines files for local testing", Value: false, }, + &cli.UintFlag{ + Name: dataFlagParallelDownloads, + Required: false, + Usage: "number of download requests to make in parallel", + Value: 100, + }, }, Action: DatasetDownloadAction, }, diff --git a/cli/data.go b/cli/data.go index ebe3187cbd2..567129acf95 100644 --- a/cli/data.go +++ b/cli/data.go @@ -68,7 +68,7 @@ func (c *viamClient) dataExportAction(cCtx *cli.Context) error { switch cCtx.String(dataFlagDataType) { case dataTypeBinary: - if err := c.binaryData(cCtx.Path(dataFlagDestination), filter, cCtx.Uint(dataFlagParallelDownloads), false); err != nil { + if err := c.binaryData(cCtx.Path(dataFlagDestination), filter, cCtx.Uint(dataFlagParallelDownloads)); err != nil { return err } case dataTypeTabular: @@ -264,7 +264,7 @@ func createDataFilter(c *cli.Context) (*datapb.Filter, error) { } // BinaryData downloads binary data matching filter to dst. -func (c *viamClient) binaryData(dst string, filter *datapb.Filter, parallelDownloads uint, includeJSONL bool) error { +func (c *viamClient) binaryData(dst string, filter *datapb.Filter, parallelDownloads uint) error { if err := c.ensureLoggedIn(); err != nil { return err } diff --git a/cli/dataset.go b/cli/dataset.go index 06e0c145a6f..ac9d1bb1cef 100644 --- a/cli/dataset.go +++ b/cli/dataset.go @@ -10,7 +10,6 @@ import ( "github.com/urfave/cli/v2" "go.uber.org/multierr" datapb "go.viam.com/api/app/data/v1" - v1 "go.viam.com/api/app/data/v1" datasetpb "go.viam.com/api/app/dataset/v1" ) @@ -132,7 +131,7 @@ func (c *viamClient) listDatasetByOrg(orgID string) error { return nil } -// DatasetDeleteAction is the corresponding action for 'dataset rename'. +// DatasetDeleteAction is the corresponding action for 'dataset delete'. func DatasetDeleteAction(c *cli.Context) error { client, err := newViamClient(c) if err != nil { @@ -158,33 +157,34 @@ func (c *viamClient) deleteDataset(datasetID string) error { return nil } -// DatasetDeleteAction is the corresponding action for 'dataset rename'. +// DatasetDownloadAction is the corresponding action for 'dataset download'. func DatasetDownloadAction(c *cli.Context) error { client, err := newViamClient(c) if err != nil { return err } - if err := client.downloadDataset(c.Path(dataFlagDestination), c.String(datasetFlagDatasetID), c.Bool(datasetFlagIncludeJSONLines)); err != nil { + if err := client.downloadDataset(c.Path(dataFlagDestination), c.String(datasetFlagDatasetID), + c.Bool(datasetFlagIncludeJSONLines), c.Uint(dataFlagParallelDownloads)); err != nil { return err } return nil } // downloadDataset downloads a dataset with the specified ID. -func (c *viamClient) downloadDataset(dst, datasetID string, includeJSONLines bool) error { +func (c *viamClient) downloadDataset(dst, datasetID string, includeJSONLines bool, parallelDownloads uint) error { if err := c.ensureLoggedIn(); err != nil { return err } var datasetFile *os.File + var err error if includeJSONLines { - //nolint:gosec datasetPath := filepath.Join(dst, "dataset.jsonl") if err := os.MkdirAll(filepath.Dir(datasetPath), 0o700); err != nil { return errors.Wrapf(err, "could not create dataset directory %s", filepath.Dir(datasetPath)) } //nolint:gosec - datasetFile, err := os.Create(datasetPath) + datasetFile, err = os.Create(datasetPath) if err != nil { return err } @@ -206,7 +206,7 @@ func (c *viamClient) downloadDataset(dst, datasetID string, includeJSONLines boo }, &datapb.Filter{ DatasetId: datasetID, - }, 100, + }, parallelDownloads, func(i int32) { printf(c.c.App.Writer, "Downloaded %d files", i) }, @@ -218,17 +218,11 @@ type Annotation struct { AnnotationLabel string `json:"annotation_label"` } -// ObjectDetection defines the format of the data in jsonlines for object detection. -type ObjectDetection struct { - ImageGCSURI string `json:"image_gcs_uri"` - BBoxAnnotations []BBoxAnnotation `json:"bounding_box_annotations"` -} - // ImageMetadata defines the format of the data in jsonlines for custom training. type ImageMetadata struct { - ImagePath string `json:"image_path"` - ClassificationAnnotations []Annotation `json:"classification_annotations"` - BBoxAnnotations []*datapb.BoundingBox `json:"bounding_box_annotations"` + ImagePath string `json:"image_path"` + ClassificationAnnotations []Annotation `json:"classification_annotations"` + BBoxAnnotations []BBoxAnnotation `json:"bounding_box_annotations"` } // BBoxAnnotation holds the information associated with each bounding box. @@ -240,13 +234,23 @@ type BBoxAnnotation struct { YMaxNormalized float64 `json:"y_max_normalized"` } -func binaryDataToJSONLines(ctx context.Context, client v1.DataServiceClient, file *os.File, +func binaryDataToJSONLines(ctx context.Context, client datapb.DataServiceClient, file *os.File, id *datapb.BinaryID, ) error { - resp, err := client.BinaryDataByIDs(ctx, &datapb.BinaryDataByIDsRequest{ - BinaryIds: []*datapb.BinaryID{id}, - IncludeBinary: false, - }) + var resp *datapb.BinaryDataByIDsResponse + var err error + for count := 0; count < maxRetryCount; count++ { + resp, err = client.BinaryDataByIDs(ctx, &datapb.BinaryDataByIDsRequest{ + BinaryIds: []*datapb.BinaryID{id}, + IncludeBinary: false, + }) + if err == nil { + break + } + } + if err != nil { + return errors.Wrapf(err, serverErrorMessage) + } data := resp.GetData() if len(data) != 1 { @@ -254,17 +258,18 @@ func binaryDataToJSONLines(ctx context.Context, client v1.DataServiceClient, fil } datum := data[0] - // Make JSONLines file + // Make JSONLines var jsonl interface{} annotations := []Annotation{} for _, tag := range datum.GetMetadata().GetCaptureMetadata().GetTags() { annotations = append(annotations, Annotation{AnnotationLabel: tag}) } + bboxAnnotations := convertBoundingBoxes(datum.GetMetadata().GetAnnotations().GetBboxes()) jsonl = ImageMetadata{ ImagePath: filenameForDownload(datum.GetMetadata()), ClassificationAnnotations: annotations, - BBoxAnnotations: datum.GetMetadata().GetAnnotations().GetBboxes(), + BBoxAnnotations: bboxAnnotations, } line, err := json.Marshal(jsonl) @@ -279,3 +284,17 @@ func binaryDataToJSONLines(ctx context.Context, client v1.DataServiceClient, fil return nil } + +func convertBoundingBoxes(protoBBoxes []*datapb.BoundingBox) []BBoxAnnotation { + bboxes := make([]BBoxAnnotation, len(protoBBoxes)) + for i, box := range protoBBoxes { + bboxes[i] = BBoxAnnotation{ + AnnotationLabel: box.GetLabel(), + XMinNormalized: box.GetXMinNormalized(), + XMaxNormalized: box.GetXMaxNormalized(), + YMinNormalized: box.GetYMinNormalized(), + YMaxNormalized: box.GetYMaxNormalized(), + } + } + return bboxes +} From b3cdc0ba3a58117e306fc41164a6b5703ed49909 Mon Sep 17 00:00:00 2001 From: Etai Shuchatowitz Date: Thu, 26 Jun 2025 17:41:46 -0400 Subject: [PATCH 3/5] first draft --- go.mod | 2 ++ go.sum | 2 -- services/datamanager/builtin/builtin.go | 14 +++++++++++ services/datamanager/builtin/sync/sync.go | 25 ++++++++++++++++--- .../builtin/sync/upload_arbitrary_file.go | 22 +++++++++------- services/datamanager/client.go | 17 +++++++++++++ services/datamanager/data_manager.go | 2 ++ services/datamanager/server.go | 14 +++++++++++ 8 files changed, 83 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 91f73817856..64ea37918c0 100644 --- a/go.mod +++ b/go.mod @@ -443,3 +443,5 @@ require ( github.com/ziutek/mymysql v1.5.4 // indirect golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e ) + +replace go.viam.com/api => ../api diff --git a/go.sum b/go.sum index 8217e54c531..c1547b1e5eb 100644 --- a/go.sum +++ b/go.sum @@ -1511,8 +1511,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= -go.viam.com/api v0.1.450 h1:AiWmIJeFlr0TnrtTg2KVEd7JJVgp+aALUxdMAlRVTrc= -go.viam.com/api v0.1.450/go.mod h1:gwJriv6EVWe97uFzzzWjzP3NPfpCrKtRAdWtYglUpqs= go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug= go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI= go.viam.com/utils v0.1.147 h1:1z2mo/8GVR1jRRckr4qC8FLTps5kpI9fkCeHL4P1yw4= diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index fd77a9ac5b1..4bd6d2cfc8d 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -154,6 +154,20 @@ func (b *builtIn) Sync(ctx context.Context, extra map[string]interface{}) error return b.sync.Sync(ctx, extra) } +func (b *builtIn) UploadImageToDataset(ctx context.Context, + image []byte, + datasetIDs []string, + tags []string, + extra map[string]interface{}, +) error { + b.logger.Info("UploadImageToDataset START") + defer b.logger.Info("UploadImageToDataset END") + b.mu.Lock() + defer b.mu.Unlock() + b.sync.UploadImageToDataset(ctx, image, datasetIDs, tags) + return nil +} + // Reconfigure updates the data manager service when the config has changed. // At time of writing Reconfigure only returns an error in one of the following unrecoverable error cases: // 1. There is some static (aka compile time) error which we currently are only able to detected at runtime: diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index 084cedded90..c80ffc8b14c 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -12,6 +12,7 @@ import ( "time" "github.com/benbjohnson/clock" + "github.com/google/uuid" "github.com/pkg/errors" "go.uber.org/multierr" v1 "go.viam.com/api/app/datasync/v1" @@ -362,10 +363,26 @@ func (s *Sync) syncFile(config Config, filePath string) { if data.IsDataCaptureFile(f) { s.syncDataCaptureFile(f, config.CaptureDir, s.logger) } else { - s.syncArbitraryFile(f, config.Tags, config.FileLastModifiedMillis, s.logger) + s.syncArbitraryFile(f, config.Tags, []string{}, config.FileLastModifiedMillis, s.logger) } } +func (s *Sync) UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string) error { + filename := uuid.NewString() + err := os.WriteFile(filename, image, os.ModeAppend) + if err != nil { + s.logger.Errorw("error writing file", "err", err) + return err + } + f, err := os.Open(filename) + if err != nil { + s.logger.Errorw("error reading file", "err", err) + return err + } + s.syncArbitraryFile(f, tags, datasetIDs, 0, s.logger) + return nil +} + func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging.Logger) { captureFile, err := data.ReadCaptureFile(f) // if you can't read the capture file's metadata field, close & move it to the failed directory @@ -434,10 +451,10 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging } } -func (s *Sync) syncArbitraryFile(f *os.File, tags []string, fileLastModifiedMillis int, logger logging.Logger) { +func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLastModifiedMillis int, logger logging.Logger) { retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) { errMetadata := fmt.Sprintf("error uploading arbitrary file %s", f.Name()) - bytesUploaded, err := uploadArbitraryFile(ctx, f, s.cloudConn, tags, fileLastModifiedMillis, s.clock, logger) + bytesUploaded, err := uploadArbitraryFile(ctx, f, s.cloudConn, tags, datasetIDs, fileLastModifiedMillis, s.clock, logger) if err != nil { return 0, errors.Wrap(err, errMetadata) } @@ -448,7 +465,7 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags []string, fileLastModifiedMill bytesUploaded, err := retry.run() if err != nil { if closeErr := f.Close(); closeErr != nil { - logger.Error(errors.Wrap(closeErr, "error closing data capture file").Error()) + logger.Error(errors.Wrap(closeErr, "error closing arbitrary file").Error()) } // if we stopped due to a cancelled context, diff --git a/services/datamanager/builtin/sync/upload_arbitrary_file.go b/services/datamanager/builtin/sync/upload_arbitrary_file.go index 782748b6a3c..915833efcbc 100644 --- a/services/datamanager/builtin/sync/upload_arbitrary_file.go +++ b/services/datamanager/builtin/sync/upload_arbitrary_file.go @@ -22,7 +22,7 @@ var ( errFileModifiedTooRecently = errors.New("file modified too recently") ) -// uploadArbitraryFile uploads files which were not writted by the builtin datamanager's data capture package. +// UploadArbitraryFile uploads files which were not writted by the builtin datamanager's data capture package. // They are frequently files written by 3rd party programs such as images, videos, logs, written to // the capture directory or a subdirectory or to additional sync paths (or their sub directories). // Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case. @@ -30,7 +30,7 @@ func uploadArbitraryFile( ctx context.Context, f *os.File, conn cloudConn, - tags []string, + tags, datasetIDs []string, fileLastModifiedMillis int, clock clock.Clock, logger logging.Logger, @@ -78,15 +78,19 @@ func uploadArbitraryFile( // Send metadata FileUploadRequest. logger.Debugf("datasync.FileUpload request sending metadata for arbitrary file: %s", path) + md := &v1.UploadMetadata{ + PartId: conn.partID, + Type: v1.DataType_DATA_TYPE_FILE, + FileName: path, + FileExtension: filepath.Ext(f.Name()), + Tags: tags, + } + if len(datasetIDs) > 0 { + md.DatasetIds = datasetIDs + } if err := stream.Send(&v1.FileUploadRequest{ UploadPacket: &v1.FileUploadRequest_Metadata{ - Metadata: &v1.UploadMetadata{ - PartId: conn.partID, - Type: v1.DataType_DATA_TYPE_FILE, - FileName: path, - FileExtension: filepath.Ext(f.Name()), - Tags: tags, - }, + Metadata: md, }, }); err != nil { return 0, errors.Wrap(err, "FileUpload failed sending metadata") diff --git a/services/datamanager/client.go b/services/datamanager/client.go index b640544b919..25a2aa9365b 100644 --- a/services/datamanager/client.go +++ b/services/datamanager/client.go @@ -53,6 +53,23 @@ func (c *client) Sync(ctx context.Context, extra map[string]interface{}) error { return nil } +func (c *client) UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, extra map[string]interface{}) error { + ext, err := protoutils.StructToStructPb(extra) + if err != nil { + return err + } + _, err = c.client.UploadImageToDataset(ctx, &pb.UploadImageToDatasetRequest{ + Image: image, + DatasetIds: datasetIDs, + Tags: tags, + Extra: ext, + }) + if err != nil { + return err + } + return nil +} + func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd) } diff --git a/services/datamanager/data_manager.go b/services/datamanager/data_manager.go index f1845d07a13..766200bc6a4 100644 --- a/services/datamanager/data_manager.go +++ b/services/datamanager/data_manager.go @@ -50,6 +50,8 @@ type Service interface { resource.Resource // Sync will sync data stored on the machine to the cloud. Sync(ctx context.Context, extra map[string]interface{}) error + UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, + extra map[string]interface{}) error } // SubtypeName is the name of the type of service. diff --git a/services/datamanager/server.go b/services/datamanager/server.go index f58f495f3a0..07f8b268199 100644 --- a/services/datamanager/server.go +++ b/services/datamanager/server.go @@ -34,6 +34,20 @@ func (server *serviceServer) Sync(ctx context.Context, req *pb.SyncRequest) (*pb return &pb.SyncResponse{}, nil } +func (server *serviceServer) UploadImageToDataset( + ctx context.Context, + req *pb.UploadImageToDatasetRequest, +) (*pb.UploadImageToDatasetResponse, error) { + svc, err := server.coll.Resource(req.Name) + if err != nil { + return nil, err + } + if err := svc.UploadImageToDataset(ctx, req.Image, req.DatasetIds, req.Tags, req.Extra.AsMap()); err != nil { + return nil, err + } + return &pb.UploadImageToDatasetResponse{}, nil +} + // DoCommand receives arbitrary commands. func (server *serviceServer) DoCommand(ctx context.Context, req *commonpb.DoCommandRequest, From 2a54edf0e52288f6059d112351093e9f9060cb95 Mon Sep 17 00:00:00 2001 From: Etai Shuchatowitz Date: Thu, 26 Jun 2025 17:42:57 -0400 Subject: [PATCH 4/5] merge --- cli/app.go | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/cli/app.go b/cli/app.go index 7a74f2af740..31cbdbaa4b0 100644 --- a/cli/app.go +++ b/cli/app.go @@ -1587,37 +1587,6 @@ var app = &cli.App{ }, }, }, - { - Name: "download", - Usage: "download data from a dataset", - UsageText: createUsageText("dataset download", - []string{datasetFlagDatasetID, datasetFlagName}, false), - Flags: []cli.Flag{ - &cli.PathFlag{ - Name: dataFlagDestination, - Required: true, - Usage: "output directory for downloaded data", - }, - &cli.StringFlag{ - Name: datasetFlagDatasetID, - Required: true, - Usage: "dataset ID of the dataset to be downloaded", - }, - &cli.BoolFlag{ - Name: datasetFlagIncludeJSONLines, - Required: false, - Usage: "option to include JSON Lines files for local testing", - Value: false, - }, - &cli.UintFlag{ - Name: dataFlagParallelDownloads, - Required: false, - Usage: "number of download requests to make in parallel", - Value: 100, - }, - }, - Action: DatasetDownloadAction, - }, }, }, { From eeaa2d9b9a50f013abb153b08164b77d8dfd85eb Mon Sep 17 00:00:00 2001 From: Etai Shuchatowitz Date: Fri, 11 Jul 2025 15:45:55 -0400 Subject: [PATCH 5/5] two endpoints in sdk --- services/datamanager/builtin/builtin.go | 2 +- services/datamanager/builtin/sync/sync.go | 3 +- services/datamanager/client.go | 50 ++++++++++++++++++++++- services/datamanager/data_manager.go | 6 ++- services/datamanager/server.go | 2 +- 5 files changed, 58 insertions(+), 5 deletions(-) diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index 4bd6d2cfc8d..45bd6605970 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -154,7 +154,7 @@ func (b *builtIn) Sync(ctx context.Context, extra map[string]interface{}) error return b.sync.Sync(ctx, extra) } -func (b *builtIn) UploadImageToDataset(ctx context.Context, +func (b *builtIn) UploadRawDataToDataset(ctx context.Context, image []byte, datasetIDs []string, tags []string, diff --git a/services/datamanager/builtin/sync/sync.go b/services/datamanager/builtin/sync/sync.go index c80ffc8b14c..6b0e0139fe3 100644 --- a/services/datamanager/builtin/sync/sync.go +++ b/services/datamanager/builtin/sync/sync.go @@ -379,7 +379,8 @@ func (s *Sync) UploadImageToDataset(ctx context.Context, image []byte, datasetID s.logger.Errorw("error reading file", "err", err) return err } - s.syncArbitraryFile(f, tags, datasetIDs, 0, s.logger) + // TODO: Make this async. + go s.syncArbitraryFile(f, tags, datasetIDs, int(time.Now().UnixMilli()), s.logger) return nil } diff --git a/services/datamanager/client.go b/services/datamanager/client.go index 25a2aa9365b..8bb76d8ce91 100644 --- a/services/datamanager/client.go +++ b/services/datamanager/client.go @@ -2,8 +2,14 @@ package datamanager import ( + "bytes" "context" + "errors" + "image" + "image/jpeg" + "image/png" + datasyncpb "go.viam.com/api/app/datasync/v1" pb "go.viam.com/api/service/datamanager/v1" "go.viam.com/utils/protoutils" "go.viam.com/utils/rpc" @@ -53,7 +59,7 @@ func (c *client) Sync(ctx context.Context, extra map[string]interface{}) error { return nil } -func (c *client) UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, extra map[string]interface{}) error { +func (c *client) UploadRawDataToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, extra map[string]interface{}) error { ext, err := protoutils.StructToStructPb(extra) if err != nil { return err @@ -70,6 +76,48 @@ func (c *client) UploadImageToDataset(ctx context.Context, image []byte, dataset return nil } +func (c *client) UploadImageToDataset( + ctx context.Context, + image image.Image, + datasetIDs, tags []string, + mimeType datasyncpb.MimeType, + extra map[string]interface{}, +) error { + ext, err := protoutils.StructToStructPb(extra) + if err != nil { + return err + } + + var imgBytes []byte + var buf bytes.Buffer + switch mimeType { + case datasyncpb.MimeType_MIME_TYPE_IMAGE_JPEG: + err := jpeg.Encode(&buf, image, nil) + if err != nil { + return err + } + imgBytes = buf.Bytes() + case datasyncpb.MimeType_MIME_TYPE_IMAGE_PNG: + err := png.Encode(&buf, image) + if err != nil { + return err + } + imgBytes = buf.Bytes() + default: + return errors.New("mime type must png or jpeg") + } + _, err = c.client.UploadImageToDataset(ctx, &pb.UploadImageToDatasetRequest{ + Image: imgBytes, + DatasetIds: datasetIDs, + Tags: tags, + Extra: ext, + }) + if err != nil { + return err + } + return nil +} + func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) { return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd) } diff --git a/services/datamanager/data_manager.go b/services/datamanager/data_manager.go index 766200bc6a4..3032ab5c2e4 100644 --- a/services/datamanager/data_manager.go +++ b/services/datamanager/data_manager.go @@ -7,9 +7,11 @@ package datamanager import ( "context" "encoding/json" + "image" "reflect" "slices" + datasyncpb "go.viam.com/api/app/datasync/v1" servicepb "go.viam.com/api/service/datamanager/v1" "go.viam.com/rdk/resource" @@ -50,8 +52,10 @@ type Service interface { resource.Resource // Sync will sync data stored on the machine to the cloud. Sync(ctx context.Context, extra map[string]interface{}) error - UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, + UploadRawDataToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, extra map[string]interface{}) error + UploadImageToDataset(ctx context.Context, image image.Image, dataseteIDs, tags []string, + mimeType datasyncpb.MimeType, extra map[string]interface{}) error } // SubtypeName is the name of the type of service. diff --git a/services/datamanager/server.go b/services/datamanager/server.go index 07f8b268199..40a2af6e0f3 100644 --- a/services/datamanager/server.go +++ b/services/datamanager/server.go @@ -42,7 +42,7 @@ func (server *serviceServer) UploadImageToDataset( if err != nil { return nil, err } - if err := svc.UploadImageToDataset(ctx, req.Image, req.DatasetIds, req.Tags, req.Extra.AsMap()); err != nil { + if err := svc.UploadRawDataToDataset(ctx, req.Image, req.DatasetIds, req.Tags, req.Extra.AsMap()); err != nil { return nil, err } return &pb.UploadImageToDatasetResponse{}, nil