buildx/dap/adapter.go

668 lines
14 KiB
Go

package dap
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"path"
"path/filepath"
"slices"
"sync"
"sync/atomic"
"github.com/docker/buildx/build"
"github.com/docker/buildx/dap/common"
"github.com/google/go-dap"
gateway "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/solver/pb"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
type Adapter[C LaunchConfig] struct {
srv *Server
eg *errgroup.Group
cfg common.Config
initialized chan struct{}
started chan launchResponse[C]
configuration chan struct{}
supportsExec bool
evaluateReqCh chan *evaluateRequest
threads map[int]*thread
threadsMu sync.RWMutex
nextThreadID int
breakpointMap *breakpointMap
sourceMap sourceMap
idPool *idPool
}
func New[C LaunchConfig]() *Adapter[C] {
d := &Adapter[C]{
initialized: make(chan struct{}),
started: make(chan launchResponse[C], 1),
configuration: make(chan struct{}),
evaluateReqCh: make(chan *evaluateRequest),
threads: make(map[int]*thread),
nextThreadID: 1,
breakpointMap: newBreakpointMap(),
idPool: new(idPool),
}
d.srv = NewServer(d.dapHandler())
return d
}
func (d *Adapter[C]) Start(ctx context.Context, conn Conn) (C, error) {
d.eg, _ = errgroup.WithContext(ctx)
d.eg.Go(func() error {
return d.srv.Serve(ctx, conn)
})
<-d.initialized
resp, ok := <-d.started
if !ok {
resp.Error = context.Canceled
}
d.cfg = resp.Config.GetConfig()
return resp.Config, resp.Error
}
func (d *Adapter[C]) Stop() error {
if d.eg == nil {
return nil
}
d.srv.Go(func(c Context) {
c.C() <- &dap.TerminatedEvent{
Event: dap.Event{
Event: "terminated",
},
}
// TODO: detect exit code from threads
// c.C() <- &dap.ExitedEvent{
// Event: dap.Event{
// Event: "exited",
// },
// Body: dap.ExitedEventBody{
// ExitCode: exitCode,
// },
// }
})
d.srv.Stop()
err := d.eg.Wait()
d.eg = nil
return err
}
func (d *Adapter[C]) Initialize(c Context, req *dap.InitializeRequest, resp *dap.InitializeResponse) error {
close(d.initialized)
// Set parameters based on passed client capabilities.
d.supportsExec = req.Arguments.SupportsRunInTerminalRequest
// Set capabilities.
resp.Body.SupportsConfigurationDoneRequest = true
return nil
}
type launchResponse[C any] struct {
Config C
Error error
}
func (d *Adapter[C]) Launch(c Context, req *dap.LaunchRequest, resp *dap.LaunchResponse) error {
defer close(d.started)
var cfg C
if err := json.Unmarshal(req.Arguments, &cfg); err != nil {
d.started <- launchResponse[C]{Error: err}
return err
}
d.start(c)
d.started <- launchResponse[C]{Config: cfg}
return nil
}
func (d *Adapter[C]) Disconnect(c Context, req *dap.DisconnectRequest, resp *dap.DisconnectResponse) error {
close(d.evaluateReqCh)
return nil
}
func (d *Adapter[C]) start(c Context) {
c.Go(d.launch)
}
func (d *Adapter[C]) Continue(c Context, req *dap.ContinueRequest, resp *dap.ContinueResponse) error {
d.threadsMu.RLock()
t := d.threads[req.Arguments.ThreadId]
d.threadsMu.RUnlock()
t.Continue()
return nil
}
func (d *Adapter[C]) Next(c Context, req *dap.NextRequest, resp *dap.NextResponse) error {
d.threadsMu.RLock()
t := d.threads[req.Arguments.ThreadId]
d.threadsMu.RUnlock()
t.Next()
return nil
}
func (d *Adapter[C]) StepIn(c Context, req *dap.StepInRequest, resp *dap.StepInResponse) error {
var (
subReq dap.NextRequest
subResp dap.NextResponse
)
subReq.Arguments.ThreadId = req.Arguments.ThreadId
subReq.Arguments.SingleThread = req.Arguments.SingleThread
subReq.Arguments.Granularity = req.Arguments.Granularity
return d.Next(c, &subReq, &subResp)
}
func (d *Adapter[C]) StepOut(c Context, req *dap.StepOutRequest, resp *dap.StepOutResponse) error {
var (
subReq dap.ContinueRequest
subResp dap.ContinueResponse
)
subReq.Arguments.ThreadId = req.Arguments.ThreadId
subReq.Arguments.SingleThread = req.Arguments.SingleThread
return d.Continue(c, &subReq, &subResp)
}
func (d *Adapter[C]) SetBreakpoints(c Context, req *dap.SetBreakpointsRequest, resp *dap.SetBreakpointsResponse) error {
resp.Body.Breakpoints = d.breakpointMap.Set(req.Arguments.Source.Path, req.Arguments.Breakpoints)
return nil
}
func (d *Adapter[C]) ConfigurationDone(c Context, req *dap.ConfigurationDoneRequest, resp *dap.ConfigurationDoneResponse) error {
d.configuration <- struct{}{}
close(d.configuration)
return nil
}
func (d *Adapter[C]) launch(c Context) {
// Send initialized event.
c.C() <- &dap.InitializedEvent{
Event: dap.Event{
Event: "initialized",
},
}
// Wait for configuration.
select {
case <-c.Done():
return
case <-d.configuration:
// TODO: actual configuration
}
for {
select {
case <-c.Done():
return
case req, ok := <-d.evaluateReqCh:
if !ok {
return
}
t := d.newThread(c, req.name)
started := c.Go(func(c Context) {
defer d.deleteThread(c, t)
defer close(req.errCh)
req.errCh <- t.Evaluate(c, req.c, req.ref, req.meta, req.inputs, d.cfg)
})
if !started {
req.errCh <- context.Canceled
close(req.errCh)
}
}
}
}
func (d *Adapter[C]) newThread(ctx Context, name string) (t *thread) {
d.threadsMu.Lock()
id := d.nextThreadID
t = &thread{
id: id,
name: name,
sourceMap: &d.sourceMap,
breakpointMap: d.breakpointMap,
idPool: d.idPool,
variables: newVariableReferences(),
}
d.threads[t.id] = t
d.nextThreadID++
d.threadsMu.Unlock()
ctx.C() <- &dap.ThreadEvent{
Event: dap.Event{Event: "thread"},
Body: dap.ThreadEventBody{
Reason: "started",
ThreadId: t.id,
},
}
return t
}
func (d *Adapter[C]) getThread(id int) (t *thread) {
d.threadsMu.Lock()
t = d.threads[id]
d.threadsMu.Unlock()
return t
}
func (d *Adapter[C]) getFirstThread() (t *thread) {
d.threadsMu.Lock()
defer d.threadsMu.Unlock()
for _, thread := range d.threads {
if thread.isPaused() {
if t == nil || thread.id < t.id {
t = thread
}
}
}
return t
}
func (d *Adapter[C]) deleteThread(ctx Context, t *thread) {
d.threadsMu.Lock()
if t := d.threads[t.id]; t != nil {
if t.variables != nil {
t.variables.Reset()
}
}
delete(d.threads, t.id)
d.threadsMu.Unlock()
ctx.C() <- &dap.ThreadEvent{
Event: dap.Event{Event: "thread"},
Body: dap.ThreadEventBody{
Reason: "exited",
ThreadId: t.id,
},
}
}
func (d *Adapter[T]) getThreadByFrameID(id int) (t *thread) {
d.threadsMu.RLock()
defer d.threadsMu.RUnlock()
for _, t := range d.threads {
if t.hasFrame(id) {
return t
}
}
return nil
}
type evaluateRequest struct {
name string
c gateway.Client
ref gateway.Reference
meta map[string][]byte
inputs build.Inputs
errCh chan<- error
}
func (d *Adapter[C]) EvaluateResult(ctx context.Context, name string, c gateway.Client, res *gateway.Result, inputs build.Inputs) error {
eg, _ := errgroup.WithContext(ctx)
if res.Ref != nil {
eg.Go(func() error {
return d.evaluateRef(ctx, name, c, res.Ref, res.Metadata, inputs)
})
}
for k, ref := range res.Refs {
refName := fmt.Sprintf("%s (%s)", name, k)
eg.Go(func() error {
return d.evaluateRef(ctx, refName, c, ref, res.Metadata, inputs)
})
}
return eg.Wait()
}
func (d *Adapter[C]) evaluateRef(ctx context.Context, name string, c gateway.Client, ref gateway.Reference, meta map[string][]byte, inputs build.Inputs) error {
errCh := make(chan error, 1)
// Send a solve request to the launch routine
// which will perform the solve in the context of the server.
ereq := &evaluateRequest{
name: name,
c: c,
ref: ref,
meta: meta,
inputs: inputs,
errCh: errCh,
}
select {
case d.evaluateReqCh <- ereq:
case <-ctx.Done():
return context.Cause(ctx)
}
// Wait for the response.
select {
case err := <-errCh:
return err
case <-ctx.Done():
return context.Cause(ctx)
}
}
func (d *Adapter[C]) Threads(c Context, req *dap.ThreadsRequest, resp *dap.ThreadsResponse) error {
d.threadsMu.RLock()
defer d.threadsMu.RUnlock()
resp.Body.Threads = []dap.Thread{}
for _, t := range d.threads {
resp.Body.Threads = append(resp.Body.Threads, dap.Thread{
Id: t.id,
Name: t.name,
})
}
return nil
}
func (d *Adapter[C]) StackTrace(c Context, req *dap.StackTraceRequest, resp *dap.StackTraceResponse) error {
t := d.getThread(req.Arguments.ThreadId)
if t == nil {
return errors.Errorf("no such thread: %d", req.Arguments.ThreadId)
}
resp.Body.StackFrames = t.StackTrace()
return nil
}
func (d *Adapter[C]) Scopes(c Context, req *dap.ScopesRequest, resp *dap.ScopesResponse) error {
t := d.getThreadByFrameID(req.Arguments.FrameId)
if t == nil {
return errors.Errorf("no such frame id: %d", req.Arguments.FrameId)
}
resp.Body.Scopes = t.Scopes(req.Arguments.FrameId)
for i, s := range resp.Body.Scopes {
resp.Body.Scopes[i].VariablesReference = (t.id << 24) | s.VariablesReference
}
return nil
}
func (d *Adapter[C]) Variables(c Context, req *dap.VariablesRequest, resp *dap.VariablesResponse) error {
tid := req.Arguments.VariablesReference >> 24
t := d.getThread(tid)
if t == nil {
return errors.Errorf("no such thread: %d", tid)
}
varRef := req.Arguments.VariablesReference & ((1 << 24) - 1)
resp.Body.Variables = t.Variables(varRef)
for i, ref := range resp.Body.Variables {
if ref.VariablesReference > 0 {
resp.Body.Variables[i].VariablesReference = (tid << 24) | ref.VariablesReference
}
}
return nil
}
func (d *Adapter[C]) Source(c Context, req *dap.SourceRequest, resp *dap.SourceResponse) error {
fname := req.Arguments.Source.Path
dt, ok := d.sourceMap.Get(fname)
if !ok {
return errors.Errorf("file not found: %s", fname)
}
resp.Body.Content = string(dt)
return nil
}
func (d *Adapter[C]) evaluate(ctx context.Context, name string, c gateway.Client, res *gateway.Result, opt build.Options) error {
errCh := make(chan error, 1)
started := d.srv.Go(func(ctx Context) {
defer close(errCh)
errCh <- d.EvaluateResult(ctx, name, c, res, opt.Inputs)
})
if !started {
return context.Canceled
}
select {
case err := <-errCh:
return err
case <-ctx.Done():
return context.Cause(ctx)
}
}
func (d *Adapter[C]) Handler() build.Handler {
return build.Handler{
Evaluate: d.evaluate,
}
}
func (d *Adapter[C]) dapHandler() Handler {
return Handler{
Initialize: d.Initialize,
Launch: d.Launch,
Continue: d.Continue,
Next: d.Next,
StepIn: d.StepIn,
StepOut: d.StepOut,
SetBreakpoints: d.SetBreakpoints,
ConfigurationDone: d.ConfigurationDone,
Disconnect: d.Disconnect,
Threads: d.Threads,
StackTrace: d.StackTrace,
Scopes: d.Scopes,
Variables: d.Variables,
Evaluate: d.Evaluate,
Source: d.Source,
}
}
func (d *Adapter[C]) Out() io.Writer {
return &adapterWriter[C]{d}
}
type adapterWriter[C LaunchConfig] struct {
*Adapter[C]
}
func (d *adapterWriter[C]) Write(p []byte) (n int, err error) {
started := d.srv.Go(func(c Context) {
<-d.initialized
c.C() <- &dap.OutputEvent{
Event: dap.Event{Event: "output"},
Body: dap.OutputEventBody{
Category: "stdout",
Output: string(p),
},
}
})
if !started {
return 0, io.ErrClosedPipe
}
return n, nil
}
type idPool struct {
next atomic.Int64
}
func (p *idPool) Get() int64 {
return p.next.Add(1)
}
func (p *idPool) Put(x int64) {
// noop
}
type sourceMap struct {
m sync.Map
}
func (s *sourceMap) Put(c Context, fname string, dt []byte) {
for {
old, loaded := s.m.LoadOrStore(fname, dt)
if !loaded {
c.C() <- &dap.LoadedSourceEvent{
Event: dap.Event{Event: "loadedSource"},
Body: dap.LoadedSourceEventBody{
Reason: "new",
Source: dap.Source{
Name: path.Base(fname),
Path: fname,
},
},
}
}
if bytes.Equal(old.([]byte), dt) {
// Nothing to do.
return
}
if s.m.CompareAndSwap(fname, old, dt) {
c.C() <- &dap.LoadedSourceEvent{
Event: dap.Event{Event: "loadedSource"},
Body: dap.LoadedSourceEventBody{
Reason: "changed",
Source: dap.Source{
Name: path.Base(fname),
Path: fname,
},
},
}
}
}
}
func (s *sourceMap) Get(fname string) ([]byte, bool) {
v, ok := s.m.Load(fname)
if !ok {
return nil, false
}
return v.([]byte), true
}
type breakpointMap struct {
byPath map[string][]dap.Breakpoint
mu sync.RWMutex
nextID atomic.Int64
}
func newBreakpointMap() *breakpointMap {
return &breakpointMap{
byPath: make(map[string][]dap.Breakpoint),
}
}
func (b *breakpointMap) Set(fname string, sbps []dap.SourceBreakpoint) (breakpoints []dap.Breakpoint) {
b.mu.Lock()
defer b.mu.Unlock()
prev := b.byPath[fname]
for _, sbp := range sbps {
index := slices.IndexFunc(prev, func(e dap.Breakpoint) bool {
return sbp.Line >= e.Line && sbp.Line <= e.EndLine && sbp.Column >= e.Column && sbp.Column <= e.EndColumn
})
var bp dap.Breakpoint
if index >= 0 {
bp = prev[index]
} else {
bp = dap.Breakpoint{
Id: int(b.nextID.Add(1)),
Line: sbp.Line,
EndLine: sbp.Line,
Column: sbp.Column,
EndColumn: sbp.Column,
}
}
breakpoints = append(breakpoints, bp)
}
b.byPath[fname] = breakpoints
return breakpoints
}
func (b *breakpointMap) Intersect(ctx Context, src *pb.Source, ws string) map[digest.Digest]int {
b.mu.Lock()
defer b.mu.Unlock()
digests := make(map[digest.Digest]int)
for dgst, locs := range src.Locations {
if id := b.intersect(ctx, src, locs, ws); id > 0 {
digests[digest.Digest(dgst)] = id
}
}
return digests
}
func (b *breakpointMap) intersect(ctx Context, src *pb.Source, locs *pb.Locations, ws string) int {
overlaps := func(r *pb.Range, bp *dap.Breakpoint) bool {
return r.Start.Line <= int32(bp.Line) && r.Start.Character <= int32(bp.Column) && r.End.Line >= int32(bp.EndLine) && r.End.Character >= int32(bp.EndColumn)
}
for _, loc := range locs.Locations {
if len(loc.Ranges) == 0 {
continue
}
r := loc.Ranges[0]
info := src.Infos[loc.SourceIndex]
fname := filepath.Join(ws, info.Filename)
bps := b.byPath[fname]
if len(bps) == 0 {
// No breakpoints for this file.
continue
}
for i, bp := range bps {
if !overlaps(r, &bp) {
continue
}
if !bp.Verified {
bp.Line = int(r.Start.Line)
bp.EndLine = int(r.End.Line)
bp.Column = int(r.Start.Character)
bp.EndColumn = int(r.End.Character)
bp.Verified = true
ctx.C() <- &dap.BreakpointEvent{
Event: dap.Event{Event: "breakpoint"},
Body: dap.BreakpointEventBody{
Reason: "changed",
Breakpoint: bp,
},
}
bps[i] = bp
}
return bp.Id
}
}
return 0
}