receiver/zipkin: consolidate server starting to receiver

Moved the server starting logic to the receiver and updated
New to
  New(address string) (*ZipkinReceiver, error)
instead of
  New(spansink.Sink) (*ZipkinReceiver, error)

This change ensures that after invoking New all the user
has to do is then just invoke:
   zr.StartTraceReception(context.Context, spansink.Sink) error

and then for cleanup
    defer zr.StopTraceReception(context.Context)

Fixes #212
This commit is contained in:
Emmanuel T Odeke 2018-11-19 12:12:41 -07:00
parent 38c9550146
commit 7f5fc67287
No known key found for this signature in database
GPG Key ID: 410DDC670CF9BFD7
5 changed files with 91 additions and 55 deletions

View File

@ -178,25 +178,16 @@ func runJaegerReceiver(collectorThriftPort, collectorHTTPPort int, sr receiver.T
}
func runZipkinReceiver(addr string, sr receiver.TraceReceiverSink) (doneFn func() error, err error) {
zi, err := zipkinreceiver.New(sr)
zi, err := zipkinreceiver.New(addr)
if err != nil {
return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err)
}
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("Cannot bind Zipkin receiver to address %q: %v", addr, err)
if err := zi.StartTraceReception(context.Background(), sr); err != nil {
return nil, fmt.Errorf("Cannot start Zipkin receiver with address %q: %v", addr, err)
}
doneFn = func() error {
return zi.StopTraceReception(context.Background())
}
mux := http.NewServeMux()
mux.Handle(zipkinRoute, zi)
go func() {
fullAddr := addr + zipkinRoute
log.Printf("Running the Zipkin receiver at %q", fullAddr)
if err := http.Serve(ln, mux); err != nil {
log.Fatalf("Failed to serve the Zipkin receiver: %v", err)
}
}()
doneFn = ln.Close
return doneFn, nil
}

View File

@ -16,6 +16,7 @@ package exporterparser_test
import (
"bytes"
"context"
"io"
"io/ioutil"
"net/http"
@ -67,14 +68,18 @@ exporters:
t.Errorf("Number of trace exporters: Got %d Want %d", g, w)
}
zexp := exporter.MultiTraceExporters(tes...)
// Run the Zipkin receiver to "receive spans upload from a client application"
zi, err := zipkinreceiver.New(zexp)
zi, err := zipkinreceiver.New(":0")
if err != nil {
t.Fatalf("Failed to create a new Zipkin receiver: %v", err)
}
zexp := exporter.MultiTraceExporters(tes...)
if err := zi.StartTraceReception(context.Background(), zexp); err != nil {
t.Fatalf("Failed to start trace reception: %v", err)
}
defer zi.StopTraceReception(context.Background())
// Let the receiver receive "uploaded Zipkin spans from a Java client application"
req, _ := http.NewRequest("POST", "https://tld.org/", strings.NewReader(zipkinSpansJSONJavaLibrary))
responseWriter := httptest.NewRecorder()

View File

@ -17,9 +17,8 @@
package zipkinreceiver
import (
"context"
"fmt"
"net"
"net/http"
"strconv"
"github.com/spf13/viper"
@ -37,30 +36,21 @@ func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (
return nil, err
}
// TODO: (@pjanotti) when Zipkin implementation of StartTraceReceiver is working, change this code (this temporarily).
ss := processor.WrapWithSpanSink("zipkin", spanProc)
addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10)
zi, err := zr.New(ss)
zi, err := zr.New(addr)
if err != nil {
return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err)
}
ss := processor.WrapWithSpanSink("zipkin", spanProc)
ln, err := net.Listen("tcp", addr)
if err != nil {
return nil, fmt.Errorf("Cannot bind Zipkin receiver to address %q: %v", addr, err)
if err := zi.StartTraceReception(context.Background(), ss); err != nil {
return nil, fmt.Errorf("Cannot start Zipkin receiver to address %q: %v", addr, err)
}
mux := http.NewServeMux()
mux.Handle("/api/v2/spans", zi)
go func() {
if err := http.Serve(ln, mux); err != nil {
logger.Fatal("Failed to serve the Zipkin receiver: %v", zap.Error(err))
}
}()
logger.Info("Zipkin receiver is running.", zap.Int("port", rOpts.Port))
doneFn := func() {
ln.Close()
zi.StopTraceReception(context.Background())
}
return doneFn, nil
}

View File

@ -27,6 +27,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"go.opencensus.io/trace"
@ -43,24 +44,72 @@ import (
// ZipkinReceiver type is used to handle spans received in the Zipkin format.
type ZipkinReceiver struct {
// mu protects the fields of this struct
mu sync.Mutex
// addr is the address onto which the HTTP server will be bound
addr string
spanSink receiver.TraceReceiverSink
startOnce sync.Once
stopOnce sync.Once
server *http.Server
}
var _ receiver.TraceReceiver = (*ZipkinReceiver)(nil)
var _ http.Handler = (*ZipkinReceiver)(nil)
// New creates a new zipkingreceiver.ZipkinReceiver reference.
func New(sr receiver.TraceReceiverSink) (*ZipkinReceiver, error) {
return &ZipkinReceiver{spanSink: sr}, nil
// New creates a new zipkinreceiver.ZipkinReceiver reference.
func New(address string) (*ZipkinReceiver, error) {
zr := &ZipkinReceiver{addr: address}
return zr, nil
}
// StartTraceReception tells the receiver to start its processing.
func (zi *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink receiver.TraceReceiverSink) error {
zi.spanSink = spanSink
return nil
const defaultAddress = ":9411"
func (zr *ZipkinReceiver) address() string {
addr := zr.addr
if addr == "" {
addr = defaultAddress
}
return addr
}
func (zi *ZipkinReceiver) parseAndConvertToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
var (
errAlreadyStarted = errors.New("already started")
errAlreadyStopped = errors.New("already stopped")
)
// StartTraceReception spins up the receiver's HTTP server and makes the receiver start its processing.
func (zr *ZipkinReceiver) StartTraceReception(ctx context.Context, spanSink receiver.TraceReceiverSink) error {
zr.mu.Lock()
defer zr.mu.Unlock()
var err = errAlreadyStarted
zr.startOnce.Do(func() {
ln, lerr := net.Listen("tcp", zr.address())
if lerr != nil {
err = lerr
return
}
server := &http.Server{Handler: zr}
go func() {
_ = server.Serve(ln)
}()
zr.spanSink = spanSink
zr.server = server
err = nil
})
return err
}
func (zr *ZipkinReceiver) parseAndConvertToTraceSpans(blob []byte, hdr http.Header) (reqs []*agenttracepb.ExportTraceServiceRequest, err error) {
var zipkinSpans []*zipkinmodel.SpanModel
// This flag's reference is from:
@ -74,7 +123,7 @@ func (zi *ZipkinReceiver) parseAndConvertToTraceSpans(blob []byte, hdr http.Head
zipkinSpans, err = zipkinproto.ParseSpans(blob, debugWasSet)
default: // By default, we'll assume using JSON
zipkinSpans, err = zi.deserializeFromJSON(blob, debugWasSet)
zipkinSpans, err = zr.deserializeFromJSON(blob, debugWasSet)
}
if err != nil {
@ -116,7 +165,7 @@ func (zi *ZipkinReceiver) parseAndConvertToTraceSpans(blob []byte, hdr http.Head
return reqs, nil
}
func (zi *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte, debugWasSet bool) (zs []*zipkinmodel.SpanModel, err error) {
func (zr *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte, debugWasSet bool) (zs []*zipkinmodel.SpanModel, err error) {
if err = json.Unmarshal(jsonBlob, &zs); err != nil {
return nil, err
}
@ -124,9 +173,14 @@ func (zi *ZipkinReceiver) deserializeFromJSON(jsonBlob []byte, debugWasSet bool)
}
// StopTraceReception tells the receiver that should stop reception,
// giving it a chance to perform any necessary clean-up.
func (zi *ZipkinReceiver) StopTraceReception(ctx context.Context) error {
return nil
// giving it a chance to perform any necessary clean-up and shutting down
// its HTTP server.
func (zr *ZipkinReceiver) StopTraceReception(ctx context.Context) error {
var err = errAlreadyStopped
zr.stopOnce.Do(func() {
err = zr.server.Close()
})
return err
}
// processBodyIfNecessary checks the "Content-Encoding" HTTP header and if
@ -167,7 +221,7 @@ func zlibUncompressedbody(r io.Reader) io.Reader {
// The ZipkinReceiver receives spans from endpoint /api/v2 as JSON,
// unmarshals them and sends them along to the spansink.Sink.
func (zi *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (zr *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Trace this method
ctx, span := trace.StartSpan(context.Background(), "ZipkinReceiver.Export")
defer span.End()
@ -182,7 +236,7 @@ func (zi *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_ = c.Close()
}
_ = r.Body.Close()
ereqs, err := zi.parseAndConvertToTraceSpans(slurp, r.Header)
ereqs, err := zr.parseAndConvertToTraceSpans(slurp, r.Header)
if err != nil {
span.SetStatus(trace.Status{
Code: trace.StatusCodeInvalidArgument,
@ -195,7 +249,7 @@ func (zi *ZipkinReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) {
spansMetricsFn := internal.NewReceivedSpansRecorderStreaming(ctx, "zipkin")
// Now translate them into tracepb.Span
for _, ereq := range ereqs {
zi.spanSink.ReceiveSpans(ctx, ereq.Node, ereq.Spans...)
zr.spanSink.ReceiveSpans(ctx, ereq.Node, ereq.Spans...)
// We MUST unconditionally record metrics from this reception.
spansMetricsFn(ereq.Node, ereq.Spans)
}

View File

@ -156,11 +156,7 @@ func TestConversionRoundtrip(t *testing.T) {
}
}]`)
sink := new(noopSink)
zi, err := New(sink)
if err != nil {
t.Fatalf("Failed to create the Zipkin receiver: %v", err)
}
zi := &ZipkinReceiver{spanSink: new(noopSink)}
ereqs, err := zi.parseAndConvertToTraceSpans(receiverInputJSON, nil)
if err != nil {
t.Fatalf("Failed to parse and convert receiver JSON: %v", err)