feat: update resource director (#2243)
* feat: update resource director * chore: add digest check for oras Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
parent
65bf673d2d
commit
94a7c7639f
|
|
@ -249,11 +249,14 @@ func newDownRequest(cfg *config.DfgetConfig, hdr map[string]string) *dfdaemonv1.
|
||||||
|
|
||||||
_url, err := url.Parse(cfg.URL)
|
_url, err := url.Parse(cfg.URL)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
inj, ok := source.ShouldInjectAuthInfo(_url.Scheme)
|
director, ok := source.HasDirector(_url.Scheme)
|
||||||
if ok {
|
if ok {
|
||||||
err = inj.Inject(_url, request.UrlMeta)
|
err = director.Direct(_url, request.UrlMeta)
|
||||||
if err != nil {
|
if err == nil {
|
||||||
logger.Errorf("inject auth info error: %s", err)
|
// write back new url
|
||||||
|
request.Url = _url.String()
|
||||||
|
} else {
|
||||||
|
logger.Errorf("direct resource error: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,7 @@
|
||||||
package orasprotocol
|
package orasprotocol
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -39,10 +40,12 @@ const (
|
||||||
scheme = "oras"
|
scheme = "oras"
|
||||||
configFilePath = "/.singularity/docker-config.json"
|
configFilePath = "/.singularity/docker-config.json"
|
||||||
|
|
||||||
authHeader = "X-Dragonfly-Oras-Authorization"
|
authHeader = "X-Dragonfly-Oras-Authorization"
|
||||||
|
tokenHeader = "X-Dragonfly-Oras-Token"
|
||||||
|
blobDigest = "digest"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ source.ResourceClient = (*orasSourceClient)(nil)
|
var client *orasSourceClient
|
||||||
|
|
||||||
type Blob struct {
|
type Blob struct {
|
||||||
Digest string `json:"digest"`
|
Digest string `json:"digest"`
|
||||||
|
|
@ -53,9 +56,13 @@ type Manifest struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
client = &orasSourceClient{
|
||||||
|
httpClient: http.DefaultClient,
|
||||||
|
}
|
||||||
|
|
||||||
source.RegisterBuilder(scheme,
|
source.RegisterBuilder(scheme,
|
||||||
source.NewPlainResourceClientBuilder(Builder),
|
source.NewPlainResourceClientBuilder(Builder),
|
||||||
source.WithAuthInfoInjector(source.NewPlainAuthInfoInjector(AuthInfoInjector)))
|
source.WithDirector(source.NewPlainDirector(Director)))
|
||||||
}
|
}
|
||||||
|
|
||||||
func Builder(optionYaml []byte) (source.ResourceClient, source.RequestAdapter, []source.Hook, error) {
|
func Builder(optionYaml []byte) (source.ResourceClient, source.RequestAdapter, []source.Hook, error) {
|
||||||
|
|
@ -64,19 +71,53 @@ func Builder(optionYaml []byte) (source.ResourceClient, source.RequestAdapter, [
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
client := &orasSourceClient{
|
client.httpClient = httpClient
|
||||||
httpClient: httpClient,
|
|
||||||
}
|
|
||||||
return client, client.adaptor, nil, nil
|
return client, client.adaptor, nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func AuthInfoInjector(_url *url.URL, urlMeta *commonv1.UrlMeta) error {
|
func Director(rawURL *url.URL, urlMeta *commonv1.UrlMeta) error {
|
||||||
auth, err := fetchAuthInfo(_url.Host, false)
|
// 1. fetch auth info from local user
|
||||||
|
auth, err := fetchAuthInfo(rawURL.Host, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
var authHdr string
|
||||||
|
if auth != "" {
|
||||||
|
authHdr = "Basic " + auth
|
||||||
|
}
|
||||||
|
|
||||||
|
path, tag, err := parseURL(rawURL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
urlMeta.Header[authHeader] = "Basic " + auth
|
ctx := context.TODO()
|
||||||
|
host := rawURL.Host
|
||||||
|
|
||||||
|
// 2. fetch token with auth info
|
||||||
|
tokenFetchURL := formatTokenURL(host, path)
|
||||||
|
token, err := client.fetchTokenWithHeader(ctx, authHdr, tokenFetchURL)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. fetch manifest digest, normal is sha256
|
||||||
|
digest, err := client.fetchManifest(ctx, host, token, path, tag)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. update unique blob digest in url
|
||||||
|
values := rawURL.Query()
|
||||||
|
values.Set(blobDigest, digest)
|
||||||
|
rawURL.RawQuery = values.Encode()
|
||||||
|
|
||||||
|
// 5. update digest for peer data check
|
||||||
|
urlMeta.Digest = digest
|
||||||
|
|
||||||
|
// 6. update token in header
|
||||||
|
urlMeta.Header[tokenHeader] = token
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -102,22 +143,40 @@ func (client *orasSourceClient) IsExpired(request *source.Request, info *source.
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *orasSourceClient) Download(request *source.Request) (*source.Response, error) {
|
func (client *orasSourceClient) Download(request *source.Request) (*source.Response, error) {
|
||||||
path, tag, err := parseURL(request, request.URL.Path)
|
ctx := request.Context()
|
||||||
|
host := request.URL.Host
|
||||||
|
|
||||||
|
path, tag, err := parseURL(request.URL.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
token, err := client.fetchToken(request, path)
|
var (
|
||||||
|
digest string
|
||||||
|
token string
|
||||||
|
)
|
||||||
|
|
||||||
|
// if there is blob sha256 and token, just goto fetch image
|
||||||
|
if digestQuery, ok1 := request.URL.Query()[blobDigest]; ok1 && len(digestQuery) > 0 {
|
||||||
|
if tokenHdr, ok2 := request.Header[tokenHeader]; ok2 && len(tokenHdr) > 0 {
|
||||||
|
digest = digestQuery[0]
|
||||||
|
token = tokenHdr[0]
|
||||||
|
goto fetch
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
token, err = client.fetchToken(request, path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sha256, err := client.fetchManifest(request, token, path, tag)
|
digest, err = client.fetchManifest(ctx, host, token, path, tag)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
imageFetchResponse, err := client.fetchImage(request, token, sha256, path, tag)
|
fetch:
|
||||||
|
imageFetchResponse, err := client.fetchImage(ctx, host, token, path, digest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -154,53 +213,61 @@ func fetchAuthInfo(host string, skipCheckExist bool) (string, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *orasSourceClient) fetchToken(request *source.Request, path string) (string, error) {
|
func (client *orasSourceClient) fetchToken(request *source.Request, path string) (string, error) {
|
||||||
var response *http.Response
|
tokenFetchURL := formatTokenURL(request.URL.Host, path)
|
||||||
var err error
|
|
||||||
tokenFetchURL := fmt.Sprintf("https://%s/service/token/?scope=repository:%s:pull&service=harbor-registry", request.URL.Host, path)
|
var authHeaderVal string
|
||||||
if authHeaderVal := request.Header.Get(authHeader); authHeaderVal != "" {
|
|
||||||
|
if authHeaderVal = request.Header.Get(authHeader); authHeaderVal != "" {
|
||||||
// remove the internal auth header
|
// remove the internal auth header
|
||||||
request.Header.Del(authHeader)
|
request.Header.Del(authHeader)
|
||||||
response, err = client.doRequest(request, jsonAcceptHeader, authHeaderVal, tokenFetchURL)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
} else if fileExists(os.Getenv("HOME") + configFilePath) {
|
} else if fileExists(os.Getenv("HOME") + configFilePath) {
|
||||||
var auth string
|
auth, err := fetchAuthInfo(request.URL.Host, true)
|
||||||
auth, err = fetchAuthInfo(request.URL.Host, true)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
authHeaderVal = "Basic " + auth
|
authHeaderVal = "Basic " + auth
|
||||||
response, err = client.doRequest(request, jsonAcceptHeader, authHeaderVal, tokenFetchURL)
|
}
|
||||||
if err != nil {
|
|
||||||
return "", err
|
token, err := client.fetchTokenWithHeader(request.Context(), authHeaderVal, tokenFetchURL)
|
||||||
}
|
if err != nil {
|
||||||
} else {
|
return "", err
|
||||||
response, err = client.doRequest(request, "", "", tokenFetchURL)
|
}
|
||||||
if err != nil {
|
logger.Info(fmt.Sprintf("fetching token for %s successfully", request.URL))
|
||||||
return "", err
|
return token, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (client *orasSourceClient) fetchTokenWithHeader(ctx context.Context, authHeaderVal, tokenFetchURL string) (string, error) {
|
||||||
|
// FIXME always jsonAcceptHeader ?
|
||||||
|
var acceptHeader string
|
||||||
|
if authHeaderVal != "" {
|
||||||
|
acceptHeader = jsonAcceptHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := client.doRequest(ctx, acceptHeader, authHeaderVal, tokenFetchURL)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
}
|
}
|
||||||
token, err := io.ReadAll(response.Body)
|
token, err := io.ReadAll(response.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
|
|
||||||
var tokenData map[string]interface{}
|
var tokenData map[string]interface{}
|
||||||
if err = json.Unmarshal(token, &tokenData); err != nil {
|
if err = json.Unmarshal(token, &tokenData); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
logger.Info(fmt.Sprintf("fetching token for %s successful", request.URL))
|
|
||||||
tokenVal := fmt.Sprintf("%v", tokenData["token"])
|
tokenVal := fmt.Sprintf("%v", tokenData["token"])
|
||||||
return tokenVal, nil
|
return tokenVal, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *orasSourceClient) fetchManifest(request *source.Request, accessToken, path, tag string) (string, error) {
|
func (client *orasSourceClient) fetchManifest(ctx context.Context, host, accessToken, path, tag string) (string, error) {
|
||||||
var sha string
|
var sha string
|
||||||
var blobLayers Manifest
|
var blobLayers Manifest
|
||||||
manifestFetchURL := fmt.Sprintf("https://%s/v2/%s/manifests/%s", request.URL.Host, path, tag)
|
manifestFetchURL := fmt.Sprintf("https://%s/v2/%s/manifests/%s", host, path, tag)
|
||||||
authHeaderVal := "Bearer " + accessToken
|
authHeaderVal := "Bearer " + accessToken
|
||||||
resp, err := client.doRequest(request, ociAcceptHeader, authHeaderVal, manifestFetchURL)
|
resp, err := client.doRequest(ctx, ociAcceptHeader, authHeaderVal, manifestFetchURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
|
@ -209,32 +276,32 @@ func (client *orasSourceClient) fetchManifest(request *source.Request, accessTok
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(manifest, &blobLayers); err != nil {
|
if err = json.Unmarshal(manifest, &blobLayers); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
for _, value := range blobLayers.Layers {
|
for _, value := range blobLayers.Layers {
|
||||||
sha = value.Digest
|
sha = value.Digest
|
||||||
}
|
}
|
||||||
if sha != "" {
|
if sha != "" {
|
||||||
logger.Info(fmt.Sprintf("fetching manifests for %s successful", request.URL))
|
logger.Info(fmt.Sprintf("fetching manifests for %s/%s:%s successfully", host, path, tag))
|
||||||
return sha, nil
|
return sha, nil
|
||||||
}
|
}
|
||||||
return "", errors.New("manifest is empty")
|
return "", errors.New("manifest is empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *orasSourceClient) fetchImage(request *source.Request, token, sha256, path, tag string) (*source.Response, error) {
|
func (client *orasSourceClient) fetchImage(ctx context.Context, host, token, path, sha256 string) (*source.Response, error) {
|
||||||
imageFetchURL := fmt.Sprintf("https://%s/v2/%s/blobs/%s", request.URL.Host, path, sha256)
|
imageFetchURL := fmt.Sprintf("https://%s/v2/%s/blobs/%s", host, path, sha256)
|
||||||
authHeaderVal := "Bearer " + token
|
authHeaderVal := "Bearer " + token
|
||||||
resp, err := client.doRequest(request, ociAcceptHeader, authHeaderVal, imageFetchURL)
|
resp, err := client.doRequest(ctx, ociAcceptHeader, authHeaderVal, imageFetchURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.New("failed to fetch image")
|
return nil, errors.New("failed to fetch image")
|
||||||
}
|
}
|
||||||
logger.Info(fmt.Sprintf("Fetched %s image successfully", request.URL))
|
logger.Info(fmt.Sprintf("Fetched image %s/%s with digest %s successfully", host, path, sha256))
|
||||||
return source.NewResponse(resp.Body), nil
|
return source.NewResponse(resp.Body), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (client *orasSourceClient) doRequest(request *source.Request, acceptHeaderVal, authHeaderVal, url string) (*http.Response, error) {
|
func (client *orasSourceClient) doRequest(ctx context.Context, acceptHeaderVal, authHeaderVal, url string) (*http.Response, error) {
|
||||||
req, err := http.NewRequestWithContext(request.Context(), http.MethodGet, url, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
@ -257,7 +324,7 @@ func fileExists(filepath string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseURL(request *source.Request, urlPath string) (string, string, error) {
|
func parseURL(urlPath string) (string, string, error) {
|
||||||
parseURLPattern, err := regexp.Compile("(.*):(.*)")
|
parseURLPattern, err := regexp.Compile("(.*):(.*)")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", "", err
|
return "", "", err
|
||||||
|
|
@ -274,3 +341,7 @@ func parseURL(request *source.Request, urlPath string) (string, string, error) {
|
||||||
func (client *orasSourceClient) GetLastModified(request *source.Request) (int64, error) {
|
func (client *orasSourceClient) GetLastModified(request *source.Request) (int64, error) {
|
||||||
panic("implement me")
|
panic("implement me")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func formatTokenURL(host, path string) string {
|
||||||
|
return fmt.Sprintf("https://%s/service/token/?scope=repository:%s:pull&service=harbor-registry", host, path)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ import (
|
||||||
var (
|
var (
|
||||||
resourceClientBuilder = map[string]ResourceClientBuilder{}
|
resourceClientBuilder = map[string]ResourceClientBuilder{}
|
||||||
resourceClientOptions = map[string]interface{}{}
|
resourceClientOptions = map[string]interface{}{}
|
||||||
resourceAuthInjector = map[string]AuthInfoInjector{}
|
resourceDirector = map[string]Director{}
|
||||||
)
|
)
|
||||||
|
|
||||||
// ResourceClientBuilder is used to build resource client with custom option
|
// ResourceClientBuilder is used to build resource client with custom option
|
||||||
|
|
@ -37,9 +37,11 @@ type ResourceClientBuilder interface {
|
||||||
Build(optionYaml []byte) (resourceClient ResourceClient, adaptor RequestAdapter, hooks []Hook, err error)
|
Build(optionYaml []byte) (resourceClient ResourceClient, adaptor RequestAdapter, hooks []Hook, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// AuthInfoInjector will inject auth information for target url and metadata, eg: fetch docker config for different users
|
// Director will handle request with some actions, like:
|
||||||
type AuthInfoInjector interface {
|
// 1. inject auth information for target url and metadata, eg: fetch docker config for different users
|
||||||
Inject(_url *url.URL, urlMeta *commonv1.UrlMeta) error
|
// 2. rewrite a common request into an unique request, eg: oras://harbor/user:latest to oras://harbor/user:lastest?digest=sha256:12345
|
||||||
|
type Director interface {
|
||||||
|
Direct(rawURL *url.URL, urlMeta *commonv1.UrlMeta) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterOption is used for extra options when registering, like mark target scheme protocol should inject auth information
|
// RegisterOption is used for extra options when registering, like mark target scheme protocol should inject auth information
|
||||||
|
|
@ -57,15 +59,15 @@ func RegisterBuilder(scheme string, builder ResourceClientBuilder, opts ...Regis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithAuthInfoInjector(inj AuthInfoInjector) RegisterOption {
|
func WithDirector(director Director) RegisterOption {
|
||||||
return func(scheme string) {
|
return func(scheme string) {
|
||||||
resourceAuthInjector[scheme] = inj
|
resourceDirector[scheme] = director
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ShouldInjectAuthInfo(scheme string) (AuthInfoInjector, bool) {
|
func HasDirector(scheme string) (Director, bool) {
|
||||||
inj, ok := resourceAuthInjector[scheme]
|
director, ok := resourceDirector[scheme]
|
||||||
return inj, ok
|
return director, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func UnRegisterBuilder(scheme string) {
|
func UnRegisterBuilder(scheme string) {
|
||||||
|
|
@ -116,15 +118,15 @@ func NewPlainResourceClientBuilder(
|
||||||
return &plainResourceClientBuilder{build: build}
|
return &plainResourceClientBuilder{build: build}
|
||||||
}
|
}
|
||||||
|
|
||||||
type plainAuthInfoInjector struct {
|
type plainDirector struct {
|
||||||
inject func(url *url.URL, urlMeta *commonv1.UrlMeta) error
|
direct func(url *url.URL, urlMeta *commonv1.UrlMeta) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *plainAuthInfoInjector) Inject(_url *url.URL, urlMeta *commonv1.UrlMeta) error {
|
func (a *plainDirector) Direct(rawURL *url.URL, urlMeta *commonv1.UrlMeta) error {
|
||||||
return a.inject(_url, urlMeta)
|
return a.direct(rawURL, urlMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPlainAuthInfoInjector(
|
func NewPlainDirector(
|
||||||
inject func(url *url.URL, urlMeta *commonv1.UrlMeta) error) AuthInfoInjector {
|
direct func(url *url.URL, urlMeta *commonv1.UrlMeta) error) Director {
|
||||||
return &plainAuthInfoInjector{inject: inject}
|
return &plainDirector{direct: direct}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue