mirror of https://github.com/loggie-io/loggie.git
Compare commits
11 Commits
Author | SHA1 | Date |
---|---|---|
|
31b8a3644c | |
|
452ee1d145 | |
|
35220b1d2c | |
|
01a0a300c4 | |
|
2bcb0632e6 | |
|
51aa8c881d | |
|
8febfeda70 | |
|
e637a3ff89 | |
|
1ea5bf997e | |
|
e6c4e37820 | |
|
38894f6e48 |
10
Makefile
10
Makefile
|
@ -82,13 +82,13 @@ benchmark: ## Run benchmark
|
|||
|
||||
##@ Build
|
||||
|
||||
build: ## go build
|
||||
CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go
|
||||
build: ## go build, EXT_BUILD_TAGS=include_core would only build core package
|
||||
CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -tags ${EXT_BUILD_TAGS} -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go
|
||||
|
||||
##@ Build(without sqlite)
|
||||
|
||||
build-in-badger: ## go build without sqlite
|
||||
GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go
|
||||
build-in-badger: ## go build without sqlite, EXT_BUILD_TAGS=include_core would only build core package
|
||||
GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger,${EXT_BUILD_TAGS} -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go
|
||||
|
||||
##@ Images
|
||||
|
||||
|
@ -98,7 +98,7 @@ docker-build: ## Docker build -t ${REPO}:${TAG}, try: make docker-build REPO=<Yo
|
|||
docker-push: ## Docker push ${REPO}:${TAG}
|
||||
docker push ${REPO}:${TAG}
|
||||
|
||||
docker-multi-arch: ## Docker buildx, try: make docker-build REPO=<YourRepoHost>, ${TAG} generated by git
|
||||
docker-multi-arch: ## Docker buildx, try: make docker-multi-arch REPO=<YourRepoHost>, ${TAG} generated by git
|
||||
docker buildx build --platform linux/amd64,linux/arm64 -t ${REPO}:${TAG} . --push
|
||||
|
||||
LOG_DIR ?= /tmp/log ## log directory
|
||||
|
|
|
@ -1,3 +1,157 @@
|
|||
# Release v1.5.0
|
||||
|
||||
### :star2: Features
|
||||
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
|
||||
- Added rocketmq sink (#530)
|
||||
- Added franzKafka source (#573)
|
||||
- Added kata runtime (#554)
|
||||
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
|
||||
- sink codec support printEvents (#448)
|
||||
- Added queue in LogConfig/ClusterLogConfig (#457)
|
||||
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
|
||||
- Supported `copytruncate` in file source (#571)
|
||||
- Added `genfiles` sub command (#471)
|
||||
- Added queue in LogConfig/ClusterLogConfig queue (#457)
|
||||
- Added `sortBy` field in elasticsearch source (#473)
|
||||
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
|
||||
- New `addHostMeta` interceptor (#474)
|
||||
- Added persistence driver `badger` (#475) (#584)
|
||||
- Ignore LogConfig with sidecar injection annotation (#478)
|
||||
- Added `toStr` action in transformer interceptor (#482)
|
||||
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
|
||||
- Get loggie version with api and sub command (#496) (#508)
|
||||
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
|
||||
- Upgrade `kafka-go` version (#506) (#567)
|
||||
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
|
||||
- Pretty error when unmarshal yaml configuration failed (#539)
|
||||
- Added default topic if render kafka topic failed (#550)
|
||||
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
|
||||
- Supported multiple topics in kafka source (#548)
|
||||
- Added default index if render elasticsearch index failed (#551) (#553)
|
||||
- The default `maxOpenFds` is set to 4096 (#559)
|
||||
- Supported default `sinkRef` in kubernetes discovery (#555)
|
||||
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
|
||||
- Supported omit empty fields in Kubernetes discovery (#570)
|
||||
- Optimizes `maxbytes` interceptors (#575)
|
||||
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
|
||||
- Added `cheanUnfinished` in cleanFiles (#580)
|
||||
- Added `target` in `maxbyte` interceptor (#588)
|
||||
- Added `partionKey` in franzKafka (#562)
|
||||
- Added `highPrecision` in `rateLimit` interceptor (#525)
|
||||
- Supported build core components of Loggie (#598)
|
||||
- Added `addonMetaSchema` in file source (#599)
|
||||
- Added `timestamp` and `bodyKey` in source (#600)
|
||||
- Added `discoverNodesOnStart` and `discoverNodesInterval` in elasticsearch sink (#612)
|
||||
|
||||
### :bug: Bug Fixes
|
||||
- Fixed panic when kubeEvent Series is nil (#459)
|
||||
- Upgraded `automaxprocs` version to v1.5.1 (#488)
|
||||
- Fixed set defaults failed in `fieldsUnderKey` (#513)
|
||||
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
|
||||
- Fixed grpc batch out-of-order data streams (#517)
|
||||
- Fixed large line may cause oom (#529)
|
||||
- Fixed duplicated batchSize in queue (#533)
|
||||
- Fixed sqlite locked panic (#524)
|
||||
- Fixed command can't be used in multi-arch container (#541)
|
||||
- Fixed `logger listener` may cause block (#561) (#552)
|
||||
- Fixed `sink concurrency` deepCopy failed (#563)
|
||||
- Drop events when partial error in elasticsearch sink (#572)
|
||||
- Fix convert to string type when returning byte array type in transformer interceptor (#596)
|
||||
- Invalid pipelines will not stop all the pipelines running (#602)
|
||||
- Fixed configuration npe when query pipelines (#604)
|
||||
- Optimized elasticsearch sink request buffer (#608)
|
||||
- Add pod namespace as prefix of source name when pod is matched by ClusterLogConfig (#613)
|
||||
|
||||
# Release v1.5.0-rc.0
|
||||
|
||||
### :star2: Features
|
||||
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
|
||||
- Added rocketmq sink (#530)
|
||||
- Added franzKafka source (#573)
|
||||
- Added kata runtime (#554)
|
||||
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
|
||||
- sink codec support printEvents (#448)
|
||||
- Added queue in LogConfig/ClusterLogConfig (#457)
|
||||
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
|
||||
- Supported `copytruncate` in file source (#571)
|
||||
- Added `genfiles` sub command (#471)
|
||||
- Added queue in LogConfig/ClusterLogConfig queue (#457)
|
||||
- Added `sortBy` field in elasticsearch source (#473)
|
||||
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
|
||||
- New `addHostMeta` interceptor (#474)
|
||||
- Added persistence driver `badger` (#475) (#584)
|
||||
- Ignore LogConfig with sidecar injection annotation (#478)
|
||||
- Added `toStr` action in transformer interceptor (#482)
|
||||
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
|
||||
- Get loggie version with api and sub command (#496) (#508)
|
||||
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
|
||||
- Upgrade `kafka-go` version (#506) (#567)
|
||||
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
|
||||
- Pretty error when unmarshal yaml configuration failed (#539)
|
||||
- Added default topic if render kafka topic failed (#550)
|
||||
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
|
||||
- Supported multiple topics in kafka source (#548)
|
||||
- Added default index if render elasticsearch index failed (#551) (#553)
|
||||
- The default `maxOpenFds` is set to 4096 (#559)
|
||||
- Supported default `sinkRef` in kubernetes discovery (#555)
|
||||
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
|
||||
- Supported omit empty fields in Kubernetes discovery (#570)
|
||||
- Optimizes `maxbytes` interceptors (#575)
|
||||
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
|
||||
- Added `cheanUnfinished` in cleanFiles (#580)
|
||||
- Added `target` in `maxbyte` interceptor (#588)
|
||||
- Added `partionKey` in franzKafka (#562)
|
||||
- Added `highPrecision` in `rateLimit` interceptor (#525)
|
||||
|
||||
### :bug: Bug Fixes
|
||||
- Fixed panic when kubeEvent Series is nil (#459)
|
||||
- Upgraded `automaxprocs` version to v1.5.1 (#488)
|
||||
- Fixed set defaults failed in `fieldsUnderKey` (#513)
|
||||
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
|
||||
- Fixed grpc batch out-of-order data streams (#517)
|
||||
- Fixed large line may cause oom (#529)
|
||||
- Fixed duplicated batchSize in queue (#533)
|
||||
- Fixed sqlite locked panic (#524)
|
||||
- Fixed command can't be used in multi-arch container (#541)
|
||||
- Fixed `logger listener` may cause block (#561) (#552)
|
||||
- Fixed `sink concurrency` deepCopy failed (#563)
|
||||
- Drop events when partial error in elasticsearch sink (#572)
|
||||
|
||||
# Release v1.4.0
|
||||
|
||||
### :star2: Features
|
||||
|
||||
- Added Loggie dashboard feature for easier troubleshooting (#416)
|
||||
- Enhanced log alerting function with more flexible log alert detection rules and added alertWebhook sink (#392)
|
||||
- Added sink concurrency support for automatic adaptation based on downstream delay (#376)
|
||||
- Added franzKafka sink for users who prefer the franz kafka library (#423)
|
||||
- Added elasticsearch source (#345)
|
||||
- Added zinc sink (#254)
|
||||
- Added pulsar sink (#417)
|
||||
- Added grok action to transformer interceptor (#418)
|
||||
- Added split action to transformer interceptor (#411)
|
||||
- Added jsonEncode action to transformer interceptor (#421)
|
||||
- Added fieldsFromPath configuration to source for obtaining fields from file content (#401)
|
||||
- Added fieldsRef parameter to filesource listener for obtaining key value from fields configuration and adding to metrics as label (#402)
|
||||
- In transformer interceptor, added dropIfError support to drop event if action execution fails (#409)
|
||||
- Added info listener which currently exposes loggie_info_stat metrics and displays version label (#410)
|
||||
- Added support for customized kafka sink partition key
|
||||
- Added sasl support to Kafka source (#415)
|
||||
- Added https insecureSkipVerify support to loki sink (#422)
|
||||
- Optimized file source for large files (#430)
|
||||
- Changed default value of file source maxOpenFds to 1024 (#437)
|
||||
- ContainerRuntime can now be set to none (#439)
|
||||
- Upgraded to go 1.18 (#440)
|
||||
- Optimize the configuration parameters to remove the redundancy generated by rendering
|
||||
|
||||
### :bug: Bug Fixes
|
||||
|
||||
- Added source fields to filesource listener (#402)
|
||||
- Fixed issue of transformer copy action not copying non-string body (#420)
|
||||
- Added fetching of logs file from UpperDir when rootfs collection is enabled (#414)
|
||||
- Fix pipeline restart npe (#454)
|
||||
- Fix create dir soft link job (#453)
|
||||
|
||||
# Release v1.4.0-rc.0
|
||||
|
||||
### :star2: Features
|
||||
|
|
2
go.mod
2
go.mod
|
@ -61,7 +61,6 @@ require (
|
|||
github.com/cespare/xxhash v1.1.0 // indirect
|
||||
github.com/danieljoos/wincred v1.0.2 // indirect
|
||||
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
||||
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect
|
||||
github.com/emirpasic/gods v1.12.0 // indirect
|
||||
github.com/fatih/color v1.10.0 // indirect
|
||||
|
@ -174,6 +173,7 @@ require (
|
|||
|
||||
require (
|
||||
github.com/apache/rocketmq-client-go/v2 v2.1.1
|
||||
github.com/dustin/go-humanize v1.0.0
|
||||
github.com/elastic/go-elasticsearch/v7 v7.17.10
|
||||
github.com/goccy/go-yaml v1.11.0
|
||||
github.com/mattn/go-sqlite3 v1.11.0
|
||||
|
|
18
loggie.yml
18
loggie.yml
|
@ -14,7 +14,7 @@ loggie:
|
|||
sink: ~
|
||||
queue: ~
|
||||
pipeline: ~
|
||||
normalize: ~
|
||||
sys: ~
|
||||
|
||||
discovery:
|
||||
enabled: false
|
||||
|
@ -31,15 +31,17 @@ loggie:
|
|||
defaults:
|
||||
sink:
|
||||
type: dev
|
||||
interceptors:
|
||||
- type: schema
|
||||
name: global
|
||||
order: 700
|
||||
addMeta:
|
||||
timestamp:
|
||||
key: "@timestamp"
|
||||
sources:
|
||||
- type: file
|
||||
timestampKey: "@timestamp"
|
||||
bodyKey: "message"
|
||||
fieldsUnderRoot: true
|
||||
addonMeta: true
|
||||
addonMetaSchema:
|
||||
underRoot: true
|
||||
fields:
|
||||
filename: "${_meta.filename}"
|
||||
line: "${_meta.line}"
|
||||
watcher:
|
||||
maxOpenFds: 6000
|
||||
http:
|
||||
|
|
|
@ -130,11 +130,21 @@ func ReadPipelineConfigFromFile(path string, ignore FileIgnore) (*PipelineConfig
|
|||
for _, fn := range all {
|
||||
pipes := &PipelineConfig{}
|
||||
unpack := cfg.UnPackFromFile(fn, pipes)
|
||||
if err = unpack.Defaults().Validate().Do(); err != nil {
|
||||
log.Error("invalid pipeline configs: %v, \n%s", err, unpack.Contents())
|
||||
if err = unpack.Do(); err != nil {
|
||||
log.Error("read pipeline configs from path %s failed: %v", path, err)
|
||||
continue
|
||||
}
|
||||
pipecfgs.AddPipelines(pipes.Pipelines)
|
||||
|
||||
for _, p := range pipes.Pipelines {
|
||||
pip := p
|
||||
if err := cfg.NewUnpack(nil, &pip, nil).Defaults().Validate().Do(); err != nil {
|
||||
// ignore invalid pipeline, but continue to read other pipelines
|
||||
// invalid pipeline will check by reloader later
|
||||
log.Error("pipeline: %s configs invalid: %v", p.Name, err)
|
||||
continue
|
||||
}
|
||||
pipecfgs.AddPipelines([]pipeline.Config{pip})
|
||||
}
|
||||
}
|
||||
return pipecfgs, nil
|
||||
}
|
||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
|||
package reloader
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -49,7 +48,7 @@ func (r *reloader) readPipelineConfigHandler(writer http.ResponseWriter, request
|
|||
continue
|
||||
}
|
||||
|
||||
content, err := ioutil.ReadFile(m)
|
||||
content, err := os.ReadFile(m)
|
||||
if err != nil {
|
||||
log.Warn("read config error. err: %v", err)
|
||||
return
|
||||
|
|
|
@ -38,6 +38,11 @@ type Config struct {
|
|||
FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"`
|
||||
FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"`
|
||||
Codec *codec.Config `yaml:"codec,omitempty"`
|
||||
|
||||
TimestampKey string `yaml:"timestampKey,omitempty"`
|
||||
TimestampLocation string `yaml:"timestampLocation,omitempty"`
|
||||
TimestampLayout string `yaml:"timestampLayout,omitempty"`
|
||||
BodyKey string `yaml:"bodyKey,omitempty"`
|
||||
}
|
||||
|
||||
func (c *Config) DeepCopy() *Config {
|
||||
|
@ -82,6 +87,11 @@ func (c *Config) DeepCopy() *Config {
|
|||
FieldsFromEnv: newFieldsFromEnv,
|
||||
FieldsFromPath: newFieldsFromPath,
|
||||
Codec: c.Codec.DeepCopy(),
|
||||
|
||||
TimestampKey: c.TimestampKey,
|
||||
TimestampLocation: c.TimestampLocation,
|
||||
TimestampLayout: c.TimestampLayout,
|
||||
BodyKey: c.BodyKey,
|
||||
}
|
||||
|
||||
return out
|
||||
|
@ -155,6 +165,19 @@ func (c *Config) Merge(from *Config) {
|
|||
} else {
|
||||
c.Codec.Merge(from.Codec)
|
||||
}
|
||||
|
||||
if c.TimestampKey == "" {
|
||||
c.TimestampKey = from.TimestampKey
|
||||
}
|
||||
if c.TimestampLocation == "" {
|
||||
c.TimestampLocation = from.TimestampLocation
|
||||
}
|
||||
if c.TimestampLayout == "" {
|
||||
c.TimestampLayout = from.TimestampLayout
|
||||
}
|
||||
if c.BodyKey == "" {
|
||||
c.BodyKey = from.BodyKey
|
||||
}
|
||||
}
|
||||
|
||||
func MergeSourceList(base []*Config, from []*Config) []*Config {
|
||||
|
|
|
@ -364,7 +364,7 @@ func (c *Controller) makeConfigPerSource(s *source.Config, pod *corev1.Pod, lgc
|
|||
}
|
||||
|
||||
// change the source name, add pod.Name-containerName as prefix, since there maybe multiple containers in pod
|
||||
filesrc.Name = helper.GenTypePodSourceName(pod.Name, status.Name, filesrc.Name)
|
||||
filesrc.Name = helper.GenTypePodSourceName(lgc.Namespace, pod.Namespace, pod.Name, status.Name, filesrc.Name)
|
||||
|
||||
// inject default pod metadata
|
||||
if err := c.injectTypePodFields(c.config.DynamicContainerLog, filesrc, extra, pod, lgc, status.Name); err != nil {
|
||||
|
|
|
@ -122,7 +122,12 @@ func ToPipelineInterceptor(interceptorsRaw string, interceptorRef string, interc
|
|||
return interConfList, nil
|
||||
}
|
||||
|
||||
func GenTypePodSourceName(podName string, containerName string, sourceName string) string {
|
||||
func GenTypePodSourceName(lgcNamespace string, podNamespace string, podName string, containerName string, sourceName string) string {
|
||||
// if lgcNamespace is empty, we use podNamespace as the first part of the source name,
|
||||
// because this is the pod matched by clusterLogConfig, if the pod namespace is not added, it may cause the source to be duplicated
|
||||
if lgcNamespace == "" {
|
||||
return fmt.Sprintf("%s/%s/%s/%s", podNamespace, podName, containerName, sourceName)
|
||||
}
|
||||
return fmt.Sprintf("%s/%s/%s", podName, containerName, sourceName)
|
||||
}
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ package sys
|
|||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/dustin/go-humanize"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
@ -48,8 +49,9 @@ func makeListener() eventbus.Listener {
|
|||
}
|
||||
|
||||
type sysData struct {
|
||||
MemoryRss uint64 `json:"memRss"`
|
||||
CPUPercent float64 `json:"cpuPercent"`
|
||||
MemoryRss uint64 `json:"-"`
|
||||
MemoryRssHumanize string `json:"memRss"`
|
||||
CPUPercent float64 `json:"cpuPercent"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
|
@ -122,6 +124,7 @@ func (l *Listener) getSysStat() error {
|
|||
return err
|
||||
}
|
||||
l.data.MemoryRss = mem.RSS
|
||||
l.data.MemoryRssHumanize = humanize.Bytes(mem.RSS)
|
||||
|
||||
cpuPer, err := l.proc.Percent(1 * time.Second)
|
||||
if err != nil {
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
//go:build !include_core
|
||||
|
||||
/*
|
||||
Copyright 2021 Loggie Authors
|
||||
Copyright 2023 Loggie Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
|
@ -0,0 +1,62 @@
|
|||
//go:build include_core
|
||||
|
||||
/*
|
||||
Copyright 2023 Loggie Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package include
|
||||
|
||||
import (
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/export/prometheus"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filesource"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filewatcher"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/info"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/logalerting"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/pipeline"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/queue"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/reload"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sink"
|
||||
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sys"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/addhostmeta"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/addk8smeta"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/limit"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert/condition"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/metric"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/retry"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/schema"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/action"
|
||||
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/condition"
|
||||
_ "github.com/loggie-io/loggie/pkg/queue/channel"
|
||||
_ "github.com/loggie-io/loggie/pkg/queue/memory"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/alertwebhook"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/codec/json"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/codec/raw"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/dev"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/elasticsearch"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/file"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/franz"
|
||||
_ "github.com/loggie-io/loggie/pkg/sink/kafka"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/codec/json"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/codec/regex"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/dev"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/elasticsearch"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/file"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/file/process"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/franz"
|
||||
_ "github.com/loggie-io/loggie/pkg/source/kafka"
|
||||
)
|
|
@ -51,5 +51,10 @@ func NewEqual(args []string) (*Equal, error) {
|
|||
}
|
||||
|
||||
func (eq *Equal) Check(e api.Event) bool {
|
||||
return eq.value == eventops.Get(e, eq.field)
|
||||
value := eventops.Get(e, eq.field)
|
||||
if byteValue, ok := value.([]byte); ok {
|
||||
value = string(byteValue)
|
||||
}
|
||||
|
||||
return eq.value == value
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/loggie-io/loggie/pkg/ops/helper"
|
||||
"github.com/loggie-io/loggie/pkg/util"
|
||||
"github.com/rivo/tview"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -234,7 +235,7 @@ func (p *LogStatusPanel) SetData() {
|
|||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
out, err := ioutil.ReadAll(resp.Body)
|
||||
out, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -174,6 +174,9 @@ func diffPipes(request *http.Request) string {
|
|||
|
||||
func queryPipelineConfig(cfgInPath *control.PipelineConfig, pipelineQuery string, sourceQuery string) map[string]pipeline.Config {
|
||||
result := make(map[string]pipeline.Config)
|
||||
if cfgInPath == nil {
|
||||
return result
|
||||
}
|
||||
|
||||
setResult := func(pipData pipeline.Config, srcData ...*source.Config) {
|
||||
pip, ok := result[pipData.Name]
|
||||
|
|
|
@ -18,7 +18,7 @@ package pipeline
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
timeutil "github.com/loggie-io/loggie/pkg/util/time"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -46,10 +46,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
FieldsUnderRoot = event.PrivateKeyPrefix + "FieldsUnderRoot"
|
||||
FieldsUnderKey = event.PrivateKeyPrefix + "FieldsUnderKey"
|
||||
|
||||
fieldsFromPathMaxBytes = 1024
|
||||
|
||||
defaultTsLayout = "2006-01-02T15:04:05.000Z"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -1032,7 +1031,7 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
|
|||
}
|
||||
|
||||
for k, pathKey := range fieldsFromPath {
|
||||
out, err := ioutil.ReadFile(pathKey)
|
||||
out, err := os.ReadFile(pathKey)
|
||||
if err != nil {
|
||||
log.Error("init fieldsFromPath %s failed, read file %s err: %v", k, pathKey, err)
|
||||
continue
|
||||
|
@ -1054,11 +1053,10 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
|
|||
|
||||
func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
|
||||
// add meta fields
|
||||
e.Meta().Set(event.SystemProductTimeKey, time.Now())
|
||||
now := time.Now()
|
||||
e.Meta().Set(event.SystemProductTimeKey, now)
|
||||
e.Meta().Set(event.SystemPipelineKey, p.name)
|
||||
e.Meta().Set(event.SystemSourceKey, config.Name)
|
||||
e.Meta().Set(FieldsUnderRoot, config.FieldsUnderRoot)
|
||||
e.Meta().Set(FieldsUnderKey, config.FieldsUnderKey)
|
||||
|
||||
header := e.Header()
|
||||
if header == nil {
|
||||
|
@ -1073,6 +1071,28 @@ func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
|
|||
|
||||
// add header source fields from file
|
||||
AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey)
|
||||
|
||||
// remap timestamp
|
||||
if config.TimestampKey != "" {
|
||||
layout := config.TimestampLayout
|
||||
if layout == "" {
|
||||
layout = defaultTsLayout
|
||||
}
|
||||
|
||||
// conf.Location could be "" or "UTC" or "Local"
|
||||
// default "" indicate "UTC"
|
||||
ts, err := timeutil.Format(now, config.TimestampLocation, layout)
|
||||
if err != nil {
|
||||
log.Warn("time format system product timestamp err: %+v", err)
|
||||
return
|
||||
}
|
||||
header[config.TimestampKey] = ts
|
||||
}
|
||||
|
||||
if config.BodyKey != "" {
|
||||
header[config.BodyKey] = util.ByteToStringUnsafe(e.Body())
|
||||
e.Fill(e.Meta(), header, []byte{})
|
||||
}
|
||||
}
|
||||
|
||||
func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) {
|
||||
|
|
|
@ -18,6 +18,7 @@ package json
|
|||
|
||||
import (
|
||||
"github.com/loggie-io/loggie/pkg/core/log"
|
||||
"github.com/loggie-io/loggie/pkg/util"
|
||||
"time"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
@ -73,7 +74,7 @@ func (j *Json) Encode(e api.Event) ([]byte, error) {
|
|||
beatsFormat(e)
|
||||
} else if len(e.Body()) != 0 {
|
||||
// put body in header
|
||||
header[eventer.Body] = string(e.Body())
|
||||
header[eventer.Body] = util.ByteToStringUnsafe(e.Body())
|
||||
}
|
||||
|
||||
var result []byte
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
es "github.com/elastic/go-elasticsearch/v7"
|
||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/loggie-io/loggie/pkg/core/api"
|
||||
eventer "github.com/loggie-io/loggie/pkg/core/event"
|
||||
|
@ -30,7 +29,7 @@ import (
|
|||
"github.com/loggie-io/loggie/pkg/util/pattern"
|
||||
"github.com/loggie-io/loggie/pkg/util/runtime"
|
||||
"github.com/pkg/errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
@ -44,8 +43,6 @@ type ClientSet struct {
|
|||
cli *es.Client
|
||||
opType string
|
||||
|
||||
buf *bytes.Buffer
|
||||
aux []byte
|
||||
reqCount int
|
||||
|
||||
codec codec.Codec
|
||||
|
@ -54,6 +51,70 @@ type ClientSet struct {
|
|||
documentIdPattern *pattern.Pattern
|
||||
}
|
||||
|
||||
type bulkRequest struct {
|
||||
lines []line
|
||||
}
|
||||
|
||||
type line struct {
|
||||
meta []byte
|
||||
body []byte
|
||||
}
|
||||
|
||||
func (b *bulkRequest) body() []byte {
|
||||
var buf bytes.Buffer
|
||||
size := 0
|
||||
for _, l := range b.lines {
|
||||
size += len(l.meta) + len(l.body) + 1
|
||||
}
|
||||
buf.Grow(size)
|
||||
|
||||
for _, l := range b.lines {
|
||||
buf.Write(l.meta)
|
||||
buf.Write(l.body)
|
||||
buf.WriteRune('\n')
|
||||
}
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
func (b *bulkRequest) add(body []byte, action string, documentID string, index string) {
|
||||
if len(body) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
var aux []byte
|
||||
|
||||
// { "index" : { "_index" : "test", "_id" : "1" } }
|
||||
buf.WriteRune('{')
|
||||
aux = strconv.AppendQuote(aux, action)
|
||||
buf.Write(aux)
|
||||
aux = aux[:0]
|
||||
buf.WriteRune(':')
|
||||
buf.WriteRune('{')
|
||||
if documentID != "" {
|
||||
buf.WriteString(`"_id":`)
|
||||
aux = strconv.AppendQuote(aux, documentID)
|
||||
buf.Write(aux)
|
||||
aux = aux[:0]
|
||||
}
|
||||
|
||||
if index != "" {
|
||||
buf.WriteString(`"_index":`)
|
||||
aux = strconv.AppendQuote(aux, index)
|
||||
buf.Write(aux)
|
||||
}
|
||||
buf.WriteRune('}')
|
||||
buf.WriteRune('}')
|
||||
buf.WriteRune('\n')
|
||||
|
||||
l := line{
|
||||
meta: buf.Bytes(),
|
||||
body: body,
|
||||
}
|
||||
|
||||
b.lines = append(b.lines, l)
|
||||
}
|
||||
|
||||
type Client interface {
|
||||
Bulk(ctx context.Context, batch api.Batch) error
|
||||
Stop()
|
||||
|
@ -68,7 +129,7 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
|
|||
}
|
||||
var ca []byte
|
||||
if config.CACertPath != "" {
|
||||
caData, err := ioutil.ReadFile(config.CACertPath)
|
||||
caData, err := os.ReadFile(config.CACertPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -76,14 +137,16 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
|
|||
}
|
||||
|
||||
cfg := es.Config{
|
||||
Addresses: config.Hosts,
|
||||
DisableRetry: true,
|
||||
Username: config.UserName,
|
||||
Password: config.Password,
|
||||
APIKey: config.APIKey,
|
||||
ServiceToken: config.ServiceToken,
|
||||
CompressRequestBody: config.Compress,
|
||||
CACert: ca,
|
||||
Addresses: config.Hosts,
|
||||
DisableRetry: true,
|
||||
Username: config.UserName,
|
||||
Password: config.Password,
|
||||
APIKey: config.APIKey,
|
||||
ServiceToken: config.ServiceToken,
|
||||
CompressRequestBody: config.Compress,
|
||||
DiscoverNodesOnStart: config.DiscoverNodesOnStart,
|
||||
DiscoverNodesInterval: config.DiscoverNodesInterval,
|
||||
CACert: ca,
|
||||
}
|
||||
cli, err := es.NewClient(cfg)
|
||||
if err != nil {
|
||||
|
@ -94,8 +157,6 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
|
|||
config: config,
|
||||
cli: cli,
|
||||
opType: config.OpType,
|
||||
buf: bytes.NewBuffer(make([]byte, 0, config.SendBuffer)),
|
||||
aux: make([]byte, 0, 512),
|
||||
reqCount: 0,
|
||||
codec: cod,
|
||||
indexPattern: indexPattern,
|
||||
|
@ -109,16 +170,11 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
|
|||
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
|
||||
}
|
||||
|
||||
bulkReq := esapi.BulkRequest{}
|
||||
|
||||
if c.config.Etype != "" {
|
||||
bulkReq.DocumentType = c.config.Etype
|
||||
}
|
||||
defer func() {
|
||||
c.buf.Reset()
|
||||
c.reqCount = 0
|
||||
}()
|
||||
|
||||
req := bulkRequest{}
|
||||
for _, event := range batch.Events() {
|
||||
headerObj := runtime.NewObject(event.Header())
|
||||
|
||||
|
@ -160,19 +216,14 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
|
|||
}
|
||||
|
||||
c.reqCount++
|
||||
if err := c.writeMeta(c.opType, docId, idx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := c.writeBody(data); err != nil {
|
||||
return err
|
||||
}
|
||||
req.add(data, c.opType, docId, idx)
|
||||
}
|
||||
|
||||
if c.reqCount == 0 {
|
||||
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
|
||||
}
|
||||
|
||||
resp, err := c.cli.Bulk(bytes.NewReader(c.buf.Bytes()),
|
||||
resp, err := c.cli.Bulk(bytes.NewReader(req.body()),
|
||||
c.cli.Bulk.WithDocumentType(c.config.Etype),
|
||||
c.cli.Bulk.WithParameters(c.config.Params),
|
||||
c.cli.Bulk.WithHeader(c.config.Headers))
|
||||
|
@ -213,39 +264,3 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
|
|||
func (c *ClientSet) Stop() {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
// { "index" : { "_index" : "test", "_id" : "1" } }
|
||||
func (c *ClientSet) writeMeta(action string, documentID string, index string) error {
|
||||
c.buf.WriteRune('{')
|
||||
c.aux = strconv.AppendQuote(c.aux, action)
|
||||
c.buf.Write(c.aux)
|
||||
c.aux = c.aux[:0]
|
||||
c.buf.WriteRune(':')
|
||||
c.buf.WriteRune('{')
|
||||
if documentID != "" {
|
||||
c.buf.WriteString(`"_id":`)
|
||||
c.aux = strconv.AppendQuote(c.aux, documentID)
|
||||
c.buf.Write(c.aux)
|
||||
c.aux = c.aux[:0]
|
||||
}
|
||||
|
||||
if index != "" {
|
||||
c.buf.WriteString(`"_index":`)
|
||||
c.aux = strconv.AppendQuote(c.aux, index)
|
||||
c.buf.Write(c.aux)
|
||||
c.aux = c.aux[:0]
|
||||
}
|
||||
c.buf.WriteRune('}')
|
||||
c.buf.WriteRune('}')
|
||||
c.buf.WriteRune('\n')
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ClientSet) writeBody(body []byte) error {
|
||||
if len(body) == 0 {
|
||||
return nil
|
||||
}
|
||||
c.buf.Write(body)
|
||||
c.buf.WriteRune('\n')
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -16,33 +16,29 @@ limitations under the License.
|
|||
|
||||
package elasticsearch
|
||||
|
||||
import "github.com/loggie-io/loggie/pkg/util/pattern"
|
||||
import (
|
||||
"github.com/loggie-io/loggie/pkg/util/pattern"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
Hosts []string `yaml:"hosts,omitempty" validate:"required"`
|
||||
UserName string `yaml:"username,omitempty"`
|
||||
Password string `yaml:"password,omitempty"`
|
||||
Index string `yaml:"index,omitempty"`
|
||||
Headers map[string]string `yaml:"headers,omitempty"`
|
||||
Params map[string]string `yaml:"parameters,omitempty"`
|
||||
IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"`
|
||||
Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility
|
||||
DocumentId string `yaml:"documentId,omitempty"`
|
||||
Sniff *bool `yaml:"sniff,omitempty"` // deprecated
|
||||
APIKey string `yaml:"apiKey,omitempty"`
|
||||
ServiceToken string `yaml:"serviceToken,omitempty"`
|
||||
CACertPath string `yaml:"caCertPath,omitempty"`
|
||||
Compress bool `yaml:"compress,omitempty"`
|
||||
Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above
|
||||
OpType string `yaml:"opType,omitempty" default:"index"`
|
||||
|
||||
SendBuffer int `yaml:"sendBufferBytes,omitempty" default:"131072" validate:"gte=0"`
|
||||
}
|
||||
|
||||
type TLS struct {
|
||||
CAFile string `yaml:"caFile,omitempty"`
|
||||
CertFile string `yaml:"certFile,omitempty"`
|
||||
KeyFile string `yaml:"keyFile,omitempty"`
|
||||
Hosts []string `yaml:"hosts,omitempty" validate:"required"`
|
||||
UserName string `yaml:"username,omitempty"`
|
||||
Password string `yaml:"password,omitempty"`
|
||||
Index string `yaml:"index,omitempty"`
|
||||
Headers map[string]string `yaml:"headers,omitempty"`
|
||||
Params map[string]string `yaml:"parameters,omitempty"`
|
||||
IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"`
|
||||
Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility
|
||||
DocumentId string `yaml:"documentId,omitempty"`
|
||||
APIKey string `yaml:"apiKey,omitempty"`
|
||||
ServiceToken string `yaml:"serviceToken,omitempty"`
|
||||
CACertPath string `yaml:"caCertPath,omitempty"`
|
||||
Compress bool `yaml:"compress,omitempty"`
|
||||
Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above
|
||||
OpType string `yaml:"opType,omitempty" default:"index"`
|
||||
DiscoverNodesOnStart bool `yaml:"discoverNodesOnStart,omitempty"`
|
||||
DiscoverNodesInterval time.Duration `yaml:"discoverNodesInterval,omitempty"`
|
||||
}
|
||||
|
||||
type RenderIndexFail struct {
|
||||
|
|
|
@ -45,6 +45,7 @@ type CollectConfig struct {
|
|||
RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated
|
||||
FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
|
||||
AddonMeta bool `yaml:"addonMeta,omitempty"`
|
||||
AddonMetaSchema AddonMetaSchema `yaml:"addonMetaSchema,omitempty"`
|
||||
excludeFilePatterns []*regexp.Regexp
|
||||
Charset string `yaml:"charset,omitempty" default:"utf-8"`
|
||||
|
||||
|
@ -54,6 +55,12 @@ type CollectConfig struct {
|
|||
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
|
||||
}
|
||||
|
||||
type AddonMetaSchema struct {
|
||||
Fields map[string]string `yaml:"fields,omitempty"`
|
||||
FieldsUnderRoot bool `yaml:"underRoot,omitempty"`
|
||||
FieldsUnderKey string `yaml:"key,omitempty" default:"state"`
|
||||
}
|
||||
|
||||
type LineDelimiterValue struct {
|
||||
Charset string `yaml:"charset,omitempty" default:"utf-8"`
|
||||
LineType string `yaml:"type,omitempty" default:"auto"`
|
||||
|
|
|
@ -71,6 +71,19 @@ type Source struct {
|
|||
multilineProcessor *MultiProcessor
|
||||
mTask *MultiTask
|
||||
codec codec.Codec
|
||||
|
||||
addonMetaField *AddonMetaFields
|
||||
}
|
||||
|
||||
type AddonMetaFields struct {
|
||||
Pipeline string `yaml:"pipeline,omitempty"`
|
||||
Source string `yaml:"source,omitempty"`
|
||||
Filename string `yaml:"filename,omitempty"`
|
||||
Timestamp string `yaml:"timestamp,omitempty"`
|
||||
Offset string `yaml:"offset,omitempty"`
|
||||
Bytes string `yaml:"bytes,omitempty"`
|
||||
Line string `yaml:"line,omitempty"`
|
||||
Hostname string `yaml:"hostname,omitempty"`
|
||||
}
|
||||
|
||||
func (s *Source) Config() interface{} {
|
||||
|
@ -109,6 +122,10 @@ func (s *Source) Init(context api.Context) error {
|
|||
s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout
|
||||
}
|
||||
|
||||
if s.config.CollectConfig.AddonMeta {
|
||||
s.addonMetaField = addonMetaFieldsConvert(s.config.CollectConfig.AddonMetaSchema.Fields)
|
||||
}
|
||||
|
||||
// init reader chan size
|
||||
s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds
|
||||
|
||||
|
@ -186,7 +203,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) {
|
|||
s.productFunc = productFunc
|
||||
s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig)
|
||||
if s.config.CollectConfig.AddonMeta {
|
||||
s.productFunc = addonMetaProductFunc(s.productFunc)
|
||||
s.productFunc = addonMetaProductFunc(s.productFunc, s.addonMetaField, s.config.CollectConfig.AddonMetaSchema)
|
||||
}
|
||||
if s.config.ReaderConfig.MultiConfig.Active {
|
||||
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
|
||||
|
@ -238,21 +255,95 @@ func jobFieldsProductFunc(productFunc api.ProductFunc, srcCfg *source.Config) ap
|
|||
}
|
||||
}
|
||||
|
||||
func addonMetaProductFunc(productFunc api.ProductFunc) api.ProductFunc {
|
||||
func addonMetaProductFunc(productFunc api.ProductFunc, fields *AddonMetaFields, schema AddonMetaSchema) api.ProductFunc {
|
||||
return func(event api.Event) api.Result {
|
||||
s, _ := event.Meta().Get(SystemStateKey)
|
||||
state := s.(*persistence.State)
|
||||
addonMeta := make(map[string]interface{})
|
||||
addonMeta["pipeline"] = state.PipelineName
|
||||
addonMeta["source"] = state.SourceName
|
||||
addonMeta["filename"] = state.Filename
|
||||
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
|
||||
addonMeta["offset"] = state.Offset
|
||||
addonMeta["bytes"] = state.ContentBytes
|
||||
addonMeta["hostname"] = global.NodeName
|
||||
|
||||
event.Header()["state"] = addonMeta
|
||||
// if fields is nil, use default config
|
||||
if fields == nil {
|
||||
addonMeta["pipeline"] = state.PipelineName
|
||||
addonMeta["source"] = state.SourceName
|
||||
addonMeta["filename"] = state.Filename
|
||||
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
|
||||
addonMeta["offset"] = state.Offset
|
||||
addonMeta["bytes"] = state.ContentBytes
|
||||
addonMeta["hostname"] = global.NodeName
|
||||
} else {
|
||||
|
||||
if fields.Pipeline != "" {
|
||||
addonMeta[fields.Pipeline] = state.PipelineName
|
||||
}
|
||||
if fields.Source != "" {
|
||||
addonMeta[fields.Source] = state.SourceName
|
||||
}
|
||||
if fields.Filename != "" {
|
||||
addonMeta[fields.Filename] = state.Filename
|
||||
}
|
||||
if fields.Timestamp != "" {
|
||||
addonMeta[fields.Timestamp] = state.CollectTime.Local().Format(tsLayout)
|
||||
}
|
||||
if fields.Offset != "" {
|
||||
addonMeta[fields.Offset] = state.Offset
|
||||
}
|
||||
if fields.Bytes != "" {
|
||||
addonMeta[fields.Bytes] = state.ContentBytes
|
||||
}
|
||||
if fields.Line != "" {
|
||||
addonMeta[fields.Line] = state.LineNumber
|
||||
}
|
||||
if fields.Hostname != "" {
|
||||
addonMeta[fields.Hostname] = global.NodeName
|
||||
}
|
||||
}
|
||||
|
||||
if schema.FieldsUnderRoot {
|
||||
for k, v := range addonMeta {
|
||||
event.Header()[k] = v
|
||||
}
|
||||
} else {
|
||||
event.Header()[schema.FieldsUnderKey] = addonMeta
|
||||
}
|
||||
|
||||
productFunc(event)
|
||||
return result.Success()
|
||||
}
|
||||
}
|
||||
|
||||
func addonMetaFieldsConvert(fields map[string]string) *AddonMetaFields {
|
||||
if len(fields) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
amf := &AddonMetaFields{}
|
||||
for k, v := range fields {
|
||||
switch v {
|
||||
case "${_meta.pipeline}":
|
||||
amf.Pipeline = k
|
||||
|
||||
case "${_meta.source}":
|
||||
amf.Source = k
|
||||
|
||||
case "${_meta.filename}":
|
||||
amf.Filename = k
|
||||
|
||||
case "${_meta.timestamp}":
|
||||
amf.Timestamp = k
|
||||
|
||||
case "${_meta.offset}":
|
||||
amf.Offset = k
|
||||
|
||||
case "${_meta.bytes}":
|
||||
amf.Bytes = k
|
||||
|
||||
case "${_meta.line}":
|
||||
amf.Line = k
|
||||
|
||||
case "${_meta.hostname}":
|
||||
amf.Hostname = k
|
||||
}
|
||||
}
|
||||
|
||||
return amf
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ type DbConfig struct {
|
|||
File string `yaml:"file,omitempty"`
|
||||
FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"`
|
||||
BufferSize int `yaml:"bufferSize,omitempty" default:"2048"`
|
||||
TableName string `yaml:"tableName,omitempty" default:"registry"`
|
||||
CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted
|
||||
CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"`
|
||||
}
|
||||
|
|
|
@ -0,0 +1,85 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/loggie-io/loggie/pkg/control"
|
||||
"github.com/loggie-io/loggie/pkg/core/cfg"
|
||||
"github.com/loggie-io/loggie/pkg/core/interceptor"
|
||||
"github.com/loggie-io/loggie/pkg/core/log"
|
||||
"github.com/loggie-io/loggie/pkg/core/queue"
|
||||
"github.com/loggie-io/loggie/pkg/eventbus"
|
||||
"github.com/loggie-io/loggie/pkg/eventbus/export/logger"
|
||||
"github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
|
||||
"github.com/loggie-io/loggie/pkg/interceptor/metric"
|
||||
"github.com/loggie-io/loggie/pkg/interceptor/retry"
|
||||
"github.com/loggie-io/loggie/pkg/pipeline"
|
||||
"github.com/loggie-io/loggie/pkg/queue/channel"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
_ "github.com/loggie-io/loggie/pkg/include"
|
||||
)
|
||||
|
||||
const pipe1 = `
|
||||
pipelines:
|
||||
- name: test
|
||||
sources:
|
||||
- type: dev
|
||||
name: test
|
||||
qps: 100
|
||||
byteSize: 10240
|
||||
eventsTotal: 10000
|
||||
sink:
|
||||
type: elasticsearch
|
||||
parallelism: 3
|
||||
hosts: ["localhost:9200"]
|
||||
index: "loggie-benchmark-${+YYYY.MM.DD}"
|
||||
`
|
||||
|
||||
func main() {
|
||||
log.InitDefaultLogger()
|
||||
pipeline.SetDefaultConfigRaw(pipeline.Config{
|
||||
Queue: &queue.Config{
|
||||
Type: channel.Type,
|
||||
},
|
||||
Interceptors: []*interceptor.Config{
|
||||
{
|
||||
Type: metric.Type,
|
||||
},
|
||||
{
|
||||
Type: maxbytes.Type,
|
||||
},
|
||||
{
|
||||
Type: retry.Type,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
eventbus.StartAndRun(eventbus.Config{
|
||||
LoggerConfig: logger.Config{
|
||||
Enabled: true,
|
||||
Period: 5 * time.Second,
|
||||
Pretty: false,
|
||||
},
|
||||
ListenerConfigs: map[string]cfg.CommonCfg{
|
||||
"sink": map[string]interface{}{
|
||||
"period": 5 * time.Second,
|
||||
},
|
||||
"sys": map[string]interface{}{
|
||||
"period": 5 * time.Second,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
pipecfgs := &control.PipelineConfig{}
|
||||
if err := cfg.UnPackFromRaw([]byte(pipe1), pipecfgs).Defaults().Validate().Do(); err != nil {
|
||||
log.Panic("pipeline configs invalid: %v", err)
|
||||
}
|
||||
|
||||
controller := control.NewController()
|
||||
controller.Start(pipecfgs)
|
||||
|
||||
if err := http.ListenAndServe(fmt.Sprintf(":9196"), nil); err != nil {
|
||||
log.Fatal("http listen and serve err: %v", err)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue