Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use 'journalctl` for getting logs #98

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions internal/provider/journalctl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package provider

import (
"bufio"
"errors"
"fmt"
"io"
"os/exec"
"strings"
"time"

nodeapi "github.com/virtual-kubelet/virtual-kubelet/node/api"
)

const journalctl = "journalctl"

func journalReader(namespace, name, container string, logOpts nodeapi.ContainerLogOpts) (io.ReadCloser, func() error, error) {
fnlog := log.
WithField("podNamespace", namespace).
WithField("podName", name).
WithField("containerName", container)

fnlog.Infof("calling for container logs with options %+v", logOpts)
cancel := func() error { return nil } // initialize as noop

unitName := strings.Join([]string{unitPrefix(namespace, name), container, "service"}, separator)

// Handle all the options.
args := []string{"-u", unitName, "--no-hostname"} // only works with -o short-xxx options.
if logOpts.Tail > 0 {
args = append(args, "-n")
args = append(args, fmt.Sprintf("%d", logOpts.Tail))
}
if logOpts.Follow {
args = append(args, "-f")
}
if !logOpts.Timestamps {
args = append(args, "-o")
args = append(args, "cat")
} else {
args = append(args, "-o")
args = append(args, "short-full") // this is _not_ the default Go timestamp output
}
if logOpts.SinceSeconds > 0 {
args = append(args, "-S")
args = append(args, fmt.Sprintf("-%ds", logOpts.SinceSeconds))
}
if !logOpts.SinceTime.IsZero() {
args = append(args, "-S")
args = append(args, logOpts.SinceTime.Format(time.RFC3339))
}
// Previous might not be possible to implement
// TODO(pires,miek) show logs from the current Pod alone https://github.com/virtual-kubelet/systemk/issues/5#issuecomment-765278538
// LimitBytes - unsure (maybe a io.CopyBuffer?)

fnlog.Debugf("getting container logs via: %q %v", journalctl, args)
cmd := exec.Command(journalctl, args...)
p, err := cmd.StdoutPipe()
if err != nil {
return nil, cancel, err
}

if err := cmd.Start(); err != nil {
return nil, cancel, err
}

cancel = func() error {
go func() {
if err := cmd.Wait(); err != nil {
fnlog.Debugf("wait for %q failed: %s", journalctl, err)
}
}()
return cmd.Process.Kill()
}

return p, cancel, nil
}

var ErrExpired = errors.New("timeout expired")

// journalFollow synchronously follows the io.Reader, writing each new journal entry to writer. The
// follow will continue until a single time.Time is received on the until channel (or it's closed).
func journalFollow(until <-chan time.Time, reader io.Reader, writer io.Writer) error {
scanner := bufio.NewScanner(reader)
bufch := make(chan []byte)
errch := make(chan error)

go func() {
for scanner.Scan() {
if err := scanner.Err(); err != nil {
errch <- err
return
}
bufch <- scanner.Bytes()
}
// When the context is Done() the 'until' channel is closed, this kicks in the defers in the GetContainerLogsHandler method.
// this cleans up the journalctl, and closes all file descripters. Scan() then stops with an error (before any reads,
// hence the above if err .. .isn't triggered). In the end this go-routine exits.
// the error here is "read |0: file already closed".
}()

for {
select {
case <-until:
return ErrExpired

case err := <-errch:
return err

case buf := <-bufch:
if _, err := writer.Write(buf); err != nil {
return err
}
if _, err := io.WriteString(writer, "\n"); err != nil {
return err
}
}
}
}
50 changes: 22 additions & 28 deletions internal/provider/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"time"

"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/gorilla/mux"
"github.com/pkg/errors"
"github.com/virtual-kubelet/virtual-kubelet/errdefs"
Expand All @@ -33,48 +32,43 @@ func (p *p) GetContainerLogsHandler(w http.ResponseWriter, r *http.Request) {

r.Header.Set("Transfer-Encoding", "chunked")

// Retrieve the actual systemd journal reader given:
// * it implements io.ReadCloser, and
// * exposes other functionality, like follow mode.
logsReader, err := p.getJournalReader(namespace, pod, container, opts)
logsReader, cancel, err := journalReader(namespace, pod, container, opts)
if err != nil {
return errors.Wrap(err, "failed to get systemd journal logs reader")
}
defer logsReader.Close()
defer cancel()

// ResponseWriter must be flushed after each write.
if _, ok := w.(writeFlusher); !ok {
log.Warn("HTTP response writer does not support flushes")
}
fw := flushOnWrite(w)

if !opts.Follow {
io.Copy(fw, logsReader)
return nil
}

// If in follow mode, follow until interrupted.
if opts.Follow {
untilTime := make(chan time.Time, 1)
errChan := make(chan error, 1)

go func(w io.Writer, errChan chan error) {
err := logsReader.Follow(untilTime, w)
if err != nil && err != sdjournal.ErrExpired {
err = errors.Wrap(err, "failed to follow systemd journal logs")
}
errChan <- err
}(fw, errChan)

// Stop following logs if request context is completed.
select {
case err := <-errChan:
return err
case <-r.Context().Done():
close(untilTime)
untilTime := make(chan time.Time, 1)
errChan := make(chan error, 1)

go func(w io.Writer, errChan chan error) {
err := journalFollow(untilTime, logsReader, w)
if err != nil && err != ErrExpired {
err = errors.Wrap(err, "failed to follow systemd journal logs")
}
return nil
errChan <- err
}(fw, errChan)

// Otherwise, just pipe the journal reader.
} else {
io.Copy(fw, logsReader)
// Stop following logs if request context is completed.
select {
case err := <-errChan:
return err
case <-r.Context().Done():
close(untilTime)
}

return nil
})(w, r)
}
Expand Down
56 changes: 0 additions & 56 deletions internal/provider/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import (
"fmt"
"os"
"strings"
"time"

"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/virtual-kubelet/systemk/internal/ospkg"
"github.com/virtual-kubelet/systemk/internal/unit"
Expand Down Expand Up @@ -290,60 +288,6 @@ func (p *p) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.P
return &pod.Status, nil
}

// getJournalReader returns the actual journal reader.
// This is useful when an io.ReadCloser is not enough, eg we need Follow().
//
// TODO(pires) show logs from the current Pod alone https://github.com/virtual-kubelet/systemk/issues/5#issuecomment-765278538
func (p *p) getJournalReader(namespace, name, container string, logOpts nodeapi.ContainerLogOpts) (*sdjournal.JournalReader, error) {
fnlog := log.
WithField("podNamespace", namespace).
WithField("podName", name).
WithField("containerName", container)

fnlog.Infof("calling for container logs with options %+v", logOpts)

unitName := strings.Join([]string{unitPrefix(namespace, name), container, "service"}, separator)
journalConfig := sdjournal.JournalReaderConfig{
Matches: []sdjournal.Match{
{
// Filter by unit.
Field: sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT,
Value: unitName,
},
},
}
if logOpts.SinceSeconds > 0 {
// Since duration must be negative so we get logs from the past.
journalConfig.Since = -time.Second * time.Duration(logOpts.SinceSeconds)
}
// By default, SinceTime is "0001-01-01 00:00:00 +0000 UTC".
if !logOpts.SinceTime.IsZero() {
journalConfig.Since = time.Since(logOpts.SinceTime)
}
if logOpts.Tail > 0 {
journalConfig.NumFromTail = uint64(logOpts.Tail)
}
// By default, timestamps are present in journal entries.
// Kubernetes defaults to not having timestamps, so we adapt.
if !logOpts.Timestamps {
journalConfig.Formatter = func(entry *sdjournal.JournalEntry) (string, error) {
msg, ok := entry.Fields[sdjournal.SD_JOURNAL_FIELD_MESSAGE]
if !ok {
return "", fmt.Errorf("no %q field present in journal entry", sdjournal.SD_JOURNAL_FIELD_MESSAGE)
}

return fmt.Sprintf("%s\n", msg), nil
}
}

journalReader, err := sdjournal.NewJournalReader(journalConfig)
if err != nil {
fnlog.Error("failed to retrieve logs from journald, for unit %q", unitName, err)
}

return journalReader, err
}

// UpdatePod is a noop,
func (p *p) UpdatePod(ctx context.Context, pod *corev1.Pod) error {
log.
Expand Down