feat: remove object storage message (#244)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-01-02 12:40:45 +08:00 committed by GitHub
parent 55a239ecc0
commit e759b2e4c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 161 additions and 2557 deletions

View File

@ -1,6 +1,6 @@
[package]
name = "dragonfly-api"
version = "2.0.72"
version = "2.0.73"
authors = ["Gaius <gaius.qi@gmail.com>"]
edition = "2021"
license = "Apache-2.0"

View File

@ -671,8 +671,6 @@ type Host struct {
Build *Build `protobuf:"bytes,16,opt,name=build,proto3,oneof" json:"build,omitempty"`
// ID of the cluster to which the host belongs.
SchedulerClusterId uint64 `protobuf:"varint,17,opt,name=scheduler_cluster_id,json=schedulerClusterId,proto3" json:"scheduler_cluster_id,omitempty"`
// Port of object storage server.
ObjectStoragePort int32 `protobuf:"varint,18,opt,name=object_storage_port,json=objectStoragePort,proto3" json:"object_storage_port,omitempty"`
}
func (x *Host) Reset() {
@ -826,13 +824,6 @@ func (x *Host) GetSchedulerClusterId() uint64 {
return 0
}
func (x *Host) GetObjectStoragePort() int32 {
if x != nil {
return x.ObjectStoragePort
}
return 0
}
// CPU Stat.
type CPU struct {
state protoimpl.MessageState
@ -1867,7 +1858,7 @@ var file_pkg_apis_common_v2_common_proto_rawDesc = []byte{
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01,
0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x67, 0x65, 0x73, 0x74, 0x42, 0x06, 0x0a, 0x04, 0x5f,
0x74, 0x61, 0x67, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x61, 0x70, 0x70, 0x6c, 0x69, 0x63, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x22, 0xfb, 0x05, 0x0a, 0x04, 0x48, 0x6f, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x02,
0x69, 0x6f, 0x6e, 0x22, 0xbb, 0x05, 0x0a, 0x04, 0x48, 0x6f, 0x73, 0x74, 0x12, 0x17, 0x0a, 0x02,
0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x72, 0x02, 0x10,
0x01, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0d, 0x42, 0x07, 0xfa, 0x42, 0x04, 0x2a, 0x02, 0x18, 0x03, 0x52, 0x04, 0x74, 0x79,
@ -1907,11 +1898,7 @@ var file_pkg_apis_common_v2_common_proto_rawDesc = []byte{
0x64, 0x48, 0x04, 0x52, 0x05, 0x62, 0x75, 0x69, 0x6c, 0x64, 0x88, 0x01, 0x01, 0x12, 0x30, 0x0a,
0x14, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x5f, 0x63, 0x6c, 0x75, 0x73, 0x74,
0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x11, 0x20, 0x01, 0x28, 0x04, 0x52, 0x12, 0x73, 0x63, 0x68,
0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12,
0x3e, 0x0a, 0x13, 0x6f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67,
0x65, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x12, 0x20, 0x01, 0x28, 0x05, 0x42, 0x0e, 0xfa, 0x42,
0x0b, 0x1a, 0x09, 0x10, 0xff, 0xff, 0x03, 0x28, 0x80, 0x08, 0x40, 0x01, 0x52, 0x11, 0x6f, 0x62,
0x6a, 0x65, 0x63, 0x74, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x50, 0x6f, 0x72, 0x74, 0x42,
0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x42,
0x06, 0x0a, 0x04, 0x5f, 0x63, 0x70, 0x75, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x6d, 0x65, 0x6d, 0x6f,
0x72, 0x79, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x6e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x42, 0x07,
0x0a, 0x05, 0x5f, 0x64, 0x69, 0x73, 0x6b, 0x42, 0x08, 0x0a, 0x06, 0x5f, 0x62, 0x75, 0x69, 0x6c,

View File

@ -737,21 +737,6 @@ func (m *Host) validate(all bool) error {
// no validation rules for SchedulerClusterId
if m.GetObjectStoragePort() != 0 {
if val := m.GetObjectStoragePort(); val < 1024 || val >= 65535 {
err := HostValidationError{
field: "ObjectStoragePort",
reason: "value must be inside range [1024, 65535)",
}
if !all {
return err
}
errors = append(errors, err)
}
}
if m.Cpu != nil {
if all {

View File

@ -208,8 +208,6 @@ message Host {
optional Build build = 16;
// ID of the cluster to which the host belongs.
uint64 scheduler_cluster_id = 17;
// Port of object storage server.
int32 object_storage_port = 18 [(validate.rules).int32 = {gte: 1024, lt: 65535, ignore_empty: true}];
}
// CPU Stat.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -28,8 +28,10 @@ option go_package = "d7y.io/api/v2/pkg/apis/manager/v2;manager";
enum SourceType {
// Scheduler service.
SCHEDULER_SOURCE = 0;
// Peer service.
PEER_SOURCE = 1;
// SeedPeer service.
SEED_PEER_SOURCE = 2;
}
@ -72,8 +74,6 @@ message SeedPeer {
SeedPeerCluster seed_peer_cluster = 11;
// Schedulers included in seed peer.
repeated Scheduler schedulers = 12;
// Seed peer object storage port.
int32 object_storage_port = 13;
}
// GetSeedPeerRequest represents request of GetSeedPeer.
@ -128,8 +128,6 @@ message UpdateSeedPeerRequest {
int32 download_port = 8 [(validate.rules).int32 = {gte: 1024, lt: 65535}];
// ID of the cluster to which the seed peer belongs.
uint64 seed_peer_cluster_id = 9 [(validate.rules).uint64 = {gte: 1}];
// Seed peer object storage port.
int32 object_storage_port = 10 [(validate.rules).int32 = {gte: 1024, lt: 65535, ignore_empty: true}];
}
// DeleteSeedPeerRequest represents request of DeleteSeedPeer.
@ -240,61 +238,6 @@ message ListSchedulersResponse {
repeated Scheduler schedulers = 1;
}
// ObjectStorage represents config of object storage.
message ObjectStorage {
// name is object storage name of type, it can be s3, oss or obs.
string name = 1 [(validate.rules).string = {min_len: 1, max_len: 1024}];
// Region is storage region.
string region = 2 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
// Endpoint is datacenter endpoint.
string endpoint = 3 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
// AccessKey is access key ID.
string access_key = 4 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
// SecretKey is access key secret.
string secret_key = 5 [(validate.rules).string = {min_len: 1, max_len: 1024, ignore_empty: true}];
// S3ForcePathStyle sets force path style for s3, true by default.
// Set this to `true` to force the request to use path-style addressing,
// i.e., `http://s3.amazonaws.com/BUCKET/KEY`. By default, the S3 client
// will use virtual hosted bucket addressing when possible
// (`http://BUCKET.s3.amazonaws.com/KEY`).
// Refer to https://github.com/aws/aws-sdk-go/blob/main/aws/config.go#L118.
bool s3_force_path_style = 6;
// Scheme is the scheme of the http client.
string scheme = 7 [(validate.rules).string = {in: ["http", "https"]}];
}
// GetObjectStorageRequest represents request of GetObjectStorage.
message GetObjectStorageRequest {
// Request source type.
SourceType source_type = 1 [(validate.rules).enum.defined_only = true];
// Source service hostname.
string hostname = 2 [(validate.rules).string.hostname = true];
// Source service ip.
string ip = 3 [(validate.rules).string.ip = true];
}
// Bucket represents config of bucket.
message Bucket {
// Bucket name.
string name = 1 [(validate.rules).string = {min_len: 1, max_len: 1024}];
}
// ListSchedulersRequest represents request of ListBuckets.
message ListBucketsRequest {
// Request source type.
SourceType source_type = 1 [(validate.rules).enum.defined_only = true];
// Source service hostname.
string hostname = 2 [(validate.rules).string.hostname = true];
// Source service ip.
string ip = 3 [(validate.rules).string.ip = true];
}
// ListBucketsResponse represents response of ListBuckets.
message ListBucketsResponse {
// Bucket configs.
repeated Bucket buckets = 1;
}
// URLPriority represents config of url priority.
message URLPriority {
// URL regex.
@ -363,21 +306,6 @@ message CreateMLPRequest {
double mae = 3 [(validate.rules).double = {gte: 0}];
}
// CreateModelRequest represents request of CreateModel.
message CreateModelRequest {
// Scheduler hostname.
string hostname = 1 [(validate.rules).string.min_len = 1];
// Scheduler ip.
string ip = 2 [(validate.rules).string.ip = true];
oneof request {
option (validate.required) = true;
CreateGNNRequest create_gnn_request = 3;
CreateMLPRequest create_mlp_request = 4;
}
}
// KeepAliveRequest represents request of KeepAlive.
message KeepAliveRequest {
// Request source type.
@ -413,18 +341,9 @@ service Manager {
// List acitve schedulers configuration.
rpc ListSchedulers(ListSchedulersRequest)returns(ListSchedulersResponse);
// Get ObjectStorage configuration.
rpc GetObjectStorage(GetObjectStorageRequest) returns(ObjectStorage);
// List buckets configuration.
rpc ListBuckets(ListBucketsRequest)returns(ListBucketsResponse);
// List applications configuration.
rpc ListApplications(ListApplicationsRequest)returns(ListApplicationsResponse);
// Create model and update data of model to object storage.
rpc CreateModel(CreateModelRequest)returns(google.protobuf.Empty);
// KeepAlive with manager.
rpc KeepAlive(stream KeepAliveRequest)returns(google.protobuf.Empty);
}

View File

@ -37,14 +37,8 @@ type ManagerClient interface {
UpdateScheduler(ctx context.Context, in *UpdateSchedulerRequest, opts ...grpc.CallOption) (*Scheduler, error)
// List acitve schedulers configuration.
ListSchedulers(ctx context.Context, in *ListSchedulersRequest, opts ...grpc.CallOption) (*ListSchedulersResponse, error)
// Get ObjectStorage configuration.
GetObjectStorage(ctx context.Context, in *GetObjectStorageRequest, opts ...grpc.CallOption) (*ObjectStorage, error)
// List buckets configuration.
ListBuckets(ctx context.Context, in *ListBucketsRequest, opts ...grpc.CallOption) (*ListBucketsResponse, error)
// List applications configuration.
ListApplications(ctx context.Context, in *ListApplicationsRequest, opts ...grpc.CallOption) (*ListApplicationsResponse, error)
// Create model and update data of model to object storage.
CreateModel(ctx context.Context, in *CreateModelRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// KeepAlive with manager.
KeepAlive(ctx context.Context, opts ...grpc.CallOption) (Manager_KeepAliveClient, error)
}
@ -120,24 +114,6 @@ func (c *managerClient) ListSchedulers(ctx context.Context, in *ListSchedulersRe
return out, nil
}
func (c *managerClient) GetObjectStorage(ctx context.Context, in *GetObjectStorageRequest, opts ...grpc.CallOption) (*ObjectStorage, error) {
out := new(ObjectStorage)
err := c.cc.Invoke(ctx, "/manager.v2.Manager/GetObjectStorage", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managerClient) ListBuckets(ctx context.Context, in *ListBucketsRequest, opts ...grpc.CallOption) (*ListBucketsResponse, error) {
out := new(ListBucketsResponse)
err := c.cc.Invoke(ctx, "/manager.v2.Manager/ListBuckets", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managerClient) ListApplications(ctx context.Context, in *ListApplicationsRequest, opts ...grpc.CallOption) (*ListApplicationsResponse, error) {
out := new(ListApplicationsResponse)
err := c.cc.Invoke(ctx, "/manager.v2.Manager/ListApplications", in, out, opts...)
@ -147,15 +123,6 @@ func (c *managerClient) ListApplications(ctx context.Context, in *ListApplicatio
return out, nil
}
func (c *managerClient) CreateModel(ctx context.Context, in *CreateModelRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/manager.v2.Manager/CreateModel", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managerClient) KeepAlive(ctx context.Context, opts ...grpc.CallOption) (Manager_KeepAliveClient, error) {
stream, err := c.cc.NewStream(ctx, &Manager_ServiceDesc.Streams[0], "/manager.v2.Manager/KeepAlive", opts...)
if err != nil {
@ -208,14 +175,8 @@ type ManagerServer interface {
UpdateScheduler(context.Context, *UpdateSchedulerRequest) (*Scheduler, error)
// List acitve schedulers configuration.
ListSchedulers(context.Context, *ListSchedulersRequest) (*ListSchedulersResponse, error)
// Get ObjectStorage configuration.
GetObjectStorage(context.Context, *GetObjectStorageRequest) (*ObjectStorage, error)
// List buckets configuration.
ListBuckets(context.Context, *ListBucketsRequest) (*ListBucketsResponse, error)
// List applications configuration.
ListApplications(context.Context, *ListApplicationsRequest) (*ListApplicationsResponse, error)
// Create model and update data of model to object storage.
CreateModel(context.Context, *CreateModelRequest) (*emptypb.Empty, error)
// KeepAlive with manager.
KeepAlive(Manager_KeepAliveServer) error
}
@ -245,18 +206,9 @@ func (UnimplementedManagerServer) UpdateScheduler(context.Context, *UpdateSchedu
func (UnimplementedManagerServer) ListSchedulers(context.Context, *ListSchedulersRequest) (*ListSchedulersResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListSchedulers not implemented")
}
func (UnimplementedManagerServer) GetObjectStorage(context.Context, *GetObjectStorageRequest) (*ObjectStorage, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetObjectStorage not implemented")
}
func (UnimplementedManagerServer) ListBuckets(context.Context, *ListBucketsRequest) (*ListBucketsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListBuckets not implemented")
}
func (UnimplementedManagerServer) ListApplications(context.Context, *ListApplicationsRequest) (*ListApplicationsResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ListApplications not implemented")
}
func (UnimplementedManagerServer) CreateModel(context.Context, *CreateModelRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method CreateModel not implemented")
}
func (UnimplementedManagerServer) KeepAlive(Manager_KeepAliveServer) error {
return status.Errorf(codes.Unimplemented, "method KeepAlive not implemented")
}
@ -398,42 +350,6 @@ func _Manager_ListSchedulers_Handler(srv interface{}, ctx context.Context, dec f
return interceptor(ctx, in, info, handler)
}
func _Manager_GetObjectStorage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetObjectStorageRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServer).GetObjectStorage(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/manager.v2.Manager/GetObjectStorage",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServer).GetObjectStorage(ctx, req.(*GetObjectStorageRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Manager_ListBuckets_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListBucketsRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServer).ListBuckets(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/manager.v2.Manager/ListBuckets",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServer).ListBuckets(ctx, req.(*ListBucketsRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Manager_ListApplications_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListApplicationsRequest)
if err := dec(in); err != nil {
@ -452,24 +368,6 @@ func _Manager_ListApplications_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _Manager_CreateModel_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(CreateModelRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagerServer).CreateModel(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/manager.v2.Manager/CreateModel",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagerServer).CreateModel(ctx, req.(*CreateModelRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Manager_KeepAlive_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ManagerServer).KeepAlive(&managerKeepAliveServer{stream})
}
@ -531,22 +429,10 @@ var Manager_ServiceDesc = grpc.ServiceDesc{
MethodName: "ListSchedulers",
Handler: _Manager_ListSchedulers_Handler,
},
{
MethodName: "GetObjectStorage",
Handler: _Manager_GetObjectStorage_Handler,
},
{
MethodName: "ListBuckets",
Handler: _Manager_ListBuckets_Handler,
},
{
MethodName: "ListApplications",
Handler: _Manager_ListApplications_Handler,
},
{
MethodName: "CreateModel",
Handler: _Manager_CreateModel_Handler,
},
},
Streams: []grpc.StreamDesc{
{

View File

@ -42,26 +42,6 @@ func (m *MockManagerClient) EXPECT() *MockManagerClientMockRecorder {
return m.recorder
}
// CreateModel mocks base method.
func (m *MockManagerClient) CreateModel(ctx context.Context, in *manager.CreateModelRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
m.ctrl.T.Helper()
varargs := []any{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "CreateModel", varargs...)
ret0, _ := ret[0].(*emptypb.Empty)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateModel indicates an expected call of CreateModel.
func (mr *MockManagerClientMockRecorder) CreateModel(ctx, in any, opts ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateModel", reflect.TypeOf((*MockManagerClient)(nil).CreateModel), varargs...)
}
// DeleteSeedPeer mocks base method.
func (m *MockManagerClient) DeleteSeedPeer(ctx context.Context, in *manager.DeleteSeedPeerRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
m.ctrl.T.Helper()
@ -82,26 +62,6 @@ func (mr *MockManagerClientMockRecorder) DeleteSeedPeer(ctx, in any, opts ...any
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSeedPeer", reflect.TypeOf((*MockManagerClient)(nil).DeleteSeedPeer), varargs...)
}
// GetObjectStorage mocks base method.
func (m *MockManagerClient) GetObjectStorage(ctx context.Context, in *manager.GetObjectStorageRequest, opts ...grpc.CallOption) (*manager.ObjectStorage, error) {
m.ctrl.T.Helper()
varargs := []any{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "GetObjectStorage", varargs...)
ret0, _ := ret[0].(*manager.ObjectStorage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetObjectStorage indicates an expected call of GetObjectStorage.
func (mr *MockManagerClientMockRecorder) GetObjectStorage(ctx, in any, opts ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectStorage", reflect.TypeOf((*MockManagerClient)(nil).GetObjectStorage), varargs...)
}
// GetScheduler mocks base method.
func (m *MockManagerClient) GetScheduler(ctx context.Context, in *manager.GetSchedulerRequest, opts ...grpc.CallOption) (*manager.Scheduler, error) {
m.ctrl.T.Helper()
@ -182,26 +142,6 @@ func (mr *MockManagerClientMockRecorder) ListApplications(ctx, in any, opts ...a
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListApplications", reflect.TypeOf((*MockManagerClient)(nil).ListApplications), varargs...)
}
// ListBuckets mocks base method.
func (m *MockManagerClient) ListBuckets(ctx context.Context, in *manager.ListBucketsRequest, opts ...grpc.CallOption) (*manager.ListBucketsResponse, error) {
m.ctrl.T.Helper()
varargs := []any{ctx, in}
for _, a := range opts {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "ListBuckets", varargs...)
ret0, _ := ret[0].(*manager.ListBucketsResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListBuckets indicates an expected call of ListBuckets.
func (mr *MockManagerClientMockRecorder) ListBuckets(ctx, in any, opts ...any) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]any{ctx, in}, opts...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListBuckets", reflect.TypeOf((*MockManagerClient)(nil).ListBuckets), varargs...)
}
// ListSchedulers mocks base method.
func (m *MockManagerClient) ListSchedulers(ctx context.Context, in *manager.ListSchedulersRequest, opts ...grpc.CallOption) (*manager.ListSchedulersResponse, error) {
m.ctrl.T.Helper()
@ -442,21 +382,6 @@ func (m *MockManagerServer) EXPECT() *MockManagerServerMockRecorder {
return m.recorder
}
// CreateModel mocks base method.
func (m *MockManagerServer) CreateModel(arg0 context.Context, arg1 *manager.CreateModelRequest) (*emptypb.Empty, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateModel", arg0, arg1)
ret0, _ := ret[0].(*emptypb.Empty)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// CreateModel indicates an expected call of CreateModel.
func (mr *MockManagerServerMockRecorder) CreateModel(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateModel", reflect.TypeOf((*MockManagerServer)(nil).CreateModel), arg0, arg1)
}
// DeleteSeedPeer mocks base method.
func (m *MockManagerServer) DeleteSeedPeer(arg0 context.Context, arg1 *manager.DeleteSeedPeerRequest) (*emptypb.Empty, error) {
m.ctrl.T.Helper()
@ -472,21 +397,6 @@ func (mr *MockManagerServerMockRecorder) DeleteSeedPeer(arg0, arg1 any) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteSeedPeer", reflect.TypeOf((*MockManagerServer)(nil).DeleteSeedPeer), arg0, arg1)
}
// GetObjectStorage mocks base method.
func (m *MockManagerServer) GetObjectStorage(arg0 context.Context, arg1 *manager.GetObjectStorageRequest) (*manager.ObjectStorage, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetObjectStorage", arg0, arg1)
ret0, _ := ret[0].(*manager.ObjectStorage)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// GetObjectStorage indicates an expected call of GetObjectStorage.
func (mr *MockManagerServerMockRecorder) GetObjectStorage(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectStorage", reflect.TypeOf((*MockManagerServer)(nil).GetObjectStorage), arg0, arg1)
}
// GetScheduler mocks base method.
func (m *MockManagerServer) GetScheduler(arg0 context.Context, arg1 *manager.GetSchedulerRequest) (*manager.Scheduler, error) {
m.ctrl.T.Helper()
@ -546,21 +456,6 @@ func (mr *MockManagerServerMockRecorder) ListApplications(arg0, arg1 any) *gomoc
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListApplications", reflect.TypeOf((*MockManagerServer)(nil).ListApplications), arg0, arg1)
}
// ListBuckets mocks base method.
func (m *MockManagerServer) ListBuckets(arg0 context.Context, arg1 *manager.ListBucketsRequest) (*manager.ListBucketsResponse, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListBuckets", arg0, arg1)
ret0, _ := ret[0].(*manager.ListBucketsResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListBuckets indicates an expected call of ListBuckets.
func (mr *MockManagerServerMockRecorder) ListBuckets(arg0, arg1 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListBuckets", reflect.TypeOf((*MockManagerServer)(nil).ListBuckets), arg0, arg1)
}
// ListSchedulers mocks base method.
func (m *MockManagerServer) ListSchedulers(arg0 context.Context, arg1 *manager.ListSchedulersRequest) (*manager.ListSchedulersResponse, error) {
m.ctrl.T.Helper()

View File

@ -202,8 +202,6 @@ message Host {
optional Build build = 16;
// ID of the cluster to which the host belongs.
uint64 scheduler_cluster_id = 17;
// Port of object storage server.
int32 object_storage_port = 18;
}
// CPU Stat.

View File

@ -25,8 +25,10 @@ import "google/protobuf/empty.proto";
enum SourceType {
// Scheduler service.
SCHEDULER_SOURCE = 0;
// Peer service.
PEER_SOURCE = 1;
// SeedPeer service.
SEED_PEER_SOURCE = 2;
}
@ -69,8 +71,6 @@ message SeedPeer {
SeedPeerCluster seed_peer_cluster = 11;
// Schedulers included in seed peer.
repeated Scheduler schedulers = 12;
// Seed peer object storage port.
int32 object_storage_port = 13;
}
// GetSeedPeerRequest represents request of GetSeedPeer.
@ -125,8 +125,6 @@ message UpdateSeedPeerRequest {
int32 download_port = 8;
// ID of the cluster to which the seed peer belongs.
uint64 seed_peer_cluster_id = 9;
// Seed peer object storage port.
int32 object_storage_port = 10;
}
// DeleteSeedPeerRequest represents request of DeleteSeedPeer.
@ -237,61 +235,6 @@ message ListSchedulersResponse {
repeated Scheduler schedulers = 1;
}
// ObjectStorage represents config of object storage.
message ObjectStorage {
// name is object storage name of type, it can be s3, oss or obs.
string name = 1;
// Region is storage region.
string region = 2;
// Endpoint is datacenter endpoint.
string endpoint = 3;
// AccessKey is access key ID.
string access_key = 4;
// SecretKey is access key secret.
string secret_key = 5;
// S3ForcePathStyle sets force path style for s3, true by default.
// Set this to `true` to force the request to use path-style addressing,
// i.e., `http://s3.amazonaws.com/BUCKET/KEY`. By default, the S3 client
// will use virtual hosted bucket addressing when possible
// (`http://BUCKET.s3.amazonaws.com/KEY`).
// Refer to https://github.com/aws/aws-sdk-go/blob/main/aws/config.go#L118.
bool s3_force_path_style = 6;
// Scheme is the scheme of the http client.
string scheme = 7;
}
// GetObjectStorageRequest represents request of GetObjectStorage.
message GetObjectStorageRequest {
// Request source type.
SourceType source_type = 1;
// Source service hostname.
string hostname = 2;
// Source service ip.
string ip = 3;
}
// Bucket represents config of bucket.
message Bucket {
// Bucket name.
string name = 1;
}
// ListSchedulersRequest represents request of ListBuckets.
message ListBucketsRequest {
// Request source type.
SourceType source_type = 1;
// Source service hostname.
string hostname = 2;
// Source service ip.
string ip = 3;
}
// ListBucketsResponse represents response of ListBuckets.
message ListBucketsResponse {
// Bucket configs.
repeated Bucket buckets = 1;
}
// URLPriority represents config of url priority.
message URLPriority {
// URL regex.
@ -360,19 +303,6 @@ message CreateMLPRequest {
double mae = 3;
}
// CreateModelRequest represents request of CreateModel.
message CreateModelRequest {
// Scheduler hostname.
string hostname = 1;
// Scheduler ip.
string ip = 2;
oneof request {
CreateGNNRequest create_gnn_request = 3;
CreateMLPRequest create_mlp_request = 4;
}
}
// KeepAliveRequest represents request of KeepAlive.
message KeepAliveRequest {
// Request source type.
@ -408,18 +338,9 @@ service Manager {
// List acitve schedulers configuration.
rpc ListSchedulers(ListSchedulersRequest)returns(ListSchedulersResponse);
// Get ObjectStorage configuration.
rpc GetObjectStorage(GetObjectStorageRequest) returns(ObjectStorage);
// List buckets configuration.
rpc ListBuckets(ListBucketsRequest)returns(ListBucketsResponse);
// List applications configuration.
rpc ListApplications(ListApplicationsRequest)returns(ListApplicationsResponse);
// Create model and update data of model to object storage.
rpc CreateModel(CreateModelRequest)returns(google.protobuf.Empty);
// KeepAlive with manager.
rpc KeepAlive(stream KeepAliveRequest)returns(google.protobuf.Empty);
}

View File

@ -156,9 +156,6 @@ pub struct Host {
/// ID of the cluster to which the host belongs.
#[prost(uint64, tag = "17")]
pub scheduler_cluster_id: u64,
/// Port of object storage server.
#[prost(int32, tag = "18")]
pub object_storage_port: i32,
}
/// CPU Stat.
#[derive(serde::Serialize, serde::Deserialize)]

Binary file not shown.

View File

@ -57,9 +57,6 @@ pub struct SeedPeer {
/// Schedulers included in seed peer.
#[prost(message, repeated, tag = "12")]
pub schedulers: ::prost::alloc::vec::Vec<Scheduler>,
/// Seed peer object storage port.
#[prost(int32, tag = "13")]
pub object_storage_port: i32,
}
/// GetSeedPeerRequest represents request of GetSeedPeer.
#[derive(serde::Serialize, serde::Deserialize)]
@ -141,9 +138,6 @@ pub struct UpdateSeedPeerRequest {
/// ID of the cluster to which the seed peer belongs.
#[prost(uint64, tag = "9")]
pub seed_peer_cluster_id: u64,
/// Seed peer object storage port.
#[prost(int32, tag = "10")]
pub object_storage_port: i32,
}
/// DeleteSeedPeerRequest represents request of DeleteSeedPeer.
#[derive(serde::Serialize, serde::Deserialize)]
@ -307,86 +301,6 @@ pub struct ListSchedulersResponse {
#[prost(message, repeated, tag = "1")]
pub schedulers: ::prost::alloc::vec::Vec<Scheduler>,
}
/// ObjectStorage represents config of object storage.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ObjectStorage {
/// name is object storage name of type, it can be s3, oss or obs.
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
/// Region is storage region.
#[prost(string, tag = "2")]
pub region: ::prost::alloc::string::String,
/// Endpoint is datacenter endpoint.
#[prost(string, tag = "3")]
pub endpoint: ::prost::alloc::string::String,
/// AccessKey is access key ID.
#[prost(string, tag = "4")]
pub access_key: ::prost::alloc::string::String,
/// SecretKey is access key secret.
#[prost(string, tag = "5")]
pub secret_key: ::prost::alloc::string::String,
/// S3ForcePathStyle sets force path style for s3, true by default.
/// Set this to `true` to force the request to use path-style addressing,
/// i.e., `<http://s3.amazonaws.com/BUCKET/KEY`.> By default, the S3 client
/// will use virtual hosted bucket addressing when possible
/// (`<http://BUCKET.s3.amazonaws.com/KEY`>).
/// Refer to <https://github.com/aws/aws-sdk-go/blob/main/aws/config.go#L118.>
#[prost(bool, tag = "6")]
pub s3_force_path_style: bool,
/// Scheme is the scheme of the http client.
#[prost(string, tag = "7")]
pub scheme: ::prost::alloc::string::String,
}
/// GetObjectStorageRequest represents request of GetObjectStorage.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetObjectStorageRequest {
/// Request source type.
#[prost(enumeration = "SourceType", tag = "1")]
pub source_type: i32,
/// Source service hostname.
#[prost(string, tag = "2")]
pub hostname: ::prost::alloc::string::String,
/// Source service ip.
#[prost(string, tag = "3")]
pub ip: ::prost::alloc::string::String,
}
/// Bucket represents config of bucket.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Bucket {
/// Bucket name.
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
}
/// ListSchedulersRequest represents request of ListBuckets.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListBucketsRequest {
/// Request source type.
#[prost(enumeration = "SourceType", tag = "1")]
pub source_type: i32,
/// Source service hostname.
#[prost(string, tag = "2")]
pub hostname: ::prost::alloc::string::String,
/// Source service ip.
#[prost(string, tag = "3")]
pub ip: ::prost::alloc::string::String,
}
/// ListBucketsResponse represents response of ListBuckets.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ListBucketsResponse {
/// Bucket configs.
#[prost(message, repeated, tag = "1")]
pub buckets: ::prost::alloc::vec::Vec<Bucket>,
}
/// URLPriority represents config of url priority.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
@ -489,32 +403,6 @@ pub struct CreateMlpRequest {
#[prost(double, tag = "3")]
pub mae: f64,
}
/// CreateModelRequest represents request of CreateModel.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateModelRequest {
/// Scheduler hostname.
#[prost(string, tag = "1")]
pub hostname: ::prost::alloc::string::String,
/// Scheduler ip.
#[prost(string, tag = "2")]
pub ip: ::prost::alloc::string::String,
#[prost(oneof = "create_model_request::Request", tags = "3, 4")]
pub request: ::core::option::Option<create_model_request::Request>,
}
/// Nested message and enum types in `CreateModelRequest`.
pub mod create_model_request {
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Oneof)]
pub enum Request {
#[prost(message, tag = "3")]
CreateGnnRequest(super::CreateGnnRequest),
#[prost(message, tag = "4")]
CreateMlpRequest(super::CreateMlpRequest),
}
}
/// KeepAliveRequest represents request of KeepAlive.
#[derive(serde::Serialize, serde::Deserialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
@ -820,55 +708,6 @@ pub mod manager_client {
.insert(GrpcMethod::new("manager.v2.Manager", "ListSchedulers"));
self.inner.unary(req, path, codec).await
}
/// Get ObjectStorage configuration.
pub async fn get_object_storage(
&mut self,
request: impl tonic::IntoRequest<super::GetObjectStorageRequest>,
) -> std::result::Result<tonic::Response<super::ObjectStorage>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/manager.v2.Manager/GetObjectStorage",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("manager.v2.Manager", "GetObjectStorage"));
self.inner.unary(req, path, codec).await
}
/// List buckets configuration.
pub async fn list_buckets(
&mut self,
request: impl tonic::IntoRequest<super::ListBucketsRequest>,
) -> std::result::Result<
tonic::Response<super::ListBucketsResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/manager.v2.Manager/ListBuckets",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("manager.v2.Manager", "ListBuckets"));
self.inner.unary(req, path, codec).await
}
/// List applications configuration.
pub async fn list_applications(
&mut self,
@ -895,29 +734,6 @@ pub mod manager_client {
.insert(GrpcMethod::new("manager.v2.Manager", "ListApplications"));
self.inner.unary(req, path, codec).await
}
/// Create model and update data of model to object storage.
pub async fn create_model(
&mut self,
request: impl tonic::IntoRequest<super::CreateModelRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/manager.v2.Manager/CreateModel",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("manager.v2.Manager", "CreateModel"));
self.inner.unary(req, path, codec).await
}
/// KeepAlive with manager.
pub async fn keep_alive(
&mut self,
@ -991,19 +807,6 @@ pub mod manager_server {
tonic::Response<super::ListSchedulersResponse>,
tonic::Status,
>;
/// Get ObjectStorage configuration.
async fn get_object_storage(
&self,
request: tonic::Request<super::GetObjectStorageRequest>,
) -> std::result::Result<tonic::Response<super::ObjectStorage>, tonic::Status>;
/// List buckets configuration.
async fn list_buckets(
&self,
request: tonic::Request<super::ListBucketsRequest>,
) -> std::result::Result<
tonic::Response<super::ListBucketsResponse>,
tonic::Status,
>;
/// List applications configuration.
async fn list_applications(
&self,
@ -1012,11 +815,6 @@ pub mod manager_server {
tonic::Response<super::ListApplicationsResponse>,
tonic::Status,
>;
/// Create model and update data of model to object storage.
async fn create_model(
&self,
request: tonic::Request<super::CreateModelRequest>,
) -> std::result::Result<tonic::Response<()>, tonic::Status>;
/// KeepAlive with manager.
async fn keep_alive(
&self,
@ -1425,98 +1223,6 @@ pub mod manager_server {
};
Box::pin(fut)
}
"/manager.v2.Manager/GetObjectStorage" => {
#[allow(non_camel_case_types)]
struct GetObjectStorageSvc<T: Manager>(pub Arc<T>);
impl<
T: Manager,
> tonic::server::UnaryService<super::GetObjectStorageRequest>
for GetObjectStorageSvc<T> {
type Response = super::ObjectStorage;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::GetObjectStorageRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).get_object_storage(request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = GetObjectStorageSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/manager.v2.Manager/ListBuckets" => {
#[allow(non_camel_case_types)]
struct ListBucketsSvc<T: Manager>(pub Arc<T>);
impl<
T: Manager,
> tonic::server::UnaryService<super::ListBucketsRequest>
for ListBucketsSvc<T> {
type Response = super::ListBucketsResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ListBucketsRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).list_buckets(request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = ListBucketsSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/manager.v2.Manager/ListApplications" => {
#[allow(non_camel_case_types)]
struct ListApplicationsSvc<T: Manager>(pub Arc<T>);
@ -1563,52 +1269,6 @@ pub mod manager_server {
};
Box::pin(fut)
}
"/manager.v2.Manager/CreateModel" => {
#[allow(non_camel_case_types)]
struct CreateModelSvc<T: Manager>(pub Arc<T>);
impl<
T: Manager,
> tonic::server::UnaryService<super::CreateModelRequest>
for CreateModelSvc<T> {
type Response = ();
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::CreateModelRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).create_model(request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let inner = inner.0;
let method = CreateModelSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
"/manager.v2.Manager/KeepAlive" => {
#[allow(non_camel_case_types)]
struct KeepAliveSvc<T: Manager>(pub Arc<T>);