diff --git a/api/manager/docs.go b/api/manager/docs.go index 22b50625d..5835394b7 100644 --- a/api/manager/docs.go +++ b/api/manager/docs.go @@ -886,6 +886,47 @@ const docTemplate = `{ "description": "Internal Server Error" } } + }, + "post": { + "description": "Create by json config", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Job" + ], + "summary": "Create Job", + "parameters": [ + { + "description": "Job", + "name": "Job", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.CreateJobRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_models.Job" + } + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } } }, "/api/v1/jobs/{id}": { @@ -1436,6 +1477,152 @@ const docTemplate = `{ } } }, + "/api/v1/persistent-caches": { + "get": { + "description": "Get PersistentCaches", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "PersistentCache" + ], + "summary": "Get PersistentCaches", + "parameters": [ + { + "type": "integer", + "default": 0, + "description": "current page", + "name": "page", + "in": "query", + "required": true + }, + { + "maximum": 50, + "minimum": 2, + "type": "integer", + "default": 10, + "description": "return max item count, default 10, max 50", + "name": "per_page", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse" + } + } + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } + } + }, + "/api/v1/persistent-caches/{scheduler_cluster_id}/{task_id}": { + "get": { + "description": "Get PersistentCache by id", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "PersistentCache" + ], + "summary": "Get PersistentCache", + "parameters": [ + { + "type": "string", + "description": "scheduler cluster id", + "name": "scheduler_cluster_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "task id", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse" + } + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } + }, + "delete": { + "description": "Destroy PersistentCache by id", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "PersistentCache" + ], + "summary": "Destroy PersistentCache", + "parameters": [ + { + "type": "string", + "description": "scheduler cluster id", + "name": "scheduler_cluster_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "task id", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK" + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } + } + }, "/api/v1/personal-access-tokens": { "get": { "description": "Get PersonalAccessTokens", @@ -3908,6 +4095,9 @@ const docTemplate = `{ } }, "definitions": { + "bitset.BitSet": { + "type": "object" + }, "d7y_io_dragonfly_v2_manager_models.Application": { "type": "object", "properties": { @@ -5012,6 +5202,66 @@ const docTemplate = `{ } } }, + "d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse": { + "type": "object", + "properties": { + "application": { + "description": "Application of persistent cache task.", + "type": "string" + }, + "content_length": { + "description": "ContentLength is persistent cache task total content length.", + "type": "integer" + }, + "created_at": { + "description": "CreatedAt is persistent cache task create time.", + "type": "string" + }, + "persistent_cache_peers": { + "description": "PersistentCachePeers is the list of persistent cache peers.", + "type": "array", + "items": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.PersistentCachePeer" + } + }, + "persistent_replica_count": { + "description": "PersistentReplicaCount is replica count of the persistent cache task.", + "type": "integer" + }, + "piece_length": { + "description": "PieceLength is persistent cache task piece length.", + "type": "integer" + }, + "state": { + "description": "State is persistent cache task state.", + "type": "string" + }, + "tag": { + "description": "Tag is used to distinguish different persistent cache tasks.", + "type": "string" + }, + "task_id": { + "description": "TaskID is task id.", + "type": "string" + }, + "total_piece_count": { + "description": "TotalPieceCount is total piece count.", + "type": "integer" + }, + "ttl": { + "description": "TTL is persistent cache task time to live.", + "allOf": [ + { + "$ref": "#/definitions/time.Duration" + } + ] + }, + "updated_at": { + "description": "UpdatedAt is persistent cache task update time.", + "type": "string" + } + } + }, "d7y_io_dragonfly_v2_manager_types.GetV1PreheatResponse": { "type": "object", "properties": { @@ -5029,6 +5279,339 @@ const docTemplate = `{ } } }, + "d7y_io_dragonfly_v2_manager_types.PersistentCachePeer": { + "type": "object", + "properties": { + "block_parents": { + "description": "BlockParents is bad parents ids.", + "type": "array", + "items": { + "type": "string" + } + }, + "cost": { + "description": "Cost is the cost of downloading.", + "allOf": [ + { + "$ref": "#/definitions/time.Duration" + } + ] + }, + "created_at": { + "description": "CreatedAt is persistent cache peer create time.", + "type": "string" + }, + "finished_pieces": { + "description": "FinishedPieces is finished pieces bitset.", + "allOf": [ + { + "$ref": "#/definitions/bitset.BitSet" + } + ] + }, + "host": { + "description": "Host is the peer host.", + "allOf": [ + { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.PersistentCachePeerHost" + } + ] + }, + "id": { + "description": "ID is persistent cache peer id.", + "type": "string" + }, + "persistent": { + "description": "Persistent is whether the peer is persistent.", + "type": "boolean" + }, + "state": { + "description": "State is persistent cache peer state.", + "type": "string" + }, + "updated_at": { + "description": "UpdatedAt is persistent cache peer update time.", + "type": "string" + } + } + }, + "d7y_io_dragonfly_v2_manager_types.PersistentCachePeerHost": { + "type": "object", + "properties": { + "announce_interval": { + "description": "AnnounceInterval is the interval between host announces to scheduler.", + "allOf": [ + { + "$ref": "#/definitions/time.Duration" + } + ] + }, + "build": { + "description": "Build contains build information.", + "type": "object", + "properties": { + "git_commit": { + "description": "GitCommit is git commit.", + "type": "string" + }, + "git_version": { + "description": "GitVersion is git version.", + "type": "string" + }, + "go_version": { + "description": "GoVersion is go version.", + "type": "string" + }, + "platform": { + "description": "Platform is build platform.", + "type": "string" + } + } + }, + "cpu": { + "description": "CPU contains cpu information.", + "type": "object", + "properties": { + "logical_count": { + "description": "LogicalCount is cpu logical count.", + "type": "integer" + }, + "percent": { + "description": "Percent is cpu usage percent.", + "type": "number" + }, + "physical_count": { + "description": "PhysicalCount is cpu physical count.", + "type": "integer" + }, + "process_percent": { + "description": "ProcessPercent is process cpu usage percent.", + "type": "number" + }, + "times": { + "description": "Times contains cpu times information.", + "type": "object", + "properties": { + "guest": { + "description": "Guest is guest cpu time.", + "type": "number" + }, + "guest_nice": { + "description": "GuestNice is guest nice cpu time.", + "type": "number" + }, + "idle": { + "description": "Idle is idle cpu time.", + "type": "number" + }, + "iowait": { + "description": "Iowait is iowait cpu time.", + "type": "number" + }, + "irq": { + "description": "Irq is irq cpu time.", + "type": "number" + }, + "nice": { + "description": "Nice is nice cpu time.", + "type": "number" + }, + "softirq": { + "description": "Softirq is softirq cpu time.", + "type": "number" + }, + "steal": { + "description": "Steal is steal cpu time.", + "type": "number" + }, + "system": { + "description": "System is system cpu time.", + "type": "number" + }, + "user": { + "description": "User is user cpu time.", + "type": "number" + } + } + } + } + }, + "created_at": { + "description": "CreatedAt is host create time.", + "type": "string" + }, + "disable_shared": { + "description": "DisableShared is whether the host is disabled for shared with other peers.", + "type": "boolean" + }, + "disk": { + "description": "Disk contains disk information.", + "type": "object", + "properties": { + "free": { + "description": "Free is free disk space.", + "type": "integer" + }, + "inodes_free": { + "description": "InodesFree is free inodes.", + "type": "integer" + }, + "inodes_total": { + "description": "InodesTotal is total inodes.", + "type": "integer" + }, + "inodes_used": { + "description": "InodesUsed is used inodes.", + "type": "integer" + }, + "inodes_used_percent": { + "description": "InodesUsedPercent is inodes usage percent.", + "type": "number" + }, + "read_bandwidth": { + "description": "ReadBandwidth is read bandwidth.", + "type": "integer" + }, + "total": { + "description": "Total is total disk space.", + "type": "integer" + }, + "used": { + "description": "Used is used disk space.", + "type": "integer" + }, + "used_percent": { + "description": "UsedPercent is disk usage percent.", + "type": "number" + }, + "write_bandwidth": { + "description": "WriteBandwidth is write bandwidth.", + "type": "integer" + } + } + }, + "download_port": { + "description": "DownloadPort is piece downloading port.", + "type": "integer" + }, + "hostname": { + "description": "Hostname is host name.", + "type": "string" + }, + "id": { + "description": "ID is host id.", + "type": "string" + }, + "ip": { + "description": "IP is host ip.", + "type": "string" + }, + "kernel_version": { + "description": "KernelVersion is host kernel version.", + "type": "string" + }, + "memory": { + "description": "Memory contains memory information.", + "type": "object", + "properties": { + "available": { + "description": "Available is available memory.", + "type": "integer" + }, + "free": { + "description": "Free is free memory.", + "type": "integer" + }, + "process_used_percent": { + "description": "ProcessUsedPercent is process memory usage percent.", + "type": "number" + }, + "total": { + "description": "Total is total memory.", + "type": "integer" + }, + "used": { + "description": "Used is used memory.", + "type": "integer" + }, + "used_percent": { + "description": "UsedPercent is memory usage percent.", + "type": "number" + } + } + }, + "network": { + "description": "Network contains network information.", + "type": "object", + "properties": { + "download_rate": { + "description": "DownloadRate is download rate.", + "type": "integer" + }, + "download_rate_limit": { + "description": "DownloadRateLimit is download rate limit.", + "type": "integer" + }, + "idc": { + "description": "IDC is network idc.", + "type": "string" + }, + "location": { + "description": "Location is network location.", + "type": "string" + }, + "tcp_connection_count": { + "description": "TCPConnectionCount is tcp connection count.", + "type": "integer" + }, + "upload_rate": { + "description": "UploadRate is upload rate.", + "type": "integer" + }, + "upload_rate_limit": { + "description": "UploadRateLimit is upload rate limit.", + "type": "integer" + }, + "upload_tcp_connection_count": { + "description": "UploadTCPConnectionCount is upload tcp connection count.", + "type": "integer" + } + } + }, + "os": { + "description": "OS is host OS.", + "type": "string" + }, + "platform": { + "description": "Platform is host platform.", + "type": "string" + }, + "platform_family": { + "description": "PlatformFamily is host platform family.", + "type": "string" + }, + "platform_version": { + "description": "PlatformVersion is host platform version.", + "type": "string" + }, + "port": { + "description": "Port is grpc service port.", + "type": "integer" + }, + "scheduler_cluster_id": { + "description": "SchedulerClusterID is the scheduler cluster id matched by scopes.", + "type": "integer" + }, + "type": { + "description": "Type is host type.", + "type": "string" + }, + "updated_at": { + "description": "UpdatedAt is host update time.", + "type": "string" + } + } + }, "d7y_io_dragonfly_v2_manager_types.PriorityConfig": { "type": "object", "required": [ @@ -5488,6 +6071,29 @@ const docTemplate = `{ "type": "string" } } + }, + "time.Duration": { + "type": "integer", + "enum": [ + -9223372036854775808, + 9223372036854775807, + 1, + 1000, + 1000000, + 1000000000, + 60000000000, + 3600000000000 + ], + "x-enum-varnames": [ + "minDuration", + "maxDuration", + "Nanosecond", + "Microsecond", + "Millisecond", + "Second", + "Minute", + "Hour" + ] } }, "tags": [ diff --git a/api/manager/swagger.json b/api/manager/swagger.json index c8087b537..689c67cb2 100644 --- a/api/manager/swagger.json +++ b/api/manager/swagger.json @@ -880,6 +880,47 @@ "description": "Internal Server Error" } } + }, + "post": { + "description": "Create by json config", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "Job" + ], + "summary": "Create Job", + "parameters": [ + { + "description": "Job", + "name": "Job", + "in": "body", + "required": true, + "schema": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.CreateJobRequest" + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_models.Job" + } + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } } }, "/api/v1/jobs/{id}": { @@ -1430,6 +1471,152 @@ } } }, + "/api/v1/persistent-caches": { + "get": { + "description": "Get PersistentCaches", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "PersistentCache" + ], + "summary": "Get PersistentCaches", + "parameters": [ + { + "type": "integer", + "default": 0, + "description": "current page", + "name": "page", + "in": "query", + "required": true + }, + { + "maximum": 50, + "minimum": 2, + "type": "integer", + "default": 10, + "description": "return max item count, default 10, max 50", + "name": "per_page", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse" + } + } + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } + } + }, + "/api/v1/persistent-caches/{scheduler_cluster_id}/{task_id}": { + "get": { + "description": "Get PersistentCache by id", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "PersistentCache" + ], + "summary": "Get PersistentCache", + "parameters": [ + { + "type": "string", + "description": "scheduler cluster id", + "name": "scheduler_cluster_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "task id", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse" + } + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } + }, + "delete": { + "description": "Destroy PersistentCache by id", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "PersistentCache" + ], + "summary": "Destroy PersistentCache", + "parameters": [ + { + "type": "string", + "description": "scheduler cluster id", + "name": "scheduler_cluster_id", + "in": "path", + "required": true + }, + { + "type": "string", + "description": "task id", + "name": "task_id", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "OK" + }, + "400": { + "description": "Bad Request" + }, + "404": { + "description": "Not Found" + }, + "500": { + "description": "Internal Server Error" + } + } + } + }, "/api/v1/personal-access-tokens": { "get": { "description": "Get PersonalAccessTokens", @@ -3902,6 +4089,9 @@ } }, "definitions": { + "bitset.BitSet": { + "type": "object" + }, "d7y_io_dragonfly_v2_manager_models.Application": { "type": "object", "properties": { @@ -5006,6 +5196,66 @@ } } }, + "d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse": { + "type": "object", + "properties": { + "application": { + "description": "Application of persistent cache task.", + "type": "string" + }, + "content_length": { + "description": "ContentLength is persistent cache task total content length.", + "type": "integer" + }, + "created_at": { + "description": "CreatedAt is persistent cache task create time.", + "type": "string" + }, + "persistent_cache_peers": { + "description": "PersistentCachePeers is the list of persistent cache peers.", + "type": "array", + "items": { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.PersistentCachePeer" + } + }, + "persistent_replica_count": { + "description": "PersistentReplicaCount is replica count of the persistent cache task.", + "type": "integer" + }, + "piece_length": { + "description": "PieceLength is persistent cache task piece length.", + "type": "integer" + }, + "state": { + "description": "State is persistent cache task state.", + "type": "string" + }, + "tag": { + "description": "Tag is used to distinguish different persistent cache tasks.", + "type": "string" + }, + "task_id": { + "description": "TaskID is task id.", + "type": "string" + }, + "total_piece_count": { + "description": "TotalPieceCount is total piece count.", + "type": "integer" + }, + "ttl": { + "description": "TTL is persistent cache task time to live.", + "allOf": [ + { + "$ref": "#/definitions/time.Duration" + } + ] + }, + "updated_at": { + "description": "UpdatedAt is persistent cache task update time.", + "type": "string" + } + } + }, "d7y_io_dragonfly_v2_manager_types.GetV1PreheatResponse": { "type": "object", "properties": { @@ -5023,6 +5273,339 @@ } } }, + "d7y_io_dragonfly_v2_manager_types.PersistentCachePeer": { + "type": "object", + "properties": { + "block_parents": { + "description": "BlockParents is bad parents ids.", + "type": "array", + "items": { + "type": "string" + } + }, + "cost": { + "description": "Cost is the cost of downloading.", + "allOf": [ + { + "$ref": "#/definitions/time.Duration" + } + ] + }, + "created_at": { + "description": "CreatedAt is persistent cache peer create time.", + "type": "string" + }, + "finished_pieces": { + "description": "FinishedPieces is finished pieces bitset.", + "allOf": [ + { + "$ref": "#/definitions/bitset.BitSet" + } + ] + }, + "host": { + "description": "Host is the peer host.", + "allOf": [ + { + "$ref": "#/definitions/d7y_io_dragonfly_v2_manager_types.PersistentCachePeerHost" + } + ] + }, + "id": { + "description": "ID is persistent cache peer id.", + "type": "string" + }, + "persistent": { + "description": "Persistent is whether the peer is persistent.", + "type": "boolean" + }, + "state": { + "description": "State is persistent cache peer state.", + "type": "string" + }, + "updated_at": { + "description": "UpdatedAt is persistent cache peer update time.", + "type": "string" + } + } + }, + "d7y_io_dragonfly_v2_manager_types.PersistentCachePeerHost": { + "type": "object", + "properties": { + "announce_interval": { + "description": "AnnounceInterval is the interval between host announces to scheduler.", + "allOf": [ + { + "$ref": "#/definitions/time.Duration" + } + ] + }, + "build": { + "description": "Build contains build information.", + "type": "object", + "properties": { + "git_commit": { + "description": "GitCommit is git commit.", + "type": "string" + }, + "git_version": { + "description": "GitVersion is git version.", + "type": "string" + }, + "go_version": { + "description": "GoVersion is go version.", + "type": "string" + }, + "platform": { + "description": "Platform is build platform.", + "type": "string" + } + } + }, + "cpu": { + "description": "CPU contains cpu information.", + "type": "object", + "properties": { + "logical_count": { + "description": "LogicalCount is cpu logical count.", + "type": "integer" + }, + "percent": { + "description": "Percent is cpu usage percent.", + "type": "number" + }, + "physical_count": { + "description": "PhysicalCount is cpu physical count.", + "type": "integer" + }, + "process_percent": { + "description": "ProcessPercent is process cpu usage percent.", + "type": "number" + }, + "times": { + "description": "Times contains cpu times information.", + "type": "object", + "properties": { + "guest": { + "description": "Guest is guest cpu time.", + "type": "number" + }, + "guest_nice": { + "description": "GuestNice is guest nice cpu time.", + "type": "number" + }, + "idle": { + "description": "Idle is idle cpu time.", + "type": "number" + }, + "iowait": { + "description": "Iowait is iowait cpu time.", + "type": "number" + }, + "irq": { + "description": "Irq is irq cpu time.", + "type": "number" + }, + "nice": { + "description": "Nice is nice cpu time.", + "type": "number" + }, + "softirq": { + "description": "Softirq is softirq cpu time.", + "type": "number" + }, + "steal": { + "description": "Steal is steal cpu time.", + "type": "number" + }, + "system": { + "description": "System is system cpu time.", + "type": "number" + }, + "user": { + "description": "User is user cpu time.", + "type": "number" + } + } + } + } + }, + "created_at": { + "description": "CreatedAt is host create time.", + "type": "string" + }, + "disable_shared": { + "description": "DisableShared is whether the host is disabled for shared with other peers.", + "type": "boolean" + }, + "disk": { + "description": "Disk contains disk information.", + "type": "object", + "properties": { + "free": { + "description": "Free is free disk space.", + "type": "integer" + }, + "inodes_free": { + "description": "InodesFree is free inodes.", + "type": "integer" + }, + "inodes_total": { + "description": "InodesTotal is total inodes.", + "type": "integer" + }, + "inodes_used": { + "description": "InodesUsed is used inodes.", + "type": "integer" + }, + "inodes_used_percent": { + "description": "InodesUsedPercent is inodes usage percent.", + "type": "number" + }, + "read_bandwidth": { + "description": "ReadBandwidth is read bandwidth.", + "type": "integer" + }, + "total": { + "description": "Total is total disk space.", + "type": "integer" + }, + "used": { + "description": "Used is used disk space.", + "type": "integer" + }, + "used_percent": { + "description": "UsedPercent is disk usage percent.", + "type": "number" + }, + "write_bandwidth": { + "description": "WriteBandwidth is write bandwidth.", + "type": "integer" + } + } + }, + "download_port": { + "description": "DownloadPort is piece downloading port.", + "type": "integer" + }, + "hostname": { + "description": "Hostname is host name.", + "type": "string" + }, + "id": { + "description": "ID is host id.", + "type": "string" + }, + "ip": { + "description": "IP is host ip.", + "type": "string" + }, + "kernel_version": { + "description": "KernelVersion is host kernel version.", + "type": "string" + }, + "memory": { + "description": "Memory contains memory information.", + "type": "object", + "properties": { + "available": { + "description": "Available is available memory.", + "type": "integer" + }, + "free": { + "description": "Free is free memory.", + "type": "integer" + }, + "process_used_percent": { + "description": "ProcessUsedPercent is process memory usage percent.", + "type": "number" + }, + "total": { + "description": "Total is total memory.", + "type": "integer" + }, + "used": { + "description": "Used is used memory.", + "type": "integer" + }, + "used_percent": { + "description": "UsedPercent is memory usage percent.", + "type": "number" + } + } + }, + "network": { + "description": "Network contains network information.", + "type": "object", + "properties": { + "download_rate": { + "description": "DownloadRate is download rate.", + "type": "integer" + }, + "download_rate_limit": { + "description": "DownloadRateLimit is download rate limit.", + "type": "integer" + }, + "idc": { + "description": "IDC is network idc.", + "type": "string" + }, + "location": { + "description": "Location is network location.", + "type": "string" + }, + "tcp_connection_count": { + "description": "TCPConnectionCount is tcp connection count.", + "type": "integer" + }, + "upload_rate": { + "description": "UploadRate is upload rate.", + "type": "integer" + }, + "upload_rate_limit": { + "description": "UploadRateLimit is upload rate limit.", + "type": "integer" + }, + "upload_tcp_connection_count": { + "description": "UploadTCPConnectionCount is upload tcp connection count.", + "type": "integer" + } + } + }, + "os": { + "description": "OS is host OS.", + "type": "string" + }, + "platform": { + "description": "Platform is host platform.", + "type": "string" + }, + "platform_family": { + "description": "PlatformFamily is host platform family.", + "type": "string" + }, + "platform_version": { + "description": "PlatformVersion is host platform version.", + "type": "string" + }, + "port": { + "description": "Port is grpc service port.", + "type": "integer" + }, + "scheduler_cluster_id": { + "description": "SchedulerClusterID is the scheduler cluster id matched by scopes.", + "type": "integer" + }, + "type": { + "description": "Type is host type.", + "type": "string" + }, + "updated_at": { + "description": "UpdatedAt is host update time.", + "type": "string" + } + } + }, "d7y_io_dragonfly_v2_manager_types.PriorityConfig": { "type": "object", "required": [ @@ -5482,6 +6065,29 @@ "type": "string" } } + }, + "time.Duration": { + "type": "integer", + "enum": [ + -9223372036854775808, + 9223372036854775807, + 1, + 1000, + 1000000, + 1000000000, + 60000000000, + 3600000000000 + ], + "x-enum-varnames": [ + "minDuration", + "maxDuration", + "Nanosecond", + "Microsecond", + "Millisecond", + "Second", + "Minute", + "Hour" + ] } }, "tags": [ diff --git a/api/manager/swagger.yaml b/api/manager/swagger.yaml index ecc776b52..c2b347c89 100644 --- a/api/manager/swagger.yaml +++ b/api/manager/swagger.yaml @@ -1,5 +1,7 @@ basePath: / definitions: + bitset.BitSet: + type: object d7y_io_dragonfly_v2_manager_models.Application: properties: bio: @@ -743,6 +745,49 @@ definitions: updated_at: type: string type: object + d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse: + properties: + application: + description: Application of persistent cache task. + type: string + content_length: + description: ContentLength is persistent cache task total content length. + type: integer + created_at: + description: CreatedAt is persistent cache task create time. + type: string + persistent_cache_peers: + description: PersistentCachePeers is the list of persistent cache peers. + items: + $ref: '#/definitions/d7y_io_dragonfly_v2_manager_types.PersistentCachePeer' + type: array + persistent_replica_count: + description: PersistentReplicaCount is replica count of the persistent cache + task. + type: integer + piece_length: + description: PieceLength is persistent cache task piece length. + type: integer + state: + description: State is persistent cache task state. + type: string + tag: + description: Tag is used to distinguish different persistent cache tasks. + type: string + task_id: + description: TaskID is task id. + type: string + total_piece_count: + description: TotalPieceCount is total piece count. + type: integer + ttl: + allOf: + - $ref: '#/definitions/time.Duration' + description: TTL is persistent cache task time to live. + updated_at: + description: UpdatedAt is persistent cache task update time. + type: string + type: object d7y_io_dragonfly_v2_manager_types.GetV1PreheatResponse: properties: finishTime: @@ -754,6 +799,244 @@ definitions: status: type: string type: object + d7y_io_dragonfly_v2_manager_types.PersistentCachePeer: + properties: + block_parents: + description: BlockParents is bad parents ids. + items: + type: string + type: array + cost: + allOf: + - $ref: '#/definitions/time.Duration' + description: Cost is the cost of downloading. + created_at: + description: CreatedAt is persistent cache peer create time. + type: string + finished_pieces: + allOf: + - $ref: '#/definitions/bitset.BitSet' + description: FinishedPieces is finished pieces bitset. + host: + allOf: + - $ref: '#/definitions/d7y_io_dragonfly_v2_manager_types.PersistentCachePeerHost' + description: Host is the peer host. + id: + description: ID is persistent cache peer id. + type: string + persistent: + description: Persistent is whether the peer is persistent. + type: boolean + state: + description: State is persistent cache peer state. + type: string + updated_at: + description: UpdatedAt is persistent cache peer update time. + type: string + type: object + d7y_io_dragonfly_v2_manager_types.PersistentCachePeerHost: + properties: + announce_interval: + allOf: + - $ref: '#/definitions/time.Duration' + description: AnnounceInterval is the interval between host announces to scheduler. + build: + description: Build contains build information. + properties: + git_commit: + description: GitCommit is git commit. + type: string + git_version: + description: GitVersion is git version. + type: string + go_version: + description: GoVersion is go version. + type: string + platform: + description: Platform is build platform. + type: string + type: object + cpu: + description: CPU contains cpu information. + properties: + logical_count: + description: LogicalCount is cpu logical count. + type: integer + percent: + description: Percent is cpu usage percent. + type: number + physical_count: + description: PhysicalCount is cpu physical count. + type: integer + process_percent: + description: ProcessPercent is process cpu usage percent. + type: number + times: + description: Times contains cpu times information. + properties: + guest: + description: Guest is guest cpu time. + type: number + guest_nice: + description: GuestNice is guest nice cpu time. + type: number + idle: + description: Idle is idle cpu time. + type: number + iowait: + description: Iowait is iowait cpu time. + type: number + irq: + description: Irq is irq cpu time. + type: number + nice: + description: Nice is nice cpu time. + type: number + softirq: + description: Softirq is softirq cpu time. + type: number + steal: + description: Steal is steal cpu time. + type: number + system: + description: System is system cpu time. + type: number + user: + description: User is user cpu time. + type: number + type: object + type: object + created_at: + description: CreatedAt is host create time. + type: string + disable_shared: + description: DisableShared is whether the host is disabled for shared with + other peers. + type: boolean + disk: + description: Disk contains disk information. + properties: + free: + description: Free is free disk space. + type: integer + inodes_free: + description: InodesFree is free inodes. + type: integer + inodes_total: + description: InodesTotal is total inodes. + type: integer + inodes_used: + description: InodesUsed is used inodes. + type: integer + inodes_used_percent: + description: InodesUsedPercent is inodes usage percent. + type: number + read_bandwidth: + description: ReadBandwidth is read bandwidth. + type: integer + total: + description: Total is total disk space. + type: integer + used: + description: Used is used disk space. + type: integer + used_percent: + description: UsedPercent is disk usage percent. + type: number + write_bandwidth: + description: WriteBandwidth is write bandwidth. + type: integer + type: object + download_port: + description: DownloadPort is piece downloading port. + type: integer + hostname: + description: Hostname is host name. + type: string + id: + description: ID is host id. + type: string + ip: + description: IP is host ip. + type: string + kernel_version: + description: KernelVersion is host kernel version. + type: string + memory: + description: Memory contains memory information. + properties: + available: + description: Available is available memory. + type: integer + free: + description: Free is free memory. + type: integer + process_used_percent: + description: ProcessUsedPercent is process memory usage percent. + type: number + total: + description: Total is total memory. + type: integer + used: + description: Used is used memory. + type: integer + used_percent: + description: UsedPercent is memory usage percent. + type: number + type: object + network: + description: Network contains network information. + properties: + download_rate: + description: DownloadRate is download rate. + type: integer + download_rate_limit: + description: DownloadRateLimit is download rate limit. + type: integer + idc: + description: IDC is network idc. + type: string + location: + description: Location is network location. + type: string + tcp_connection_count: + description: TCPConnectionCount is tcp connection count. + type: integer + upload_rate: + description: UploadRate is upload rate. + type: integer + upload_rate_limit: + description: UploadRateLimit is upload rate limit. + type: integer + upload_tcp_connection_count: + description: UploadTCPConnectionCount is upload tcp connection count. + type: integer + type: object + os: + description: OS is host OS. + type: string + platform: + description: Platform is host platform. + type: string + platform_family: + description: PlatformFamily is host platform family. + type: string + platform_version: + description: PlatformVersion is host platform version. + type: string + port: + description: Port is grpc service port. + type: integer + scheduler_cluster_id: + description: SchedulerClusterID is the scheduler cluster id matched by scopes. + type: integer + type: + description: Type is host type. + type: string + updated_at: + description: UpdatedAt is host update time. + type: string + type: object d7y_io_dragonfly_v2_manager_types.PriorityConfig: properties: urls: @@ -1065,6 +1348,26 @@ definitions: description: Name is bucket name. type: string type: object + time.Duration: + enum: + - -9223372036854775808 + - 9223372036854775807 + - 1 + - 1000 + - 1000000 + - 1000000000 + - 60000000000 + - 3600000000000 + type: integer + x-enum-varnames: + - minDuration + - maxDuration + - Nanosecond + - Microsecond + - Millisecond + - Second + - Minute + - Hour host: localhost:8080 info: contact: @@ -1656,6 +1959,33 @@ paths: summary: Get Jobs tags: - Job + post: + consumes: + - application/json + description: Create by json config + parameters: + - description: Job + in: body + name: Job + required: true + schema: + $ref: '#/definitions/d7y_io_dragonfly_v2_manager_types.CreateJobRequest' + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/d7y_io_dragonfly_v2_manager_models.Job' + "400": + description: Bad Request + "404": + description: Not Found + "500": + description: Internal Server Error + summary: Create Job + tags: + - Job /api/v1/jobs/{id}: delete: consumes: @@ -2022,6 +2352,105 @@ paths: summary: Get Permissions tags: - Permission + /api/v1/persistent-caches: + get: + consumes: + - application/json + description: Get PersistentCaches + parameters: + - default: 0 + description: current page + in: query + name: page + required: true + type: integer + - default: 10 + description: return max item count, default 10, max 50 + in: query + maximum: 50 + minimum: 2 + name: per_page + required: true + type: integer + produces: + - application/json + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse' + type: array + "400": + description: Bad Request + "404": + description: Not Found + "500": + description: Internal Server Error + summary: Get PersistentCaches + tags: + - PersistentCache + /api/v1/persistent-caches/{scheduler_cluster_id}/{task_id}: + delete: + consumes: + - application/json + description: Destroy PersistentCache by id + parameters: + - description: scheduler cluster id + in: path + name: scheduler_cluster_id + required: true + type: string + - description: task id + in: path + name: task_id + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + "400": + description: Bad Request + "404": + description: Not Found + "500": + description: Internal Server Error + summary: Destroy PersistentCache + tags: + - PersistentCache + get: + consumes: + - application/json + description: Get PersistentCache by id + parameters: + - description: scheduler cluster id + in: path + name: scheduler_cluster_id + required: true + type: string + - description: task id + in: path + name: task_id + required: true + type: string + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/d7y_io_dragonfly_v2_manager_types.GetPersistentCacheResponse' + "400": + description: Bad Request + "404": + description: Not Found + "500": + description: Internal Server Error + summary: Get PersistentCache + tags: + - PersistentCache /api/v1/personal-access-tokens: get: consumes: diff --git a/manager/handlers/job.go b/manager/handlers/job.go index 1938bf150..72b0706e0 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -38,6 +38,7 @@ import ( // @Failure 400 // @Failure 404 // @Failure 500 +// @Router /api/v1/jobs [post] // @Router /oapi/v1/jobs [post] func (h *Handlers) CreateJob(ctx *gin.Context) { var json types.CreateJobRequest diff --git a/manager/handlers/persistent_cache.go b/manager/handlers/persistent_cache.go new file mode 100644 index 000000000..f4a313208 --- /dev/null +++ b/manager/handlers/persistent_cache.go @@ -0,0 +1,110 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package handlers + +import ( + "net/http" + + "github.com/gin-gonic/gin" + + "d7y.io/dragonfly/v2/manager/types" +) + +// @Summary Destroy PersistentCache +// @Description Destroy PersistentCache by id +// @Tags PersistentCache +// @Accept json +// @Produce json +// @Param scheduler_cluster_id path string true "scheduler cluster id" +// @Param task_id path string true "task id" +// @Success 200 +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /api/v1/persistent-caches/{scheduler_cluster_id}/{task_id} [delete] +func (h *Handlers) DestroyPersistentCache(ctx *gin.Context) { + var params types.PersistentCacheParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + if err := h.service.DestroyPersistentCache(ctx.Request.Context(), params); err != nil { + ctx.Error(err) // nolint: errcheck + return + } + + ctx.Status(http.StatusOK) +} + +// @Summary Get PersistentCache +// @Description Get PersistentCache by id +// @Tags PersistentCache +// @Accept json +// @Produce json +// @Param scheduler_cluster_id path string true "scheduler cluster id" +// @Param task_id path string true "task id" +// @Success 200 {object} types.GetPersistentCacheResponse +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /api/v1/persistent-caches/{scheduler_cluster_id}/{task_id} [get] +func (h *Handlers) GetPersistentCache(ctx *gin.Context) { + var params types.PersistentCacheParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + persistentCache, err := h.service.GetPersistentCache(ctx.Request.Context(), params) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + + ctx.JSON(http.StatusOK, persistentCache) +} + +// @Summary Get PersistentCaches +// @Description Get PersistentCaches +// @Tags PersistentCache +// @Accept json +// @Produce json +// @Param page query int true "current page" default(0) +// @Param per_page query int true "return max item count, default 10, max 50" default(10) minimum(2) maximum(50) +// @Success 200 {object} []types.GetPersistentCacheResponse +// @Failure 400 +// @Failure 404 +// @Failure 500 +// @Router /api/v1/persistent-caches [get] +func (h *Handlers) GetPersistentCaches(ctx *gin.Context) { + var query types.GetPersistentCachesQuery + if err := ctx.ShouldBindQuery(&query); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + h.setPaginationDefault(&query.Page, &query.PerPage) + persistentCaches, count, err := h.service.GetPersistentCaches(ctx.Request.Context(), query) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + + h.setPaginationLinkHeader(ctx, query.Page, query.PerPage, int(count)) + ctx.JSON(http.StatusOK, persistentCaches) +} diff --git a/manager/handlers/persistent_cache_test.go b/manager/handlers/persistent_cache_test.go new file mode 100644 index 000000000..eaae0395f --- /dev/null +++ b/manager/handlers/persistent_cache_test.go @@ -0,0 +1,183 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package handlers + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/gin-gonic/gin" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "d7y.io/dragonfly/v2/manager/service/mocks" + "d7y.io/dragonfly/v2/manager/types" +) + +var ( + mockPersistentCacheParams = types.PersistentCacheParams{ + SchedulerClusterID: 1, + TaskID: "task-1", + } + mockGetPersistentCacheResponse = &types.GetPersistentCacheResponse{ + TaskID: "task-1", + State: "SUCCESS", + } + mockGetPersistentCachesResponse = []types.GetPersistentCacheResponse{ + { + TaskID: "task-1", + State: "SUCCESS", + }, + } +) + +func mockPersistentCacheRouter(h *Handlers) *gin.Engine { + r := gin.Default() + r.DELETE("/persistent-caches/:scheduler_cluster_id/:task_id", h.DestroyPersistentCache) + r.GET("/persistent-caches/:scheduler_cluster_id/:task_id", h.GetPersistentCache) + r.GET("/persistent-caches", h.GetPersistentCaches) + return r +} + +func TestHandlers_DestroyPersistentCache(t *testing.T) { + tests := []struct { + name string + req *http.Request + mock func(ms *mocks.MockServiceMockRecorder) + expect func(t *testing.T, w *httptest.ResponseRecorder) + }{ + { + name: "unprocessable entity", + req: httptest.NewRequest(http.MethodDelete, "/persistent-caches/invalid/task-1", nil), + mock: func(ms *mocks.MockServiceMockRecorder) {}, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusUnprocessableEntity, w.Code) + }, + }, + { + name: "success", + req: httptest.NewRequest(http.MethodDelete, "/persistent-caches/1/task-1", nil), + mock: func(ms *mocks.MockServiceMockRecorder) { + ms.DestroyPersistentCache(gomock.Any(), gomock.Eq(mockPersistentCacheParams)).Return(nil).Times(1) + }, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusOK, w.Code) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + svc := mocks.NewMockService(ctl) + w := httptest.NewRecorder() + h := New(svc) + mockRouter := mockPersistentCacheRouter(h) + + tc.mock(svc.EXPECT()) + mockRouter.ServeHTTP(w, tc.req) + tc.expect(t, w) + }) + } +} + +func TestHandlers_GetPersistentCache(t *testing.T) { + tests := []struct { + name string + req *http.Request + mock func(ms *mocks.MockServiceMockRecorder) + expect func(t *testing.T, w *httptest.ResponseRecorder) + }{ + { + name: "unprocessable entity", + req: httptest.NewRequest(http.MethodGet, "/persistent-caches/invalid/task-1", nil), + mock: func(ms *mocks.MockServiceMockRecorder) {}, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusUnprocessableEntity, w.Code) + }, + }, + { + name: "success", + req: httptest.NewRequest(http.MethodGet, "/persistent-caches/1/task-1", nil), + mock: func(ms *mocks.MockServiceMockRecorder) { + ms.GetPersistentCache(gomock.Any(), gomock.Eq(mockPersistentCacheParams)).Return(mockGetPersistentCacheResponse, nil).Times(1) + }, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusOK, w.Code) + assert.Contains(w.Body.String(), `"task_id":"task-1"`) + assert.Contains(w.Body.String(), `"state":"SUCCESS"`) + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + svc := mocks.NewMockService(ctl) + w := httptest.NewRecorder() + h := New(svc) + mockRouter := mockPersistentCacheRouter(h) + + tc.mock(svc.EXPECT()) + mockRouter.ServeHTTP(w, tc.req) + tc.expect(t, w) + }) + } +} + +func TestHandlers_GetPersistentCaches(t *testing.T) { + tests := []struct { + name string + req *http.Request + mock func(ms *mocks.MockServiceMockRecorder) + expect func(t *testing.T, w *httptest.ResponseRecorder) + }{ + { + name: "success", + req: httptest.NewRequest(http.MethodGet, "/persistent-caches?page=1&per_page=10", nil), + mock: func(ms *mocks.MockServiceMockRecorder) { + ms.GetPersistentCaches(gomock.Any(), gomock.Any()).Return(mockGetPersistentCachesResponse, int64(1), nil).Times(1) + }, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusOK, w.Code) + assert.Contains(w.Body.String(), `"task_id":"task-1"`) + assert.Contains(w.Body.String(), `"state":"SUCCESS"`) + assert.Contains(w.Header().Get("Link"), "page=1") + }, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + svc := mocks.NewMockService(ctl) + w := httptest.NewRecorder() + h := New(svc) + mockRouter := mockPersistentCacheRouter(h) + + tc.mock(svc.EXPECT()) + mockRouter.ServeHTTP(w, tc.req) + tc.expect(t, w) + }) + } +} diff --git a/manager/router/router.go b/manager/router/router.go index 2a3a9715c..a55b76abd 100644 --- a/manager/router/router.go +++ b/manager/router/router.go @@ -233,6 +233,12 @@ func Init(cfg *config.Config, logDir string, service service.Service, database * pat.GET(":id", h.GetPersonalAccessToken) pat.GET("", h.GetPersonalAccessTokens) + // Persistent Cache. + pc := apiv1.Group("/persistent-caches", jwt.MiddlewareFunc(), rbac) + pc.DELETE(":id", h.DestroyPersistentCache) + pc.GET(":id", h.GetPersistentCache) + pc.GET("", h.GetPersistentCaches) + // Open API router. oapiv1 := r.Group("/oapi/v1") diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index fd654b94a..fac7d19e7 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -497,6 +497,20 @@ func (mr *MockServiceMockRecorder) DestroyPeer(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyPeer", reflect.TypeOf((*MockService)(nil).DestroyPeer), arg0, arg1) } +// DestroyPersistentCache mocks base method. +func (m *MockService) DestroyPersistentCache(arg0 context.Context, arg1 types.PersistentCacheParams) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DestroyPersistentCache", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DestroyPersistentCache indicates an expected call of DestroyPersistentCache. +func (mr *MockServiceMockRecorder) DestroyPersistentCache(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DestroyPersistentCache", reflect.TypeOf((*MockService)(nil).DestroyPersistentCache), arg0, arg1) +} + // DestroyPersonalAccessToken mocks base method. func (m *MockService) DestroyPersonalAccessToken(arg0 context.Context, arg1 uint) error { m.ctrl.T.Helper() @@ -812,6 +826,37 @@ func (mr *MockServiceMockRecorder) GetPermissions(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPermissions", reflect.TypeOf((*MockService)(nil).GetPermissions), arg0, arg1) } +// GetPersistentCache mocks base method. +func (m *MockService) GetPersistentCache(arg0 context.Context, arg1 types.PersistentCacheParams) (*types.GetPersistentCacheResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPersistentCache", arg0, arg1) + ret0, _ := ret[0].(*types.GetPersistentCacheResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPersistentCache indicates an expected call of GetPersistentCache. +func (mr *MockServiceMockRecorder) GetPersistentCache(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPersistentCache", reflect.TypeOf((*MockService)(nil).GetPersistentCache), arg0, arg1) +} + +// GetPersistentCaches mocks base method. +func (m *MockService) GetPersistentCaches(arg0 context.Context, arg1 types.GetPersistentCachesQuery) ([]types.GetPersistentCacheResponse, int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPersistentCaches", arg0, arg1) + ret0, _ := ret[0].([]types.GetPersistentCacheResponse) + ret1, _ := ret[1].(int64) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// GetPersistentCaches indicates an expected call of GetPersistentCaches. +func (mr *MockServiceMockRecorder) GetPersistentCaches(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPersistentCaches", reflect.TypeOf((*MockService)(nil).GetPersistentCaches), arg0, arg1) +} + // GetPersonalAccessToken mocks base method. func (m *MockService) GetPersonalAccessToken(arg0 context.Context, arg1 uint) (*models.PersonalAccessToken, error) { m.ctrl.T.Helper() diff --git a/manager/service/persistent_cache.go b/manager/service/persistent_cache.go new file mode 100644 index 000000000..58761ada1 --- /dev/null +++ b/manager/service/persistent_cache.go @@ -0,0 +1,712 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/bits-and-blooms/bitset" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/manager/types" + pkgredis "d7y.io/dragonfly/v2/pkg/redis" +) + +// DestroyPersistentCache deletes a persistent cache task from Redis based on query parameters. +func (s *service) DestroyPersistentCache(ctx context.Context, params types.PersistentCacheParams) error { + taskKey := pkgredis.MakePersistentCacheTaskKeyInScheduler(params.SchedulerClusterID, params.TaskID) + return s.rdb.Del(ctx, taskKey).Err() +} + +// GetPersistentCache retrieves a persistent cache task from Redis based on query parameters. +func (s *service) GetPersistentCache(ctx context.Context, params types.PersistentCacheParams) (*types.GetPersistentCacheResponse, error) { + // Get task data from Redis. + taskKey := pkgredis.MakePersistentCacheTaskKeyInScheduler(params.SchedulerClusterID, params.TaskID) + + rawTask, err := s.rdb.HGetAll(ctx, taskKey).Result() + if err != nil { + logger.Warnf("getting task %s failed from redis: %v", taskKey, err) + return nil, err + } + + if len(rawTask) == 0 { + logger.Warnf("task %s not found in redis", taskKey) + return nil, errors.New("task not found") + } + + // Parse task data. + response, err := s.parseTaskData(ctx, rawTask) + if err != nil { + logger.Warnf("parse task %s data failed: %v", taskKey, err) + return nil, err + } + + // Load peers for this task. + peers, err := s.loadPeersForTask(ctx, taskKey, params.SchedulerClusterID) + if err != nil { + logger.Warnf("load peers for task %s failed: %v", taskKey, err) + } + + response.PersistentCachePeers = peers + + return &response, nil +} + +// GetPersistentCaches retrieves persistent cache tasks from Redis based on query parameters. +func (s *service) GetPersistentCaches(ctx context.Context, q types.GetPersistentCachesQuery) ([]types.GetPersistentCacheResponse, int64, error) { + var responses []types.GetPersistentCacheResponse + + // Get all scheduler cluster IDs if none specified. + schedulerClusterIDs := q.SchedulerClusterIDs + if len(schedulerClusterIDs) == 0 { + // Get all available scheduler cluster IDs. + var cursor uint64 + for { + // Scan all keys with prefix, for example: + // "scheduler:scheduler-clusters:1". + prefix := fmt.Sprintf("%s:", pkgredis.MakeNamespaceKeyInScheduler(pkgredis.SchedulerClustersNamespace)) + keys, cursor, err := s.rdb.Scan(ctx, cursor, fmt.Sprintf("%s*", prefix), 10).Result() + if err != nil { + logger.Errorf("scan scheduler clusterIDs failed: %v", err) + return nil, 0, err + } + + for _, key := range keys { + // If context is done, return error. + if err := ctx.Err(); err != nil { + return nil, 0, err + } + + // Remove prefix from key. + suffix := strings.TrimPrefix(key, prefix) + if suffix == "" { + logger.Error("invalid key") + continue + } + + // Extract scheduler clusterID from suffix. + // If suffix does not contain ":", it is a scheduler clusterID. + if !strings.ContainsRune(suffix, ':') { + schedulerClusterID, err := strconv.ParseUint(suffix, 10, 32) + if err != nil { + logger.Errorf("parse scheduler clusterID failed: %v", err) + continue + } + schedulerClusterIDs = append(schedulerClusterIDs, uint(schedulerClusterID)) + } + } + + if cursor == 0 { + break + } + } + + if len(schedulerClusterIDs) == 0 { + return nil, 0, errors.New("no scheduler cluster found") + } + } + + // Collect all task IDs from all specified clusters. + var allTaskKeys []string + for _, schedulerClusterID := range schedulerClusterIDs { + // Get all task keys in the cluster. + var cursor uint64 + for { + var ( + taskKeys []string + err error + ) + + // For example, if {prefix} is "scheduler:scheduler-clusters:1:persistent-cache-tasks:", keys could be: + // "{prefix}{taskID}:persistent-cache-peers", "{prefix}{taskID}:persistent-peers" and "{prefix}{taskID}". + // Scan all keys with prefix. + prefix := fmt.Sprintf("%s:", pkgredis.MakePersistentCacheTasksInScheduler(schedulerClusterID)) + taskKeys, cursor, err = s.rdb.Scan(ctx, cursor, fmt.Sprintf("%s*", prefix), 10).Result() + if err != nil { + logger.Error("scan tasks failed") + continue + } + + taskIDs := make(map[string]struct{}) + for _, taskKey := range taskKeys { + // If context is done, return error. + if err := ctx.Err(); err != nil { + continue + } + + // Remove prefix from task key. + suffix := strings.TrimPrefix(taskKey, prefix) + if suffix == "" { + logger.Error("invalid task key") + continue + } + + // suffix is a non-empty string like: + // "{taskID}:persistent-cache-peers", "{taskID}:persistent-peers" and "{taskID}". + // Extract taskID from suffix and avoid duplicate taskID. + taskID := strings.Split(suffix, ":")[0] + if _, ok := taskIDs[taskID]; ok { + continue + } + taskIDs[taskID] = struct{}{} + + allTaskKeys = append(allTaskKeys, taskKey) + } + + if cursor == 0 { + break + } + } + } + + // Calculate total count and pagination. + totalCount := int64(len(allTaskKeys)) + startIndex := (q.Page - 1) * q.PerPage + endIndex := startIndex + q.PerPage + + if startIndex >= int(totalCount) { + return []types.GetPersistentCacheResponse{}, totalCount, nil + } + + if endIndex > int(totalCount) { + endIndex = int(totalCount) + } + + // Get paginated task IDs and keys. + taskKeys := allTaskKeys[startIndex:endIndex] + + // Process each task. + for _, taskKey := range taskKeys { + schedulerClusterID, err := pkgredis.ExtractSchedulerClusterIDFromPersistentCacheTaskKey(taskKey) + if err != nil { + logger.Warnf("extract scheduler cluster ID from persistent cache task key %s failed: %v", taskKey, err) + continue + } + + // Get task data from Redis. + rawTask, err := s.rdb.HGetAll(ctx, taskKey).Result() + if err != nil { + logger.Warnf("getting task %s failed from redis: %v", taskKey, err) + continue + } + + if len(rawTask) == 0 { + logger.Warnf("task %s not found in redis", taskKey) + continue + } + + // Parse task data. + response, err := s.parseTaskData(ctx, rawTask) + if err != nil { + logger.Warnf("parse task %s data failed: %v", taskKey, err) + continue + } + + // Load peers for this task. + peers, err := s.loadPeersForTask(ctx, taskKey, schedulerClusterID) + if err != nil { + logger.Warnf("load peers for task %s failed: %v", taskKey, err) + } + + response.PersistentCachePeers = peers + responses = append(responses, response) + } + + return responses, totalCount, nil +} + +// parseTaskData parses raw task data from Redis and returns a GetPersistentCacheResponse. +func (s *service) parseTaskData(ctx context.Context, rawTask map[string]string) (types.GetPersistentCacheResponse, error) { + var response types.GetPersistentCacheResponse + + // Set task ID. + response.TaskID = rawTask["id"] + + // Parse PersistentReplicaCount. + if persistentReplicaCount, err := strconv.ParseUint(rawTask["persistent_replica_count"], 10, 64); err == nil { + response.PersistentReplicaCount = persistentReplicaCount + } else { + logger.Warnf("parsing persistent replica count failed: %v", err) + } + + // Set Tag and Application. + response.Tag = rawTask["tag"] + response.Application = rawTask["application"] + + // Parse PieceLength. + if pieceLength, err := strconv.ParseUint(rawTask["piece_length"], 10, 64); err == nil { + response.PieceLength = pieceLength + } else { + logger.Warnf("parsing piece length failed: %v", err) + } + + // Parse ContentLength. + if contentLength, err := strconv.ParseUint(rawTask["content_length"], 10, 64); err == nil { + response.ContentLength = contentLength + } else { + logger.Warnf("parsing content length failed: %v", err) + } + + // Parse TotalPieceCount. + if totalPieceCount, err := strconv.ParseUint(rawTask["total_piece_count"], 10, 32); err == nil { + response.TotalPieceCount = uint32(totalPieceCount) + } else { + logger.Warnf("parsing total piece count failed: %v", err) + } + + // Set State. + response.State = rawTask["state"] + + // Parse TTL. + if ttl, err := strconv.ParseInt(rawTask["ttl"], 10, 64); err == nil { + response.TTL = time.Duration(ttl) + } else { + logger.Warnf("parsing ttl failed: %v", err) + } + + // Parse CreatedAt. + if createdAt, err := time.Parse(time.RFC3339, rawTask["created_at"]); err == nil { + response.CreatedAt = createdAt + } else { + logger.Warnf("parsing created at failed: %v", err) + } + + // Parse UpdatedAt. + if updatedAt, err := time.Parse(time.RFC3339, rawTask["updated_at"]); err == nil { + response.UpdatedAt = updatedAt + } else { + logger.Warnf("parsing updated at failed: %v", err) + } + + return response, nil +} + +// loadPeersForTask loads peers for a specific task from Redis. +func (s *service) loadPeersForTask(ctx context.Context, taskID string, schedulerClusterID uint) ([]types.PersistentCachePeer, error) { + var peers []types.PersistentCachePeer + + // Get peer IDs for the task. + peerIDs, err := s.rdb.SMembers(ctx, pkgredis.MakePersistentCachePeersOfPersistentCacheTaskInScheduler(schedulerClusterID, taskID)).Result() + if err != nil { + return nil, err + } + + // Process each peer. + for _, peerID := range peerIDs { + // Get peer data from Redis. + rawPeer, err := s.rdb.HGetAll(ctx, pkgredis.MakePersistentCachePeerKeyInScheduler(schedulerClusterID, peerID)).Result() + if err != nil { + logger.Warnf("getting peer %s failed from redis: %v", peerID, err) + continue + } + + if len(rawPeer) == 0 { + logger.Warnf("peer %s not found in redis", peerID) + continue + } + + // Parse peer data. + peer, err := s.parsePeerData(ctx, rawPeer, schedulerClusterID) + if err != nil { + logger.Warnf("parse peer %s data failed: %v", peerID, err) + continue + } + + peers = append(peers, peer) + } + + return peers, nil +} + +// parsePeerData parses raw peer data from Redis and returns a PersistentCachePeer. +func (s *service) parsePeerData(ctx context.Context, rawPeer map[string]string, schedulerClusterID uint) (types.PersistentCachePeer, error) { + var peer types.PersistentCachePeer + + // Set ID. + peer.ID = rawPeer["id"] + + // Parse Persistent. + if persistent, err := strconv.ParseBool(rawPeer["persistent"]); err == nil { + peer.Persistent = persistent + } else { + logger.Warnf("parsing persistent failed: %v", err) + } + + // Parse FinishedPieces. + finishedPieces := &bitset.BitSet{} + if err := finishedPieces.UnmarshalBinary([]byte(rawPeer["finished_pieces"])); err == nil { + peer.FinishedPieces = finishedPieces + } else { + logger.Warnf("unmarshal finished pieces failed: %v", err) + } + + // Set State. + peer.State = rawPeer["state"] + + // Parse BlockParents. + blockParents := []string{} + if err := json.Unmarshal([]byte(rawPeer["block_parents"]), &blockParents); err == nil { + peer.BlockParents = blockParents + } else { + logger.Warnf("unmarshal block parents failed: %v", err) + } + + // Parse Cost. + if cost, err := strconv.ParseInt(rawPeer["cost"], 10, 64); err == nil { + peer.Cost = time.Duration(cost) + } else { + logger.Warnf("parsing cost failed: %v", err) + } + + // Parse CreatedAt. + if createdAt, err := time.Parse(time.RFC3339, rawPeer["created_at"]); err == nil { + peer.CreatedAt = createdAt + } else { + logger.Warnf("parsing created at failed: %v", err) + } + + // Parse UpdatedAt. + if updatedAt, err := time.Parse(time.RFC3339, rawPeer["updated_at"]); err == nil { + peer.UpdatedAt = updatedAt + } else { + logger.Warnf("parsing updated at failed: %v", err) + } + + // Load host data if host_id is available. + if hostID, ok := rawPeer["host_id"]; ok && hostID != "" { + host, err := s.loadHostData(ctx, hostID, schedulerClusterID) + if err != nil { + logger.Warnf("load host %s data failed: %v", hostID, err) + } else { + peer.Host = host + } + } + + return peer, nil +} + +// loadHostData loads host data from Redis. +func (s *service) loadHostData(ctx context.Context, hostID string, schedulerClusterID uint) (types.PersistentCachePeerHost, error) { + var host types.PersistentCachePeerHost + + // Get host data from Redis. + rawHost, err := s.rdb.HGetAll(ctx, pkgredis.MakePersistentCacheHostKeyInScheduler(schedulerClusterID, hostID)).Result() + if err != nil { + return host, err + } + + if len(rawHost) == 0 { + return host, nil + } + + // Set basic host information. + host.ID = rawHost["id"] + host.Type = rawHost["type"] + host.Hostname = rawHost["hostname"] + host.IP = rawHost["ip"] + host.OS = rawHost["os"] + host.Platform = rawHost["platform"] + host.PlatformFamily = rawHost["platform_family"] + host.PlatformVersion = rawHost["platform_version"] + host.KernelVersion = rawHost["kernel_version"] + + // Parse integer fields. + if port, err := strconv.ParseInt(rawHost["port"], 10, 32); err == nil { + host.Port = int32(port) + } else { + logger.Warnf("parsing port failed: %v", err) + } + + if downloadPort, err := strconv.ParseInt(rawHost["download_port"], 10, 32); err == nil { + host.DownloadPort = int32(downloadPort) + } else { + logger.Warnf("parsing download port failed: %v", err) + } + + // Parse boolean fields. + if disableShared, err := strconv.ParseBool(rawHost["disable_shared"]); err == nil { + host.DisableShared = disableShared + } else { + logger.Warnf("parsing disable shared failed: %v", err) + } + + // Parse CPU information. + if cpuLogicalCount, err := strconv.ParseUint(rawHost["cpu_logical_count"], 10, 32); err == nil { + host.CPU.LogicalCount = uint32(cpuLogicalCount) + } else { + logger.Warnf("parsing cpu logical count failed: %v", err) + } + + if cpuPhysicalCount, err := strconv.ParseUint(rawHost["cpu_physical_count"], 10, 32); err == nil { + host.CPU.PhysicalCount = uint32(cpuPhysicalCount) + } else { + logger.Warnf("parsing cpu physical count failed: %v", err) + } + + if cpuPercent, err := strconv.ParseFloat(rawHost["cpu_percent"], 64); err == nil { + host.CPU.Percent = cpuPercent + } else { + logger.Warnf("parsing cpu percent failed: %v", err) + } + + if cpuProcessPercent, err := strconv.ParseFloat(rawHost["cpu_process_percent"], 64); err == nil { + host.CPU.ProcessPercent = cpuProcessPercent + } else { + logger.Warnf("parsing cpu process percent failed: %v", err) + } + + // Parse CPU Times. + if cpuTimesUser, err := strconv.ParseFloat(rawHost["cpu_times_user"], 64); err == nil { + host.CPU.Times.User = cpuTimesUser + } else { + logger.Warnf("parsing cpu times user failed: %v", err) + } + + if cpuTimesSystem, err := strconv.ParseFloat(rawHost["cpu_times_system"], 64); err == nil { + host.CPU.Times.System = cpuTimesSystem + } else { + logger.Warnf("parsing cpu times system failed: %v", err) + } + + if cpuTimesIdle, err := strconv.ParseFloat(rawHost["cpu_times_idle"], 64); err == nil { + host.CPU.Times.Idle = cpuTimesIdle + } else { + logger.Warnf("parsing cpu times idle failed: %v", err) + } + + if cpuTimesNice, err := strconv.ParseFloat(rawHost["cpu_times_nice"], 64); err == nil { + host.CPU.Times.Nice = cpuTimesNice + } else { + logger.Warnf("parsing cpu times nice failed: %v", err) + } + + if cpuTimesIowait, err := strconv.ParseFloat(rawHost["cpu_times_iowait"], 64); err == nil { + host.CPU.Times.Iowait = cpuTimesIowait + } else { + logger.Warnf("parsing cpu times iowait failed: %v", err) + } + + if cpuTimesIrq, err := strconv.ParseFloat(rawHost["cpu_times_irq"], 64); err == nil { + host.CPU.Times.Irq = cpuTimesIrq + } else { + logger.Warnf("parsing cpu times irq failed: %v", err) + } + + if cpuTimesSoftirq, err := strconv.ParseFloat(rawHost["cpu_times_softirq"], 64); err == nil { + host.CPU.Times.Softirq = cpuTimesSoftirq + } else { + logger.Warnf("parsing cpu times softirq failed: %v", err) + } + + if cpuTimesSteal, err := strconv.ParseFloat(rawHost["cpu_times_steal"], 64); err == nil { + host.CPU.Times.Steal = cpuTimesSteal + } else { + logger.Warnf("parsing cpu times steal failed: %v", err) + } + + if cpuTimesGuest, err := strconv.ParseFloat(rawHost["cpu_times_guest"], 64); err == nil { + host.CPU.Times.Guest = cpuTimesGuest + } else { + logger.Warnf("parsing cpu times guest failed: %v", err) + } + + if cpuTimesGuestNice, err := strconv.ParseFloat(rawHost["cpu_times_guest_nice"], 64); err == nil { + host.CPU.Times.GuestNice = cpuTimesGuestNice + } else { + logger.Warnf("parsing cpu times guest nice failed: %v", err) + } + + // Parse Memory information. + if memoryTotal, err := strconv.ParseUint(rawHost["memory_total"], 10, 64); err == nil { + host.Memory.Total = memoryTotal + } else { + logger.Warnf("parsing memory total failed: %v", err) + } + + if memoryAvailable, err := strconv.ParseUint(rawHost["memory_available"], 10, 64); err == nil { + host.Memory.Available = memoryAvailable + } else { + logger.Warnf("parsing memory available failed: %v", err) + } + + if memoryUsed, err := strconv.ParseUint(rawHost["memory_used"], 10, 64); err == nil { + host.Memory.Used = memoryUsed + } else { + logger.Warnf("parsing memory used failed: %v", err) + } + + if memoryUsedPercent, err := strconv.ParseFloat(rawHost["memory_used_percent"], 64); err == nil { + host.Memory.UsedPercent = memoryUsedPercent + } else { + logger.Warnf("parsing memory used percent failed: %v", err) + } + + if memoryProcessUsedPercent, err := strconv.ParseFloat(rawHost["memory_process_used_percent"], 64); err == nil { + host.Memory.ProcessUsedPercent = memoryProcessUsedPercent + } else { + logger.Warnf("parsing memory process used percent failed: %v", err) + } + + if memoryFree, err := strconv.ParseUint(rawHost["memory_free"], 10, 64); err == nil { + host.Memory.Free = memoryFree + } else { + logger.Warnf("parsing memory free failed: %v", err) + } + + // Parse Network information. + if tcpConnectionCount, err := strconv.ParseUint(rawHost["tcp_connection_count"], 10, 32); err == nil { + host.Network.TCPConnectionCount = uint32(tcpConnectionCount) + } else { + logger.Warnf("parsing tcp connection count failed: %v", err) + } + + if uploadTCPConnectionCount, err := strconv.ParseUint(rawHost["upload_tcp_connection_count"], 10, 32); err == nil { + host.Network.UploadTCPConnectionCount = uint32(uploadTCPConnectionCount) + } else { + logger.Warnf("parsing upload tcp connection count failed: %v", err) + } + + host.Network.Location = rawHost["location"] + host.Network.IDC = rawHost["idc"] + + if downloadRate, err := strconv.ParseUint(rawHost["download_rate"], 10, 64); err == nil { + host.Network.DownloadRate = downloadRate + } else { + logger.Warnf("parsing download rate failed: %v", err) + } + + if downloadRateLimit, err := strconv.ParseUint(rawHost["download_rate_limit"], 10, 64); err == nil { + host.Network.DownloadRateLimit = downloadRateLimit + } else { + logger.Warnf("parsing download rate limit failed: %v", err) + } + + if uploadRate, err := strconv.ParseUint(rawHost["upload_rate"], 10, 64); err == nil { + host.Network.UploadRate = uploadRate + } else { + logger.Warnf("parsing upload rate failed: %v", err) + } + + if uploadRateLimit, err := strconv.ParseUint(rawHost["upload_rate_limit"], 10, 64); err == nil { + host.Network.UploadRateLimit = uploadRateLimit + } else { + logger.Warnf("parsing upload rate limit failed: %v", err) + } + + // Parse Disk information. + if diskTotal, err := strconv.ParseUint(rawHost["disk_total"], 10, 64); err == nil { + host.Disk.Total = diskTotal + } else { + logger.Warnf("parsing disk total failed: %v", err) + } + + if diskFree, err := strconv.ParseUint(rawHost["disk_free"], 10, 64); err == nil { + host.Disk.Free = diskFree + } else { + logger.Warnf("parsing disk free failed: %v", err) + } + + if diskUsed, err := strconv.ParseUint(rawHost["disk_used"], 10, 64); err == nil { + host.Disk.Used = diskUsed + } else { + logger.Warnf("parsing disk used failed: %v", err) + } + + if diskUsedPercent, err := strconv.ParseFloat(rawHost["disk_used_percent"], 64); err == nil { + host.Disk.UsedPercent = diskUsedPercent + } else { + logger.Warnf("parsing disk used percent failed: %v", err) + } + + if diskInodesTotal, err := strconv.ParseUint(rawHost["disk_inodes_total"], 10, 64); err == nil { + host.Disk.InodesTotal = diskInodesTotal + } else { + logger.Warnf("parsing disk inodes total failed: %v", err) + } + + if diskInodesUsed, err := strconv.ParseUint(rawHost["disk_inodes_used"], 10, 64); err == nil { + host.Disk.InodesUsed = diskInodesUsed + } else { + logger.Warnf("parsing disk inodes used failed: %v", err) + } + + if diskInodesFree, err := strconv.ParseUint(rawHost["disk_inodes_free"], 10, 64); err == nil { + host.Disk.InodesFree = diskInodesFree + } else { + logger.Warnf("parsing disk inodes free failed: %v", err) + } + + if diskInodesUsedPercent, err := strconv.ParseFloat(rawHost["disk_inodes_used_percent"], 64); err == nil { + host.Disk.InodesUsedPercent = diskInodesUsedPercent + } else { + logger.Warnf("parsing disk inodes used percent failed: %v", err) + } + + if diskWriteBandwidth, err := strconv.ParseUint(rawHost["disk_write_bandwidth"], 10, 64); err == nil { + host.Disk.WriteBandwidth = diskWriteBandwidth + } else { + logger.Warnf("parsing disk write bandwidth failed: %v", err) + } + + if diskReadBandwidth, err := strconv.ParseUint(rawHost["disk_read_bandwidth"], 10, 64); err == nil { + host.Disk.ReadBandwidth = diskReadBandwidth + } else { + logger.Warnf("parsing disk read bandwidth failed: %v", err) + } + + // Parse Build information. + host.Build.GitVersion = rawHost["build_git_version"] + host.Build.GitCommit = rawHost["build_git_commit"] + host.Build.GoVersion = rawHost["build_go_version"] + host.Build.Platform = rawHost["build_platform"] + + // Parse SchedulerClusterID. + if schedulerClusterID, err := strconv.ParseUint(rawHost["scheduler_cluster_id"], 10, 64); err == nil { + host.SchedulerClusterID = schedulerClusterID + } else { + logger.Warnf("parsing scheduler cluster id failed: %v", err) + } + + // Parse AnnounceInterval. + if announceInterval, err := strconv.ParseInt(rawHost["announce_interval"], 10, 64); err == nil { + host.AnnounceInterval = time.Duration(announceInterval) + } else { + logger.Warnf("parsing announce interval failed: %v", err) + } + + // Parse CreatedAt. + if createdAt, err := time.Parse(time.RFC3339, rawHost["created_at"]); err == nil { + host.CreatedAt = createdAt + } else { + logger.Warnf("parsing created at failed: %v", err) + } + + // Parse UpdatedAt. + if updatedAt, err := time.Parse(time.RFC3339, rawHost["updated_at"]); err == nil { + host.UpdatedAt = updatedAt + } else { + logger.Warnf("parsing updated at failed: %v", err) + } + + return host, nil +} diff --git a/manager/service/service.go b/manager/service/service.go index 1650bab1e..b3fbcd0ff 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -137,6 +137,10 @@ type Service interface { UpdatePersonalAccessToken(context.Context, uint, types.UpdatePersonalAccessTokenRequest) (*models.PersonalAccessToken, error) GetPersonalAccessToken(context.Context, uint) (*models.PersonalAccessToken, error) GetPersonalAccessTokens(context.Context, types.GetPersonalAccessTokensQuery) ([]models.PersonalAccessToken, int64, error) + + DestroyPersistentCache(context.Context, types.PersistentCacheParams) error + GetPersistentCache(context.Context, types.PersistentCacheParams) (*types.GetPersistentCacheResponse, error) + GetPersistentCaches(context.Context, types.GetPersistentCachesQuery) ([]types.GetPersistentCacheResponse, int64, error) } type service struct { diff --git a/manager/types/job.go b/manager/types/job.go index fa982b738..0940e1a7e 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -155,7 +155,7 @@ type CreateSyncPeersJobRequest struct { // UserID is the user id of the job. UserID uint `json:"user_id" binding:"omitempty"` - // SeedPeerClusterIDs is the seed peer cluster ids of the job. + // SchedulerClusterIDs is the scheduler cluster ids of the job. SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } diff --git a/manager/types/persistent_cache.go b/manager/types/persistent_cache.go new file mode 100644 index 000000000..cd870ad66 --- /dev/null +++ b/manager/types/persistent_cache.go @@ -0,0 +1,303 @@ +/* + * Copyright 2025 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package types + +import ( + "time" + + "github.com/bits-and-blooms/bitset" +) + +type PersistentCacheParams struct { + // SchedulerClusterID is the scheduler cluster id of the persistent cache. + SchedulerClusterID uint `uri:"scheduler_cluster_id" binding:"required"` + + // TaskID is the task id of the persistent cache. + TaskID string `uri:"task_id" binding:"required"` +} + +type GetPersistentCachesQuery struct { + // SchedulerClusterIDs is the scheduler cluster ids of the persistent cache. + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + + // Page is the page number of the persistent cache list. + Page int `form:"page" binding:"omitempty,gte=1"` + + // PerPage is the item count per page of the persistent cache list. + PerPage int `form:"per_page" binding:"omitempty,gte=1,lte=10000000"` +} + +type GetPersistentCacheResponse struct { + // TaskID is task id. + TaskID string `json:"task_id" binding:"omitempty"` + + // PersistentReplicaCount is replica count of the persistent cache task. + PersistentReplicaCount uint64 `json:"persistent_replica_count" binding:"omitempty"` + + // Tag is used to distinguish different persistent cache tasks. + Tag string `json:"tag" binding:"omitempty"` + + // Application of persistent cache task. + Application string `json:"application" binding:"omitempty"` + + // PieceLength is persistent cache task piece length. + PieceLength uint64 `json:"piece_length" binding:"omitempty"` + + // ContentLength is persistent cache task total content length. + ContentLength uint64 `json:"content_length" binding:"omitempty"` + + // TotalPieceCount is total piece count. + TotalPieceCount uint32 `json:"total_piece_count" binding:"omitempty"` + + // State is persistent cache task state. + State string `json:"state" binding:"omitempty"` + + // TTL is persistent cache task time to live. + TTL time.Duration `json:"ttl" binding:"omitempty"` + + // CreatedAt is persistent cache task create time. + CreatedAt time.Time `json:"created_at" binding:"omitempty"` + + // UpdatedAt is persistent cache task update time. + UpdatedAt time.Time `json:"updated_at" binding:"omitempty"` + + // PersistentCachePeers is the list of persistent cache peers. + PersistentCachePeers []PersistentCachePeer `json:"persistent_cache_peers" binding:"omitempty"` +} + +type PersistentCachePeer struct { + // ID is persistent cache peer id. + ID string `json:"id" binding:"omitempty"` + + // Persistent is whether the peer is persistent. + Persistent bool `json:"persistent" binding:"omitempty"` + + // FinishedPieces is finished pieces bitset. + FinishedPieces *bitset.BitSet `json:"finished_pieces" binding:"omitempty"` + + // State is persistent cache peer state. + State string `json:"state" binding:"omitempty"` + + // BlockParents is bad parents ids. + BlockParents []string `json:"block_parents" binding:"omitempty"` + + // Cost is the cost of downloading. + Cost time.Duration `json:"cost" binding:"omitempty"` + + // CreatedAt is persistent cache peer create time. + CreatedAt time.Time `json:"created_at" binding:"omitempty"` + + // UpdatedAt is persistent cache peer update time. + UpdatedAt time.Time `json:"updated_at" binding:"omitempty"` + + // Host is the peer host. + Host PersistentCachePeerHost `json:"host" binding:"omitempty"` +} + +type PersistentCachePeerHost struct { + // ID is host id. + ID string `json:"id" binding:"omitempty"` + + // Type is host type. + Type string `json:"type" binding:"omitempty"` + + // Hostname is host name. + Hostname string `json:"hostname" binding:"omitempty"` + + // IP is host ip. + IP string `json:"ip" binding:"omitempty"` + + // Port is grpc service port. + Port int32 `json:"port" binding:"omitempty"` + + // DownloadPort is piece downloading port. + DownloadPort int32 `json:"download_port" binding:"omitempty"` + + // DisableShared is whether the host is disabled for shared with other peers. + DisableShared bool `json:"disable_shared" binding:"omitempty"` + + // OS is host OS. + OS string `json:"os" binding:"omitempty"` + + // Platform is host platform. + Platform string `json:"platform" binding:"omitempty"` + + // PlatformFamily is host platform family. + PlatformFamily string `json:"platform_family" binding:"omitempty"` + + // PlatformVersion is host platform version. + PlatformVersion string `json:"platform_version" binding:"omitempty"` + + // KernelVersion is host kernel version. + KernelVersion string `json:"kernel_version" binding:"omitempty"` + + // CPU contains cpu information. + CPU struct { + // LogicalCount is cpu logical count. + LogicalCount uint32 `json:"logical_count" binding:"omitempty"` + + // PhysicalCount is cpu physical count. + PhysicalCount uint32 `json:"physical_count" binding:"omitempty"` + + // Percent is cpu usage percent. + Percent float64 `json:"percent" binding:"omitempty"` + + // ProcessPercent is process cpu usage percent. + ProcessPercent float64 `json:"process_percent" binding:"omitempty"` + + // Times contains cpu times information. + Times struct { + // User is user cpu time. + User float64 `json:"user" binding:"omitempty"` + + // System is system cpu time. + System float64 `json:"system" binding:"omitempty"` + + // Idle is idle cpu time. + Idle float64 `json:"idle" binding:"omitempty"` + + // Nice is nice cpu time. + Nice float64 `json:"nice" binding:"omitempty"` + + // Iowait is iowait cpu time. + Iowait float64 `json:"iowait" binding:"omitempty"` + + // Irq is irq cpu time. + Irq float64 `json:"irq" binding:"omitempty"` + + // Softirq is softirq cpu time. + Softirq float64 `json:"softirq" binding:"omitempty"` + + // Steal is steal cpu time. + Steal float64 `json:"steal" binding:"omitempty"` + + // Guest is guest cpu time. + Guest float64 `json:"guest" binding:"omitempty"` + + // GuestNice is guest nice cpu time. + GuestNice float64 `json:"guest_nice" binding:"omitempty"` + } `json:"times" binding:"omitempty"` + } `json:"cpu" binding:"omitempty"` + + // Memory contains memory information. + Memory struct { + // Total is total memory. + Total uint64 `json:"total" binding:"omitempty"` + + // Available is available memory. + Available uint64 `json:"available" binding:"omitempty"` + + // Used is used memory. + Used uint64 `json:"used" binding:"omitempty"` + + // UsedPercent is memory usage percent. + UsedPercent float64 `json:"used_percent" binding:"omitempty"` + + // ProcessUsedPercent is process memory usage percent. + ProcessUsedPercent float64 `json:"process_used_percent" binding:"omitempty"` + + // Free is free memory. + Free uint64 `json:"free" binding:"omitempty"` + } `json:"memory" binding:"omitempty"` + + // Network contains network information. + Network struct { + // TCPConnectionCount is tcp connection count. + TCPConnectionCount uint32 `json:"tcp_connection_count" binding:"omitempty"` + + // UploadTCPConnectionCount is upload tcp connection count. + UploadTCPConnectionCount uint32 `json:"upload_tcp_connection_count" binding:"omitempty"` + + // Location is network location. + Location string `json:"location" binding:"omitempty"` + + // IDC is network idc. + IDC string `json:"idc" binding:"omitempty"` + + // DownloadRate is download rate. + DownloadRate uint64 `json:"download_rate" binding:"omitempty"` + + // DownloadRateLimit is download rate limit. + DownloadRateLimit uint64 `json:"download_rate_limit" binding:"omitempty"` + + // UploadRate is upload rate. + UploadRate uint64 `json:"upload_rate" binding:"omitempty"` + + // UploadRateLimit is upload rate limit. + UploadRateLimit uint64 `json:"upload_rate_limit" binding:"omitempty"` + } `json:"network" binding:"omitempty"` + + // Disk contains disk information. + Disk struct { + // Total is total disk space. + Total uint64 `json:"total" binding:"omitempty"` + + // Free is free disk space. + Free uint64 `json:"free" binding:"omitempty"` + + // Used is used disk space. + Used uint64 `json:"used" binding:"omitempty"` + + // UsedPercent is disk usage percent. + UsedPercent float64 `json:"used_percent" binding:"omitempty"` + + // InodesTotal is total inodes. + InodesTotal uint64 `json:"inodes_total" binding:"omitempty"` + + // InodesUsed is used inodes. + InodesUsed uint64 `json:"inodes_used" binding:"omitempty"` + + // InodesFree is free inodes. + InodesFree uint64 `json:"inodes_free" binding:"omitempty"` + + // InodesUsedPercent is inodes usage percent. + InodesUsedPercent float64 `json:"inodes_used_percent" binding:"omitempty"` + + // WriteBandwidth is write bandwidth. + WriteBandwidth uint64 `json:"write_bandwidth" binding:"omitempty"` + + // ReadBandwidth is read bandwidth. + ReadBandwidth uint64 `json:"read_bandwidth" binding:"omitempty"` + } `json:"disk" binding:"omitempty"` + + // Build contains build information. + Build struct { + // GitVersion is git version. + GitVersion string `json:"git_version" binding:"omitempty"` + + // GitCommit is git commit. + GitCommit string `json:"git_commit" binding:"omitempty"` + + // GoVersion is go version. + GoVersion string `json:"go_version" binding:"omitempty"` + + // Platform is build platform. + Platform string `json:"platform" binding:"omitempty"` + } `json:"build" binding:"omitempty"` + + // SchedulerClusterID is the scheduler cluster id matched by scopes. + SchedulerClusterID uint64 `json:"scheduler_cluster_id" binding:"omitempty"` + + // AnnounceInterval is the interval between host announces to scheduler. + AnnounceInterval time.Duration `json:"announce_interval" binding:"omitempty"` + + // CreatedAt is host create time. + CreatedAt time.Time `json:"created_at" binding:"omitempty"` + + // UpdatedAt is host update time. + UpdatedAt time.Time `json:"updated_at" binding:"omitempty"` +} diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index b16fa546b..cc07c3360 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -19,6 +19,8 @@ package redis import ( "context" "fmt" + "strconv" + "strings" "github.com/redis/go-redis/v9" @@ -147,7 +149,7 @@ func MakeKeyInScheduler(namespace, id string) string { return fmt.Sprintf("%s:%s", MakeNamespaceKeyInScheduler(namespace), id) } -// MakeSchedulerClusterKeyInManager make scheduler cluster key in manager. +// MakeSchedulerClusterKeyInManager make scheduler cluster key in scheduler. func MakePersistentCacheTaskKeyInScheduler(schedulerClusterID uint, taskID string) string { return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s", schedulerClusterID, PersistentCacheTasksNamespace, taskID)) } @@ -191,3 +193,19 @@ func MakePersistentCacheHostsInScheduler(schedulerClusterID uint) string { func MakePersistentCachePeersOfPersistentCacheHostInScheduler(schedulerClusterID uint, hostID string) string { return MakeKeyInScheduler(SchedulerClustersNamespace, fmt.Sprintf("%d:%s:%s:%s", schedulerClusterID, PersistentCacheHostsNamespace, hostID, PersistentCachePeersNamespace)) } + +// ExtractSchedulerClusterIDFromPersistentCacheTaskKey extracts the scheduler cluster ID from a persistent cache task key. +func ExtractSchedulerClusterIDFromPersistentCacheTaskKey(key string) (uint, error) { + parts := strings.Split(key, ":") + if len(parts) != 5 { + return 0, fmt.Errorf("invalid persistent cache task key: %s", key) + } + + // For example, if the key is "scheduler:scheduler-clusters:1:persistent-cache-tasks:123456789", the scheduler cluster ID is 1. + clusterID, err := strconv.ParseUint(parts[2], 10, 32) + if err != nil { + return 0, fmt.Errorf("invalid persistent cache task key: %s", key) + } + + return uint(clusterID), nil +}