mirror of https://github.com/docker/buildx.git
				
				
				
			
		
			
				
	
	
		
			144 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Go
		
	
	
	
| package session
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"net"
 | |
| 	"strings"
 | |
| 
 | |
| 	"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
 | |
| 	"github.com/moby/buildkit/identity"
 | |
| 	opentracing "github.com/opentracing/opentracing-go"
 | |
| 	"github.com/pkg/errors"
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/health"
 | |
| 	"google.golang.org/grpc/health/grpc_health_v1"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	headerSessionID        = "X-Docker-Expose-Session-Uuid"
 | |
| 	headerSessionName      = "X-Docker-Expose-Session-Name"
 | |
| 	headerSessionSharedKey = "X-Docker-Expose-Session-Sharedkey"
 | |
| 	headerSessionMethod    = "X-Docker-Expose-Session-Grpc-Method"
 | |
| )
 | |
| 
 | |
| // Dialer returns a connection that can be used by the session
 | |
| type Dialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
 | |
| 
 | |
| // Attachable defines a feature that can be exposed on a session
 | |
| type Attachable interface {
 | |
| 	Register(*grpc.Server)
 | |
| }
 | |
| 
 | |
| // Session is a long running connection between client and a daemon
 | |
| type Session struct {
 | |
| 	id         string
 | |
| 	name       string
 | |
| 	sharedKey  string
 | |
| 	ctx        context.Context
 | |
| 	cancelCtx  func()
 | |
| 	done       chan struct{}
 | |
| 	grpcServer *grpc.Server
 | |
| 	conn       net.Conn
 | |
| }
 | |
| 
 | |
| // NewSession returns a new long running session
 | |
| func NewSession(ctx context.Context, name, sharedKey string) (*Session, error) {
 | |
| 	id := identity.NewID()
 | |
| 
 | |
| 	serverOpts := []grpc.ServerOption{}
 | |
| 	if span := opentracing.SpanFromContext(ctx); span != nil {
 | |
| 		tracer := span.Tracer()
 | |
| 		serverOpts = []grpc.ServerOption{
 | |
| 			grpc.StreamInterceptor(otgrpc.OpenTracingStreamServerInterceptor(span.Tracer(), traceFilter())),
 | |
| 			grpc.UnaryInterceptor(otgrpc.OpenTracingServerInterceptor(tracer, traceFilter())),
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	s := &Session{
 | |
| 		id:         id,
 | |
| 		name:       name,
 | |
| 		sharedKey:  sharedKey,
 | |
| 		grpcServer: grpc.NewServer(serverOpts...),
 | |
| 	}
 | |
| 
 | |
| 	grpc_health_v1.RegisterHealthServer(s.grpcServer, health.NewServer())
 | |
| 
 | |
| 	return s, nil
 | |
| }
 | |
| 
 | |
| // Allow enables a given service to be reachable through the grpc session
 | |
| func (s *Session) Allow(a Attachable) {
 | |
| 	a.Register(s.grpcServer)
 | |
| }
 | |
| 
 | |
| // ID returns unique identifier for the session
 | |
| func (s *Session) ID() string {
 | |
| 	return s.id
 | |
| }
 | |
| 
 | |
| // Run activates the session
 | |
| func (s *Session) Run(ctx context.Context, dialer Dialer) error {
 | |
| 	ctx, cancel := context.WithCancel(ctx)
 | |
| 	s.cancelCtx = cancel
 | |
| 	s.done = make(chan struct{})
 | |
| 
 | |
| 	defer cancel()
 | |
| 	defer close(s.done)
 | |
| 
 | |
| 	meta := make(map[string][]string)
 | |
| 	meta[headerSessionID] = []string{s.id}
 | |
| 	meta[headerSessionName] = []string{s.name}
 | |
| 	meta[headerSessionSharedKey] = []string{s.sharedKey}
 | |
| 
 | |
| 	for name, svc := range s.grpcServer.GetServiceInfo() {
 | |
| 		for _, method := range svc.Methods {
 | |
| 			meta[headerSessionMethod] = append(meta[headerSessionMethod], MethodURL(name, method.Name))
 | |
| 		}
 | |
| 	}
 | |
| 	conn, err := dialer(ctx, "h2c", meta)
 | |
| 	if err != nil {
 | |
| 		return errors.Wrap(err, "failed to dial gRPC")
 | |
| 	}
 | |
| 	s.conn = conn
 | |
| 	serve(ctx, s.grpcServer, conn)
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // Close closes the session
 | |
| func (s *Session) Close() error {
 | |
| 	if s.cancelCtx != nil && s.done != nil {
 | |
| 		if s.conn != nil {
 | |
| 			s.conn.Close()
 | |
| 		}
 | |
| 		s.grpcServer.Stop()
 | |
| 		<-s.done
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (s *Session) context() context.Context {
 | |
| 	return s.ctx
 | |
| }
 | |
| 
 | |
| func (s *Session) closed() bool {
 | |
| 	select {
 | |
| 	case <-s.context().Done():
 | |
| 		return true
 | |
| 	default:
 | |
| 		return false
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // MethodURL returns a gRPC method URL for service and method name
 | |
| func MethodURL(s, m string) string {
 | |
| 	return "/" + s + "/" + m
 | |
| }
 | |
| 
 | |
| func traceFilter() otgrpc.Option {
 | |
| 	return otgrpc.IncludingSpans(func(parentSpanCtx opentracing.SpanContext,
 | |
| 		method string,
 | |
| 		req, resp interface{}) bool {
 | |
| 		return !strings.HasSuffix(method, "Health/Check")
 | |
| 	})
 | |
| }
 |