mirror of https://github.com/docker/buildx.git
575 lines
12 KiB
Go
575 lines
12 KiB
Go
package dap
|
|
|
|
import (
|
|
"context"
|
|
"path/filepath"
|
|
"slices"
|
|
"sync"
|
|
|
|
"github.com/docker/buildx/build"
|
|
"github.com/docker/buildx/dap/common"
|
|
"github.com/google/go-dap"
|
|
"github.com/moby/buildkit/client/llb"
|
|
gateway "github.com/moby/buildkit/frontend/gateway/client"
|
|
"github.com/moby/buildkit/solver/errdefs"
|
|
"github.com/moby/buildkit/solver/pb"
|
|
"github.com/opencontainers/go-digest"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
type thread struct {
|
|
// Persistent data.
|
|
id int
|
|
name string
|
|
|
|
// Persistent state from the adapter.
|
|
idPool *idPool
|
|
sourceMap *sourceMap
|
|
breakpointMap *breakpointMap
|
|
variables *variableReferences
|
|
|
|
// Inputs to the evaluate call.
|
|
c gateway.Client
|
|
ref gateway.Reference
|
|
meta map[string][]byte
|
|
sourcePath string
|
|
|
|
// LLB state for the evaluate call.
|
|
def *llb.Definition
|
|
ops map[digest.Digest]*pb.Op
|
|
head digest.Digest
|
|
bps map[digest.Digest]int
|
|
|
|
// Runtime state for the evaluate call.
|
|
regions []*region
|
|
regionsByDigest map[digest.Digest]int
|
|
|
|
// Controls pause.
|
|
paused chan stepType
|
|
mu sync.Mutex
|
|
|
|
// Attributes set when a thread is paused.
|
|
rCtx *build.ResultHandle
|
|
curPos digest.Digest
|
|
stackTrace []int32
|
|
frames map[int32]*frame
|
|
}
|
|
|
|
type region struct {
|
|
// dependsOn means this thread depends on the result of another thread.
|
|
dependsOn map[int]struct{}
|
|
|
|
// digests is a set of digests associated with this thread.
|
|
digests []digest.Digest
|
|
}
|
|
|
|
type stepType int
|
|
|
|
const (
|
|
stepContinue stepType = iota
|
|
stepNext
|
|
)
|
|
|
|
func (t *thread) Evaluate(ctx Context, c gateway.Client, ref gateway.Reference, meta map[string][]byte, inputs build.Inputs, cfg common.Config) error {
|
|
if err := t.init(ctx, c, ref, meta, inputs); err != nil {
|
|
return err
|
|
}
|
|
defer t.reset()
|
|
|
|
step := stepContinue
|
|
if cfg.StopOnEntry {
|
|
step = stepNext
|
|
}
|
|
|
|
for {
|
|
if step == stepContinue {
|
|
t.setBreakpoints(ctx)
|
|
}
|
|
ref, pos, err := t.seekNext(ctx, step)
|
|
|
|
event := t.needsDebug(pos, step, err)
|
|
if event.Reason == "" {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case step = <-t.pause(ctx, ref, err, event):
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case <-ctx.Done():
|
|
return context.Cause(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *thread) init(ctx Context, c gateway.Client, ref gateway.Reference, meta map[string][]byte, inputs build.Inputs) error {
|
|
t.c = c
|
|
t.ref = ref
|
|
t.meta = meta
|
|
t.sourcePath = inputs.ContextPath
|
|
|
|
if err := t.getLLBState(ctx); err != nil {
|
|
return err
|
|
}
|
|
return t.createRegions()
|
|
}
|
|
|
|
func (t *thread) reset() {
|
|
t.c = nil
|
|
t.ref = nil
|
|
t.meta = nil
|
|
t.sourcePath = ""
|
|
t.ops = nil
|
|
}
|
|
|
|
func (t *thread) needsDebug(target digest.Digest, step stepType, err error) (e dap.StoppedEventBody) {
|
|
if err != nil {
|
|
e.Reason = "exception"
|
|
e.Description = "Encountered an error during result evaluation"
|
|
} else if step == stepNext && target != "" {
|
|
e.Reason = "step"
|
|
} else if step == stepContinue {
|
|
if id, ok := t.bps[target]; ok {
|
|
e.Reason = "breakpoint"
|
|
e.Description = "Paused on breakpoint"
|
|
e.HitBreakpointIds = []int{id}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (t *thread) pause(c Context, ref gateway.Reference, err error, event dap.StoppedEventBody) <-chan stepType {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if t.paused != nil {
|
|
return t.paused
|
|
}
|
|
|
|
t.paused = make(chan stepType, 1)
|
|
t.rCtx = build.NewResultHandle(c, t.c, ref, t.meta, err)
|
|
if err != nil {
|
|
var solveErr *errdefs.SolveError
|
|
if errors.As(err, &solveErr) {
|
|
if dt, err := solveErr.Op.MarshalVT(); err == nil {
|
|
t.curPos = digest.FromBytes(dt)
|
|
}
|
|
}
|
|
}
|
|
t.collectStackTrace()
|
|
|
|
event.ThreadId = t.id
|
|
c.C() <- &dap.StoppedEvent{
|
|
Event: dap.Event{Event: "stopped"},
|
|
Body: event,
|
|
}
|
|
return t.paused
|
|
}
|
|
|
|
func (t *thread) Continue() {
|
|
t.resume(stepContinue)
|
|
}
|
|
|
|
func (t *thread) Next() {
|
|
t.resume(stepNext)
|
|
}
|
|
|
|
func (t *thread) resume(step stepType) {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if t.paused == nil {
|
|
return
|
|
}
|
|
t.releaseState()
|
|
|
|
t.paused <- step
|
|
close(t.paused)
|
|
t.paused = nil
|
|
}
|
|
|
|
func (t *thread) isPaused() bool {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
return t.paused != nil
|
|
}
|
|
|
|
func (t *thread) StackTrace() []dap.StackFrame {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if t.paused == nil {
|
|
// Cannot compute stack trace when not paused.
|
|
// This should never happen, but protect ourself in
|
|
// case it does.
|
|
return []dap.StackFrame{}
|
|
}
|
|
|
|
frames := make([]dap.StackFrame, len(t.stackTrace))
|
|
for i, id := range t.stackTrace {
|
|
frames[i] = t.frames[id].StackFrame
|
|
}
|
|
return frames
|
|
}
|
|
|
|
func (t *thread) Scopes(frameID int) []dap.Scope {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
frame := t.frames[int32(frameID)]
|
|
return frame.Scopes()
|
|
}
|
|
|
|
func (t *thread) Variables(id int) []dap.Variable {
|
|
return t.variables.Get(id)
|
|
}
|
|
|
|
func (t *thread) getLLBState(ctx Context) error {
|
|
st, err := t.ref.ToState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
t.def, err = st.Marshal(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, src := range t.def.Source.Infos {
|
|
fname := filepath.Join(t.sourcePath, src.Filename)
|
|
t.sourceMap.Put(ctx, fname, src.Data)
|
|
}
|
|
|
|
t.ops = make(map[digest.Digest]*pb.Op, len(t.def.Def))
|
|
for _, dt := range t.def.Def {
|
|
dgst := digest.FromBytes(dt)
|
|
|
|
var op pb.Op
|
|
if err := op.UnmarshalVT(dt); err != nil {
|
|
return err
|
|
}
|
|
t.ops[dgst] = &op
|
|
}
|
|
|
|
t.head, err = t.def.Head()
|
|
return err
|
|
}
|
|
|
|
func (t *thread) setBreakpoints(ctx Context) {
|
|
t.bps = t.breakpointMap.Intersect(ctx, t.def.Source, t.sourcePath)
|
|
}
|
|
|
|
func (t *thread) findBacklinks() map[digest.Digest]map[digest.Digest]struct{} {
|
|
backlinks := make(map[digest.Digest]map[digest.Digest]struct{})
|
|
for dgst := range t.ops {
|
|
backlinks[dgst] = make(map[digest.Digest]struct{})
|
|
}
|
|
|
|
for dgst, op := range t.ops {
|
|
for _, inp := range op.Inputs {
|
|
if digest.Digest(inp.Digest) == t.head {
|
|
continue
|
|
}
|
|
backlinks[digest.Digest(inp.Digest)][dgst] = struct{}{}
|
|
}
|
|
}
|
|
return backlinks
|
|
}
|
|
|
|
func (t *thread) createRegions() error {
|
|
// Find the links going from inputs to their outputs.
|
|
// This isn't represented in the LLB graph but we need it to ensure
|
|
// an op only has one child and whether we are allowed to visit a node.
|
|
backlinks := t.findBacklinks()
|
|
|
|
// Create distinct regions whenever we have any branch (inputs or outputs).
|
|
t.regions = []*region{}
|
|
t.regionsByDigest = map[digest.Digest]int{}
|
|
|
|
determineRegion := func(dgst digest.Digest, children map[digest.Digest]struct{}) {
|
|
if len(children) == 1 {
|
|
var cDgst digest.Digest
|
|
for d := range children {
|
|
cDgst = d
|
|
}
|
|
childOp := t.ops[cDgst]
|
|
|
|
if len(childOp.Inputs) == 1 {
|
|
// We have one child and our child has one input so we can be merged
|
|
// into the same region as our child.
|
|
region := t.regionsByDigest[cDgst]
|
|
t.regions[region].digests = append(t.regions[region].digests, dgst)
|
|
t.regionsByDigest[dgst] = region
|
|
return
|
|
}
|
|
}
|
|
|
|
// We will require a new region for this digest because
|
|
// we weren't able to merge it in within the existing regions.
|
|
next := len(t.regions)
|
|
t.regions = append(t.regions, ®ion{
|
|
digests: []digest.Digest{dgst},
|
|
dependsOn: make(map[int]struct{}),
|
|
})
|
|
t.regionsByDigest[dgst] = next
|
|
|
|
// Mark each child as depending on this new region.
|
|
for child := range children {
|
|
region := t.regionsByDigest[child]
|
|
t.regions[region].dependsOn[next] = struct{}{}
|
|
}
|
|
}
|
|
|
|
canVisit := func(dgst digest.Digest) bool {
|
|
for dgst := range backlinks[dgst] {
|
|
if _, ok := t.regionsByDigest[dgst]; !ok {
|
|
// One of our outputs has not been categorized.
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
unvisited := []digest.Digest{t.head}
|
|
for len(unvisited) > 0 {
|
|
dgst := pop(&unvisited)
|
|
op := t.ops[dgst]
|
|
|
|
children := backlinks[dgst]
|
|
determineRegion(dgst, children)
|
|
|
|
// Determine which inputs we can now visit.
|
|
for _, inp := range op.Inputs {
|
|
indgst := digest.Digest(inp.Digest)
|
|
if canVisit(indgst) {
|
|
unvisited = append(unvisited, indgst)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reverse each of the digests so dependencies are first.
|
|
// It is currently in reverse topological order and it needs to be in
|
|
// topological order.
|
|
for _, r := range t.regions {
|
|
slices.Reverse(r.digests)
|
|
}
|
|
t.propagateRegionDependencies()
|
|
return nil
|
|
}
|
|
|
|
// propagateRegionDependencies will propagate the dependsOn attribute between
|
|
// different regions to make dependency lookups easier. If A depends on B
|
|
// and B depends on C, then A depends on C. But the algorithm before this will only
|
|
// record direct dependencies.
|
|
func (t *thread) propagateRegionDependencies() {
|
|
for _, r := range t.regions {
|
|
for {
|
|
n := len(r.dependsOn)
|
|
for i := range r.dependsOn {
|
|
for j := range t.regions[i].dependsOn {
|
|
r.dependsOn[j] = struct{}{}
|
|
}
|
|
}
|
|
|
|
if n == len(r.dependsOn) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *thread) seekNext(ctx Context, step stepType) (gateway.Reference, digest.Digest, error) {
|
|
// If we're at the end, return no digest to signal that
|
|
// we should conclude debugging.
|
|
if t.curPos == t.head {
|
|
return nil, "", nil
|
|
}
|
|
|
|
target := t.head
|
|
switch step {
|
|
case stepNext:
|
|
target = t.nextDigest(nil)
|
|
case stepContinue:
|
|
target = t.continueDigest()
|
|
}
|
|
|
|
if target == "" {
|
|
return nil, "", nil
|
|
}
|
|
return t.seek(ctx, target)
|
|
}
|
|
|
|
func (t *thread) seek(ctx Context, target digest.Digest) (gateway.Reference, digest.Digest, error) {
|
|
ref, err := t.solve(ctx, target)
|
|
if err != nil {
|
|
return ref, "", err
|
|
}
|
|
|
|
if err = ref.Evaluate(ctx); err != nil {
|
|
var solveErr *errdefs.SolveError
|
|
if errors.As(err, &solveErr) {
|
|
if dt, err := solveErr.Op.MarshalVT(); err == nil {
|
|
t.curPos = digest.FromBytes(dt)
|
|
}
|
|
} else {
|
|
t.curPos = ""
|
|
}
|
|
} else {
|
|
t.curPos = target
|
|
}
|
|
return ref, t.curPos, err
|
|
}
|
|
|
|
func (t *thread) nextDigest(fn func(digest.Digest) bool) digest.Digest {
|
|
isValid := func(dgst digest.Digest) bool {
|
|
// Skip this digest because it has no locations in the source file.
|
|
if loc, ok := t.def.Source.Locations[string(dgst)]; !ok || len(loc.Locations) == 0 {
|
|
return false
|
|
}
|
|
|
|
// If a custom function has been set for validation, use it.
|
|
return fn == nil || fn(dgst)
|
|
}
|
|
|
|
// If we have no position, automatically select the first step.
|
|
if t.curPos == "" {
|
|
r := t.regions[len(t.regions)-1]
|
|
if isValid(r.digests[0]) {
|
|
return r.digests[0]
|
|
}
|
|
|
|
// We cannot use the first position. Treat the first position as our
|
|
// current position so we can iterate.
|
|
t.curPos = r.digests[0]
|
|
}
|
|
|
|
// Look up the region associated with our current position.
|
|
// If we can't find it, just pretend we're using step continue.
|
|
region, ok := t.regionsByDigest[t.curPos]
|
|
if !ok {
|
|
return t.head
|
|
}
|
|
|
|
r := t.regions[region]
|
|
i := slices.Index(r.digests, t.curPos) + 1
|
|
|
|
for {
|
|
if i >= len(r.digests) {
|
|
if region <= 0 {
|
|
// We're at the end of our execution. Should have been caught by
|
|
// t.head == t.curPos.
|
|
return ""
|
|
}
|
|
region--
|
|
|
|
r = t.regions[region]
|
|
i = 0
|
|
continue
|
|
}
|
|
|
|
next := r.digests[i]
|
|
if !isValid(next) {
|
|
i++
|
|
continue
|
|
}
|
|
return next
|
|
}
|
|
}
|
|
|
|
func (t *thread) continueDigest() digest.Digest {
|
|
if len(t.bps) == 0 {
|
|
return t.head
|
|
}
|
|
|
|
isValid := func(dgst digest.Digest) bool {
|
|
_, ok := t.bps[dgst]
|
|
return ok
|
|
}
|
|
return t.nextDigest(isValid)
|
|
}
|
|
|
|
func (t *thread) solve(ctx context.Context, target digest.Digest) (gateway.Reference, error) {
|
|
if target == t.head {
|
|
return t.ref, nil
|
|
}
|
|
|
|
head := &pb.Op{
|
|
Inputs: []*pb.Input{{Digest: string(target)}},
|
|
}
|
|
dt, err := head.MarshalVT()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
def := t.def.ToPB()
|
|
def.Def[len(def.Def)-1] = dt
|
|
|
|
res, err := t.c.Solve(ctx, gateway.SolveRequest{
|
|
Definition: def,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return res.SingleRef()
|
|
}
|
|
|
|
func (t *thread) releaseState() {
|
|
if t.rCtx != nil {
|
|
t.rCtx.Done()
|
|
t.rCtx = nil
|
|
}
|
|
t.stackTrace = nil
|
|
t.frames = nil
|
|
}
|
|
|
|
func (t *thread) collectStackTrace() {
|
|
region := t.regionsByDigest[t.curPos]
|
|
r := t.regions[region]
|
|
|
|
digests := r.digests
|
|
if index := slices.Index(digests, t.curPos); index >= 0 {
|
|
digests = digests[:index+1]
|
|
}
|
|
|
|
t.frames = make(map[int32]*frame)
|
|
for i := len(digests) - 1; i >= 0; i-- {
|
|
dgst := digests[i]
|
|
|
|
frame := &frame{}
|
|
frame.Id = int(t.idPool.Get())
|
|
|
|
if meta, ok := t.def.Metadata[dgst]; ok {
|
|
frame.setNameFromMeta(meta)
|
|
}
|
|
if loc, ok := t.def.Source.Locations[string(dgst)]; ok {
|
|
frame.fillLocation(t.def, loc, t.sourcePath)
|
|
}
|
|
|
|
if op := t.ops[dgst]; op != nil {
|
|
frame.fillVarsFromOp(op, t.variables)
|
|
}
|
|
t.stackTrace = append(t.stackTrace, int32(frame.Id))
|
|
t.frames[int32(frame.Id)] = frame
|
|
}
|
|
}
|
|
|
|
func (t *thread) hasFrame(id int) bool {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if t.paused == nil {
|
|
return false
|
|
}
|
|
|
|
_, ok := t.frames[int32(id)]
|
|
return ok
|
|
}
|
|
|
|
func pop[S ~[]E, E any](s *S) E {
|
|
e := (*s)[len(*s)-1]
|
|
*s = (*s)[:len(*s)-1]
|
|
return e
|
|
}
|