Compare commits

...

11 Commits
main ... v1.5.0

23 changed files with 618 additions and 135 deletions

View File

@ -82,13 +82,13 @@ benchmark: ## Run benchmark
##@ Build
build: ## go build
CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go
build: ## go build, EXT_BUILD_TAGS=include_core would only build core package
CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -tags ${EXT_BUILD_TAGS} -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go
##@ Build(without sqlite)
build-in-badger: ## go build without sqlite
GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go
build-in-badger: ## go build without sqlite, EXT_BUILD_TAGS=include_core would only build core package
GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger,${EXT_BUILD_TAGS} -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go
##@ Images
@ -98,7 +98,7 @@ docker-build: ## Docker build -t ${REPO}:${TAG}, try: make docker-build REPO=<Yo
docker-push: ## Docker push ${REPO}:${TAG}
docker push ${REPO}:${TAG}
docker-multi-arch: ## Docker buildx, try: make docker-build REPO=<YourRepoHost>, ${TAG} generated by git
docker-multi-arch: ## Docker buildx, try: make docker-multi-arch REPO=<YourRepoHost>, ${TAG} generated by git
docker buildx build --platform linux/amd64,linux/arm64 -t ${REPO}:${TAG} . --push
LOG_DIR ?= /tmp/log ## log directory

View File

@ -1,3 +1,157 @@
# Release v1.5.0
### :star2: Features
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
- Added rocketmq sink (#530)
- Added franzKafka source (#573)
- Added kata runtime (#554)
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
- sink codec support printEvents (#448)
- Added queue in LogConfig/ClusterLogConfig (#457)
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
- Supported `copytruncate` in file source (#571)
- Added `genfiles` sub command (#471)
- Added queue in LogConfig/ClusterLogConfig queue (#457)
- Added `sortBy` field in elasticsearch source (#473)
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
- New `addHostMeta` interceptor (#474)
- Added persistence driver `badger` (#475) (#584)
- Ignore LogConfig with sidecar injection annotation (#478)
- Added `toStr` action in transformer interceptor (#482)
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
- Get loggie version with api and sub command (#496) (#508)
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
- Upgrade `kafka-go` version (#506) (#567)
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
- Pretty error when unmarshal yaml configuration failed (#539)
- Added default topic if render kafka topic failed (#550)
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
- Supported multiple topics in kafka source (#548)
- Added default index if render elasticsearch index failed (#551) (#553)
- The default `maxOpenFds` is set to 4096 (#559)
- Supported default `sinkRef` in kubernetes discovery (#555)
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
- Supported omit empty fields in Kubernetes discovery (#570)
- Optimizes `maxbytes` interceptors (#575)
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
- Added `cheanUnfinished` in cleanFiles (#580)
- Added `target` in `maxbyte` interceptor (#588)
- Added `partionKey` in franzKafka (#562)
- Added `highPrecision` in `rateLimit` interceptor (#525)
- Supported build core components of Loggie (#598)
- Added `addonMetaSchema` in file source (#599)
- Added `timestamp` and `bodyKey` in source (#600)
- Added `discoverNodesOnStart` and `discoverNodesInterval` in elasticsearch sink (#612)
### :bug: Bug Fixes
- Fixed panic when kubeEvent Series is nil (#459)
- Upgraded `automaxprocs` version to v1.5.1 (#488)
- Fixed set defaults failed in `fieldsUnderKey` (#513)
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
- Fixed grpc batch out-of-order data streams (#517)
- Fixed large line may cause oom (#529)
- Fixed duplicated batchSize in queue (#533)
- Fixed sqlite locked panic (#524)
- Fixed command can't be used in multi-arch container (#541)
- Fixed `logger listener` may cause block (#561) (#552)
- Fixed `sink concurrency` deepCopy failed (#563)
- Drop events when partial error in elasticsearch sink (#572)
- Fix convert to string type when returning byte array type in transformer interceptor (#596)
- Invalid pipelines will not stop all the pipelines running (#602)
- Fixed configuration npe when query pipelines (#604)
- Optimized elasticsearch sink request buffer (#608)
- Add pod namespace as prefix of source name when pod is matched by ClusterLogConfig (#613)
# Release v1.5.0-rc.0
### :star2: Features
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
- Added rocketmq sink (#530)
- Added franzKafka source (#573)
- Added kata runtime (#554)
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
- sink codec support printEvents (#448)
- Added queue in LogConfig/ClusterLogConfig (#457)
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
- Supported `copytruncate` in file source (#571)
- Added `genfiles` sub command (#471)
- Added queue in LogConfig/ClusterLogConfig queue (#457)
- Added `sortBy` field in elasticsearch source (#473)
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
- New `addHostMeta` interceptor (#474)
- Added persistence driver `badger` (#475) (#584)
- Ignore LogConfig with sidecar injection annotation (#478)
- Added `toStr` action in transformer interceptor (#482)
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
- Get loggie version with api and sub command (#496) (#508)
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
- Upgrade `kafka-go` version (#506) (#567)
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
- Pretty error when unmarshal yaml configuration failed (#539)
- Added default topic if render kafka topic failed (#550)
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
- Supported multiple topics in kafka source (#548)
- Added default index if render elasticsearch index failed (#551) (#553)
- The default `maxOpenFds` is set to 4096 (#559)
- Supported default `sinkRef` in kubernetes discovery (#555)
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
- Supported omit empty fields in Kubernetes discovery (#570)
- Optimizes `maxbytes` interceptors (#575)
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
- Added `cheanUnfinished` in cleanFiles (#580)
- Added `target` in `maxbyte` interceptor (#588)
- Added `partionKey` in franzKafka (#562)
- Added `highPrecision` in `rateLimit` interceptor (#525)
### :bug: Bug Fixes
- Fixed panic when kubeEvent Series is nil (#459)
- Upgraded `automaxprocs` version to v1.5.1 (#488)
- Fixed set defaults failed in `fieldsUnderKey` (#513)
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
- Fixed grpc batch out-of-order data streams (#517)
- Fixed large line may cause oom (#529)
- Fixed duplicated batchSize in queue (#533)
- Fixed sqlite locked panic (#524)
- Fixed command can't be used in multi-arch container (#541)
- Fixed `logger listener` may cause block (#561) (#552)
- Fixed `sink concurrency` deepCopy failed (#563)
- Drop events when partial error in elasticsearch sink (#572)
# Release v1.4.0
### :star2: Features
- Added Loggie dashboard feature for easier troubleshooting (#416)
- Enhanced log alerting function with more flexible log alert detection rules and added alertWebhook sink (#392)
- Added sink concurrency support for automatic adaptation based on downstream delay (#376)
- Added franzKafka sink for users who prefer the franz kafka library (#423)
- Added elasticsearch source (#345)
- Added zinc sink (#254)
- Added pulsar sink (#417)
- Added grok action to transformer interceptor (#418)
- Added split action to transformer interceptor (#411)
- Added jsonEncode action to transformer interceptor (#421)
- Added fieldsFromPath configuration to source for obtaining fields from file content (#401)
- Added fieldsRef parameter to filesource listener for obtaining key value from fields configuration and adding to metrics as label (#402)
- In transformer interceptor, added dropIfError support to drop event if action execution fails (#409)
- Added info listener which currently exposes loggie_info_stat metrics and displays version label (#410)
- Added support for customized kafka sink partition key
- Added sasl support to Kafka source (#415)
- Added https insecureSkipVerify support to loki sink (#422)
- Optimized file source for large files (#430)
- Changed default value of file source maxOpenFds to 1024 (#437)
- ContainerRuntime can now be set to none (#439)
- Upgraded to go 1.18 (#440)
- Optimize the configuration parameters to remove the redundancy generated by rendering
### :bug: Bug Fixes
- Added source fields to filesource listener (#402)
- Fixed issue of transformer copy action not copying non-string body (#420)
- Added fetching of logs file from UpperDir when rootfs collection is enabled (#414)
- Fix pipeline restart npe (#454)
- Fix create dir soft link job (#453)
# Release v1.4.0-rc.0
### :star2: Features

2
go.mod
View File

@ -61,7 +61,6 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/danieljoos/wincred v1.0.2 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect
github.com/emirpasic/gods v1.12.0 // indirect
github.com/fatih/color v1.10.0 // indirect
@ -174,6 +173,7 @@ require (
require (
github.com/apache/rocketmq-client-go/v2 v2.1.1
github.com/dustin/go-humanize v1.0.0
github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/goccy/go-yaml v1.11.0
github.com/mattn/go-sqlite3 v1.11.0

View File

@ -14,7 +14,7 @@ loggie:
sink: ~
queue: ~
pipeline: ~
normalize: ~
sys: ~
discovery:
enabled: false
@ -31,15 +31,17 @@ loggie:
defaults:
sink:
type: dev
interceptors:
- type: schema
name: global
order: 700
addMeta:
timestamp:
key: "@timestamp"
sources:
- type: file
timestampKey: "@timestamp"
bodyKey: "message"
fieldsUnderRoot: true
addonMeta: true
addonMetaSchema:
underRoot: true
fields:
filename: "${_meta.filename}"
line: "${_meta.line}"
watcher:
maxOpenFds: 6000
http:

View File

@ -130,11 +130,21 @@ func ReadPipelineConfigFromFile(path string, ignore FileIgnore) (*PipelineConfig
for _, fn := range all {
pipes := &PipelineConfig{}
unpack := cfg.UnPackFromFile(fn, pipes)
if err = unpack.Defaults().Validate().Do(); err != nil {
log.Error("invalid pipeline configs: %v, \n%s", err, unpack.Contents())
if err = unpack.Do(); err != nil {
log.Error("read pipeline configs from path %s failed: %v", path, err)
continue
}
pipecfgs.AddPipelines(pipes.Pipelines)
for _, p := range pipes.Pipelines {
pip := p
if err := cfg.NewUnpack(nil, &pip, nil).Defaults().Validate().Do(); err != nil {
// ignore invalid pipeline, but continue to read other pipelines
// invalid pipeline will check by reloader later
log.Error("pipeline: %s configs invalid: %v", p.Name, err)
continue
}
pipecfgs.AddPipelines([]pipeline.Config{pip})
}
}
return pipecfgs, nil
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package reloader
import (
"io/ioutil"
"net/http"
"os"
"path/filepath"
@ -49,7 +48,7 @@ func (r *reloader) readPipelineConfigHandler(writer http.ResponseWriter, request
continue
}
content, err := ioutil.ReadFile(m)
content, err := os.ReadFile(m)
if err != nil {
log.Warn("read config error. err: %v", err)
return

View File

@ -38,6 +38,11 @@ type Config struct {
FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"`
FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"`
Codec *codec.Config `yaml:"codec,omitempty"`
TimestampKey string `yaml:"timestampKey,omitempty"`
TimestampLocation string `yaml:"timestampLocation,omitempty"`
TimestampLayout string `yaml:"timestampLayout,omitempty"`
BodyKey string `yaml:"bodyKey,omitempty"`
}
func (c *Config) DeepCopy() *Config {
@ -82,6 +87,11 @@ func (c *Config) DeepCopy() *Config {
FieldsFromEnv: newFieldsFromEnv,
FieldsFromPath: newFieldsFromPath,
Codec: c.Codec.DeepCopy(),
TimestampKey: c.TimestampKey,
TimestampLocation: c.TimestampLocation,
TimestampLayout: c.TimestampLayout,
BodyKey: c.BodyKey,
}
return out
@ -155,6 +165,19 @@ func (c *Config) Merge(from *Config) {
} else {
c.Codec.Merge(from.Codec)
}
if c.TimestampKey == "" {
c.TimestampKey = from.TimestampKey
}
if c.TimestampLocation == "" {
c.TimestampLocation = from.TimestampLocation
}
if c.TimestampLayout == "" {
c.TimestampLayout = from.TimestampLayout
}
if c.BodyKey == "" {
c.BodyKey = from.BodyKey
}
}
func MergeSourceList(base []*Config, from []*Config) []*Config {

View File

@ -364,7 +364,7 @@ func (c *Controller) makeConfigPerSource(s *source.Config, pod *corev1.Pod, lgc
}
// change the source name, add pod.Name-containerName as prefix, since there maybe multiple containers in pod
filesrc.Name = helper.GenTypePodSourceName(pod.Name, status.Name, filesrc.Name)
filesrc.Name = helper.GenTypePodSourceName(lgc.Namespace, pod.Namespace, pod.Name, status.Name, filesrc.Name)
// inject default pod metadata
if err := c.injectTypePodFields(c.config.DynamicContainerLog, filesrc, extra, pod, lgc, status.Name); err != nil {

View File

@ -122,7 +122,12 @@ func ToPipelineInterceptor(interceptorsRaw string, interceptorRef string, interc
return interConfList, nil
}
func GenTypePodSourceName(podName string, containerName string, sourceName string) string {
func GenTypePodSourceName(lgcNamespace string, podNamespace string, podName string, containerName string, sourceName string) string {
// if lgcNamespace is empty, we use podNamespace as the first part of the source name,
// because this is the pod matched by clusterLogConfig, if the pod namespace is not added, it may cause the source to be duplicated
if lgcNamespace == "" {
return fmt.Sprintf("%s/%s/%s/%s", podNamespace, podName, containerName, sourceName)
}
return fmt.Sprintf("%s/%s/%s", podName, containerName, sourceName)
}

View File

@ -19,6 +19,7 @@ package sys
import (
"encoding/json"
"fmt"
"github.com/dustin/go-humanize"
"os"
"strconv"
"time"
@ -48,8 +49,9 @@ func makeListener() eventbus.Listener {
}
type sysData struct {
MemoryRss uint64 `json:"memRss"`
CPUPercent float64 `json:"cpuPercent"`
MemoryRss uint64 `json:"-"`
MemoryRssHumanize string `json:"memRss"`
CPUPercent float64 `json:"cpuPercent"`
}
type Config struct {
@ -122,6 +124,7 @@ func (l *Listener) getSysStat() error {
return err
}
l.data.MemoryRss = mem.RSS
l.data.MemoryRssHumanize = humanize.Bytes(mem.RSS)
cpuPer, err := l.proc.Percent(1 * time.Second)
if err != nil {

View File

@ -1,5 +1,7 @@
//go:build !include_core
/*
Copyright 2021 Loggie Authors
Copyright 2023 Loggie Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.

62
pkg/include/core.go Normal file
View File

@ -0,0 +1,62 @@
//go:build include_core
/*
Copyright 2023 Loggie Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package include
import (
_ "github.com/loggie-io/loggie/pkg/eventbus/export/prometheus"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filesource"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filewatcher"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/info"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/logalerting"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/pipeline"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/queue"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/reload"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sink"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sys"
_ "github.com/loggie-io/loggie/pkg/interceptor/addhostmeta"
_ "github.com/loggie-io/loggie/pkg/interceptor/addk8smeta"
_ "github.com/loggie-io/loggie/pkg/interceptor/limit"
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert"
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert/condition"
_ "github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
_ "github.com/loggie-io/loggie/pkg/interceptor/metric"
_ "github.com/loggie-io/loggie/pkg/interceptor/retry"
_ "github.com/loggie-io/loggie/pkg/interceptor/schema"
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer"
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/action"
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/condition"
_ "github.com/loggie-io/loggie/pkg/queue/channel"
_ "github.com/loggie-io/loggie/pkg/queue/memory"
_ "github.com/loggie-io/loggie/pkg/sink/alertwebhook"
_ "github.com/loggie-io/loggie/pkg/sink/codec/json"
_ "github.com/loggie-io/loggie/pkg/sink/codec/raw"
_ "github.com/loggie-io/loggie/pkg/sink/dev"
_ "github.com/loggie-io/loggie/pkg/sink/elasticsearch"
_ "github.com/loggie-io/loggie/pkg/sink/file"
_ "github.com/loggie-io/loggie/pkg/sink/franz"
_ "github.com/loggie-io/loggie/pkg/sink/kafka"
_ "github.com/loggie-io/loggie/pkg/source/codec/json"
_ "github.com/loggie-io/loggie/pkg/source/codec/regex"
_ "github.com/loggie-io/loggie/pkg/source/dev"
_ "github.com/loggie-io/loggie/pkg/source/elasticsearch"
_ "github.com/loggie-io/loggie/pkg/source/file"
_ "github.com/loggie-io/loggie/pkg/source/file/process"
_ "github.com/loggie-io/loggie/pkg/source/franz"
_ "github.com/loggie-io/loggie/pkg/source/kafka"
)

View File

@ -51,5 +51,10 @@ func NewEqual(args []string) (*Equal, error) {
}
func (eq *Equal) Check(e api.Event) bool {
return eq.value == eventops.Get(e, eq.field)
value := eventops.Get(e, eq.field)
if byteValue, ok := value.([]byte); ok {
value = string(byteValue)
}
return eq.value == value
}

View File

@ -25,6 +25,7 @@ import (
"github.com/loggie-io/loggie/pkg/ops/helper"
"github.com/loggie-io/loggie/pkg/util"
"github.com/rivo/tview"
"io"
"io/ioutil"
"net/http"
"net/url"
@ -234,7 +235,7 @@ func (p *LogStatusPanel) SetData() {
}
defer resp.Body.Close()
out, err := ioutil.ReadAll(resp.Body)
out, err := io.ReadAll(resp.Body)
if err != nil {
return
}

View File

@ -174,6 +174,9 @@ func diffPipes(request *http.Request) string {
func queryPipelineConfig(cfgInPath *control.PipelineConfig, pipelineQuery string, sourceQuery string) map[string]pipeline.Config {
result := make(map[string]pipeline.Config)
if cfgInPath == nil {
return result
}
setResult := func(pipData pipeline.Config, srcData ...*source.Config) {
pip, ok := result[pipData.Name]

View File

@ -18,7 +18,7 @@ package pipeline
import (
"fmt"
"io/ioutil"
timeutil "github.com/loggie-io/loggie/pkg/util/time"
"os"
"strconv"
"strings"
@ -46,10 +46,9 @@ import (
)
const (
FieldsUnderRoot = event.PrivateKeyPrefix + "FieldsUnderRoot"
FieldsUnderKey = event.PrivateKeyPrefix + "FieldsUnderKey"
fieldsFromPathMaxBytes = 1024
defaultTsLayout = "2006-01-02T15:04:05.000Z"
)
var (
@ -1032,7 +1031,7 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
}
for k, pathKey := range fieldsFromPath {
out, err := ioutil.ReadFile(pathKey)
out, err := os.ReadFile(pathKey)
if err != nil {
log.Error("init fieldsFromPath %s failed, read file %s err: %v", k, pathKey, err)
continue
@ -1054,11 +1053,10 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
// add meta fields
e.Meta().Set(event.SystemProductTimeKey, time.Now())
now := time.Now()
e.Meta().Set(event.SystemProductTimeKey, now)
e.Meta().Set(event.SystemPipelineKey, p.name)
e.Meta().Set(event.SystemSourceKey, config.Name)
e.Meta().Set(FieldsUnderRoot, config.FieldsUnderRoot)
e.Meta().Set(FieldsUnderKey, config.FieldsUnderKey)
header := e.Header()
if header == nil {
@ -1073,6 +1071,28 @@ func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
// add header source fields from file
AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey)
// remap timestamp
if config.TimestampKey != "" {
layout := config.TimestampLayout
if layout == "" {
layout = defaultTsLayout
}
// conf.Location could be "" or "UTC" or "Local"
// default "" indicate "UTC"
ts, err := timeutil.Format(now, config.TimestampLocation, layout)
if err != nil {
log.Warn("time format system product timestamp err: %+v", err)
return
}
header[config.TimestampKey] = ts
}
if config.BodyKey != "" {
header[config.BodyKey] = util.ByteToStringUnsafe(e.Body())
e.Fill(e.Meta(), header, []byte{})
}
}
func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) {

View File

@ -18,6 +18,7 @@ package json
import (
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/util"
"time"
jsoniter "github.com/json-iterator/go"
@ -73,7 +74,7 @@ func (j *Json) Encode(e api.Event) ([]byte, error) {
beatsFormat(e)
} else if len(e.Body()) != 0 {
// put body in header
header[eventer.Body] = string(e.Body())
header[eventer.Body] = util.ByteToStringUnsafe(e.Body())
}
var result []byte

View File

@ -21,7 +21,6 @@ import (
"context"
"fmt"
es "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
jsoniter "github.com/json-iterator/go"
"github.com/loggie-io/loggie/pkg/core/api"
eventer "github.com/loggie-io/loggie/pkg/core/event"
@ -30,7 +29,7 @@ import (
"github.com/loggie-io/loggie/pkg/util/pattern"
"github.com/loggie-io/loggie/pkg/util/runtime"
"github.com/pkg/errors"
"io/ioutil"
"os"
"strconv"
"strings"
)
@ -44,8 +43,6 @@ type ClientSet struct {
cli *es.Client
opType string
buf *bytes.Buffer
aux []byte
reqCount int
codec codec.Codec
@ -54,6 +51,70 @@ type ClientSet struct {
documentIdPattern *pattern.Pattern
}
type bulkRequest struct {
lines []line
}
type line struct {
meta []byte
body []byte
}
func (b *bulkRequest) body() []byte {
var buf bytes.Buffer
size := 0
for _, l := range b.lines {
size += len(l.meta) + len(l.body) + 1
}
buf.Grow(size)
for _, l := range b.lines {
buf.Write(l.meta)
buf.Write(l.body)
buf.WriteRune('\n')
}
return buf.Bytes()
}
func (b *bulkRequest) add(body []byte, action string, documentID string, index string) {
if len(body) == 0 {
return
}
var buf bytes.Buffer
var aux []byte
// { "index" : { "_index" : "test", "_id" : "1" } }
buf.WriteRune('{')
aux = strconv.AppendQuote(aux, action)
buf.Write(aux)
aux = aux[:0]
buf.WriteRune(':')
buf.WriteRune('{')
if documentID != "" {
buf.WriteString(`"_id":`)
aux = strconv.AppendQuote(aux, documentID)
buf.Write(aux)
aux = aux[:0]
}
if index != "" {
buf.WriteString(`"_index":`)
aux = strconv.AppendQuote(aux, index)
buf.Write(aux)
}
buf.WriteRune('}')
buf.WriteRune('}')
buf.WriteRune('\n')
l := line{
meta: buf.Bytes(),
body: body,
}
b.lines = append(b.lines, l)
}
type Client interface {
Bulk(ctx context.Context, batch api.Batch) error
Stop()
@ -68,7 +129,7 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
}
var ca []byte
if config.CACertPath != "" {
caData, err := ioutil.ReadFile(config.CACertPath)
caData, err := os.ReadFile(config.CACertPath)
if err != nil {
return nil, err
}
@ -76,14 +137,16 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
}
cfg := es.Config{
Addresses: config.Hosts,
DisableRetry: true,
Username: config.UserName,
Password: config.Password,
APIKey: config.APIKey,
ServiceToken: config.ServiceToken,
CompressRequestBody: config.Compress,
CACert: ca,
Addresses: config.Hosts,
DisableRetry: true,
Username: config.UserName,
Password: config.Password,
APIKey: config.APIKey,
ServiceToken: config.ServiceToken,
CompressRequestBody: config.Compress,
DiscoverNodesOnStart: config.DiscoverNodesOnStart,
DiscoverNodesInterval: config.DiscoverNodesInterval,
CACert: ca,
}
cli, err := es.NewClient(cfg)
if err != nil {
@ -94,8 +157,6 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
config: config,
cli: cli,
opType: config.OpType,
buf: bytes.NewBuffer(make([]byte, 0, config.SendBuffer)),
aux: make([]byte, 0, 512),
reqCount: 0,
codec: cod,
indexPattern: indexPattern,
@ -109,16 +170,11 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
}
bulkReq := esapi.BulkRequest{}
if c.config.Etype != "" {
bulkReq.DocumentType = c.config.Etype
}
defer func() {
c.buf.Reset()
c.reqCount = 0
}()
req := bulkRequest{}
for _, event := range batch.Events() {
headerObj := runtime.NewObject(event.Header())
@ -160,19 +216,14 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
}
c.reqCount++
if err := c.writeMeta(c.opType, docId, idx); err != nil {
return err
}
if err := c.writeBody(data); err != nil {
return err
}
req.add(data, c.opType, docId, idx)
}
if c.reqCount == 0 {
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
}
resp, err := c.cli.Bulk(bytes.NewReader(c.buf.Bytes()),
resp, err := c.cli.Bulk(bytes.NewReader(req.body()),
c.cli.Bulk.WithDocumentType(c.config.Etype),
c.cli.Bulk.WithParameters(c.config.Params),
c.cli.Bulk.WithHeader(c.config.Headers))
@ -213,39 +264,3 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
func (c *ClientSet) Stop() {
// Do nothing
}
// { "index" : { "_index" : "test", "_id" : "1" } }
func (c *ClientSet) writeMeta(action string, documentID string, index string) error {
c.buf.WriteRune('{')
c.aux = strconv.AppendQuote(c.aux, action)
c.buf.Write(c.aux)
c.aux = c.aux[:0]
c.buf.WriteRune(':')
c.buf.WriteRune('{')
if documentID != "" {
c.buf.WriteString(`"_id":`)
c.aux = strconv.AppendQuote(c.aux, documentID)
c.buf.Write(c.aux)
c.aux = c.aux[:0]
}
if index != "" {
c.buf.WriteString(`"_index":`)
c.aux = strconv.AppendQuote(c.aux, index)
c.buf.Write(c.aux)
c.aux = c.aux[:0]
}
c.buf.WriteRune('}')
c.buf.WriteRune('}')
c.buf.WriteRune('\n')
return nil
}
func (c *ClientSet) writeBody(body []byte) error {
if len(body) == 0 {
return nil
}
c.buf.Write(body)
c.buf.WriteRune('\n')
return nil
}

View File

@ -16,33 +16,29 @@ limitations under the License.
package elasticsearch
import "github.com/loggie-io/loggie/pkg/util/pattern"
import (
"github.com/loggie-io/loggie/pkg/util/pattern"
"time"
)
type Config struct {
Hosts []string `yaml:"hosts,omitempty" validate:"required"`
UserName string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
Index string `yaml:"index,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"`
Params map[string]string `yaml:"parameters,omitempty"`
IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"`
Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility
DocumentId string `yaml:"documentId,omitempty"`
Sniff *bool `yaml:"sniff,omitempty"` // deprecated
APIKey string `yaml:"apiKey,omitempty"`
ServiceToken string `yaml:"serviceToken,omitempty"`
CACertPath string `yaml:"caCertPath,omitempty"`
Compress bool `yaml:"compress,omitempty"`
Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above
OpType string `yaml:"opType,omitempty" default:"index"`
SendBuffer int `yaml:"sendBufferBytes,omitempty" default:"131072" validate:"gte=0"`
}
type TLS struct {
CAFile string `yaml:"caFile,omitempty"`
CertFile string `yaml:"certFile,omitempty"`
KeyFile string `yaml:"keyFile,omitempty"`
Hosts []string `yaml:"hosts,omitempty" validate:"required"`
UserName string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
Index string `yaml:"index,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"`
Params map[string]string `yaml:"parameters,omitempty"`
IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"`
Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility
DocumentId string `yaml:"documentId,omitempty"`
APIKey string `yaml:"apiKey,omitempty"`
ServiceToken string `yaml:"serviceToken,omitempty"`
CACertPath string `yaml:"caCertPath,omitempty"`
Compress bool `yaml:"compress,omitempty"`
Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above
OpType string `yaml:"opType,omitempty" default:"index"`
DiscoverNodesOnStart bool `yaml:"discoverNodesOnStart,omitempty"`
DiscoverNodesInterval time.Duration `yaml:"discoverNodesInterval,omitempty"`
}
type RenderIndexFail struct {

View File

@ -45,6 +45,7 @@ type CollectConfig struct {
RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated
FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
AddonMeta bool `yaml:"addonMeta,omitempty"`
AddonMetaSchema AddonMetaSchema `yaml:"addonMetaSchema,omitempty"`
excludeFilePatterns []*regexp.Regexp
Charset string `yaml:"charset,omitempty" default:"utf-8"`
@ -54,6 +55,12 @@ type CollectConfig struct {
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
}
type AddonMetaSchema struct {
Fields map[string]string `yaml:"fields,omitempty"`
FieldsUnderRoot bool `yaml:"underRoot,omitempty"`
FieldsUnderKey string `yaml:"key,omitempty" default:"state"`
}
type LineDelimiterValue struct {
Charset string `yaml:"charset,omitempty" default:"utf-8"`
LineType string `yaml:"type,omitempty" default:"auto"`

View File

@ -71,6 +71,19 @@ type Source struct {
multilineProcessor *MultiProcessor
mTask *MultiTask
codec codec.Codec
addonMetaField *AddonMetaFields
}
type AddonMetaFields struct {
Pipeline string `yaml:"pipeline,omitempty"`
Source string `yaml:"source,omitempty"`
Filename string `yaml:"filename,omitempty"`
Timestamp string `yaml:"timestamp,omitempty"`
Offset string `yaml:"offset,omitempty"`
Bytes string `yaml:"bytes,omitempty"`
Line string `yaml:"line,omitempty"`
Hostname string `yaml:"hostname,omitempty"`
}
func (s *Source) Config() interface{} {
@ -109,6 +122,10 @@ func (s *Source) Init(context api.Context) error {
s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout
}
if s.config.CollectConfig.AddonMeta {
s.addonMetaField = addonMetaFieldsConvert(s.config.CollectConfig.AddonMetaSchema.Fields)
}
// init reader chan size
s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds
@ -186,7 +203,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) {
s.productFunc = productFunc
s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig)
if s.config.CollectConfig.AddonMeta {
s.productFunc = addonMetaProductFunc(s.productFunc)
s.productFunc = addonMetaProductFunc(s.productFunc, s.addonMetaField, s.config.CollectConfig.AddonMetaSchema)
}
if s.config.ReaderConfig.MultiConfig.Active {
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
@ -238,21 +255,95 @@ func jobFieldsProductFunc(productFunc api.ProductFunc, srcCfg *source.Config) ap
}
}
func addonMetaProductFunc(productFunc api.ProductFunc) api.ProductFunc {
func addonMetaProductFunc(productFunc api.ProductFunc, fields *AddonMetaFields, schema AddonMetaSchema) api.ProductFunc {
return func(event api.Event) api.Result {
s, _ := event.Meta().Get(SystemStateKey)
state := s.(*persistence.State)
addonMeta := make(map[string]interface{})
addonMeta["pipeline"] = state.PipelineName
addonMeta["source"] = state.SourceName
addonMeta["filename"] = state.Filename
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
addonMeta["offset"] = state.Offset
addonMeta["bytes"] = state.ContentBytes
addonMeta["hostname"] = global.NodeName
event.Header()["state"] = addonMeta
// if fields is nil, use default config
if fields == nil {
addonMeta["pipeline"] = state.PipelineName
addonMeta["source"] = state.SourceName
addonMeta["filename"] = state.Filename
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
addonMeta["offset"] = state.Offset
addonMeta["bytes"] = state.ContentBytes
addonMeta["hostname"] = global.NodeName
} else {
if fields.Pipeline != "" {
addonMeta[fields.Pipeline] = state.PipelineName
}
if fields.Source != "" {
addonMeta[fields.Source] = state.SourceName
}
if fields.Filename != "" {
addonMeta[fields.Filename] = state.Filename
}
if fields.Timestamp != "" {
addonMeta[fields.Timestamp] = state.CollectTime.Local().Format(tsLayout)
}
if fields.Offset != "" {
addonMeta[fields.Offset] = state.Offset
}
if fields.Bytes != "" {
addonMeta[fields.Bytes] = state.ContentBytes
}
if fields.Line != "" {
addonMeta[fields.Line] = state.LineNumber
}
if fields.Hostname != "" {
addonMeta[fields.Hostname] = global.NodeName
}
}
if schema.FieldsUnderRoot {
for k, v := range addonMeta {
event.Header()[k] = v
}
} else {
event.Header()[schema.FieldsUnderKey] = addonMeta
}
productFunc(event)
return result.Success()
}
}
func addonMetaFieldsConvert(fields map[string]string) *AddonMetaFields {
if len(fields) == 0 {
return nil
}
amf := &AddonMetaFields{}
for k, v := range fields {
switch v {
case "${_meta.pipeline}":
amf.Pipeline = k
case "${_meta.source}":
amf.Source = k
case "${_meta.filename}":
amf.Filename = k
case "${_meta.timestamp}":
amf.Timestamp = k
case "${_meta.offset}":
amf.Offset = k
case "${_meta.bytes}":
amf.Bytes = k
case "${_meta.line}":
amf.Line = k
case "${_meta.hostname}":
amf.Hostname = k
}
}
return amf
}

View File

@ -34,7 +34,6 @@ type DbConfig struct {
File string `yaml:"file,omitempty"`
FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"`
BufferSize int `yaml:"bufferSize,omitempty" default:"2048"`
TableName string `yaml:"tableName,omitempty" default:"registry"`
CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted
CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"`
}

View File

@ -0,0 +1,85 @@
package main
import (
"fmt"
"github.com/loggie-io/loggie/pkg/control"
"github.com/loggie-io/loggie/pkg/core/cfg"
"github.com/loggie-io/loggie/pkg/core/interceptor"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/core/queue"
"github.com/loggie-io/loggie/pkg/eventbus"
"github.com/loggie-io/loggie/pkg/eventbus/export/logger"
"github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
"github.com/loggie-io/loggie/pkg/interceptor/metric"
"github.com/loggie-io/loggie/pkg/interceptor/retry"
"github.com/loggie-io/loggie/pkg/pipeline"
"github.com/loggie-io/loggie/pkg/queue/channel"
"net/http"
"time"
_ "github.com/loggie-io/loggie/pkg/include"
)
const pipe1 = `
pipelines:
- name: test
sources:
- type: dev
name: test
qps: 100
byteSize: 10240
eventsTotal: 10000
sink:
type: elasticsearch
parallelism: 3
hosts: ["localhost:9200"]
index: "loggie-benchmark-${+YYYY.MM.DD}"
`
func main() {
log.InitDefaultLogger()
pipeline.SetDefaultConfigRaw(pipeline.Config{
Queue: &queue.Config{
Type: channel.Type,
},
Interceptors: []*interceptor.Config{
{
Type: metric.Type,
},
{
Type: maxbytes.Type,
},
{
Type: retry.Type,
},
},
})
eventbus.StartAndRun(eventbus.Config{
LoggerConfig: logger.Config{
Enabled: true,
Period: 5 * time.Second,
Pretty: false,
},
ListenerConfigs: map[string]cfg.CommonCfg{
"sink": map[string]interface{}{
"period": 5 * time.Second,
},
"sys": map[string]interface{}{
"period": 5 * time.Second,
},
},
})
pipecfgs := &control.PipelineConfig{}
if err := cfg.UnPackFromRaw([]byte(pipe1), pipecfgs).Defaults().Validate().Do(); err != nil {
log.Panic("pipeline configs invalid: %v", err)
}
controller := control.NewController()
controller.Start(pipecfgs)
if err := http.ListenAndServe(fmt.Sprintf(":9196"), nil); err != nil {
log.Fatal("http listen and serve err: %v", err)
}
}