(re-)Implement parallelism in deploy

This requires the input to be in the correct order (children first), but that was already an implied constraint.

In my testing, this takes our `.test/test.sh --deploy` block testing deploy from ~5s down to ~2.5s.

When we originally introduced this, it collided with some other aging infrastructure in a bad way, kicking over everything, and had to be reverted.  We've since changed that and this should be safe to re-introduce.
This commit is contained in:
Tianon Gravi 2025-01-23 11:09:07 -08:00
parent 4f22d05eff
commit 8b4e0f376a
5 changed files with 196 additions and 50 deletions

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"maps"
"github.com/docker-library/meta-scripts/registry"
@ -46,7 +47,14 @@ type inputNormalized struct {
Data []byte `json:"data"`
CopyFrom *registry.Reference `json:"copyFrom"`
Do func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) `json:"-"`
// if CopyFrom is nil and Type is manifest, this will be set (used by "do")
MediaType string `json:"mediaType,omitempty"`
}
func (normal inputNormalized) clone() inputNormalized {
// normal.Lookup is the only thing we have concurrency issues with, so it's the only thing we'll explicitly clone 😇
normal.Lookup = maps.Clone(normal.Lookup)
return normal
}
func normalizeInputRefs(deployType deployType, rawRefs []string) ([]registry.Reference, ociregistry.Digest, error) {
@ -222,6 +230,7 @@ func NormalizeInput(raw inputRaw) (inputNormalized, error) {
normal.Lookup[d] = ref
}
// front-load some validation / data extraction for "normal.do" to work
switch normal.Type {
case typeManifest:
if normal.CopyFrom == nil {
@ -240,27 +249,12 @@ func NormalizeInput(raw inputRaw) (inputNormalized, error) {
// and our logic for pushing children needs to know the mediaType (see the GHSAs referenced above)
return normal, fmt.Errorf("%s: pushing manifest but missing 'mediaType'", debugId)
}
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.EnsureManifest(ctx, dstRef, normal.Data, mediaTypeHaver.MediaType, normal.Lookup)
}
} else {
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.CopyManifest(ctx, *normal.CopyFrom, dstRef, normal.Lookup)
}
normal.MediaType = mediaTypeHaver.MediaType
}
case typeBlob:
if normal.CopyFrom == nil {
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.EnsureBlob(ctx, dstRef, int64(len(normal.Data)), bytes.NewReader(normal.Data))
}
} else {
if normal.CopyFrom.Digest == "" {
return normal, fmt.Errorf("%s: blobs are always by-digest, and thus need a digest: %s", debugId, normal.CopyFrom)
}
normal.Do = func(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
return registry.CopyBlob(ctx, *normal.CopyFrom, dstRef)
}
if normal.CopyFrom != nil && normal.CopyFrom.Digest == "" {
return normal, fmt.Errorf("%s: blobs are always by-digest, and thus need a digest: %s", debugId, normal.CopyFrom)
}
default:
@ -270,3 +264,27 @@ func NormalizeInput(raw inputRaw) (inputNormalized, error) {
return normal, nil
}
// WARNING: many of these codepaths will end up writing to "normal.Lookup", which because it's a map is passed by reference, so this method is *not* safe for concurrent invocation on a single "normal" object! see "normal.clone" (above)
func (normal inputNormalized) do(ctx context.Context, dstRef registry.Reference) (ociregistry.Descriptor, error) {
switch normal.Type {
case typeManifest:
if normal.CopyFrom == nil {
// TODO panic on bad data, like MediaType being empty?
return registry.EnsureManifest(ctx, dstRef, normal.Data, normal.MediaType, normal.Lookup)
} else {
return registry.CopyManifest(ctx, *normal.CopyFrom, dstRef, normal.Lookup)
}
case typeBlob:
if normal.CopyFrom == nil {
return registry.EnsureBlob(ctx, dstRef, int64(len(normal.Data)), bytes.NewReader(normal.Data))
} else {
return registry.CopyBlob(ctx, *normal.CopyFrom, dstRef)
}
default:
panic("unknown type: " + string(normal.Type))
// panic instead of error because this should've already been handled/normalized above (so this is a coding error, not a runtime error)
}
}

View File

@ -281,7 +281,7 @@ func TestNormalizeInput(t *testing.T) {
"refs": [ "localhost:5000/example:test" ],
"data": {"mediaType": "application/vnd.oci.image.index.v1+json"}
}`,
`{"type":"manifest","refs":["localhost:5000/example:test@sha256:0ae6b7b9d0bc73ee36c1adef005deb431e94cf009c6a947718b31da3d668032d"],"data":"eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIn0=","copyFrom":null}`,
`{"type":"manifest","refs":["localhost:5000/example:test@sha256:0ae6b7b9d0bc73ee36c1adef005deb431e94cf009c6a947718b31da3d668032d"],"data":"eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIn0=","copyFrom":null,"mediaType":"application/vnd.oci.image.index.v1+json"}`,
},
{
"manifest raw",
@ -290,7 +290,7 @@ func TestNormalizeInput(t *testing.T) {
"refs": [ "localhost:5000/example" ],
"data": "eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIn0="
}`,
`{"type":"manifest","refs":["localhost:5000/example@sha256:0ae6b7b9d0bc73ee36c1adef005deb431e94cf009c6a947718b31da3d668032d"],"data":"eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIn0=","copyFrom":null}`,
`{"type":"manifest","refs":["localhost:5000/example@sha256:0ae6b7b9d0bc73ee36c1adef005deb431e94cf009c6a947718b31da3d668032d"],"data":"eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIn0=","copyFrom":null,"mediaType":"application/vnd.oci.image.index.v1+json"}`,
},
{
@ -301,7 +301,7 @@ func TestNormalizeInput(t *testing.T) {
"lookup": { "sha256:9ef42f1d602fb423fad935aac1caa0cfdbce1ad7edce64d080a4eb7b13f7cd9d": "tianon/true" },
"data": {"mediaType": "application/vnd.oci.image.index.v1+json","manifests":[{"mediaType":"application/vnd.oci.image.manifest.v1+json","digest":"sha256:9ef42f1d602fb423fad935aac1caa0cfdbce1ad7edce64d080a4eb7b13f7cd9d","size":1165}],"schemaVersion":2}
}`,
`{"type":"manifest","refs":["localhost:5000/example:test@sha256:0cb474919526d040392883b84e5babb65a149cc605b89b117781ab94e88a5e86"],"lookup":{"sha256:9ef42f1d602fb423fad935aac1caa0cfdbce1ad7edce64d080a4eb7b13f7cd9d":"tianon/true"},"data":"eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIiwibWFuaWZlc3RzIjpbeyJtZWRpYVR5cGUiOiJhcHBsaWNhdGlvbi92bmQub2NpLmltYWdlLm1hbmlmZXN0LnYxK2pzb24iLCJkaWdlc3QiOiJzaGEyNTY6OWVmNDJmMWQ2MDJmYjQyM2ZhZDkzNWFhYzFjYWEwY2ZkYmNlMWFkN2VkY2U2NGQwODBhNGViN2IxM2Y3Y2Q5ZCIsInNpemUiOjExNjV9XSwic2NoZW1hVmVyc2lvbiI6Mn0=","copyFrom":null}`,
`{"type":"manifest","refs":["localhost:5000/example:test@sha256:0cb474919526d040392883b84e5babb65a149cc605b89b117781ab94e88a5e86"],"lookup":{"sha256:9ef42f1d602fb423fad935aac1caa0cfdbce1ad7edce64d080a4eb7b13f7cd9d":"tianon/true"},"data":"eyJtZWRpYVR5cGUiOiAiYXBwbGljYXRpb24vdm5kLm9jaS5pbWFnZS5pbmRleC52MStqc29uIiwibWFuaWZlc3RzIjpbeyJtZWRpYVR5cGUiOiJhcHBsaWNhdGlvbi92bmQub2NpLmltYWdlLm1hbmlmZXN0LnYxK2pzb24iLCJkaWdlc3QiOiJzaGEyNTY6OWVmNDJmMWQ2MDJmYjQyM2ZhZDkzNWFhYzFjYWEwY2ZkYmNlMWFkN2VkY2U2NGQwODBhNGViN2IxM2Y3Y2Q5ZCIsInNpemUiOjExNjV9XSwic2NoZW1hVmVyc2lvbiI6Mn0=","copyFrom":null,"mediaType":"application/vnd.oci.image.index.v1+json"}`,
},
{
"image",
@ -311,7 +311,7 @@ func TestNormalizeInput(t *testing.T) {
"lookup": { "": "tianon/true" },
"data": {"schemaVersion":2,"mediaType":"application/vnd.docker.distribution.manifest.v2+json","config":{"mediaType":"application/vnd.docker.container.image.v1+json","size":1471,"digest":"sha256:690912094c0165c489f874c72cee4ba208c28992c0699fa6e10d8cc59f93fec9"},"layers":[{"mediaType":"application/vnd.docker.image.rootfs.diff.tar.gzip","size":129,"digest":"sha256:4c74d744397d4bcbd3079d9c82a87b80d43da376313772978134d1288f20518c"}]}
}`,
`{"type":"manifest","refs":["localhost:5000/example@sha256:1c70f9d471b83100c45d5a218d45bbf7e073e11ea5043758a020379a7c78f878"],"lookup":{"":"tianon/true"},"data":"eyJzY2hlbWFWZXJzaW9uIjoyLCJtZWRpYVR5cGUiOiJhcHBsaWNhdGlvbi92bmQuZG9ja2VyLmRpc3RyaWJ1dGlvbi5tYW5pZmVzdC52Mitqc29uIiwiY29uZmlnIjp7Im1lZGlhVHlwZSI6ImFwcGxpY2F0aW9uL3ZuZC5kb2NrZXIuY29udGFpbmVyLmltYWdlLnYxK2pzb24iLCJzaXplIjoxNDcxLCJkaWdlc3QiOiJzaGEyNTY6NjkwOTEyMDk0YzAxNjVjNDg5Zjg3NGM3MmNlZTRiYTIwOGMyODk5MmMwNjk5ZmE2ZTEwZDhjYzU5ZjkzZmVjOSJ9LCJsYXllcnMiOlt7Im1lZGlhVHlwZSI6ImFwcGxpY2F0aW9uL3ZuZC5kb2NrZXIuaW1hZ2Uucm9vdGZzLmRpZmYudGFyLmd6aXAiLCJzaXplIjoxMjksImRpZ2VzdCI6InNoYTI1Njo0Yzc0ZDc0NDM5N2Q0YmNiZDMwNzlkOWM4MmE4N2I4MGQ0M2RhMzc2MzEzNzcyOTc4MTM0ZDEyODhmMjA1MThjIn1dfQ==","copyFrom":null}`,
`{"type":"manifest","refs":["localhost:5000/example@sha256:1c70f9d471b83100c45d5a218d45bbf7e073e11ea5043758a020379a7c78f878"],"lookup":{"":"tianon/true"},"data":"eyJzY2hlbWFWZXJzaW9uIjoyLCJtZWRpYVR5cGUiOiJhcHBsaWNhdGlvbi92bmQuZG9ja2VyLmRpc3RyaWJ1dGlvbi5tYW5pZmVzdC52Mitqc29uIiwiY29uZmlnIjp7Im1lZGlhVHlwZSI6ImFwcGxpY2F0aW9uL3ZuZC5kb2NrZXIuY29udGFpbmVyLmltYWdlLnYxK2pzb24iLCJzaXplIjoxNDcxLCJkaWdlc3QiOiJzaGEyNTY6NjkwOTEyMDk0YzAxNjVjNDg5Zjg3NGM3MmNlZTRiYTIwOGMyODk5MmMwNjk5ZmE2ZTEwZDhjYzU5ZjkzZmVjOSJ9LCJsYXllcnMiOlt7Im1lZGlhVHlwZSI6ImFwcGxpY2F0aW9uL3ZuZC5kb2NrZXIuaW1hZ2Uucm9vdGZzLmRpZmYudGFyLmd6aXAiLCJzaXplIjoxMjksImRpZ2VzdCI6InNoYTI1Njo0Yzc0ZDc0NDM5N2Q0YmNiZDMwNzlkOWM4MmE4N2I4MGQ0M2RhMzc2MzEzNzcyOTc4MTM0ZDEyODhmMjA1MThjIn1dfQ==","copyFrom":null,"mediaType":"application/vnd.docker.distribution.manifest.v2+json"}`,
},
{

View File

@ -7,6 +7,11 @@ import (
"os"
"os/exec"
"os/signal"
"sync"
"github.com/docker-library/meta-scripts/registry"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func main() {
@ -32,6 +37,11 @@ func main() {
panic(err)
}
// a set of RWMutex objects for synchronizing the pushing of "child" objects before their parents later in the list of documents
// for every RWMutex, it will be *write*-locked during push, and *read*-locked during reading (which means we won't limit the parallelization of multiple parents after a given child is pushed, but we will stop parents from being pushed before their children)
childMutexes := sync.Map{}
wg := sync.WaitGroup{}
dec := json.NewDecoder(stdout)
for dec.More() {
var raw inputRaw
@ -48,26 +58,128 @@ func main() {
}
refsDigest := normal.Refs[0].Digest
if normal.CopyFrom == nil {
fmt.Printf("Pushing %s %s:\n", raw.Type, refsDigest)
var logSuffix string = " (" + string(raw.Type) + ") "
if normal.CopyFrom != nil {
// normal copy (one repo/registry to another)
logSuffix = " 🤝" + logSuffix + normal.CopyFrom.String()
// "localhost:32774/test 🤝 (manifest) tianon/test@sha256:4077658bc7e39f02f81d1682fe49f66b3db2c420813e43f5db0c53046167c12f"
} else {
fmt.Printf("Copying %s %s:\n", raw.Type, *normal.CopyFrom)
// push (raw/embedded blob or manifest data)
logSuffix = " 🦾" + logSuffix + string(refsDigest)
// "localhost:32774/test 🦾 (blob) sha256:1a51828d59323e0e02522c45652b6a7a44a032b464b06d574f067d2358b0e9f1"
}
startedPrefix := "❔ "
successPrefix := "✅ "
failurePrefix := "❌ "
// locks are per-digest, but refs might be 20 tags on the same digest, so we need to get one write lock per repo@digest and release it when the first tag completes, and every other tag needs a read lock
seenRefs := map[string]bool{}
for _, ref := range normal.Refs {
fmt.Printf(" - %s", ref.StringWithKnownDigest(refsDigest))
desc, err := normal.Do(ctx, ref)
if err != nil {
fmt.Fprintf(os.Stderr, " -- ERROR: %v\n", err)
os.Exit(1)
return
}
if ref.Digest == "" && refsDigest == "" {
fmt.Printf("@%s", desc.Digest)
}
fmt.Println()
}
ref := ref // https://github.com/golang/go/issues/60078
fmt.Println()
necessaryReadLockRefs := []registry.Reference{}
// before parallelization, collect the pushing "child" mutex we need to lock for writing right away (but only for the first entry)
var mutex *sync.RWMutex
if ref.Digest != "" {
lockRef := ref
lockRef.Tag = ""
lockRefStr := lockRef.String()
if seenRefs[lockRefStr] {
// if we've already seen this specific ref for this input, we need a read lock, not a write lock (since they're per-repo@digest)
necessaryReadLockRefs = append(necessaryReadLockRefs, lockRef)
} else {
seenRefs[lockRefStr] = true
lock, _ := childMutexes.LoadOrStore(lockRefStr, &sync.RWMutex{})
mutex = lock.(*sync.RWMutex)
// if we have a "child" mutex, lock it immediately so we don't create a race between inputs
mutex.Lock() // (this gets unlocked in the goroutine below)
// this is sane to lock here because interdependent inputs are required to be in-order (children first), so if this hangs it's 100% a bug in the input order
}
}
// make a (deep) copy of "normal" so that we can use it in a goroutine ("normal.do" is not safe for concurrent invocation)
normal := normal.clone()
wg.Add(1)
go func() {
defer wg.Done()
if mutex != nil {
defer mutex.Unlock()
}
// before we start this job (parallelized), if it's a raw data job we need to parse the raw data and see if any of the "children" are objects we're still in the process of pushing (from a previously parallel job)
if len(normal.Data) > 2 { // needs to at least be bigger than "{}" for us to care (anything else either doesn't have data or can't have children)
// explicitly ignoring errors because this might not actually be JSON (or even a manifest at all!); this is best-effort
// TODO optimize this by checking whether normal.Data matches "^\s*{.+}\s*$" first so we have some assurance it might work before we go further?
manifestChildren, _ := registry.ParseManifestChildren(normal.Data)
childDescs := []ocispec.Descriptor{}
childDescs = append(childDescs, manifestChildren.Manifests...)
if manifestChildren.Config != nil {
childDescs = append(childDescs, *manifestChildren.Config)
}
childDescs = append(childDescs, manifestChildren.Layers...)
for _, childDesc := range childDescs {
childRef := ref
childRef.Digest = childDesc.Digest
necessaryReadLockRefs = append(necessaryReadLockRefs, childRef)
// these read locks are cheap, so let's be aggressive with our "lookup" refs too
if lookupRef, ok := normal.Lookup[childDesc.Digest]; ok {
lookupRef.Digest = childDesc.Digest
necessaryReadLockRefs = append(necessaryReadLockRefs, lookupRef)
}
if fallbackRef, ok := normal.Lookup[""]; ok {
fallbackRef.Digest = childDesc.Digest
necessaryReadLockRefs = append(necessaryReadLockRefs, fallbackRef)
}
}
}
// we don't *know* that all the lookup references are children, but if any of them have an explicit digest, let's treat them as potential children too (which is fair, because they *are* explicit potential references that it's sane to make sure exist)
for digest, lookupRef := range normal.Lookup {
necessaryReadLockRefs = append(necessaryReadLockRefs, lookupRef)
if digest != lookupRef.Digest {
lookupRef.Digest = digest
necessaryReadLockRefs = append(necessaryReadLockRefs, lookupRef)
}
}
// if we're going to do a copy, we need to *also* include the artifact we're copying in our list
if normal.CopyFrom != nil {
necessaryReadLockRefs = append(necessaryReadLockRefs, *normal.CopyFrom)
}
// ok, we've built up a list, let's start grabbing (ro) mutexes
seenChildren := map[string]bool{}
for _, lockRef := range necessaryReadLockRefs {
lockRef.Tag = ""
if lockRef.Digest == "" {
continue
}
lockRefStr := lockRef.String()
if seenChildren[lockRefStr] {
continue
}
seenChildren[lockRefStr] = true
lock, _ := childMutexes.LoadOrStore(lockRefStr, &sync.RWMutex{})
lock.(*sync.RWMutex).RLock()
defer lock.(*sync.RWMutex).RUnlock()
}
logText := ref.StringWithKnownDigest(refsDigest) + logSuffix
fmt.Println(startedPrefix + logText)
desc, err := normal.do(ctx, ref)
if err != nil {
fmt.Fprintf(os.Stderr, "%s%s -- ERROR: %v\n", failurePrefix, logText, err)
panic(err) // TODO exit in a more clean way (we can't use "os.Exit" because that causes *more* errors 😭)
}
if ref.Digest == "" && refsDigest == "" {
logText += "@" + string(desc.Digest)
}
fmt.Println(successPrefix + logText)
}()
}
}
wg.Wait()
}

View File

@ -0,0 +1,25 @@
package registry
import (
"encoding/json"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
type ManifestChildren struct {
// *technically* this should be two separate structs chosen based on mediaType (https://github.com/opencontainers/distribution-spec/security/advisories/GHSA-mc8v-mgrf-8f4m), but that makes the code a lot more annoying when we're just collecting a list of potential children we need to copy over for the parent object to push successfully
// intentional subset of https://github.com/opencontainers/image-spec/blob/v1.1.0/specs-go/v1/index.go#L21 to minimize parsing
Manifests []ocispec.Descriptor `json:"manifests"`
// intentional subset of https://github.com/opencontainers/image-spec/blob/v1.1.0/specs-go/v1/manifest.go#L20 to minimize parsing
Config *ocispec.Descriptor `json:"config"` // have to turn this into a pointer so we can recognize when it's not set easier / more correctly
Layers []ocispec.Descriptor `json:"layers"`
}
// opportunistically parse a given manifest for any *potential* child objects; will return JSON parsing errors for non-JSON
func ParseManifestChildren(manifest []byte) (ManifestChildren, error) {
var manifestChildren ManifestChildren
err := json.Unmarshal(manifest, &manifestChildren)
return manifestChildren, err
}

View File

@ -74,17 +74,8 @@ func EnsureManifest(ctx context.Context, ref Reference, manifest json.RawMessage
errors.Is(err, ociregistry.ErrBlobUnknown) ||
(errors.As(err, &httpErr) && httpErr.StatusCode() >= 400 && httpErr.StatusCode() <= 499) {
// this probably means we need to push some child manifests and/or mount missing blobs (and then retry the manifest push)
var manifestChildren struct {
// *technically* this should be two separate structs chosen based on mediaType (https://github.com/opencontainers/distribution-spec/security/advisories/GHSA-mc8v-mgrf-8f4m), but that makes the code a lot more annoying when we're just collecting a list of potential children we need to copy over for the parent object to push successfully
// intentional subset of https://github.com/opencontainers/image-spec/blob/v1.1.0/specs-go/v1/index.go#L21 to minimize parsing
Manifests []ocispec.Descriptor `json:"manifests"`
// intentional subset of https://github.com/opencontainers/image-spec/blob/v1.1.0/specs-go/v1/manifest.go#L20 to minimize parsing
Config *ocispec.Descriptor `json:"config"` // have to turn this into a pointer so we can recognize when it's not set easier / more correctly
Layers []ocispec.Descriptor `json:"layers"`
}
if err := json.Unmarshal(manifest, &manifestChildren); err != nil {
manifestChildren, err := ParseManifestChildren(manifest)
if err != nil {
return desc, fmt.Errorf("%s: failed parsing manifest JSON: %w", ref, err)
}