Compare commits

...

11 Commits
main ... v1.5.0

23 changed files with 618 additions and 135 deletions

View File

@ -82,13 +82,13 @@ benchmark: ## Run benchmark
##@ Build ##@ Build
build: ## go build build: ## go build, EXT_BUILD_TAGS=include_core would only build core package
CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -tags ${EXT_BUILD_TAGS} -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go
##@ Build(without sqlite) ##@ Build(without sqlite)
build-in-badger: ## go build without sqlite build-in-badger: ## go build without sqlite, EXT_BUILD_TAGS=include_core would only build core package
GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger,${EXT_BUILD_TAGS} -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go
##@ Images ##@ Images
@ -98,7 +98,7 @@ docker-build: ## Docker build -t ${REPO}:${TAG}, try: make docker-build REPO=<Yo
docker-push: ## Docker push ${REPO}:${TAG} docker-push: ## Docker push ${REPO}:${TAG}
docker push ${REPO}:${TAG} docker push ${REPO}:${TAG}
docker-multi-arch: ## Docker buildx, try: make docker-build REPO=<YourRepoHost>, ${TAG} generated by git docker-multi-arch: ## Docker buildx, try: make docker-multi-arch REPO=<YourRepoHost>, ${TAG} generated by git
docker buildx build --platform linux/amd64,linux/arm64 -t ${REPO}:${TAG} . --push docker buildx build --platform linux/amd64,linux/arm64 -t ${REPO}:${TAG} . --push
LOG_DIR ?= /tmp/log ## log directory LOG_DIR ?= /tmp/log ## log directory

View File

@ -1,3 +1,157 @@
# Release v1.5.0
### :star2: Features
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
- Added rocketmq sink (#530)
- Added franzKafka source (#573)
- Added kata runtime (#554)
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
- sink codec support printEvents (#448)
- Added queue in LogConfig/ClusterLogConfig (#457)
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
- Supported `copytruncate` in file source (#571)
- Added `genfiles` sub command (#471)
- Added queue in LogConfig/ClusterLogConfig queue (#457)
- Added `sortBy` field in elasticsearch source (#473)
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
- New `addHostMeta` interceptor (#474)
- Added persistence driver `badger` (#475) (#584)
- Ignore LogConfig with sidecar injection annotation (#478)
- Added `toStr` action in transformer interceptor (#482)
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
- Get loggie version with api and sub command (#496) (#508)
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
- Upgrade `kafka-go` version (#506) (#567)
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
- Pretty error when unmarshal yaml configuration failed (#539)
- Added default topic if render kafka topic failed (#550)
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
- Supported multiple topics in kafka source (#548)
- Added default index if render elasticsearch index failed (#551) (#553)
- The default `maxOpenFds` is set to 4096 (#559)
- Supported default `sinkRef` in kubernetes discovery (#555)
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
- Supported omit empty fields in Kubernetes discovery (#570)
- Optimizes `maxbytes` interceptors (#575)
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
- Added `cheanUnfinished` in cleanFiles (#580)
- Added `target` in `maxbyte` interceptor (#588)
- Added `partionKey` in franzKafka (#562)
- Added `highPrecision` in `rateLimit` interceptor (#525)
- Supported build core components of Loggie (#598)
- Added `addonMetaSchema` in file source (#599)
- Added `timestamp` and `bodyKey` in source (#600)
- Added `discoverNodesOnStart` and `discoverNodesInterval` in elasticsearch sink (#612)
### :bug: Bug Fixes
- Fixed panic when kubeEvent Series is nil (#459)
- Upgraded `automaxprocs` version to v1.5.1 (#488)
- Fixed set defaults failed in `fieldsUnderKey` (#513)
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
- Fixed grpc batch out-of-order data streams (#517)
- Fixed large line may cause oom (#529)
- Fixed duplicated batchSize in queue (#533)
- Fixed sqlite locked panic (#524)
- Fixed command can't be used in multi-arch container (#541)
- Fixed `logger listener` may cause block (#561) (#552)
- Fixed `sink concurrency` deepCopy failed (#563)
- Drop events when partial error in elasticsearch sink (#572)
- Fix convert to string type when returning byte array type in transformer interceptor (#596)
- Invalid pipelines will not stop all the pipelines running (#602)
- Fixed configuration npe when query pipelines (#604)
- Optimized elasticsearch sink request buffer (#608)
- Add pod namespace as prefix of source name when pod is matched by ClusterLogConfig (#613)
# Release v1.5.0-rc.0
### :star2: Features
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
- Added rocketmq sink (#530)
- Added franzKafka source (#573)
- Added kata runtime (#554)
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
- sink codec support printEvents (#448)
- Added queue in LogConfig/ClusterLogConfig (#457)
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
- Supported `copytruncate` in file source (#571)
- Added `genfiles` sub command (#471)
- Added queue in LogConfig/ClusterLogConfig queue (#457)
- Added `sortBy` field in elasticsearch source (#473)
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
- New `addHostMeta` interceptor (#474)
- Added persistence driver `badger` (#475) (#584)
- Ignore LogConfig with sidecar injection annotation (#478)
- Added `toStr` action in transformer interceptor (#482)
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
- Get loggie version with api and sub command (#496) (#508)
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
- Upgrade `kafka-go` version (#506) (#567)
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
- Pretty error when unmarshal yaml configuration failed (#539)
- Added default topic if render kafka topic failed (#550)
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
- Supported multiple topics in kafka source (#548)
- Added default index if render elasticsearch index failed (#551) (#553)
- The default `maxOpenFds` is set to 4096 (#559)
- Supported default `sinkRef` in kubernetes discovery (#555)
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
- Supported omit empty fields in Kubernetes discovery (#570)
- Optimizes `maxbytes` interceptors (#575)
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
- Added `cheanUnfinished` in cleanFiles (#580)
- Added `target` in `maxbyte` interceptor (#588)
- Added `partionKey` in franzKafka (#562)
- Added `highPrecision` in `rateLimit` interceptor (#525)
### :bug: Bug Fixes
- Fixed panic when kubeEvent Series is nil (#459)
- Upgraded `automaxprocs` version to v1.5.1 (#488)
- Fixed set defaults failed in `fieldsUnderKey` (#513)
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
- Fixed grpc batch out-of-order data streams (#517)
- Fixed large line may cause oom (#529)
- Fixed duplicated batchSize in queue (#533)
- Fixed sqlite locked panic (#524)
- Fixed command can't be used in multi-arch container (#541)
- Fixed `logger listener` may cause block (#561) (#552)
- Fixed `sink concurrency` deepCopy failed (#563)
- Drop events when partial error in elasticsearch sink (#572)
# Release v1.4.0
### :star2: Features
- Added Loggie dashboard feature for easier troubleshooting (#416)
- Enhanced log alerting function with more flexible log alert detection rules and added alertWebhook sink (#392)
- Added sink concurrency support for automatic adaptation based on downstream delay (#376)
- Added franzKafka sink for users who prefer the franz kafka library (#423)
- Added elasticsearch source (#345)
- Added zinc sink (#254)
- Added pulsar sink (#417)
- Added grok action to transformer interceptor (#418)
- Added split action to transformer interceptor (#411)
- Added jsonEncode action to transformer interceptor (#421)
- Added fieldsFromPath configuration to source for obtaining fields from file content (#401)
- Added fieldsRef parameter to filesource listener for obtaining key value from fields configuration and adding to metrics as label (#402)
- In transformer interceptor, added dropIfError support to drop event if action execution fails (#409)
- Added info listener which currently exposes loggie_info_stat metrics and displays version label (#410)
- Added support for customized kafka sink partition key
- Added sasl support to Kafka source (#415)
- Added https insecureSkipVerify support to loki sink (#422)
- Optimized file source for large files (#430)
- Changed default value of file source maxOpenFds to 1024 (#437)
- ContainerRuntime can now be set to none (#439)
- Upgraded to go 1.18 (#440)
- Optimize the configuration parameters to remove the redundancy generated by rendering
### :bug: Bug Fixes
- Added source fields to filesource listener (#402)
- Fixed issue of transformer copy action not copying non-string body (#420)
- Added fetching of logs file from UpperDir when rootfs collection is enabled (#414)
- Fix pipeline restart npe (#454)
- Fix create dir soft link job (#453)
# Release v1.4.0-rc.0 # Release v1.4.0-rc.0
### :star2: Features ### :star2: Features

2
go.mod
View File

@ -61,7 +61,6 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect github.com/cespare/xxhash v1.1.0 // indirect
github.com/danieljoos/wincred v1.0.2 // indirect github.com/danieljoos/wincred v1.0.2 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect
github.com/emirpasic/gods v1.12.0 // indirect github.com/emirpasic/gods v1.12.0 // indirect
github.com/fatih/color v1.10.0 // indirect github.com/fatih/color v1.10.0 // indirect
@ -174,6 +173,7 @@ require (
require ( require (
github.com/apache/rocketmq-client-go/v2 v2.1.1 github.com/apache/rocketmq-client-go/v2 v2.1.1
github.com/dustin/go-humanize v1.0.0
github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-elasticsearch/v7 v7.17.10
github.com/goccy/go-yaml v1.11.0 github.com/goccy/go-yaml v1.11.0
github.com/mattn/go-sqlite3 v1.11.0 github.com/mattn/go-sqlite3 v1.11.0

View File

@ -14,7 +14,7 @@ loggie:
sink: ~ sink: ~
queue: ~ queue: ~
pipeline: ~ pipeline: ~
normalize: ~ sys: ~
discovery: discovery:
enabled: false enabled: false
@ -31,15 +31,17 @@ loggie:
defaults: defaults:
sink: sink:
type: dev type: dev
interceptors:
- type: schema
name: global
order: 700
addMeta:
timestamp:
key: "@timestamp"
sources: sources:
- type: file - type: file
timestampKey: "@timestamp"
bodyKey: "message"
fieldsUnderRoot: true
addonMeta: true
addonMetaSchema:
underRoot: true
fields:
filename: "${_meta.filename}"
line: "${_meta.line}"
watcher: watcher:
maxOpenFds: 6000 maxOpenFds: 6000
http: http:

View File

@ -130,11 +130,21 @@ func ReadPipelineConfigFromFile(path string, ignore FileIgnore) (*PipelineConfig
for _, fn := range all { for _, fn := range all {
pipes := &PipelineConfig{} pipes := &PipelineConfig{}
unpack := cfg.UnPackFromFile(fn, pipes) unpack := cfg.UnPackFromFile(fn, pipes)
if err = unpack.Defaults().Validate().Do(); err != nil { if err = unpack.Do(); err != nil {
log.Error("invalid pipeline configs: %v, \n%s", err, unpack.Contents()) log.Error("read pipeline configs from path %s failed: %v", path, err)
continue continue
} }
pipecfgs.AddPipelines(pipes.Pipelines)
for _, p := range pipes.Pipelines {
pip := p
if err := cfg.NewUnpack(nil, &pip, nil).Defaults().Validate().Do(); err != nil {
// ignore invalid pipeline, but continue to read other pipelines
// invalid pipeline will check by reloader later
log.Error("pipeline: %s configs invalid: %v", p.Name, err)
continue
}
pipecfgs.AddPipelines([]pipeline.Config{pip})
}
} }
return pipecfgs, nil return pipecfgs, nil
} }

View File

@ -17,7 +17,6 @@ limitations under the License.
package reloader package reloader
import ( import (
"io/ioutil"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@ -49,7 +48,7 @@ func (r *reloader) readPipelineConfigHandler(writer http.ResponseWriter, request
continue continue
} }
content, err := ioutil.ReadFile(m) content, err := os.ReadFile(m)
if err != nil { if err != nil {
log.Warn("read config error. err: %v", err) log.Warn("read config error. err: %v", err)
return return

View File

@ -38,6 +38,11 @@ type Config struct {
FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"` FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"`
FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"` FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"`
Codec *codec.Config `yaml:"codec,omitempty"` Codec *codec.Config `yaml:"codec,omitempty"`
TimestampKey string `yaml:"timestampKey,omitempty"`
TimestampLocation string `yaml:"timestampLocation,omitempty"`
TimestampLayout string `yaml:"timestampLayout,omitempty"`
BodyKey string `yaml:"bodyKey,omitempty"`
} }
func (c *Config) DeepCopy() *Config { func (c *Config) DeepCopy() *Config {
@ -82,6 +87,11 @@ func (c *Config) DeepCopy() *Config {
FieldsFromEnv: newFieldsFromEnv, FieldsFromEnv: newFieldsFromEnv,
FieldsFromPath: newFieldsFromPath, FieldsFromPath: newFieldsFromPath,
Codec: c.Codec.DeepCopy(), Codec: c.Codec.DeepCopy(),
TimestampKey: c.TimestampKey,
TimestampLocation: c.TimestampLocation,
TimestampLayout: c.TimestampLayout,
BodyKey: c.BodyKey,
} }
return out return out
@ -155,6 +165,19 @@ func (c *Config) Merge(from *Config) {
} else { } else {
c.Codec.Merge(from.Codec) c.Codec.Merge(from.Codec)
} }
if c.TimestampKey == "" {
c.TimestampKey = from.TimestampKey
}
if c.TimestampLocation == "" {
c.TimestampLocation = from.TimestampLocation
}
if c.TimestampLayout == "" {
c.TimestampLayout = from.TimestampLayout
}
if c.BodyKey == "" {
c.BodyKey = from.BodyKey
}
} }
func MergeSourceList(base []*Config, from []*Config) []*Config { func MergeSourceList(base []*Config, from []*Config) []*Config {

View File

@ -364,7 +364,7 @@ func (c *Controller) makeConfigPerSource(s *source.Config, pod *corev1.Pod, lgc
} }
// change the source name, add pod.Name-containerName as prefix, since there maybe multiple containers in pod // change the source name, add pod.Name-containerName as prefix, since there maybe multiple containers in pod
filesrc.Name = helper.GenTypePodSourceName(pod.Name, status.Name, filesrc.Name) filesrc.Name = helper.GenTypePodSourceName(lgc.Namespace, pod.Namespace, pod.Name, status.Name, filesrc.Name)
// inject default pod metadata // inject default pod metadata
if err := c.injectTypePodFields(c.config.DynamicContainerLog, filesrc, extra, pod, lgc, status.Name); err != nil { if err := c.injectTypePodFields(c.config.DynamicContainerLog, filesrc, extra, pod, lgc, status.Name); err != nil {

View File

@ -122,7 +122,12 @@ func ToPipelineInterceptor(interceptorsRaw string, interceptorRef string, interc
return interConfList, nil return interConfList, nil
} }
func GenTypePodSourceName(podName string, containerName string, sourceName string) string { func GenTypePodSourceName(lgcNamespace string, podNamespace string, podName string, containerName string, sourceName string) string {
// if lgcNamespace is empty, we use podNamespace as the first part of the source name,
// because this is the pod matched by clusterLogConfig, if the pod namespace is not added, it may cause the source to be duplicated
if lgcNamespace == "" {
return fmt.Sprintf("%s/%s/%s/%s", podNamespace, podName, containerName, sourceName)
}
return fmt.Sprintf("%s/%s/%s", podName, containerName, sourceName) return fmt.Sprintf("%s/%s/%s", podName, containerName, sourceName)
} }

View File

@ -19,6 +19,7 @@ package sys
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/dustin/go-humanize"
"os" "os"
"strconv" "strconv"
"time" "time"
@ -48,8 +49,9 @@ func makeListener() eventbus.Listener {
} }
type sysData struct { type sysData struct {
MemoryRss uint64 `json:"memRss"` MemoryRss uint64 `json:"-"`
CPUPercent float64 `json:"cpuPercent"` MemoryRssHumanize string `json:"memRss"`
CPUPercent float64 `json:"cpuPercent"`
} }
type Config struct { type Config struct {
@ -122,6 +124,7 @@ func (l *Listener) getSysStat() error {
return err return err
} }
l.data.MemoryRss = mem.RSS l.data.MemoryRss = mem.RSS
l.data.MemoryRssHumanize = humanize.Bytes(mem.RSS)
cpuPer, err := l.proc.Percent(1 * time.Second) cpuPer, err := l.proc.Percent(1 * time.Second)
if err != nil { if err != nil {

View File

@ -1,5 +1,7 @@
//go:build !include_core
/* /*
Copyright 2021 Loggie Authors Copyright 2023 Loggie Authors
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.

62
pkg/include/core.go Normal file
View File

@ -0,0 +1,62 @@
//go:build include_core
/*
Copyright 2023 Loggie Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package include
import (
_ "github.com/loggie-io/loggie/pkg/eventbus/export/prometheus"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filesource"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filewatcher"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/info"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/logalerting"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/pipeline"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/queue"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/reload"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sink"
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sys"
_ "github.com/loggie-io/loggie/pkg/interceptor/addhostmeta"
_ "github.com/loggie-io/loggie/pkg/interceptor/addk8smeta"
_ "github.com/loggie-io/loggie/pkg/interceptor/limit"
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert"
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert/condition"
_ "github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
_ "github.com/loggie-io/loggie/pkg/interceptor/metric"
_ "github.com/loggie-io/loggie/pkg/interceptor/retry"
_ "github.com/loggie-io/loggie/pkg/interceptor/schema"
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer"
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/action"
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/condition"
_ "github.com/loggie-io/loggie/pkg/queue/channel"
_ "github.com/loggie-io/loggie/pkg/queue/memory"
_ "github.com/loggie-io/loggie/pkg/sink/alertwebhook"
_ "github.com/loggie-io/loggie/pkg/sink/codec/json"
_ "github.com/loggie-io/loggie/pkg/sink/codec/raw"
_ "github.com/loggie-io/loggie/pkg/sink/dev"
_ "github.com/loggie-io/loggie/pkg/sink/elasticsearch"
_ "github.com/loggie-io/loggie/pkg/sink/file"
_ "github.com/loggie-io/loggie/pkg/sink/franz"
_ "github.com/loggie-io/loggie/pkg/sink/kafka"
_ "github.com/loggie-io/loggie/pkg/source/codec/json"
_ "github.com/loggie-io/loggie/pkg/source/codec/regex"
_ "github.com/loggie-io/loggie/pkg/source/dev"
_ "github.com/loggie-io/loggie/pkg/source/elasticsearch"
_ "github.com/loggie-io/loggie/pkg/source/file"
_ "github.com/loggie-io/loggie/pkg/source/file/process"
_ "github.com/loggie-io/loggie/pkg/source/franz"
_ "github.com/loggie-io/loggie/pkg/source/kafka"
)

View File

@ -51,5 +51,10 @@ func NewEqual(args []string) (*Equal, error) {
} }
func (eq *Equal) Check(e api.Event) bool { func (eq *Equal) Check(e api.Event) bool {
return eq.value == eventops.Get(e, eq.field) value := eventops.Get(e, eq.field)
if byteValue, ok := value.([]byte); ok {
value = string(byteValue)
}
return eq.value == value
} }

View File

@ -25,6 +25,7 @@ import (
"github.com/loggie-io/loggie/pkg/ops/helper" "github.com/loggie-io/loggie/pkg/ops/helper"
"github.com/loggie-io/loggie/pkg/util" "github.com/loggie-io/loggie/pkg/util"
"github.com/rivo/tview" "github.com/rivo/tview"
"io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
@ -234,7 +235,7 @@ func (p *LogStatusPanel) SetData() {
} }
defer resp.Body.Close() defer resp.Body.Close()
out, err := ioutil.ReadAll(resp.Body) out, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return return
} }

View File

@ -174,6 +174,9 @@ func diffPipes(request *http.Request) string {
func queryPipelineConfig(cfgInPath *control.PipelineConfig, pipelineQuery string, sourceQuery string) map[string]pipeline.Config { func queryPipelineConfig(cfgInPath *control.PipelineConfig, pipelineQuery string, sourceQuery string) map[string]pipeline.Config {
result := make(map[string]pipeline.Config) result := make(map[string]pipeline.Config)
if cfgInPath == nil {
return result
}
setResult := func(pipData pipeline.Config, srcData ...*source.Config) { setResult := func(pipData pipeline.Config, srcData ...*source.Config) {
pip, ok := result[pipData.Name] pip, ok := result[pipData.Name]

View File

@ -18,7 +18,7 @@ package pipeline
import ( import (
"fmt" "fmt"
"io/ioutil" timeutil "github.com/loggie-io/loggie/pkg/util/time"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -46,10 +46,9 @@ import (
) )
const ( const (
FieldsUnderRoot = event.PrivateKeyPrefix + "FieldsUnderRoot"
FieldsUnderKey = event.PrivateKeyPrefix + "FieldsUnderKey"
fieldsFromPathMaxBytes = 1024 fieldsFromPathMaxBytes = 1024
defaultTsLayout = "2006-01-02T15:04:05.000Z"
) )
var ( var (
@ -1032,7 +1031,7 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
} }
for k, pathKey := range fieldsFromPath { for k, pathKey := range fieldsFromPath {
out, err := ioutil.ReadFile(pathKey) out, err := os.ReadFile(pathKey)
if err != nil { if err != nil {
log.Error("init fieldsFromPath %s failed, read file %s err: %v", k, pathKey, err) log.Error("init fieldsFromPath %s failed, read file %s err: %v", k, pathKey, err)
continue continue
@ -1054,11 +1053,10 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) { func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
// add meta fields // add meta fields
e.Meta().Set(event.SystemProductTimeKey, time.Now()) now := time.Now()
e.Meta().Set(event.SystemProductTimeKey, now)
e.Meta().Set(event.SystemPipelineKey, p.name) e.Meta().Set(event.SystemPipelineKey, p.name)
e.Meta().Set(event.SystemSourceKey, config.Name) e.Meta().Set(event.SystemSourceKey, config.Name)
e.Meta().Set(FieldsUnderRoot, config.FieldsUnderRoot)
e.Meta().Set(FieldsUnderKey, config.FieldsUnderKey)
header := e.Header() header := e.Header()
if header == nil { if header == nil {
@ -1073,6 +1071,28 @@ func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
// add header source fields from file // add header source fields from file
AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey) AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey)
// remap timestamp
if config.TimestampKey != "" {
layout := config.TimestampLayout
if layout == "" {
layout = defaultTsLayout
}
// conf.Location could be "" or "UTC" or "Local"
// default "" indicate "UTC"
ts, err := timeutil.Format(now, config.TimestampLocation, layout)
if err != nil {
log.Warn("time format system product timestamp err: %+v", err)
return
}
header[config.TimestampKey] = ts
}
if config.BodyKey != "" {
header[config.BodyKey] = util.ByteToStringUnsafe(e.Body())
e.Fill(e.Meta(), header, []byte{})
}
} }
func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) { func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) {

View File

@ -18,6 +18,7 @@ package json
import ( import (
"github.com/loggie-io/loggie/pkg/core/log" "github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/util"
"time" "time"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
@ -73,7 +74,7 @@ func (j *Json) Encode(e api.Event) ([]byte, error) {
beatsFormat(e) beatsFormat(e)
} else if len(e.Body()) != 0 { } else if len(e.Body()) != 0 {
// put body in header // put body in header
header[eventer.Body] = string(e.Body()) header[eventer.Body] = util.ByteToStringUnsafe(e.Body())
} }
var result []byte var result []byte

View File

@ -21,7 +21,6 @@ import (
"context" "context"
"fmt" "fmt"
es "github.com/elastic/go-elasticsearch/v7" es "github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"github.com/loggie-io/loggie/pkg/core/api" "github.com/loggie-io/loggie/pkg/core/api"
eventer "github.com/loggie-io/loggie/pkg/core/event" eventer "github.com/loggie-io/loggie/pkg/core/event"
@ -30,7 +29,7 @@ import (
"github.com/loggie-io/loggie/pkg/util/pattern" "github.com/loggie-io/loggie/pkg/util/pattern"
"github.com/loggie-io/loggie/pkg/util/runtime" "github.com/loggie-io/loggie/pkg/util/runtime"
"github.com/pkg/errors" "github.com/pkg/errors"
"io/ioutil" "os"
"strconv" "strconv"
"strings" "strings"
) )
@ -44,8 +43,6 @@ type ClientSet struct {
cli *es.Client cli *es.Client
opType string opType string
buf *bytes.Buffer
aux []byte
reqCount int reqCount int
codec codec.Codec codec codec.Codec
@ -54,6 +51,70 @@ type ClientSet struct {
documentIdPattern *pattern.Pattern documentIdPattern *pattern.Pattern
} }
type bulkRequest struct {
lines []line
}
type line struct {
meta []byte
body []byte
}
func (b *bulkRequest) body() []byte {
var buf bytes.Buffer
size := 0
for _, l := range b.lines {
size += len(l.meta) + len(l.body) + 1
}
buf.Grow(size)
for _, l := range b.lines {
buf.Write(l.meta)
buf.Write(l.body)
buf.WriteRune('\n')
}
return buf.Bytes()
}
func (b *bulkRequest) add(body []byte, action string, documentID string, index string) {
if len(body) == 0 {
return
}
var buf bytes.Buffer
var aux []byte
// { "index" : { "_index" : "test", "_id" : "1" } }
buf.WriteRune('{')
aux = strconv.AppendQuote(aux, action)
buf.Write(aux)
aux = aux[:0]
buf.WriteRune(':')
buf.WriteRune('{')
if documentID != "" {
buf.WriteString(`"_id":`)
aux = strconv.AppendQuote(aux, documentID)
buf.Write(aux)
aux = aux[:0]
}
if index != "" {
buf.WriteString(`"_index":`)
aux = strconv.AppendQuote(aux, index)
buf.Write(aux)
}
buf.WriteRune('}')
buf.WriteRune('}')
buf.WriteRune('\n')
l := line{
meta: buf.Bytes(),
body: body,
}
b.lines = append(b.lines, l)
}
type Client interface { type Client interface {
Bulk(ctx context.Context, batch api.Batch) error Bulk(ctx context.Context, batch api.Batch) error
Stop() Stop()
@ -68,7 +129,7 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
} }
var ca []byte var ca []byte
if config.CACertPath != "" { if config.CACertPath != "" {
caData, err := ioutil.ReadFile(config.CACertPath) caData, err := os.ReadFile(config.CACertPath)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -76,14 +137,16 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
} }
cfg := es.Config{ cfg := es.Config{
Addresses: config.Hosts, Addresses: config.Hosts,
DisableRetry: true, DisableRetry: true,
Username: config.UserName, Username: config.UserName,
Password: config.Password, Password: config.Password,
APIKey: config.APIKey, APIKey: config.APIKey,
ServiceToken: config.ServiceToken, ServiceToken: config.ServiceToken,
CompressRequestBody: config.Compress, CompressRequestBody: config.Compress,
CACert: ca, DiscoverNodesOnStart: config.DiscoverNodesOnStart,
DiscoverNodesInterval: config.DiscoverNodesInterval,
CACert: ca,
} }
cli, err := es.NewClient(cfg) cli, err := es.NewClient(cfg)
if err != nil { if err != nil {
@ -94,8 +157,6 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
config: config, config: config,
cli: cli, cli: cli,
opType: config.OpType, opType: config.OpType,
buf: bytes.NewBuffer(make([]byte, 0, config.SendBuffer)),
aux: make([]byte, 0, 512),
reqCount: 0, reqCount: 0,
codec: cod, codec: cod,
indexPattern: indexPattern, indexPattern: indexPattern,
@ -109,16 +170,11 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null") return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
} }
bulkReq := esapi.BulkRequest{}
if c.config.Etype != "" {
bulkReq.DocumentType = c.config.Etype
}
defer func() { defer func() {
c.buf.Reset()
c.reqCount = 0 c.reqCount = 0
}() }()
req := bulkRequest{}
for _, event := range batch.Events() { for _, event := range batch.Events() {
headerObj := runtime.NewObject(event.Header()) headerObj := runtime.NewObject(event.Header())
@ -160,19 +216,14 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
} }
c.reqCount++ c.reqCount++
if err := c.writeMeta(c.opType, docId, idx); err != nil { req.add(data, c.opType, docId, idx)
return err
}
if err := c.writeBody(data); err != nil {
return err
}
} }
if c.reqCount == 0 { if c.reqCount == 0 {
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null") return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
} }
resp, err := c.cli.Bulk(bytes.NewReader(c.buf.Bytes()), resp, err := c.cli.Bulk(bytes.NewReader(req.body()),
c.cli.Bulk.WithDocumentType(c.config.Etype), c.cli.Bulk.WithDocumentType(c.config.Etype),
c.cli.Bulk.WithParameters(c.config.Params), c.cli.Bulk.WithParameters(c.config.Params),
c.cli.Bulk.WithHeader(c.config.Headers)) c.cli.Bulk.WithHeader(c.config.Headers))
@ -213,39 +264,3 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
func (c *ClientSet) Stop() { func (c *ClientSet) Stop() {
// Do nothing // Do nothing
} }
// { "index" : { "_index" : "test", "_id" : "1" } }
func (c *ClientSet) writeMeta(action string, documentID string, index string) error {
c.buf.WriteRune('{')
c.aux = strconv.AppendQuote(c.aux, action)
c.buf.Write(c.aux)
c.aux = c.aux[:0]
c.buf.WriteRune(':')
c.buf.WriteRune('{')
if documentID != "" {
c.buf.WriteString(`"_id":`)
c.aux = strconv.AppendQuote(c.aux, documentID)
c.buf.Write(c.aux)
c.aux = c.aux[:0]
}
if index != "" {
c.buf.WriteString(`"_index":`)
c.aux = strconv.AppendQuote(c.aux, index)
c.buf.Write(c.aux)
c.aux = c.aux[:0]
}
c.buf.WriteRune('}')
c.buf.WriteRune('}')
c.buf.WriteRune('\n')
return nil
}
func (c *ClientSet) writeBody(body []byte) error {
if len(body) == 0 {
return nil
}
c.buf.Write(body)
c.buf.WriteRune('\n')
return nil
}

View File

@ -16,33 +16,29 @@ limitations under the License.
package elasticsearch package elasticsearch
import "github.com/loggie-io/loggie/pkg/util/pattern" import (
"github.com/loggie-io/loggie/pkg/util/pattern"
"time"
)
type Config struct { type Config struct {
Hosts []string `yaml:"hosts,omitempty" validate:"required"` Hosts []string `yaml:"hosts,omitempty" validate:"required"`
UserName string `yaml:"username,omitempty"` UserName string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"` Password string `yaml:"password,omitempty"`
Index string `yaml:"index,omitempty"` Index string `yaml:"index,omitempty"`
Headers map[string]string `yaml:"headers,omitempty"` Headers map[string]string `yaml:"headers,omitempty"`
Params map[string]string `yaml:"parameters,omitempty"` Params map[string]string `yaml:"parameters,omitempty"`
IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"` IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"`
Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility
DocumentId string `yaml:"documentId,omitempty"` DocumentId string `yaml:"documentId,omitempty"`
Sniff *bool `yaml:"sniff,omitempty"` // deprecated APIKey string `yaml:"apiKey,omitempty"`
APIKey string `yaml:"apiKey,omitempty"` ServiceToken string `yaml:"serviceToken,omitempty"`
ServiceToken string `yaml:"serviceToken,omitempty"` CACertPath string `yaml:"caCertPath,omitempty"`
CACertPath string `yaml:"caCertPath,omitempty"` Compress bool `yaml:"compress,omitempty"`
Compress bool `yaml:"compress,omitempty"` Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above
Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above OpType string `yaml:"opType,omitempty" default:"index"`
OpType string `yaml:"opType,omitempty" default:"index"` DiscoverNodesOnStart bool `yaml:"discoverNodesOnStart,omitempty"`
DiscoverNodesInterval time.Duration `yaml:"discoverNodesInterval,omitempty"`
SendBuffer int `yaml:"sendBufferBytes,omitempty" default:"131072" validate:"gte=0"`
}
type TLS struct {
CAFile string `yaml:"caFile,omitempty"`
CertFile string `yaml:"certFile,omitempty"`
KeyFile string `yaml:"keyFile,omitempty"`
} }
type RenderIndexFail struct { type RenderIndexFail struct {

View File

@ -45,6 +45,7 @@ type CollectConfig struct {
RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated
FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
AddonMeta bool `yaml:"addonMeta,omitempty"` AddonMeta bool `yaml:"addonMeta,omitempty"`
AddonMetaSchema AddonMetaSchema `yaml:"addonMetaSchema,omitempty"`
excludeFilePatterns []*regexp.Regexp excludeFilePatterns []*regexp.Regexp
Charset string `yaml:"charset,omitempty" default:"utf-8"` Charset string `yaml:"charset,omitempty" default:"utf-8"`
@ -54,6 +55,12 @@ type CollectConfig struct {
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"` FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
} }
type AddonMetaSchema struct {
Fields map[string]string `yaml:"fields,omitempty"`
FieldsUnderRoot bool `yaml:"underRoot,omitempty"`
FieldsUnderKey string `yaml:"key,omitempty" default:"state"`
}
type LineDelimiterValue struct { type LineDelimiterValue struct {
Charset string `yaml:"charset,omitempty" default:"utf-8"` Charset string `yaml:"charset,omitempty" default:"utf-8"`
LineType string `yaml:"type,omitempty" default:"auto"` LineType string `yaml:"type,omitempty" default:"auto"`

View File

@ -71,6 +71,19 @@ type Source struct {
multilineProcessor *MultiProcessor multilineProcessor *MultiProcessor
mTask *MultiTask mTask *MultiTask
codec codec.Codec codec codec.Codec
addonMetaField *AddonMetaFields
}
type AddonMetaFields struct {
Pipeline string `yaml:"pipeline,omitempty"`
Source string `yaml:"source,omitempty"`
Filename string `yaml:"filename,omitempty"`
Timestamp string `yaml:"timestamp,omitempty"`
Offset string `yaml:"offset,omitempty"`
Bytes string `yaml:"bytes,omitempty"`
Line string `yaml:"line,omitempty"`
Hostname string `yaml:"hostname,omitempty"`
} }
func (s *Source) Config() interface{} { func (s *Source) Config() interface{} {
@ -109,6 +122,10 @@ func (s *Source) Init(context api.Context) error {
s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout
} }
if s.config.CollectConfig.AddonMeta {
s.addonMetaField = addonMetaFieldsConvert(s.config.CollectConfig.AddonMetaSchema.Fields)
}
// init reader chan size // init reader chan size
s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds
@ -186,7 +203,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) {
s.productFunc = productFunc s.productFunc = productFunc
s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig) s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig)
if s.config.CollectConfig.AddonMeta { if s.config.CollectConfig.AddonMeta {
s.productFunc = addonMetaProductFunc(s.productFunc) s.productFunc = addonMetaProductFunc(s.productFunc, s.addonMetaField, s.config.CollectConfig.AddonMetaSchema)
} }
if s.config.ReaderConfig.MultiConfig.Active { if s.config.ReaderConfig.MultiConfig.Active {
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc) s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
@ -238,21 +255,95 @@ func jobFieldsProductFunc(productFunc api.ProductFunc, srcCfg *source.Config) ap
} }
} }
func addonMetaProductFunc(productFunc api.ProductFunc) api.ProductFunc { func addonMetaProductFunc(productFunc api.ProductFunc, fields *AddonMetaFields, schema AddonMetaSchema) api.ProductFunc {
return func(event api.Event) api.Result { return func(event api.Event) api.Result {
s, _ := event.Meta().Get(SystemStateKey) s, _ := event.Meta().Get(SystemStateKey)
state := s.(*persistence.State) state := s.(*persistence.State)
addonMeta := make(map[string]interface{}) addonMeta := make(map[string]interface{})
addonMeta["pipeline"] = state.PipelineName
addonMeta["source"] = state.SourceName
addonMeta["filename"] = state.Filename
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
addonMeta["offset"] = state.Offset
addonMeta["bytes"] = state.ContentBytes
addonMeta["hostname"] = global.NodeName
event.Header()["state"] = addonMeta // if fields is nil, use default config
if fields == nil {
addonMeta["pipeline"] = state.PipelineName
addonMeta["source"] = state.SourceName
addonMeta["filename"] = state.Filename
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
addonMeta["offset"] = state.Offset
addonMeta["bytes"] = state.ContentBytes
addonMeta["hostname"] = global.NodeName
} else {
if fields.Pipeline != "" {
addonMeta[fields.Pipeline] = state.PipelineName
}
if fields.Source != "" {
addonMeta[fields.Source] = state.SourceName
}
if fields.Filename != "" {
addonMeta[fields.Filename] = state.Filename
}
if fields.Timestamp != "" {
addonMeta[fields.Timestamp] = state.CollectTime.Local().Format(tsLayout)
}
if fields.Offset != "" {
addonMeta[fields.Offset] = state.Offset
}
if fields.Bytes != "" {
addonMeta[fields.Bytes] = state.ContentBytes
}
if fields.Line != "" {
addonMeta[fields.Line] = state.LineNumber
}
if fields.Hostname != "" {
addonMeta[fields.Hostname] = global.NodeName
}
}
if schema.FieldsUnderRoot {
for k, v := range addonMeta {
event.Header()[k] = v
}
} else {
event.Header()[schema.FieldsUnderKey] = addonMeta
}
productFunc(event) productFunc(event)
return result.Success() return result.Success()
} }
} }
func addonMetaFieldsConvert(fields map[string]string) *AddonMetaFields {
if len(fields) == 0 {
return nil
}
amf := &AddonMetaFields{}
for k, v := range fields {
switch v {
case "${_meta.pipeline}":
amf.Pipeline = k
case "${_meta.source}":
amf.Source = k
case "${_meta.filename}":
amf.Filename = k
case "${_meta.timestamp}":
amf.Timestamp = k
case "${_meta.offset}":
amf.Offset = k
case "${_meta.bytes}":
amf.Bytes = k
case "${_meta.line}":
amf.Line = k
case "${_meta.hostname}":
amf.Hostname = k
}
}
return amf
}

View File

@ -34,7 +34,6 @@ type DbConfig struct {
File string `yaml:"file,omitempty"` File string `yaml:"file,omitempty"`
FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"` FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"`
BufferSize int `yaml:"bufferSize,omitempty" default:"2048"` BufferSize int `yaml:"bufferSize,omitempty" default:"2048"`
TableName string `yaml:"tableName,omitempty" default:"registry"`
CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted
CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"` CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"`
} }

View File

@ -0,0 +1,85 @@
package main
import (
"fmt"
"github.com/loggie-io/loggie/pkg/control"
"github.com/loggie-io/loggie/pkg/core/cfg"
"github.com/loggie-io/loggie/pkg/core/interceptor"
"github.com/loggie-io/loggie/pkg/core/log"
"github.com/loggie-io/loggie/pkg/core/queue"
"github.com/loggie-io/loggie/pkg/eventbus"
"github.com/loggie-io/loggie/pkg/eventbus/export/logger"
"github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
"github.com/loggie-io/loggie/pkg/interceptor/metric"
"github.com/loggie-io/loggie/pkg/interceptor/retry"
"github.com/loggie-io/loggie/pkg/pipeline"
"github.com/loggie-io/loggie/pkg/queue/channel"
"net/http"
"time"
_ "github.com/loggie-io/loggie/pkg/include"
)
const pipe1 = `
pipelines:
- name: test
sources:
- type: dev
name: test
qps: 100
byteSize: 10240
eventsTotal: 10000
sink:
type: elasticsearch
parallelism: 3
hosts: ["localhost:9200"]
index: "loggie-benchmark-${+YYYY.MM.DD}"
`
func main() {
log.InitDefaultLogger()
pipeline.SetDefaultConfigRaw(pipeline.Config{
Queue: &queue.Config{
Type: channel.Type,
},
Interceptors: []*interceptor.Config{
{
Type: metric.Type,
},
{
Type: maxbytes.Type,
},
{
Type: retry.Type,
},
},
})
eventbus.StartAndRun(eventbus.Config{
LoggerConfig: logger.Config{
Enabled: true,
Period: 5 * time.Second,
Pretty: false,
},
ListenerConfigs: map[string]cfg.CommonCfg{
"sink": map[string]interface{}{
"period": 5 * time.Second,
},
"sys": map[string]interface{}{
"period": 5 * time.Second,
},
},
})
pipecfgs := &control.PipelineConfig{}
if err := cfg.UnPackFromRaw([]byte(pipe1), pipecfgs).Defaults().Validate().Do(); err != nil {
log.Panic("pipeline configs invalid: %v", err)
}
controller := control.NewController()
controller.Start(pipecfgs)
if err := http.ListenAndServe(fmt.Sprintf(":9196"), nil); err != nil {
log.Fatal("http listen and serve err: %v", err)
}
}