feat: dfdaemon add object storage rest api (#1390)

* feat: dfdaemon add object storage rest api

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-06-16 14:23:09 +08:00
parent 4889a37886
commit 171c0d54ee
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
12 changed files with 268 additions and 27 deletions

View File

@ -4,7 +4,7 @@ run:
linters-settings:
gocyclo:
min-complexity: 44
min-complexity: 50
gci:
sections:
- standard

View File

@ -58,13 +58,14 @@ type DaemonOption struct {
DataDir string `mapstructure:"dataDir" yaml:"dataDir"`
KeepStorage bool `mapstructure:"keepStorage" yaml:"keepStorage"`
Scheduler SchedulerOption `mapstructure:"scheduler" yaml:"scheduler"`
Host HostOption `mapstructure:"host" yaml:"host"`
Download DownloadOption `mapstructure:"download" yaml:"download"`
Proxy *ProxyOption `mapstructure:"proxy" yaml:"proxy"`
Upload UploadOption `mapstructure:"upload" yaml:"upload"`
Storage StorageOption `mapstructure:"storage" yaml:"storage"`
Health *HealthOption `mapstructure:"health" yaml:"health"`
Scheduler SchedulerOption `mapstructure:"scheduler" yaml:"scheduler"`
Host HostOption `mapstructure:"host" yaml:"host"`
Download DownloadOption `mapstructure:"download" yaml:"download"`
Proxy *ProxyOption `mapstructure:"proxy" yaml:"proxy"`
Upload UploadOption `mapstructure:"upload" yaml:"upload"`
ObjectStorage ObjectStorageOption `mapstructure:"objectStorage" yaml:"objectStorage"`
Storage StorageOption `mapstructure:"storage" yaml:"storage"`
Health *HealthOption `mapstructure:"health" yaml:"health"`
// TODO WIP, did not use
Reload ReloadOption `mapstructure:"reloadOption" yaml:"reloadOption"`
}
@ -352,6 +353,10 @@ type UploadOption struct {
RateLimit clientutil.RateLimit `mapstructure:"rateLimit" yaml:"rateLimit"`
}
type ObjectStorageOption struct {
ListenOption `yaml:",inline" mapstructure:",squash"`
}
type ListenOption struct {
Security SecurityOption `mapstructure:"security" yaml:"security"`
TCPListen *TCPListenOption `mapstructure:"tcpListen,omitempty" yaml:"tcpListen,omitempty"`

View File

@ -118,6 +118,21 @@ var peerHostConfig = DaemonOption{
},
},
},
ObjectStorage: ObjectStorageOption{
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,
TLSVerify: true,
},
TCPListen: &TCPListenOption{
Listen: net.IPv4zero.String(),
PortRange: TCPListenPortRange{
Start: 8081,
End: 8081,
},
},
},
},
Proxy: &ProxyOption{
ListenOption: ListenOption{
Security: SecurityOption{

View File

@ -117,6 +117,21 @@ var peerHostConfig = DaemonOption{
},
},
},
ObjectStorage: ObjectStorageOption{
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,
TLSVerify: true,
},
TCPListen: &TCPListenOption{
Listen: net.IPv4zero.String(),
PortRange: TCPListenPortRange{
Start: 8081,
End: 8081,
},
},
},
},
Proxy: &ProxyOption{
ListenOption: ListenOption{
Security: SecurityOption{

View File

@ -326,6 +326,24 @@ func TestPeerHostOption_Load(t *testing.T) {
},
},
},
ObjectStorage: ObjectStorageOption{
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,
CACert: "caCert",
Cert: "cert",
Key: "key",
TLSVerify: true,
},
TCPListen: &TCPListenOption{
Listen: "0.0.0.0",
PortRange: TCPListenPortRange{
Start: 8081,
End: 0,
},
},
},
},
Storage: StorageOption{
DataPath: "/tmp/storage/data",
TaskExpireTime: clientutil.Duration{

View File

@ -70,6 +70,17 @@ upload:
listen: 0.0.0.0
port: 65002
objectStorage:
security:
insecure: true
caCert: caCert
cert: cert
key: key
tlsVerify: true
tcpListen:
listen: 0.0.0.0
port: 8081
storage:
dataPath: /tmp/storage/data
taskExpireTime: 3m0s

View File

@ -41,6 +41,7 @@ import (
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/gc"
"d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/objectstorage"
"d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/proxy"
"d7y.io/dragonfly/v2/client/daemon/rpcserver"
@ -79,6 +80,7 @@ type clientDaemon struct {
RPCManager rpcserver.Server
UploadManager upload.Manager
ObjectStorage objectstorage.ObjectStorage
ProxyManager proxy.Manager
StorageManager storage.Manager
GCManager gc.Manager
@ -234,6 +236,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
PieceManager: pieceManager,
ProxyManager: proxyManager,
UploadManager: uploadManager,
ObjectStorage: objectstorage.New(dynconfig, peerTaskManager, storageManager),
StorageManager: storageManager,
GCManager: gc.NewManager(opt.GCInterval.Duration),
dynconfig: dynconfig,
@ -388,6 +391,16 @@ func (cd *clientDaemon) Serve() error {
}
cd.schedPeerHost.DownPort = int32(uploadPort)
// prepare object storage service listen
if cd.Option.ObjectStorage.TCPListen == nil {
return errors.New("object storage tcp listen option is empty")
}
objectStorageListener, _, err := cd.prepareTCPListener(cd.Option.ObjectStorage.ListenOption, true)
if err != nil {
logger.Errorf("failed to listen for object storage service: %v", err)
return err
}
g := errgroup.Group{}
// serve download grpc service
g.Go(func() error {
@ -469,6 +482,19 @@ func (cd *clientDaemon) Serve() error {
return nil
})
// serve object storage service
g.Go(func() error {
defer objectStorageListener.Close()
logger.Infof("serve object storage service at %s://%s", objectStorageListener.Addr().Network(), objectStorageListener.Addr().String())
if err := cd.ObjectStorage.Serve(objectStorageListener); err != nil && err != http.ErrServerClosed {
logger.Errorf("failed to serve for object storage service: %v", err)
return err
} else if err == http.ErrServerClosed {
logger.Infof("object storage service closed")
}
return nil
})
// enable seed peer mode
if cd.managerClient != nil && cd.Option.Scheduler.Manager.SeedPeer.Enable {
logger.Info("announce to manager")
@ -585,6 +611,10 @@ func (cd *clientDaemon) Stop() {
logger.Errorf("upload manager stop failed %s", err)
}
if err := cd.ObjectStorage.Stop(); err != nil {
logger.Errorf("object storage stop failed %s", err)
}
if cd.ProxyManager.IsEnabled() {
if err := cd.ProxyManager.Stop(); err != nil {
logger.Errorf("proxy manager stop failed %s", err)

View File

@ -0,0 +1,116 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package objectstorage
import (
"context"
"net"
"net/http"
"github.com/gin-gonic/gin"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/storage"
)
// ObjectStorage is the interface used for object storage server.
type ObjectStorage interface {
// Started object storage server.
Serve(lis net.Listener) error
// Stop object storage server.
Stop() error
}
// objectStorage provides object storage function.
type objectStorage struct {
*http.Server
dynconfig config.Dynconfig
peeTaskManager peer.TaskManager
storageManager storage.Manager
}
// New returns a new ObjectStorage instence.
func New(dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager) ObjectStorage {
o := &objectStorage{
dynconfig: dynconfig,
peeTaskManager: peerTaskManager,
storageManager: storageManager,
}
router := o.initRouter()
o.Server = &http.Server{
Handler: router,
}
return o
}
// Started object storage server.
func (o *objectStorage) Serve(lis net.Listener) error {
return o.Server.Serve(lis)
}
// Stop object storage server.
func (o *objectStorage) Stop() error {
return o.Server.Shutdown(context.Background())
}
// Initialize router of gin.
func (o *objectStorage) initRouter() *gin.Engine {
r := gin.Default()
// Health Check.
r.GET("/healthy", o.getHealth)
// Buckets
b := r.Group("/buckets")
b.GET(":id/objects/*object_key", o.getObject)
b.POST(":id/objects", o.createObject)
return r
}
// getHealth uses to check server health.
func (o *objectStorage) getHealth(ctx *gin.Context) {
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
}
// getObject uses to download object data.
func (o *objectStorage) getObject(ctx *gin.Context) {
var params GetObjectParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
}
// createObject uses to upload object data.
func (o *objectStorage) createObject(ctx *gin.Context) {
var params CreateObjectParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
var form CreateObjectRequset
if err := ctx.ShouldBind(&form); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
}

View File

@ -0,0 +1,33 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package objectstorage
import "mime/multipart"
type GetObjectParams struct {
ID string `uri:"id" binding:"required"`
ObjectKey string `uri:"object_key" binding:"required"`
}
type CreateObjectParams struct {
ID string `uri:"id" binding:"required"`
}
type CreateObjectRequset struct {
Key string `form:"key" binding:"required"`
File *multipart.FileHeader `form:"file" binding:"required"`
}

1
go.mod
View File

@ -36,7 +36,6 @@ require (
github.com/google/go-cmp v0.5.8
github.com/google/go-github v17.0.0+incompatible
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/jarcoal/httpmock v1.0.8

1
go.sum
View File

@ -545,7 +545,6 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1 h1:miw7JPhV+b/lAHSXz4qd/nN9jRiAFV5FwjeKyCS8BvQ=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=

View File

@ -94,27 +94,27 @@ func Init(cfg *config.Config, logDir string, service service.Service, enforcer *
// User
u := apiv1.Group("/users")
u.PATCH(":id", jwt.MiddlewareFunc(), rbac, h.UpdateUser)
u.GET("/:id", jwt.MiddlewareFunc(), rbac, h.GetUser)
u.GET(":id", jwt.MiddlewareFunc(), rbac, h.GetUser)
u.GET("", jwt.MiddlewareFunc(), rbac, h.GetUsers)
u.POST("/signin", jwt.LoginHandler)
u.POST("/signout", jwt.LogoutHandler)
u.POST("/signup", h.SignUp)
u.GET("/signin/:name", h.OauthSignin)
u.GET("/signin/:name/callback", h.OauthSigninCallback(jwt))
u.POST("/refresh_token", jwt.RefreshHandler)
u.POST("/:id/reset_password", h.ResetPassword)
u.GET("/:id/roles", jwt.MiddlewareFunc(), rbac, h.GetRolesForUser)
u.PUT("/:id/roles/:role", jwt.MiddlewareFunc(), rbac, h.AddRoleToUser)
u.DELETE("/:id/roles/:role", jwt.MiddlewareFunc(), rbac, h.DeleteRoleForUser)
u.POST("signin", jwt.LoginHandler)
u.POST("signout", jwt.LogoutHandler)
u.POST("signup", h.SignUp)
u.GET("signin/:name", h.OauthSignin)
u.GET("signin/:name/callback", h.OauthSigninCallback(jwt))
u.POST("refresh_token", jwt.RefreshHandler)
u.POST(":id/reset_password", h.ResetPassword)
u.GET(":id/roles", jwt.MiddlewareFunc(), rbac, h.GetRolesForUser)
u.PUT(":id/roles/:role", jwt.MiddlewareFunc(), rbac, h.AddRoleToUser)
u.DELETE(":id/roles/:role", jwt.MiddlewareFunc(), rbac, h.DeleteRoleForUser)
// Role
re := apiv1.Group("/roles", jwt.MiddlewareFunc(), rbac)
re.POST("", h.CreateRole)
re.DELETE("/:role", h.DestroyRole)
re.GET("/:role", h.GetRole)
re.DELETE(":role", h.DestroyRole)
re.GET(":role", h.GetRole)
re.GET("", h.GetRoles)
re.POST("/:role/permissions", h.AddPermissionForRole)
re.DELETE("/:role/permissions", h.DeletePermissionForRole)
re.POST(":role/permissions", h.AddPermissionForRole)
re.DELETE(":role/permissions", h.DeletePermissionForRole)
// Permission
pm := apiv1.Group("/permissions", jwt.MiddlewareFunc(), rbac)
@ -219,8 +219,8 @@ func Init(cfg *config.Config, logDir string, service service.Service, enforcer *
job.GET("", h.GetJobs)
// Compatible with the V1 preheat.
pv1 := r.Group("preheats")
r.GET("/_ping", h.GetHealth)
pv1 := r.Group("/preheats")
r.GET("_ping", h.GetHealth)
pv1.POST("", h.CreateV1Preheat)
pv1.GET(":id", h.GetV1Preheat)