mirror of https://github.com/knative/func.git
150 lines
3.4 KiB
Go
150 lines
3.4 KiB
Go
package knative
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"knative.dev/func/pkg/k8s"
|
|
)
|
|
|
|
// GetKServiceLogs will get logs of Knative service.
|
|
//
|
|
// It will do so by gathering logs of user-container of all affiliated pods.
|
|
// In addition, filtering on image can be done so only logs for given image are logged.
|
|
// The image must be the digest format since pods of Knative service use it.
|
|
//
|
|
// This function runs as long as the passed context is active (i.e. it is required cancel the context to stop log gathering).
|
|
func GetKServiceLogs(ctx context.Context, namespace, kServiceName, image string, since *time.Time, out io.Writer) error {
|
|
client, namespace, err := k8s.NewClientAndResolvedNamespace(namespace)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create k8s client: %w", err)
|
|
}
|
|
|
|
pods := client.CoreV1().Pods(namespace)
|
|
|
|
podListOpts := metav1.ListOptions{
|
|
Watch: true,
|
|
LabelSelector: fmt.Sprintf("serving.knative.dev/service=%s", kServiceName),
|
|
}
|
|
|
|
w, err := pods.Watch(ctx, podListOpts)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot create watch: %w", err)
|
|
}
|
|
defer w.Stop()
|
|
|
|
beingProcessed := make(map[string]bool)
|
|
var beingProcessedMu sync.Mutex
|
|
|
|
copyLogs := func(pod corev1.Pod) error {
|
|
defer func() {
|
|
beingProcessedMu.Lock()
|
|
delete(beingProcessed, pod.Name)
|
|
beingProcessedMu.Unlock()
|
|
}()
|
|
podLogOpts := corev1.PodLogOptions{
|
|
Container: "user-container",
|
|
Follow: true,
|
|
}
|
|
if since != nil {
|
|
sinceTime := metav1.NewTime(*since)
|
|
podLogOpts.SinceTime = &sinceTime
|
|
}
|
|
req := client.CoreV1().Pods(namespace).GetLogs(pod.Name, &podLogOpts)
|
|
|
|
r, e := req.Stream(ctx)
|
|
if e != nil {
|
|
return fmt.Errorf("cannot get stream: %w", e)
|
|
}
|
|
defer r.Close()
|
|
_, e = io.Copy(out, r)
|
|
if e != nil {
|
|
return fmt.Errorf("error copying logs: %w", e)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
mayReadLogs := func(pod corev1.Pod) bool {
|
|
for _, status := range pod.Status.ContainerStatuses {
|
|
if status.Name == "user-container" {
|
|
return status.State.Running != nil || status.State.Terminated != nil
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
getImage := func(pod corev1.Pod) string {
|
|
for _, ctr := range pod.Spec.Containers {
|
|
if ctr.Name == "user-container" {
|
|
return ctr.Image
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
var eg errgroup.Group
|
|
|
|
for event := range w.ResultChan() {
|
|
if event.Type == watch.Modified || event.Type == watch.Added {
|
|
pod := *event.Object.(*corev1.Pod)
|
|
|
|
beingProcessedMu.Lock()
|
|
_, loggingAlready := beingProcessed[pod.Name]
|
|
beingProcessedMu.Unlock()
|
|
|
|
if !loggingAlready && (image == "" || image == getImage(pod)) && mayReadLogs(pod) {
|
|
|
|
beingProcessedMu.Lock()
|
|
beingProcessed[pod.Name] = true
|
|
beingProcessedMu.Unlock()
|
|
|
|
eg.Go(func() error { return copyLogs(pod) })
|
|
}
|
|
}
|
|
}
|
|
|
|
err = eg.Wait()
|
|
if err != nil {
|
|
return fmt.Errorf("error while gathering logs: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type SynchronizedBuffer struct {
|
|
b bytes.Buffer
|
|
mu sync.Mutex
|
|
}
|
|
|
|
func (b *SynchronizedBuffer) String() string {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.b.String()
|
|
}
|
|
|
|
func (b *SynchronizedBuffer) Write(p []byte) (n int, err error) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.b.Write(p)
|
|
}
|
|
|
|
func (b *SynchronizedBuffer) Read(p []byte) (n int, err error) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.b.Read(p)
|
|
}
|
|
|
|
func (b *SynchronizedBuffer) Reset() {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
b.b.Reset()
|
|
}
|