Skip to content

Commit 3eac009

Browse files
authored
feat: add glusterfs (#222)
1 parent 6d68dfc commit 3eac009

File tree

5 files changed

+109
-60
lines changed

5 files changed

+109
-60
lines changed

pkg/apiserver/common/utils.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ const (
5252
SFTP = "sftp"
5353
Mock = "mock"
5454
CFS = "cfs"
55+
Glusterfs = "glusterfs"
5556
IPDomainOrIPDomainPortPattern = "^([a-zA-Z0-9][-a-zA-Z0-9]{0,62}(\\.[a-zA-Z0-9][-a-zA-Z0-9]{0,62})+)" +
5657
"(:([1-9]|[1-9]\\d{1,3}|[1-5]\\d{4}|6[0-4]\\d{3}|65[0-4]\\d{2}|655[0-2]\\d|6553[0-5]))?$"
5758

@@ -160,6 +161,10 @@ func InformationFromURL(url string, properties map[string]string) (fileSystemTyp
160161
case CFS:
161162
serverAddress = urlSplit[ServerAddressSplit]
162163
subPath = "/" + SubPathFromUrl(urlSplit, CFSSplit)
164+
case Glusterfs:
165+
glusterfsInfo := strings.Split(urlSplit[ServerAddressSplit], ":")
166+
serverAddress = glusterfsInfo[0]
167+
subPath = glusterfsInfo[1]
163168
default:
164169
return
165170
}

pkg/apiserver/router/v1/fs.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,16 @@ func (pr *PFSRouter) AddRouter(r chi.Router) {
6666
}
6767

6868
var URLPrefix = map[string]bool{
69-
common.HDFS: true,
70-
common.Local: true,
71-
common.S3: true,
72-
common.SFTP: true,
73-
common.Mock: true,
74-
common.CFS: true,
69+
common.HDFS: true,
70+
common.Local: true,
71+
common.S3: true,
72+
common.SFTP: true,
73+
common.Mock: true,
74+
common.CFS: true,
75+
common.Glusterfs: true,
7576
}
7677

77-
const FsNameMaxLen = 8
78+
const FsNameMaxLen = 20
7879

7980
// createFileSystem the function that handle the create file system request
8081
// @Summary createFileSystem
@@ -173,7 +174,7 @@ func validateCreateFileSystem(ctx *logger.RequestContext, req *api.CreateFileSys
173174
ctx.ErrorCode = common.InvalidFileSystemURL
174175
return err
175176
}
176-
if fileSystemType == common.Mock {
177+
if fileSystemType == common.Mock || fileSystemType == common.Glusterfs {
177178
return nil
178179
}
179180
fsType, serverAddress, subPath := common.InformationFromURL(req.Url, req.Properties)
@@ -345,6 +346,9 @@ func checkFsDir(fsType, url string, properties map[string]string) error {
345346
inputIPs = strings.Split(properties[fsCommon.Endpoint], ",")
346347
subPath = "/" + strings.SplitAfterN(url, "/", 4)[3]
347348
}
349+
if len(inputIPs) == 0 {
350+
return nil
351+
}
348352
fsList, err := models.GetSimilarityAddressList(fsType, inputIPs)
349353
if err != nil {
350354
return err

pkg/fs/csiplugin/client/pfs/mount_info.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ import (
2020
"fmt"
2121
"strings"
2222

23+
"paddleflow/pkg/apiserver/common"
2324
csiCommon "paddleflow/pkg/fs/utils/common"
2425
)
2526

2627
const (
28+
mountName = "mount"
2729
pfsMountCmdName = "./mount.sh"
2830
ReadOnly = "ro"
2931
ReadWrite = "rw"
@@ -44,31 +46,44 @@ type FSMountParameter struct {
4446
}
4547

4648
type MountInfo struct {
47-
Server string
48-
FSID string
49-
TargetPath string
50-
LocalPath string
51-
UsernameRoot string
52-
PasswordRoot string
53-
UID int
54-
GID int
55-
Options []string
49+
Server string
50+
FSID string
51+
TargetPath string
52+
LocalPath string
53+
UsernameRoot string
54+
PasswordRoot string
55+
ClusterID string
56+
Type string
57+
ServerAddress string
58+
SubPath string
59+
UID int
60+
GID int
61+
Options []string
5662
}
5763

5864
func (m *MountInfo) GetMountCmd() (string, []string) {
59-
cmdName := pfsMountCmdName
65+
var cmdName string
6066
var args []string
61-
62-
if len(m.Options) > 0 {
63-
mountOptions := strings.Join(m.Options[:], ",")
64-
args = append(args, fmt.Sprintf(MountOptions, mountOptions))
67+
if m.Type == common.Glusterfs {
68+
cmdName = mountName
69+
args = append(args, "-t", m.Type)
70+
if len(m.Options) != 0 {
71+
args = append(args, "-o", strings.Join(m.Options, ","))
72+
}
73+
args = append(args, strings.Join([]string{m.ServerAddress, m.SubPath}, ":"), m.LocalPath)
74+
} else {
75+
cmdName = pfsMountCmdName
76+
if len(m.Options) > 0 {
77+
mountOptions := strings.Join(m.Options[:], ",")
78+
args = append(args, fmt.Sprintf(MountOptions, mountOptions))
79+
}
80+
// set pfs-fuse arguments
81+
args = append(args, fmt.Sprintf(PFSServerOption, m.Server))
82+
args = append(args, fmt.Sprintf(FSIDOptions, m.FSID))
83+
args = append(args, fmt.Sprintf(UIDOption, m.UID))
84+
args = append(args, fmt.Sprintf(GIDOption, m.GID))
85+
args = append(args, fmt.Sprintf(MountPoint, m.LocalPath))
6586
}
66-
// set pfs-fuse arguments
67-
args = append(args, fmt.Sprintf(PFSServerOption, m.Server))
68-
args = append(args, fmt.Sprintf(FSIDOptions, m.FSID))
69-
args = append(args, fmt.Sprintf(UIDOption, m.UID))
70-
args = append(args, fmt.Sprintf(GIDOption, m.GID))
71-
args = append(args, fmt.Sprintf(MountPoint, m.LocalPath))
7287

7388
return cmdName, args
7489
}

pkg/fs/csiplugin/csidriver/node_server.go

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,8 @@ import (
2727
"google.golang.org/grpc/codes"
2828
"google.golang.org/grpc/status"
2929

30-
"paddleflow/pkg/client"
31-
"paddleflow/pkg/common/http/api"
32-
"paddleflow/pkg/fs/client/base"
3330
"paddleflow/pkg/fs/csiplugin/client/pfs"
31+
"paddleflow/pkg/fs/csiplugin/mount"
3432
"paddleflow/pkg/fs/utils/common"
3533
"paddleflow/pkg/fs/utils/io"
3634
mountUtil "paddleflow/pkg/fs/utils/mount"
@@ -76,28 +74,10 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context,
7674
fsId := volumeContext[pfsFSID]
7775
server := volumeContext[pfsServer]
7876

79-
// new fuse http client
80-
httpClient := client.NewHttpClient(server, client.DefaultTimeOut)
81-
// token
82-
login := api.LoginParams{
83-
UserName: ns.credentialInfo.usernameRoot,
84-
Password: ns.credentialInfo.passwordRoot,
85-
}
86-
loginResponse, err := api.LoginRequest(login, httpClient)
87-
if err != nil {
88-
log.Errorf("fuse login failed: %v", err)
89-
return &csi.NodePublishVolumeResponse{}, err
90-
}
91-
_, err = base.NewClient(fsId, httpClient, loginResponse.Authorization)
92-
if err != nil {
93-
log.Errorf("csi addRefOfMount: init client with fs[%s] and server[%s] failed: %v",
94-
fsId, server, err)
95-
return &csi.NodePublishVolumeResponse{}, err
96-
}
97-
9877
mountInfo := pfs.GetMountInfo(fsId, server, req.GetReadonly())
78+
mountInfo.UsernameRoot, mountInfo.PasswordRoot = ns.credentialInfo.usernameRoot, ns.credentialInfo.passwordRoot
79+
mountInfo.TargetPath = targetPath
9980
// root credentials for pfs-fuse
100-
mountInfo.UsernameRoot, mountInfo.PasswordRoot = login.UserName, login.Password
10181
pathPrefix := filepath.Dir(targetPath)
10282
mountInfo.TargetPath = targetPath
10383
if err := mountVolume(pathPrefix, mountInfo, req.GetReadonly()); err != nil {
@@ -147,6 +127,11 @@ func (ns *nodeServer) NodeExpandVolume(ctx context.Context,
147127

148128
func mountVolume(mountPathPrefix string, mountInfo pfs.MountInfo, readOnly bool) error {
149129
log.Debugf("mountVolume mountInfo:%+v, readOnly:%t", mountInfo, readOnly)
130+
_, _, err := mount.HttpClientAndToken(&mountInfo)
131+
if err != nil {
132+
log.Errorf("GetFsInfoAndClient err: %v", err)
133+
return err
134+
}
150135
// business pods use a separate source path
151136
volumeSourceMountPath := common.GetVolumeSourceMountPath(mountPathPrefix)
152137
if err := os.MkdirAll(volumeSourceMountPath, 0750); err != nil {
@@ -162,17 +147,8 @@ func mountVolume(mountPathPrefix string, mountInfo pfs.MountInfo, readOnly bool)
162147
log.Errorf("exec mount failed: [%v], output[%v]", err, string(output))
163148
return err
164149
}
165-
166-
//err := mount.MountThroughPod(mountInfo)
167-
//if err != nil {
168-
// log.Errorf("MountThroughPod err: %v", err)
169-
// return err
170-
//}
171-
172150
volumeBindMountPath := common.GetVolumeMountPath(mountPathPrefix)
173151
return bindMountVolume(volumeSourceMountPath, volumeBindMountPath, readOnly)
174-
//bindSource := mount.MountDir + "/" + mountInfo.FSID + "/storage"
175-
//return bindMountVolume(bindSource, mountInfo.TargetPath, readOnly)
176152
}
177153

178154
func bindMountVolume(sourcePath, mountPath string, readOnly bool) error {

pkg/fs/csiplugin/mount/pod.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import (
3030
v1 "k8s.io/api/core/v1"
3131
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3232

33+
"paddleflow/pkg/client"
3334
"paddleflow/pkg/common/http/api"
35+
"paddleflow/pkg/common/http/core"
3436
"paddleflow/pkg/fs/client/base"
3537
"paddleflow/pkg/fs/common"
3638
"paddleflow/pkg/fs/csiplugin/client/k8s"
@@ -274,3 +276,50 @@ func getcmd(mountInfo pfs.MountInfo, cacheConf common.FsCacheConfig) string {
274276
cmd := mkdir + pfsMountPath + mountPath + strings.Join(options, " ")
275277
return cmd
276278
}
279+
280+
func HttpClientAndToken(mountInfo *pfs.MountInfo) (*core.PFClient, string, error) {
281+
// login server
282+
httpClient := client.NewHttpClient(mountInfo.Server, client.DefaultTimeOut)
283+
login := api.LoginParams{
284+
UserName: mountInfo.UsernameRoot,
285+
Password: mountInfo.PasswordRoot,
286+
}
287+
loginResponse, err := api.LoginRequest(login, httpClient)
288+
if err != nil {
289+
log.Errorf("Mount: login failed: %v", err)
290+
return nil, "", err
291+
}
292+
token := loginResponse.Authorization
293+
// validate fs
294+
var resp *api.FsResponse
295+
if resp, err = getFs(mountInfo.FSID, httpClient, token); err != nil {
296+
log.Errorf("Mount: validate fs exist [%s] err: %v", mountInfo.FSID, err)
297+
return nil, "", err
298+
}
299+
mountInfo.Type = resp.Type
300+
mountInfo.SubPath = resp.SubPath
301+
mountInfo.ServerAddress = resp.ServerAddress
302+
return httpClient, token, nil
303+
}
304+
305+
func getFs(fsID string, httpClient *core.PFClient, token string) (*api.FsResponse, error) {
306+
userName, fsName := getFsNameAndUserNameByFsID(fsID)
307+
params := api.FsParams{
308+
FsName: fsName,
309+
UserName: userName,
310+
Token: token,
311+
}
312+
fsResp, err := api.FsRequest(params, httpClient)
313+
if err != nil {
314+
log.Errorf("fs request[%+v] failed: %v", params, err)
315+
return nil, err
316+
}
317+
return fsResp, nil
318+
}
319+
320+
func getFsNameAndUserNameByFsID(fsID string) (userName string, fsName string) {
321+
fsArray := strings.Split(fsID, "-")
322+
userName = strings.Join(fsArray[1:len(fsArray)-1], "")
323+
fsName = fsArray[len(fsArray)-1]
324+
return userName, fsName
325+
}

0 commit comments

Comments
 (0)