Merge pull request #3325 from jsternberg/dap-alternate-stepping

dap: refactor how step in/step out works
This commit is contained in:
Tõnis Tiigi 2025-07-24 15:08:52 -07:00 committed by GitHub
commit 4c791dce97
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 308 additions and 328 deletions

View File

@ -161,26 +161,21 @@ func (d *Adapter[C]) Next(c Context, req *dap.NextRequest, resp *dap.NextRespons
}
func (d *Adapter[C]) StepIn(c Context, req *dap.StepInRequest, resp *dap.StepInResponse) error {
var (
subReq dap.NextRequest
subResp dap.NextResponse
)
d.threadsMu.RLock()
t := d.threads[req.Arguments.ThreadId]
d.threadsMu.RUnlock()
subReq.Arguments.ThreadId = req.Arguments.ThreadId
subReq.Arguments.SingleThread = req.Arguments.SingleThread
subReq.Arguments.Granularity = req.Arguments.Granularity
return d.Next(c, &subReq, &subResp)
t.StepIn()
return nil
}
func (d *Adapter[C]) StepOut(c Context, req *dap.StepOutRequest, resp *dap.StepOutResponse) error {
var (
subReq dap.ContinueRequest
subResp dap.ContinueResponse
)
d.threadsMu.RLock()
t := d.threads[req.Arguments.ThreadId]
d.threadsMu.RUnlock()
subReq.Arguments.ThreadId = req.Arguments.ThreadId
subReq.Arguments.SingleThread = req.Arguments.SingleThread
return d.Continue(c, &subReq, &subResp)
t.StepOut()
return nil
}
func (d *Adapter[C]) SetBreakpoints(c Context, req *dap.SetBreakpointsRequest, resp *dap.SetBreakpointsResponse) error {

View File

@ -29,43 +29,74 @@ func (d *Adapter[C]) Evaluate(ctx Context, req *dap.EvaluateRequest, resp *dap.E
return nil
}
var t *thread
if req.Arguments.FrameId > 0 {
if t = d.getThreadByFrameID(req.Arguments.FrameId); t == nil {
return errors.Errorf("no thread with frame id %d", req.Arguments.FrameId)
}
} else {
if t = d.getFirstThread(); t == nil {
return errors.New("no paused thread")
}
}
cmd := d.replCommands(ctx, t, resp)
var retErr error
cmd := d.replCommands(ctx, req, resp, &retErr)
cmd.SetArgs(args)
cmd.SetErr(d.Out())
if err := cmd.Execute(); err != nil {
fmt.Fprintf(d.Out(), "ERROR: %+v\n", err)
// This error should only happen if there was something command
// related that malfunctioned as it will also print usage.
// Normal errors should set retErr from replCommands.
return err
}
return nil
return retErr
}
func (d *Adapter[C]) replCommands(ctx Context, t *thread, resp *dap.EvaluateResponse) *cobra.Command {
rootCmd := &cobra.Command{}
func (d *Adapter[C]) replCommands(ctx Context, req *dap.EvaluateRequest, resp *dap.EvaluateResponse, retErr *error) *cobra.Command {
rootCmd := &cobra.Command{
SilenceErrors: true,
}
execCmd := &cobra.Command{
Use: "exec",
RunE: func(cmd *cobra.Command, args []string) error {
if !d.supportsExec {
return errors.New("cannot exec without runInTerminal client capability")
}
return t.Exec(ctx, args, resp)
},
execCmd, execOpts := replCmd(ctx, "exec", resp, retErr, d.execCmd)
execCmd.PreRun = func(cmd *cobra.Command, args []string) {
execOpts.FrameID = req.Arguments.FrameId
}
rootCmd.AddCommand(execCmd)
return rootCmd
}
func (t *thread) Exec(ctx Context, args []string, eresp *dap.EvaluateResponse) (retErr error) {
type execOptions struct {
FrameID int
}
func (d *Adapter[C]) execCmd(ctx Context, args []string, flags execOptions) (string, error) {
if !d.supportsExec {
return "", errors.New("cannot exec without runInTerminal client capability")
}
var t *thread
if flags.FrameID > 0 {
if t = d.getThreadByFrameID(flags.FrameID); t == nil {
return "", errors.Errorf("no thread with frame id %d", flags.FrameID)
}
} else {
if t = d.getFirstThread(); t == nil {
return "", errors.New("no paused thread")
}
}
return t.Exec(ctx, args)
}
func replCmd[Flags any, RetVal any](ctx Context, name string, resp *dap.EvaluateResponse, retErr *error, fn func(ctx Context, args []string, flags Flags) (RetVal, error)) (*cobra.Command, *Flags) {
flags := new(Flags)
return &cobra.Command{
Use: name,
Run: func(cmd *cobra.Command, args []string) {
v, err := fn(ctx, args, *flags)
if err != nil {
*retErr = err
return
}
resp.Body.Result = fmt.Sprint(v)
},
}, flags
}
func (t *thread) Exec(ctx Context, args []string) (message string, retErr error) {
if t.rCtx == nil {
return "", errors.New("no container context for exec")
}
cfg := &build.InvokeConfig{Tty: true}
if len(cfg.Entrypoint) == 0 && len(cfg.Cmd) == 0 {
cfg.Entrypoint = []string{"/bin/sh"} // launch shell by default
@ -75,7 +106,7 @@ func (t *thread) Exec(ctx Context, args []string, eresp *dap.EvaluateResponse) (
ctr, err := build.NewContainer(ctx, t.rCtx, cfg)
if err != nil {
return err
return "", err
}
defer func() {
if retErr != nil {
@ -85,7 +116,7 @@ func (t *thread) Exec(ctx Context, args []string, eresp *dap.EvaluateResponse) (
dir, err := os.MkdirTemp("", "buildx-dap-exec")
if err != nil {
return err
return "", err
}
defer func() {
if retErr != nil {
@ -96,7 +127,7 @@ func (t *thread) Exec(ctx Context, args []string, eresp *dap.EvaluateResponse) (
socketPath := filepath.Join(dir, "s.sock")
l, err := net.Listen("unix", socketPath)
if err != nil {
return err
return "", err
}
go func() {
@ -121,11 +152,11 @@ func (t *thread) Exec(ctx Context, args []string, eresp *dap.EvaluateResponse) (
resp := ctx.Request(req)
if !resp.GetResponse().Success {
return errors.New(resp.GetResponse().Message)
return "", errors.New(resp.GetResponse().Message)
}
eresp.Body.Result = fmt.Sprintf("Started process attached to %s.", socketPath)
return nil
message = fmt.Sprintf("Started process attached to %s.", socketPath)
return message, nil
}
func (t *thread) runExec(l net.Listener, ctr *build.Container, cfg *build.InvokeConfig) {

View File

@ -3,7 +3,6 @@ package dap
import (
"context"
"path/filepath"
"slices"
"sync"
"github.com/docker/buildx/build"
@ -40,9 +39,11 @@ type thread struct {
head digest.Digest
bps map[digest.Digest]int
frames map[int32]*frame
framesByDigest map[digest.Digest]*frame
// Runtime state for the evaluate call.
regions []*region
regionsByDigest map[digest.Digest]int
entrypoint *step
// Controls pause.
paused chan stepType
@ -52,15 +53,6 @@ type thread struct {
rCtx *build.ResultHandle
curPos digest.Digest
stackTrace []int32
frames map[int32]*frame
}
type region struct {
// dependsOn means this thread depends on the result of another thread.
dependsOn map[int]struct{}
// digests is a set of digests associated with this thread.
digests []digest.Digest
}
type stepType int
@ -68,39 +60,47 @@ type stepType int
const (
stepContinue stepType = iota
stepNext
stepIn
stepOut
)
func (t *thread) Evaluate(ctx Context, c gateway.Client, ref gateway.Reference, meta map[string][]byte, inputs build.Inputs, cfg common.Config) error {
if err := t.init(ctx, c, ref, meta, inputs); err != nil {
func (t *thread) Evaluate(ctx Context, c gateway.Client, headRef gateway.Reference, meta map[string][]byte, inputs build.Inputs, cfg common.Config) error {
if err := t.init(ctx, c, headRef, meta, inputs); err != nil {
return err
}
defer t.reset()
step := stepContinue
action := stepContinue
if cfg.StopOnEntry {
step = stepNext
action = stepNext
}
for {
if step == stepContinue {
t.setBreakpoints(ctx)
var (
ref gateway.Reference
next = t.entrypoint
err error
)
for next != nil {
event := t.needsDebug(next, action, err)
if event.Reason != "" {
select {
case action = <-t.pause(ctx, ref, err, next, event):
// do nothing here
case <-ctx.Done():
return context.Cause(ctx)
}
}
ref, pos, err := t.seekNext(ctx, step)
event := t.needsDebug(pos, step, err)
if event.Reason == "" {
if err != nil {
return err
}
select {
case step = <-t.pause(ctx, ref, err, event):
if err != nil {
return err
}
case <-ctx.Done():
return context.Cause(ctx)
if action == stepContinue {
t.setBreakpoints(ctx)
}
ref, next, err = t.seekNext(ctx, next, action)
}
return nil
}
func (t *thread) init(ctx Context, c gateway.Client, ref gateway.Reference, meta map[string][]byte, inputs build.Inputs) error {
@ -112,7 +112,110 @@ func (t *thread) init(ctx Context, c gateway.Client, ref gateway.Reference, meta
if err := t.getLLBState(ctx); err != nil {
return err
}
return t.createRegions()
return t.createProgram()
}
type step struct {
// dgst holds the digest that should be resolved by this step.
// If this is empty, no digest should be resolved.
dgst digest.Digest
// in holds the next target when step in is used.
in *step
// out holds the next target when step out is used.
out *step
// next holds the next target when next is used.
next *step
// frame will hold the stack frame associated with this step.
frame *frame
}
func (t *thread) createProgram() error {
t.framesByDigest = make(map[digest.Digest]*frame)
t.frames = make(map[int32]*frame)
// Create the entrypoint by using the last node.
// We will build on top of that.
head := &step{
dgst: t.head,
frame: t.getStackFrame(t.head),
}
t.entrypoint = t.createBranch(head)
return nil
}
func (t *thread) createBranch(last *step) (first *step) {
first = last
for first.dgst != "" {
prev := &step{
// set to first temporarily until we determine
// if there are other inputs.
in: first,
// always first
next: first,
// exit point always matches the one set on first
out: first.out,
// always set to the same as next which is always first
frame: t.getStackFrame(first.dgst),
}
op := t.ops[first.dgst]
if len(op.Inputs) > 0 {
for i := len(op.Inputs) - 1; i > 0; i-- {
inp := op.Inputs[i]
// Create a pseudo-step that acts as an exit point for this
// branch. This step exists so this branch has a place to go
// after it has finished that will advance to the next
// instruction.
exit := &step{
in: prev.in,
next: prev.next,
out: prev.out,
frame: prev.frame,
}
head := &step{
dgst: digest.Digest(inp.Digest),
in: exit,
next: exit,
out: exit,
frame: t.getStackFrame(digest.Digest(inp.Digest)),
}
prev.in = t.createBranch(head)
}
// Set the digest of the parent input on the first step associated
// with this step.
prev.dgst = digest.Digest(op.Inputs[0].Digest)
}
// New first is the step we just created.
first = prev
}
return first
}
func (t *thread) getStackFrame(dgst digest.Digest) *frame {
if f := t.framesByDigest[dgst]; f != nil {
return f
}
f := &frame{
op: t.ops[dgst],
}
f.Id = int(t.idPool.Get())
if meta, ok := t.def.Metadata[dgst]; ok {
f.setNameFromMeta(meta)
}
if loc, ok := t.def.Source.Locations[string(dgst)]; ok {
f.fillLocation(t.def, loc, t.sourcePath)
}
t.frames[int32(f.Id)] = f
return f
}
func (t *thread) reset() {
@ -123,23 +226,25 @@ func (t *thread) reset() {
t.ops = nil
}
func (t *thread) needsDebug(target digest.Digest, step stepType, err error) (e dap.StoppedEventBody) {
func (t *thread) needsDebug(cur *step, step stepType, err error) (e dap.StoppedEventBody) {
if err != nil {
e.Reason = "exception"
e.Description = "Encountered an error during result evaluation"
} else if step == stepNext && target != "" {
e.Reason = "step"
} else if step == stepContinue {
if id, ok := t.bps[target]; ok {
e.Reason = "breakpoint"
e.Description = "Paused on breakpoint"
e.HitBreakpointIds = []int{id}
} else if cur != nil {
if step != stepContinue {
e.Reason = "step"
} else if next := cur.in; next != nil {
if id, ok := t.bps[next.dgst]; ok {
e.Reason = "breakpoint"
e.Description = "Paused on breakpoint"
e.HitBreakpointIds = []int{id}
}
}
}
return
}
func (t *thread) pause(c Context, ref gateway.Reference, err error, event dap.StoppedEventBody) <-chan stepType {
func (t *thread) pause(c Context, ref gateway.Reference, err error, pos *step, event dap.StoppedEventBody) <-chan stepType {
t.mu.Lock()
defer t.mu.Unlock()
@ -148,7 +253,9 @@ func (t *thread) pause(c Context, ref gateway.Reference, err error, event dap.St
}
t.paused = make(chan stepType, 1)
t.rCtx = build.NewResultHandle(c, t.c, ref, t.meta, err)
if ref != nil || err != nil {
t.rCtx = build.NewResultHandle(c, t.c, ref, t.meta, err)
}
if err != nil {
var solveErr *errdefs.SolveError
if errors.As(err, &solveErr) {
@ -157,7 +264,7 @@ func (t *thread) pause(c Context, ref gateway.Reference, err error, event dap.St
}
}
}
t.collectStackTrace()
t.collectStackTrace(pos)
event.ThreadId = t.id
c.C() <- &dap.StoppedEvent{
@ -175,6 +282,14 @@ func (t *thread) Next() {
t.resume(stepNext)
}
func (t *thread) StepIn() {
t.resume(stepIn)
}
func (t *thread) StepOut() {
t.resume(stepOut)
}
func (t *thread) resume(step stepType) {
t.mu.Lock()
defer t.mu.Unlock()
@ -261,233 +376,92 @@ func (t *thread) setBreakpoints(ctx Context) {
t.bps = t.breakpointMap.Intersect(ctx, t.def.Source, t.sourcePath)
}
func (t *thread) findBacklinks() map[digest.Digest]map[digest.Digest]struct{} {
backlinks := make(map[digest.Digest]map[digest.Digest]struct{})
for dgst := range t.ops {
backlinks[dgst] = make(map[digest.Digest]struct{})
}
for dgst, op := range t.ops {
for _, inp := range op.Inputs {
if digest.Digest(inp.Digest) == t.head {
continue
}
backlinks[digest.Digest(inp.Digest)][dgst] = struct{}{}
}
}
return backlinks
}
func (t *thread) createRegions() error {
// Find the links going from inputs to their outputs.
// This isn't represented in the LLB graph but we need it to ensure
// an op only has one child and whether we are allowed to visit a node.
backlinks := t.findBacklinks()
// Create distinct regions whenever we have any branch (inputs or outputs).
t.regions = []*region{}
t.regionsByDigest = map[digest.Digest]int{}
determineRegion := func(dgst digest.Digest, children map[digest.Digest]struct{}) {
if len(children) == 1 {
var cDgst digest.Digest
for d := range children {
cDgst = d
}
childOp := t.ops[cDgst]
if len(childOp.Inputs) == 1 {
// We have one child and our child has one input so we can be merged
// into the same region as our child.
region := t.regionsByDigest[cDgst]
t.regions[region].digests = append(t.regions[region].digests, dgst)
t.regionsByDigest[dgst] = region
return
}
}
// We will require a new region for this digest because
// we weren't able to merge it in within the existing regions.
next := len(t.regions)
t.regions = append(t.regions, &region{
digests: []digest.Digest{dgst},
dependsOn: make(map[int]struct{}),
})
t.regionsByDigest[dgst] = next
// Mark each child as depending on this new region.
for child := range children {
region := t.regionsByDigest[child]
t.regions[region].dependsOn[next] = struct{}{}
}
}
canVisit := func(dgst digest.Digest) bool {
for dgst := range backlinks[dgst] {
if _, ok := t.regionsByDigest[dgst]; !ok {
// One of our outputs has not been categorized.
return false
}
}
return true
}
unvisited := []digest.Digest{t.head}
for len(unvisited) > 0 {
dgst := pop(&unvisited)
op := t.ops[dgst]
children := backlinks[dgst]
determineRegion(dgst, children)
// Determine which inputs we can now visit.
for _, inp := range op.Inputs {
indgst := digest.Digest(inp.Digest)
if canVisit(indgst) {
unvisited = append(unvisited, indgst)
}
}
}
// Reverse each of the digests so dependencies are first.
// It is currently in reverse topological order and it needs to be in
// topological order.
for _, r := range t.regions {
slices.Reverse(r.digests)
}
t.propagateRegionDependencies()
return nil
}
// propagateRegionDependencies will propagate the dependsOn attribute between
// different regions to make dependency lookups easier. If A depends on B
// and B depends on C, then A depends on C. But the algorithm before this will only
// record direct dependencies.
func (t *thread) propagateRegionDependencies() {
for _, r := range t.regions {
for {
n := len(r.dependsOn)
for i := range r.dependsOn {
for j := range t.regions[i].dependsOn {
r.dependsOn[j] = struct{}{}
}
}
if n == len(r.dependsOn) {
break
}
}
}
}
func (t *thread) seekNext(ctx Context, step stepType) (gateway.Reference, digest.Digest, error) {
func (t *thread) seekNext(ctx Context, from *step, action stepType) (gateway.Reference, *step, error) {
// If we're at the end, return no digest to signal that
// we should conclude debugging.
if t.curPos == t.head {
return nil, "", nil
}
target := t.head
switch step {
var target *step
switch action {
case stepNext:
target = t.nextDigest(nil)
target = from.next
case stepIn:
target = from.in
case stepOut:
target = from.out
case stepContinue:
target = t.continueDigest()
}
if target == "" {
return nil, "", nil
target = t.continueDigest(from)
}
return t.seek(ctx, target)
}
func (t *thread) seek(ctx Context, target digest.Digest) (gateway.Reference, digest.Digest, error) {
ref, err := t.solve(ctx, target)
if err != nil {
return ref, "", err
func (t *thread) seek(ctx Context, target *step) (ref gateway.Reference, result *step, err error) {
if target != nil {
if target.dgst != "" {
ref, err = t.solve(ctx, target.dgst)
if err != nil {
return ref, nil, err
}
}
result = target
} else {
ref = t.ref
}
if err = ref.Evaluate(ctx); err != nil {
var solveErr *errdefs.SolveError
if errors.As(err, &solveErr) {
if dt, err := solveErr.Op.MarshalVT(); err == nil {
t.curPos = digest.FromBytes(dt)
if ref != nil {
if err = ref.Evaluate(ctx); err != nil {
// If this is not a solve error, do not return the
// reference and target step.
var solveErr *errdefs.SolveError
if errors.As(err, &solveErr) {
if dt, err := solveErr.Op.MarshalVT(); err == nil {
// Find the error digest.
errDgst := digest.FromBytes(dt)
// Iterate from the first step to find the one
// we failed on.
result = t.entrypoint
for result != nil {
next := result.in
if next != nil && next.dgst == errDgst {
break
}
result = next
}
}
} else {
return nil, nil, err
}
} else {
t.curPos = ""
}
} else {
t.curPos = target
}
return ref, t.curPos, err
return ref, result, err
}
func (t *thread) nextDigest(fn func(digest.Digest) bool) digest.Digest {
isValid := func(dgst digest.Digest) bool {
// Skip this digest because it has no locations in the source file.
if loc, ok := t.def.Source.Locations[string(dgst)]; !ok || len(loc.Locations) == 0 {
func (t *thread) continueDigest(from *step) *step {
if len(t.bps) == 0 {
return nil
}
isBreakpoint := func(dgst digest.Digest) bool {
if dgst == "" {
return false
}
// If a custom function has been set for validation, use it.
return fn == nil || fn(dgst)
}
// If we have no position, automatically select the first step.
if t.curPos == "" {
r := t.regions[len(t.regions)-1]
if isValid(r.digests[0]) {
return r.digests[0]
}
// We cannot use the first position. Treat the first position as our
// current position so we can iterate.
t.curPos = r.digests[0]
}
// Look up the region associated with our current position.
// If we can't find it, just pretend we're using step continue.
region, ok := t.regionsByDigest[t.curPos]
if !ok {
return t.head
}
r := t.regions[region]
i := slices.Index(r.digests, t.curPos) + 1
for {
if i >= len(r.digests) {
if region <= 0 {
// We're at the end of our execution. Should have been caught by
// t.head == t.curPos.
return ""
}
region--
r = t.regions[region]
i = 0
continue
}
next := r.digests[i]
if !isValid(next) {
i++
continue
}
return next
}
}
func (t *thread) continueDigest() digest.Digest {
if len(t.bps) == 0 {
return t.head
}
isValid := func(dgst digest.Digest) bool {
_, ok := t.bps[dgst]
return ok
}
return t.nextDigest(isValid)
next := func(s *step) *step {
cur := s.in
for cur != nil {
next := cur.in
if next != nil && isBreakpoint(next.dgst) {
return cur
}
cur = next
}
return nil
}
return next(from)
}
func (t *thread) solve(ctx context.Context, target digest.Digest) (gateway.Reference, error) {
@ -520,38 +494,16 @@ func (t *thread) releaseState() {
t.rCtx.Done()
t.rCtx = nil
}
t.stackTrace = nil
t.frames = nil
t.stackTrace = t.stackTrace[:0]
t.variables.Reset()
}
func (t *thread) collectStackTrace() {
region := t.regionsByDigest[t.curPos]
r := t.regions[region]
digests := r.digests
if index := slices.Index(digests, t.curPos); index >= 0 {
digests = digests[:index+1]
}
t.frames = make(map[int32]*frame)
for i := len(digests) - 1; i >= 0; i-- {
dgst := digests[i]
frame := &frame{}
frame.Id = int(t.idPool.Get())
if meta, ok := t.def.Metadata[dgst]; ok {
frame.setNameFromMeta(meta)
}
if loc, ok := t.def.Source.Locations[string(dgst)]; ok {
frame.fillLocation(t.def, loc, t.sourcePath)
}
if op := t.ops[dgst]; op != nil {
frame.fillVarsFromOp(op, t.variables)
}
func (t *thread) collectStackTrace(pos *step) {
for pos != nil {
frame := pos.frame
frame.ExportVars(t.variables)
t.stackTrace = append(t.stackTrace, int32(frame.Id))
t.frames[int32(frame.Id)] = frame
pos = pos.out
}
}
@ -566,9 +518,3 @@ func (t *thread) hasFrame(id int) bool {
_, ok := t.frames[int32(id)]
return ok
}
func pop[S ~[]E, E any](s *S) E {
e := (*s)[len(*s)-1]
*s = (*s)[:len(*s)-1]
return e
}

View File

@ -15,6 +15,7 @@ import (
type frame struct {
dap.StackFrame
op *pb.Op
scopes []dap.Scope
}
@ -44,6 +45,10 @@ func (f *frame) fillLocation(def *llb.Definition, loc *pb.Locations, ws string)
}
}
func (f *frame) ExportVars(refs *variableReferences) {
f.fillVarsFromOp(f.op, refs)
}
func (f *frame) fillVarsFromOp(op *pb.Op, refs *variableReferences) {
f.scopes = []dap.Scope{
{
@ -155,6 +160,9 @@ func execOpVars(exec *pb.ExecOp, refs *variableReferences) dap.Variable {
}
func (f *frame) Scopes() []dap.Scope {
if f.scopes == nil {
return []dap.Scope{}
}
return f.scopes
}