mirror of https://github.com/docker/buildx.git
668 lines
14 KiB
Go
668 lines
14 KiB
Go
package dap
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"path/filepath"
|
|
"slices"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/docker/buildx/build"
|
|
"github.com/docker/buildx/dap/common"
|
|
"github.com/google/go-dap"
|
|
gateway "github.com/moby/buildkit/frontend/gateway/client"
|
|
"github.com/moby/buildkit/solver/pb"
|
|
"github.com/opencontainers/go-digest"
|
|
"github.com/pkg/errors"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
type Adapter[C LaunchConfig] struct {
|
|
srv *Server
|
|
eg *errgroup.Group
|
|
cfg common.Config
|
|
|
|
initialized chan struct{}
|
|
started chan launchResponse[C]
|
|
configuration chan struct{}
|
|
supportsExec bool
|
|
|
|
evaluateReqCh chan *evaluateRequest
|
|
|
|
threads map[int]*thread
|
|
threadsMu sync.RWMutex
|
|
nextThreadID int
|
|
|
|
breakpointMap *breakpointMap
|
|
sourceMap sourceMap
|
|
idPool *idPool
|
|
}
|
|
|
|
func New[C LaunchConfig]() *Adapter[C] {
|
|
d := &Adapter[C]{
|
|
initialized: make(chan struct{}),
|
|
started: make(chan launchResponse[C], 1),
|
|
configuration: make(chan struct{}),
|
|
evaluateReqCh: make(chan *evaluateRequest),
|
|
threads: make(map[int]*thread),
|
|
nextThreadID: 1,
|
|
breakpointMap: newBreakpointMap(),
|
|
idPool: new(idPool),
|
|
}
|
|
d.srv = NewServer(d.dapHandler())
|
|
return d
|
|
}
|
|
|
|
func (d *Adapter[C]) Start(ctx context.Context, conn Conn) (C, error) {
|
|
d.eg, _ = errgroup.WithContext(ctx)
|
|
d.eg.Go(func() error {
|
|
return d.srv.Serve(ctx, conn)
|
|
})
|
|
|
|
<-d.initialized
|
|
|
|
resp, ok := <-d.started
|
|
if !ok {
|
|
resp.Error = context.Canceled
|
|
}
|
|
d.cfg = resp.Config.GetConfig()
|
|
return resp.Config, resp.Error
|
|
}
|
|
|
|
func (d *Adapter[C]) Stop() error {
|
|
if d.eg == nil {
|
|
return nil
|
|
}
|
|
|
|
d.srv.Go(func(c Context) {
|
|
c.C() <- &dap.TerminatedEvent{
|
|
Event: dap.Event{
|
|
Event: "terminated",
|
|
},
|
|
}
|
|
// TODO: detect exit code from threads
|
|
// c.C() <- &dap.ExitedEvent{
|
|
// Event: dap.Event{
|
|
// Event: "exited",
|
|
// },
|
|
// Body: dap.ExitedEventBody{
|
|
// ExitCode: exitCode,
|
|
// },
|
|
// }
|
|
})
|
|
d.srv.Stop()
|
|
|
|
err := d.eg.Wait()
|
|
d.eg = nil
|
|
return err
|
|
}
|
|
|
|
func (d *Adapter[C]) Initialize(c Context, req *dap.InitializeRequest, resp *dap.InitializeResponse) error {
|
|
close(d.initialized)
|
|
|
|
// Set parameters based on passed client capabilities.
|
|
d.supportsExec = req.Arguments.SupportsRunInTerminalRequest
|
|
|
|
// Set capabilities.
|
|
resp.Body.SupportsConfigurationDoneRequest = true
|
|
return nil
|
|
}
|
|
|
|
type launchResponse[C any] struct {
|
|
Config C
|
|
Error error
|
|
}
|
|
|
|
func (d *Adapter[C]) Launch(c Context, req *dap.LaunchRequest, resp *dap.LaunchResponse) error {
|
|
defer close(d.started)
|
|
|
|
var cfg C
|
|
if err := json.Unmarshal(req.Arguments, &cfg); err != nil {
|
|
d.started <- launchResponse[C]{Error: err}
|
|
return err
|
|
}
|
|
|
|
d.start(c)
|
|
|
|
d.started <- launchResponse[C]{Config: cfg}
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) Disconnect(c Context, req *dap.DisconnectRequest, resp *dap.DisconnectResponse) error {
|
|
close(d.evaluateReqCh)
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) start(c Context) {
|
|
c.Go(d.launch)
|
|
}
|
|
|
|
func (d *Adapter[C]) Continue(c Context, req *dap.ContinueRequest, resp *dap.ContinueResponse) error {
|
|
d.threadsMu.RLock()
|
|
t := d.threads[req.Arguments.ThreadId]
|
|
d.threadsMu.RUnlock()
|
|
|
|
t.Continue()
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) Next(c Context, req *dap.NextRequest, resp *dap.NextResponse) error {
|
|
d.threadsMu.RLock()
|
|
t := d.threads[req.Arguments.ThreadId]
|
|
d.threadsMu.RUnlock()
|
|
|
|
t.Next()
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) StepIn(c Context, req *dap.StepInRequest, resp *dap.StepInResponse) error {
|
|
var (
|
|
subReq dap.NextRequest
|
|
subResp dap.NextResponse
|
|
)
|
|
|
|
subReq.Arguments.ThreadId = req.Arguments.ThreadId
|
|
subReq.Arguments.SingleThread = req.Arguments.SingleThread
|
|
subReq.Arguments.Granularity = req.Arguments.Granularity
|
|
return d.Next(c, &subReq, &subResp)
|
|
}
|
|
|
|
func (d *Adapter[C]) StepOut(c Context, req *dap.StepOutRequest, resp *dap.StepOutResponse) error {
|
|
var (
|
|
subReq dap.ContinueRequest
|
|
subResp dap.ContinueResponse
|
|
)
|
|
|
|
subReq.Arguments.ThreadId = req.Arguments.ThreadId
|
|
subReq.Arguments.SingleThread = req.Arguments.SingleThread
|
|
return d.Continue(c, &subReq, &subResp)
|
|
}
|
|
|
|
func (d *Adapter[C]) SetBreakpoints(c Context, req *dap.SetBreakpointsRequest, resp *dap.SetBreakpointsResponse) error {
|
|
resp.Body.Breakpoints = d.breakpointMap.Set(req.Arguments.Source.Path, req.Arguments.Breakpoints)
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) ConfigurationDone(c Context, req *dap.ConfigurationDoneRequest, resp *dap.ConfigurationDoneResponse) error {
|
|
d.configuration <- struct{}{}
|
|
close(d.configuration)
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) launch(c Context) {
|
|
// Send initialized event.
|
|
c.C() <- &dap.InitializedEvent{
|
|
Event: dap.Event{
|
|
Event: "initialized",
|
|
},
|
|
}
|
|
|
|
// Wait for configuration.
|
|
select {
|
|
case <-c.Done():
|
|
return
|
|
case <-d.configuration:
|
|
// TODO: actual configuration
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-c.Done():
|
|
return
|
|
case req, ok := <-d.evaluateReqCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
t := d.newThread(c, req.name)
|
|
started := c.Go(func(c Context) {
|
|
defer d.deleteThread(c, t)
|
|
defer close(req.errCh)
|
|
req.errCh <- t.Evaluate(c, req.c, req.ref, req.meta, req.inputs, d.cfg)
|
|
})
|
|
|
|
if !started {
|
|
req.errCh <- context.Canceled
|
|
close(req.errCh)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *Adapter[C]) newThread(ctx Context, name string) (t *thread) {
|
|
d.threadsMu.Lock()
|
|
id := d.nextThreadID
|
|
t = &thread{
|
|
id: id,
|
|
name: name,
|
|
sourceMap: &d.sourceMap,
|
|
breakpointMap: d.breakpointMap,
|
|
idPool: d.idPool,
|
|
variables: newVariableReferences(),
|
|
}
|
|
d.threads[t.id] = t
|
|
d.nextThreadID++
|
|
d.threadsMu.Unlock()
|
|
|
|
ctx.C() <- &dap.ThreadEvent{
|
|
Event: dap.Event{Event: "thread"},
|
|
Body: dap.ThreadEventBody{
|
|
Reason: "started",
|
|
ThreadId: t.id,
|
|
},
|
|
}
|
|
return t
|
|
}
|
|
|
|
func (d *Adapter[C]) getThread(id int) (t *thread) {
|
|
d.threadsMu.Lock()
|
|
t = d.threads[id]
|
|
d.threadsMu.Unlock()
|
|
return t
|
|
}
|
|
|
|
func (d *Adapter[C]) getFirstThread() (t *thread) {
|
|
d.threadsMu.Lock()
|
|
defer d.threadsMu.Unlock()
|
|
|
|
for _, thread := range d.threads {
|
|
if thread.isPaused() {
|
|
if t == nil || thread.id < t.id {
|
|
t = thread
|
|
}
|
|
}
|
|
}
|
|
return t
|
|
}
|
|
|
|
func (d *Adapter[C]) deleteThread(ctx Context, t *thread) {
|
|
d.threadsMu.Lock()
|
|
if t := d.threads[t.id]; t != nil {
|
|
if t.variables != nil {
|
|
t.variables.Reset()
|
|
}
|
|
}
|
|
delete(d.threads, t.id)
|
|
d.threadsMu.Unlock()
|
|
|
|
ctx.C() <- &dap.ThreadEvent{
|
|
Event: dap.Event{Event: "thread"},
|
|
Body: dap.ThreadEventBody{
|
|
Reason: "exited",
|
|
ThreadId: t.id,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (d *Adapter[T]) getThreadByFrameID(id int) (t *thread) {
|
|
d.threadsMu.RLock()
|
|
defer d.threadsMu.RUnlock()
|
|
|
|
for _, t := range d.threads {
|
|
if t.hasFrame(id) {
|
|
return t
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type evaluateRequest struct {
|
|
name string
|
|
c gateway.Client
|
|
ref gateway.Reference
|
|
meta map[string][]byte
|
|
inputs build.Inputs
|
|
errCh chan<- error
|
|
}
|
|
|
|
func (d *Adapter[C]) EvaluateResult(ctx context.Context, name string, c gateway.Client, res *gateway.Result, inputs build.Inputs) error {
|
|
eg, _ := errgroup.WithContext(ctx)
|
|
if res.Ref != nil {
|
|
eg.Go(func() error {
|
|
return d.evaluateRef(ctx, name, c, res.Ref, res.Metadata, inputs)
|
|
})
|
|
}
|
|
|
|
for k, ref := range res.Refs {
|
|
refName := fmt.Sprintf("%s (%s)", name, k)
|
|
eg.Go(func() error {
|
|
return d.evaluateRef(ctx, refName, c, ref, res.Metadata, inputs)
|
|
})
|
|
}
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (d *Adapter[C]) evaluateRef(ctx context.Context, name string, c gateway.Client, ref gateway.Reference, meta map[string][]byte, inputs build.Inputs) error {
|
|
errCh := make(chan error, 1)
|
|
|
|
// Send a solve request to the launch routine
|
|
// which will perform the solve in the context of the server.
|
|
ereq := &evaluateRequest{
|
|
name: name,
|
|
c: c,
|
|
ref: ref,
|
|
meta: meta,
|
|
inputs: inputs,
|
|
errCh: errCh,
|
|
}
|
|
select {
|
|
case d.evaluateReqCh <- ereq:
|
|
case <-ctx.Done():
|
|
return context.Cause(ctx)
|
|
}
|
|
|
|
// Wait for the response.
|
|
select {
|
|
case err := <-errCh:
|
|
return err
|
|
case <-ctx.Done():
|
|
return context.Cause(ctx)
|
|
}
|
|
}
|
|
|
|
func (d *Adapter[C]) Threads(c Context, req *dap.ThreadsRequest, resp *dap.ThreadsResponse) error {
|
|
d.threadsMu.RLock()
|
|
defer d.threadsMu.RUnlock()
|
|
|
|
resp.Body.Threads = []dap.Thread{}
|
|
for _, t := range d.threads {
|
|
resp.Body.Threads = append(resp.Body.Threads, dap.Thread{
|
|
Id: t.id,
|
|
Name: t.name,
|
|
})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) StackTrace(c Context, req *dap.StackTraceRequest, resp *dap.StackTraceResponse) error {
|
|
t := d.getThread(req.Arguments.ThreadId)
|
|
if t == nil {
|
|
return errors.Errorf("no such thread: %d", req.Arguments.ThreadId)
|
|
}
|
|
|
|
resp.Body.StackFrames = t.StackTrace()
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) Scopes(c Context, req *dap.ScopesRequest, resp *dap.ScopesResponse) error {
|
|
t := d.getThreadByFrameID(req.Arguments.FrameId)
|
|
if t == nil {
|
|
return errors.Errorf("no such frame id: %d", req.Arguments.FrameId)
|
|
}
|
|
|
|
resp.Body.Scopes = t.Scopes(req.Arguments.FrameId)
|
|
for i, s := range resp.Body.Scopes {
|
|
resp.Body.Scopes[i].VariablesReference = (t.id << 24) | s.VariablesReference
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) Variables(c Context, req *dap.VariablesRequest, resp *dap.VariablesResponse) error {
|
|
tid := req.Arguments.VariablesReference >> 24
|
|
|
|
t := d.getThread(tid)
|
|
if t == nil {
|
|
return errors.Errorf("no such thread: %d", tid)
|
|
}
|
|
|
|
varRef := req.Arguments.VariablesReference & ((1 << 24) - 1)
|
|
resp.Body.Variables = t.Variables(varRef)
|
|
for i, ref := range resp.Body.Variables {
|
|
if ref.VariablesReference > 0 {
|
|
resp.Body.Variables[i].VariablesReference = (tid << 24) | ref.VariablesReference
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) Source(c Context, req *dap.SourceRequest, resp *dap.SourceResponse) error {
|
|
fname := req.Arguments.Source.Path
|
|
|
|
dt, ok := d.sourceMap.Get(fname)
|
|
if !ok {
|
|
return errors.Errorf("file not found: %s", fname)
|
|
}
|
|
|
|
resp.Body.Content = string(dt)
|
|
return nil
|
|
}
|
|
|
|
func (d *Adapter[C]) evaluate(ctx context.Context, name string, c gateway.Client, res *gateway.Result, opt build.Options) error {
|
|
errCh := make(chan error, 1)
|
|
|
|
started := d.srv.Go(func(ctx Context) {
|
|
defer close(errCh)
|
|
errCh <- d.EvaluateResult(ctx, name, c, res, opt.Inputs)
|
|
})
|
|
if !started {
|
|
return context.Canceled
|
|
}
|
|
|
|
select {
|
|
case err := <-errCh:
|
|
return err
|
|
case <-ctx.Done():
|
|
return context.Cause(ctx)
|
|
}
|
|
}
|
|
|
|
func (d *Adapter[C]) Handler() build.Handler {
|
|
return build.Handler{
|
|
Evaluate: d.evaluate,
|
|
}
|
|
}
|
|
|
|
func (d *Adapter[C]) dapHandler() Handler {
|
|
return Handler{
|
|
Initialize: d.Initialize,
|
|
Launch: d.Launch,
|
|
Continue: d.Continue,
|
|
Next: d.Next,
|
|
StepIn: d.StepIn,
|
|
StepOut: d.StepOut,
|
|
SetBreakpoints: d.SetBreakpoints,
|
|
ConfigurationDone: d.ConfigurationDone,
|
|
Disconnect: d.Disconnect,
|
|
Threads: d.Threads,
|
|
StackTrace: d.StackTrace,
|
|
Scopes: d.Scopes,
|
|
Variables: d.Variables,
|
|
Evaluate: d.Evaluate,
|
|
Source: d.Source,
|
|
}
|
|
}
|
|
|
|
func (d *Adapter[C]) Out() io.Writer {
|
|
return &adapterWriter[C]{d}
|
|
}
|
|
|
|
type adapterWriter[C LaunchConfig] struct {
|
|
*Adapter[C]
|
|
}
|
|
|
|
func (d *adapterWriter[C]) Write(p []byte) (n int, err error) {
|
|
started := d.srv.Go(func(c Context) {
|
|
<-d.initialized
|
|
|
|
c.C() <- &dap.OutputEvent{
|
|
Event: dap.Event{Event: "output"},
|
|
Body: dap.OutputEventBody{
|
|
Category: "stdout",
|
|
Output: string(p),
|
|
},
|
|
}
|
|
})
|
|
|
|
if !started {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
return n, nil
|
|
}
|
|
|
|
type idPool struct {
|
|
next atomic.Int64
|
|
}
|
|
|
|
func (p *idPool) Get() int64 {
|
|
return p.next.Add(1)
|
|
}
|
|
|
|
func (p *idPool) Put(x int64) {
|
|
// noop
|
|
}
|
|
|
|
type sourceMap struct {
|
|
m sync.Map
|
|
}
|
|
|
|
func (s *sourceMap) Put(c Context, fname string, dt []byte) {
|
|
for {
|
|
old, loaded := s.m.LoadOrStore(fname, dt)
|
|
if !loaded {
|
|
c.C() <- &dap.LoadedSourceEvent{
|
|
Event: dap.Event{Event: "loadedSource"},
|
|
Body: dap.LoadedSourceEventBody{
|
|
Reason: "new",
|
|
Source: dap.Source{
|
|
Name: path.Base(fname),
|
|
Path: fname,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
if bytes.Equal(old.([]byte), dt) {
|
|
// Nothing to do.
|
|
return
|
|
}
|
|
|
|
if s.m.CompareAndSwap(fname, old, dt) {
|
|
c.C() <- &dap.LoadedSourceEvent{
|
|
Event: dap.Event{Event: "loadedSource"},
|
|
Body: dap.LoadedSourceEventBody{
|
|
Reason: "changed",
|
|
Source: dap.Source{
|
|
Name: path.Base(fname),
|
|
Path: fname,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *sourceMap) Get(fname string) ([]byte, bool) {
|
|
v, ok := s.m.Load(fname)
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
return v.([]byte), true
|
|
}
|
|
|
|
type breakpointMap struct {
|
|
byPath map[string][]dap.Breakpoint
|
|
mu sync.RWMutex
|
|
|
|
nextID atomic.Int64
|
|
}
|
|
|
|
func newBreakpointMap() *breakpointMap {
|
|
return &breakpointMap{
|
|
byPath: make(map[string][]dap.Breakpoint),
|
|
}
|
|
}
|
|
|
|
func (b *breakpointMap) Set(fname string, sbps []dap.SourceBreakpoint) (breakpoints []dap.Breakpoint) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
prev := b.byPath[fname]
|
|
for _, sbp := range sbps {
|
|
index := slices.IndexFunc(prev, func(e dap.Breakpoint) bool {
|
|
return sbp.Line >= e.Line && sbp.Line <= e.EndLine && sbp.Column >= e.Column && sbp.Column <= e.EndColumn
|
|
})
|
|
|
|
var bp dap.Breakpoint
|
|
if index >= 0 {
|
|
bp = prev[index]
|
|
} else {
|
|
bp = dap.Breakpoint{
|
|
Id: int(b.nextID.Add(1)),
|
|
Line: sbp.Line,
|
|
EndLine: sbp.Line,
|
|
Column: sbp.Column,
|
|
EndColumn: sbp.Column,
|
|
}
|
|
}
|
|
breakpoints = append(breakpoints, bp)
|
|
}
|
|
b.byPath[fname] = breakpoints
|
|
return breakpoints
|
|
}
|
|
|
|
func (b *breakpointMap) Intersect(ctx Context, src *pb.Source, ws string) map[digest.Digest]int {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
digests := make(map[digest.Digest]int)
|
|
|
|
for dgst, locs := range src.Locations {
|
|
if id := b.intersect(ctx, src, locs, ws); id > 0 {
|
|
digests[digest.Digest(dgst)] = id
|
|
}
|
|
}
|
|
return digests
|
|
}
|
|
|
|
func (b *breakpointMap) intersect(ctx Context, src *pb.Source, locs *pb.Locations, ws string) int {
|
|
overlaps := func(r *pb.Range, bp *dap.Breakpoint) bool {
|
|
return r.Start.Line <= int32(bp.Line) && r.Start.Character <= int32(bp.Column) && r.End.Line >= int32(bp.EndLine) && r.End.Character >= int32(bp.EndColumn)
|
|
}
|
|
|
|
for _, loc := range locs.Locations {
|
|
if len(loc.Ranges) == 0 {
|
|
continue
|
|
}
|
|
r := loc.Ranges[0]
|
|
|
|
info := src.Infos[loc.SourceIndex]
|
|
fname := filepath.Join(ws, info.Filename)
|
|
|
|
bps := b.byPath[fname]
|
|
if len(bps) == 0 {
|
|
// No breakpoints for this file.
|
|
continue
|
|
}
|
|
|
|
for i, bp := range bps {
|
|
if !overlaps(r, &bp) {
|
|
continue
|
|
}
|
|
|
|
if !bp.Verified {
|
|
bp.Line = int(r.Start.Line)
|
|
bp.EndLine = int(r.End.Line)
|
|
bp.Column = int(r.Start.Character)
|
|
bp.EndColumn = int(r.End.Character)
|
|
bp.Verified = true
|
|
|
|
ctx.C() <- &dap.BreakpointEvent{
|
|
Event: dap.Event{Event: "breakpoint"},
|
|
Body: dap.BreakpointEventBody{
|
|
Reason: "changed",
|
|
Breakpoint: bp,
|
|
},
|
|
}
|
|
bps[i] = bp
|
|
}
|
|
return bp.Id
|
|
}
|
|
}
|
|
return 0
|
|
}
|