Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,5 @@ require (
github.com/ziutek/mymysql v1.5.4 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
)

replace go.viam.com/api => ../api
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1511,8 +1511,6 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
go.viam.com/api v0.1.450 h1:AiWmIJeFlr0TnrtTg2KVEd7JJVgp+aALUxdMAlRVTrc=
go.viam.com/api v0.1.450/go.mod h1:gwJriv6EVWe97uFzzzWjzP3NPfpCrKtRAdWtYglUpqs=
go.viam.com/test v1.2.4 h1:JYgZhsuGAQ8sL9jWkziAXN9VJJiKbjoi9BsO33TW3ug=
go.viam.com/test v1.2.4/go.mod h1:zI2xzosHdqXAJ/kFqcN+OIF78kQuTV2nIhGZ8EzvaJI=
go.viam.com/utils v0.1.147 h1:1z2mo/8GVR1jRRckr4qC8FLTps5kpI9fkCeHL4P1yw4=
Expand Down
14 changes: 14 additions & 0 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ func (b *builtIn) Sync(ctx context.Context, extra map[string]interface{}) error
return b.sync.Sync(ctx, extra)
}

func (b *builtIn) UploadRawDataToDataset(ctx context.Context,
image []byte,
datasetIDs []string,
tags []string,
extra map[string]interface{},
) error {
b.logger.Info("UploadImageToDataset START")
defer b.logger.Info("UploadImageToDataset END")
b.mu.Lock()
defer b.mu.Unlock()
b.sync.UploadImageToDataset(ctx, image, datasetIDs, tags)
return nil
}

// Reconfigure updates the data manager service when the config has changed.
// At time of writing Reconfigure only returns an error in one of the following unrecoverable error cases:
// 1. There is some static (aka compile time) error which we currently are only able to detected at runtime:
Expand Down
26 changes: 22 additions & 4 deletions services/datamanager/builtin/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/benbjohnson/clock"
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/multierr"
v1 "go.viam.com/api/app/datasync/v1"
Expand Down Expand Up @@ -362,10 +363,27 @@ func (s *Sync) syncFile(config Config, filePath string) {
if data.IsDataCaptureFile(f) {
s.syncDataCaptureFile(f, config.CaptureDir, s.logger)
} else {
s.syncArbitraryFile(f, config.Tags, config.FileLastModifiedMillis, s.logger)
s.syncArbitraryFile(f, config.Tags, []string{}, config.FileLastModifiedMillis, s.logger)
}
}

func (s *Sync) UploadImageToDataset(ctx context.Context, image []byte, datasetIDs, tags []string) error {
filename := uuid.NewString()
err := os.WriteFile(filename, image, os.ModeAppend)
if err != nil {
s.logger.Errorw("error writing file", "err", err)
return err
}
f, err := os.Open(filename)
if err != nil {
s.logger.Errorw("error reading file", "err", err)
return err
}
// TODO: Make this async.
go s.syncArbitraryFile(f, tags, datasetIDs, int(time.Now().UnixMilli()), s.logger)
return nil
}

func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging.Logger) {
captureFile, err := data.ReadCaptureFile(f)
// if you can't read the capture file's metadata field, close & move it to the failed directory
Expand Down Expand Up @@ -434,10 +452,10 @@ func (s *Sync) syncDataCaptureFile(f *os.File, captureDir string, logger logging
}
}

func (s *Sync) syncArbitraryFile(f *os.File, tags []string, fileLastModifiedMillis int, logger logging.Logger) {
func (s *Sync) syncArbitraryFile(f *os.File, tags, datasetIDs []string, fileLastModifiedMillis int, logger logging.Logger) {
retry := newExponentialRetry(s.configCtx, s.clock, s.logger, f.Name(), func(ctx context.Context) (uint64, error) {
errMetadata := fmt.Sprintf("error uploading arbitrary file %s", f.Name())
bytesUploaded, err := uploadArbitraryFile(ctx, f, s.cloudConn, tags, fileLastModifiedMillis, s.clock, logger)
bytesUploaded, err := uploadArbitraryFile(ctx, f, s.cloudConn, tags, datasetIDs, fileLastModifiedMillis, s.clock, logger)
if err != nil {
return 0, errors.Wrap(err, errMetadata)
}
Expand All @@ -448,7 +466,7 @@ func (s *Sync) syncArbitraryFile(f *os.File, tags []string, fileLastModifiedMill
bytesUploaded, err := retry.run()
if err != nil {
if closeErr := f.Close(); closeErr != nil {
logger.Error(errors.Wrap(closeErr, "error closing data capture file").Error())
logger.Error(errors.Wrap(closeErr, "error closing arbitrary file").Error())
}

// if we stopped due to a cancelled context,
Expand Down
22 changes: 13 additions & 9 deletions services/datamanager/builtin/sync/upload_arbitrary_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ var (
errFileModifiedTooRecently = errors.New("file modified too recently")
)

// uploadArbitraryFile uploads files which were not writted by the builtin datamanager's data capture package.
// UploadArbitraryFile uploads files which were not writted by the builtin datamanager's data capture package.
// They are frequently files written by 3rd party programs such as images, videos, logs, written to
// the capture directory or a subdirectory or to additional sync paths (or their sub directories).
// Note: the bytes size returned is the size of the input file. It only returns a non 0 value in the success case.
func uploadArbitraryFile(
ctx context.Context,
f *os.File,
conn cloudConn,
tags []string,
tags, datasetIDs []string,
fileLastModifiedMillis int,
clock clock.Clock,
logger logging.Logger,
Expand Down Expand Up @@ -78,15 +78,19 @@ func uploadArbitraryFile(

// Send metadata FileUploadRequest.
logger.Debugf("datasync.FileUpload request sending metadata for arbitrary file: %s", path)
md := &v1.UploadMetadata{
PartId: conn.partID,
Type: v1.DataType_DATA_TYPE_FILE,
FileName: path,
FileExtension: filepath.Ext(f.Name()),
Tags: tags,
}
if len(datasetIDs) > 0 {
md.DatasetIds = datasetIDs
}
if err := stream.Send(&v1.FileUploadRequest{
UploadPacket: &v1.FileUploadRequest_Metadata{
Metadata: &v1.UploadMetadata{
PartId: conn.partID,
Type: v1.DataType_DATA_TYPE_FILE,
FileName: path,
FileExtension: filepath.Ext(f.Name()),
Tags: tags,
},
Metadata: md,
},
}); err != nil {
return 0, errors.Wrap(err, "FileUpload failed sending metadata")
Expand Down
65 changes: 65 additions & 0 deletions services/datamanager/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
package datamanager

import (
"bytes"
"context"
"errors"
"image"
"image/jpeg"
"image/png"

datasyncpb "go.viam.com/api/app/datasync/v1"
pb "go.viam.com/api/service/datamanager/v1"
"go.viam.com/utils/protoutils"
"go.viam.com/utils/rpc"
Expand Down Expand Up @@ -53,6 +59,65 @@ func (c *client) Sync(ctx context.Context, extra map[string]interface{}) error {
return nil
}

func (c *client) UploadRawDataToDataset(ctx context.Context, image []byte, datasetIDs, tags []string, extra map[string]interface{}) error {
ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return err
}
_, err = c.client.UploadImageToDataset(ctx, &pb.UploadImageToDatasetRequest{
Image: image,
DatasetIds: datasetIDs,
Tags: tags,
Extra: ext,
})
if err != nil {
return err
}
return nil
}

func (c *client) UploadImageToDataset(
ctx context.Context,
image image.Image,
datasetIDs, tags []string,
mimeType datasyncpb.MimeType,
extra map[string]interface{},
) error {
ext, err := protoutils.StructToStructPb(extra)
if err != nil {
return err
}

var imgBytes []byte
var buf bytes.Buffer
switch mimeType {
case datasyncpb.MimeType_MIME_TYPE_IMAGE_JPEG:
err := jpeg.Encode(&buf, image, nil)
if err != nil {
return err
}
imgBytes = buf.Bytes()
case datasyncpb.MimeType_MIME_TYPE_IMAGE_PNG:
err := png.Encode(&buf, image)
if err != nil {
return err
}
imgBytes = buf.Bytes()
default:
return errors.New("mime type must png or jpeg")
}
_, err = c.client.UploadImageToDataset(ctx, &pb.UploadImageToDatasetRequest{
Image: imgBytes,
DatasetIds: datasetIDs,
Tags: tags,
Extra: ext,
})
if err != nil {
return err
}
return nil
}

func (c *client) DoCommand(ctx context.Context, cmd map[string]interface{}) (map[string]interface{}, error) {
return rprotoutils.DoFromResourceClient(ctx, c.client, c.name, cmd)
}
6 changes: 6 additions & 0 deletions services/datamanager/data_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ package datamanager
import (
"context"
"encoding/json"
"image"
"reflect"
"slices"

datasyncpb "go.viam.com/api/app/datasync/v1"
servicepb "go.viam.com/api/service/datamanager/v1"

"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -50,6 +52,10 @@ type Service interface {
resource.Resource
// Sync will sync data stored on the machine to the cloud.
Sync(ctx context.Context, extra map[string]interface{}) error
UploadRawDataToDataset(ctx context.Context, image []byte, datasetIDs, tags []string,
extra map[string]interface{}) error
UploadImageToDataset(ctx context.Context, image image.Image, dataseteIDs, tags []string,
mimeType datasyncpb.MimeType, extra map[string]interface{}) error
}

// SubtypeName is the name of the type of service.
Expand Down
14 changes: 14 additions & 0 deletions services/datamanager/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ func (server *serviceServer) Sync(ctx context.Context, req *pb.SyncRequest) (*pb
return &pb.SyncResponse{}, nil
}

func (server *serviceServer) UploadImageToDataset(
ctx context.Context,
req *pb.UploadImageToDatasetRequest,
) (*pb.UploadImageToDatasetResponse, error) {
svc, err := server.coll.Resource(req.Name)
if err != nil {
return nil, err
}
if err := svc.UploadRawDataToDataset(ctx, req.Image, req.DatasetIds, req.Tags, req.Extra.AsMap()); err != nil {
return nil, err
}
return &pb.UploadImageToDatasetResponse{}, nil
}

// DoCommand receives arbitrary commands.
func (server *serviceServer) DoCommand(ctx context.Context,
req *commonpb.DoCommandRequest,
Expand Down
Loading