Skip to content

Feat: add glusterfs #255

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/fs/client/meta/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,13 +650,14 @@ func newUFS(fsMeta common.FSMeta) (ufslib.UnderFileStorage, error) {
properties[k] = v
}
properties[common.SubPath] = fsMeta.SubPath
properties[common.Type] = fsMeta.UfsType

switch fsMeta.UfsType {
case common.HDFSType:
properties[common.NameNodeAddress] = fsMeta.ServerAddress
case common.HDFSWithKerberosType:
properties[common.NameNodeAddress] = fsMeta.ServerAddress
case common.SFTPType, common.CFSType:
case common.SFTPType, common.CFSType, common.GlusterfsType:
properties[common.Address] = fsMeta.ServerAddress
}
return ufslib.NewUFS(fsMeta.UfsType, properties)
Expand Down
104 changes: 61 additions & 43 deletions pkg/fs/client/ufs/cfs.go → pkg/fs/client/ufs/local_mount.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@ package ufs

import (
"errors"
"fmt"
"io"
"io/ioutil"
"os"
Expand All @@ -36,85 +37,86 @@ import (
)

const (
mountPath = ".tmp_cfs"
cfsMountParam = "minorversion=1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport"
localMountPath = ".tmp_local_mount"
cfsMountParam = "minorversion=1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2,noresvport"
)

type cfsFileSystem struct {
addr string // host:
type localMount struct {
addr string
subpath string
localPath string
mountType string
}

func (fs *cfsFileSystem) String() string {
return common.CFSType
func (fs *localMount) String() string {
return fs.mountType
}

func (fs *cfsFileSystem) GetPath(relPath string) string {
func (fs *localMount) GetPath(relPath string) string {
return filepath.Join(fs.localPath, relPath)
}

func (fs *cfsFileSystem) Chmod(name string, mode uint32) error {
func (fs *localMount) Chmod(name string, mode uint32) error {
return os.Chmod(fs.GetPath(name), os.FileMode(mode))
}

func (fs *cfsFileSystem) Chown(name string, uid uint32, gid uint32) error {
func (fs *localMount) Chown(name string, uid uint32, gid uint32) error {
return os.Chown(fs.GetPath(name), int(uid), int(gid))
}

func (fs *cfsFileSystem) Utimens(name string, atime *time.Time, mtime *time.Time) error {
func (fs *localMount) Utimens(name string, atime *time.Time, mtime *time.Time) error {
return os.Chtimes(fs.GetPath(name), *atime, *mtime)
}

func (fs *cfsFileSystem) Truncate(name string, size uint64) error {
func (fs *localMount) Truncate(name string, size uint64) error {
return os.Truncate(fs.GetPath(name), int64(size))
}

func (fs *cfsFileSystem) Access(name string, mode, callerUid, callerGid uint32) error {
func (fs *localMount) Access(name string, mode, callerUid, callerGid uint32) error {
return syscall.ENOSYS
}

func (fs *cfsFileSystem) Link(oldName string, newName string) error {
func (fs *localMount) Link(oldName string, newName string) error {
return os.Link(fs.GetPath(oldName), fs.GetPath(newName))
}

func (fs *cfsFileSystem) Mkdir(name string, mode uint32) error {
func (fs *localMount) Mkdir(name string, mode uint32) error {
return os.Mkdir(fs.GetPath(name), os.FileMode(mode))
}

func (fs *cfsFileSystem) Mknod(name string, mode uint32, dev uint32) error {
func (fs *localMount) Mknod(name string, mode uint32, dev uint32) error {
return syscall.ENOSYS
}

func (fs *cfsFileSystem) Rename(oldName string, newName string) error {
func (fs *localMount) Rename(oldName string, newName string) error {
return os.Rename(fs.GetPath(oldName), fs.GetPath(newName))
}

func (fs *cfsFileSystem) Rmdir(name string) error {
func (fs *localMount) Rmdir(name string) error {
return syscall.Rmdir(fs.GetPath(name))
}

func (fs *cfsFileSystem) Unlink(name string) error {
func (fs *localMount) Unlink(name string) error {
return syscall.Unlink(fs.GetPath(name))
}

func (fs *cfsFileSystem) GetXAttr(name string, attribute string) (data []byte, err error) {
func (fs *localMount) GetXAttr(name string, attribute string) (data []byte, err error) {
return nil, syscall.ENOSYS
}

func (fs *cfsFileSystem) ListXAttr(name string) (attributes []string, err error) {
func (fs *localMount) ListXAttr(name string) (attributes []string, err error) {
return nil, syscall.ENOSYS
}

func (fs *cfsFileSystem) RemoveXAttr(name string, attr string) error {
func (fs *localMount) RemoveXAttr(name string, attr string) error {
return syscall.ENOSYS
}

func (fs *cfsFileSystem) SetXAttr(name string, attr string, data []byte, flags int) error {
func (fs *localMount) SetXAttr(name string, attr string, data []byte, flags int) error {
return syscall.ENOSYS
}

func (fs *cfsFileSystem) Open(name string, flags uint32) (fd base.FileHandle, err error) {
func (fs *localMount) Open(name string, flags uint32) (fd base.FileHandle, err error) {
flags = flags &^ syscall.O_APPEND
f, err := os.OpenFile(fs.GetPath(name), int(flags), 0)
if err != nil {
Expand All @@ -123,14 +125,14 @@ func (fs *cfsFileSystem) Open(name string, flags uint32) (fd base.FileHandle, er
return nodefs.NewLoopbackFile(f), nil
}

func (fs *cfsFileSystem) Create(name string, flags uint32, mode uint32) (fd base.FileHandle, err error) {
func (fs *localMount) Create(name string, flags uint32, mode uint32) (fd base.FileHandle, err error) {
flags = flags &^ syscall.O_APPEND
f, err := os.OpenFile(fs.GetPath(name), int(flags)|os.O_CREATE, os.FileMode(mode))
return nodefs.NewLoopbackFile(f), err
}

// Directory handling
func (fs *cfsFileSystem) ReadDir(name string) (stream []base.DirEntry, err error) {
func (fs *localMount) ReadDir(name string) (stream []base.DirEntry, err error) {
entries, err := os.ReadDir(fs.GetPath(name))
if err != nil {
return nil, err
Expand All @@ -157,16 +159,16 @@ func (fs *cfsFileSystem) ReadDir(name string) (stream []base.DirEntry, err error
}

// Symlinks.
func (fs *cfsFileSystem) Symlink(value string, linkName string) error {
func (fs *localMount) Symlink(value string, linkName string) error {
return os.Symlink(value, fs.GetPath(linkName))
}

func (fs *cfsFileSystem) Readlink(name string) (string, error) {
func (fs *localMount) Readlink(name string) (string, error) {
f, err := os.Readlink(fs.GetPath(name))
return f, err
}

func (fs *cfsFileSystem) StatFs(name string) *base.StatfsOut {
func (fs *localMount) StatFs(name string) *base.StatfsOut {
s := syscall.Statfs_t{}
err := syscall.Statfs(fs.GetPath(name), &s)
if err != nil {
Expand All @@ -178,7 +180,7 @@ func (fs *cfsFileSystem) StatFs(name string) *base.StatfsOut {
return out
}

func (fs *cfsFileSystem) Get(name string, flags uint32, off, limit int64) (io.ReadCloser, error) {
func (fs *localMount) Get(name string, flags uint32, off, limit int64) (io.ReadCloser, error) {
flags = flags &^ syscall.O_APPEND
reader, err := os.OpenFile(fs.GetPath(name), int(flags), 0)
if err != nil {
Expand All @@ -196,37 +198,52 @@ func (fs *cfsFileSystem) Get(name string, flags uint32, off, limit int64) (io.Re
return reader, err
}

func (fs *cfsFileSystem) Put(name string, reader io.Reader) error {
func (fs *localMount) Put(name string, reader io.Reader) error {
return nil
}

func NewCfsFileSystem(properties map[string]interface{}) (UnderFileStorage, error) {
addr := properties[common.Address].(string)
subpath := properties[common.SubPath].(string)
os.MkdirAll(mountPath, 0755)
localPath, err := ioutil.TempDir(mountPath, "*")
func NewLocalMountFileSystem(properties map[string]interface{}) (UnderFileStorage, error) {
mountType := properties[common.Type].(string)
if mountType == "" {
return nil, errors.New("mount type empty")
}
var args []string
var sourcePath string
os.MkdirAll(localMountPath, 0755)
localPath, err := ioutil.TempDir(localMountPath, "*")
if err != nil {
log.Errorf("create temp dir all path[%s] failed: %v", localPath, err)
return nil, err
}
addr := properties[common.Address].(string)
subpath := properties[common.SubPath].(string)

switch mountType {
case common.GlusterfsType:
sourcePath = addr + ":" + subpath
args = []string{"-t", "glusterfs"}
case common.CFSType:
sourcePath = filepath.Join(addr, subpath) + "/"
args = []string{"-t", "nfs4", "-o", cfsMountParam}
default:
return nil, fmt.Errorf("type[%s] is not exist", mountType)
}

// todo use cfs client
sourcePath := filepath.Join(addr, subpath) + "/"
args := []string{"-t", "nfs4", "-o", cfsMountParam}
output, err := mount.ExecMount(sourcePath, localPath, args)
if err != nil {
log.Errorf("exec cfs mount cmd failed: %v, output[%s]", err, string(output))
log.Errorf("exec %s mount cmd failed: %v, output[%s]", mountType, err, string(output))
os.Remove(localPath)
return nil, errors.New(string(output))
}

fs := &cfsFileSystem{
fs := &localMount{
addr: addr,
subpath: subpath,
localPath: localPath,
mountType: mountType,
}

runtime.SetFinalizer(fs, func(fs *cfsFileSystem) {
runtime.SetFinalizer(fs, func(fs *localMount) {
err = mount.ForceUnmount(localPath)
if err != nil {
log.Debugf("force unmount mountPoint[%s] failed: %v", localPath, err)
Expand All @@ -241,5 +258,6 @@ func NewCfsFileSystem(properties map[string]interface{}) (UnderFileStorage, erro
}

func init() {
RegisterUFS(common.CFSType, NewCfsFileSystem)
RegisterUFS(common.GlusterfsType, NewLocalMountFileSystem)
RegisterUFS(common.CFSType, NewLocalFileSystem)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@ import (
"paddleflow/pkg/fs/client/utils"
)

func (fs *cfsFileSystem) GetAttr(name string) (*base.FileInfo, error) {
func (fs *localMount) GetAttr(name string) (*base.FileInfo, error) {
path := fs.GetPath(name)
var err error = nil
st := syscall.Stat_t{}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserve.
Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserve.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -23,7 +23,7 @@ import (
"paddleflow/pkg/fs/client/utils"
)

func (fs *cfsFileSystem) GetAttr(name string) (*base.FileInfo, error) {
func (fs *localMount) GetAttr(name string) (*base.FileInfo, error) {
path := fs.GetPath(name)
var err error = nil
st := syscall.Stat_t{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"github.com/stretchr/testify/assert"
)

func TestCFS(t *testing.T) {
func TestLocalMount(t *testing.T) {
os.MkdirAll("./mock", 0755)
fs := &cfsFileSystem{
fs := &localMount{
localPath: "./mock",
}
fs.Mkdir("data1", 0755)
Expand Down
2 changes: 2 additions & 0 deletions pkg/fs/common/fs_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
SFTPType = "sftp"
MockType = "mock"
CFSType = "cfs"
GlusterfsType = "glusterfs"

// common
Owner = "owner"
Expand All @@ -32,6 +33,7 @@ const (
// local properties and root path
RootKey = "root"
SubPath = "subpath"
Type = "type"

// HDFS properties
NameNodeAddress = "dfs.namenode.address"
Expand Down