mirror of https://github.com/loggie-io/loggie.git
Compare commits
11 Commits
Author | SHA1 | Date |
---|---|---|
|
31b8a3644c | |
|
452ee1d145 | |
|
35220b1d2c | |
|
01a0a300c4 | |
|
2bcb0632e6 | |
|
51aa8c881d | |
|
8febfeda70 | |
|
e637a3ff89 | |
|
1ea5bf997e | |
|
e6c4e37820 | |
|
38894f6e48 |
10
Makefile
10
Makefile
|
@ -82,13 +82,13 @@ benchmark: ## Run benchmark
|
||||||
|
|
||||||
##@ Build
|
##@ Build
|
||||||
|
|
||||||
build: ## go build
|
build: ## go build, EXT_BUILD_TAGS=include_core would only build core package
|
||||||
CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go
|
CGO_ENABLED=1 GOOS=${GOOS} GOARCH=${GOARCH} go build -tags ${EXT_BUILD_TAGS} -mod=vendor -a ${extra_flags} -o loggie cmd/loggie/main.go
|
||||||
|
|
||||||
##@ Build(without sqlite)
|
##@ Build(without sqlite)
|
||||||
|
|
||||||
build-in-badger: ## go build without sqlite
|
build-in-badger: ## go build without sqlite, EXT_BUILD_TAGS=include_core would only build core package
|
||||||
GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go
|
GOOS=${GOOS} GOARCH=${GOARCH} go build -tags driver_badger,${EXT_BUILD_TAGS} -mod=vendor -a -ldflags '-X github.com/loggie-io/loggie/pkg/core/global._VERSION_=${TAG} -X github.com/loggie-io/loggie/pkg/util/persistence._DRIVER_=badger -s -w' -o loggie cmd/loggie/main.go
|
||||||
|
|
||||||
##@ Images
|
##@ Images
|
||||||
|
|
||||||
|
@ -98,7 +98,7 @@ docker-build: ## Docker build -t ${REPO}:${TAG}, try: make docker-build REPO=<Yo
|
||||||
docker-push: ## Docker push ${REPO}:${TAG}
|
docker-push: ## Docker push ${REPO}:${TAG}
|
||||||
docker push ${REPO}:${TAG}
|
docker push ${REPO}:${TAG}
|
||||||
|
|
||||||
docker-multi-arch: ## Docker buildx, try: make docker-build REPO=<YourRepoHost>, ${TAG} generated by git
|
docker-multi-arch: ## Docker buildx, try: make docker-multi-arch REPO=<YourRepoHost>, ${TAG} generated by git
|
||||||
docker buildx build --platform linux/amd64,linux/arm64 -t ${REPO}:${TAG} . --push
|
docker buildx build --platform linux/amd64,linux/arm64 -t ${REPO}:${TAG} . --push
|
||||||
|
|
||||||
LOG_DIR ?= /tmp/log ## log directory
|
LOG_DIR ?= /tmp/log ## log directory
|
||||||
|
|
|
@ -1,3 +1,157 @@
|
||||||
|
# Release v1.5.0
|
||||||
|
|
||||||
|
### :star2: Features
|
||||||
|
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
|
||||||
|
- Added rocketmq sink (#530)
|
||||||
|
- Added franzKafka source (#573)
|
||||||
|
- Added kata runtime (#554)
|
||||||
|
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
|
||||||
|
- sink codec support printEvents (#448)
|
||||||
|
- Added queue in LogConfig/ClusterLogConfig (#457)
|
||||||
|
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
|
||||||
|
- Supported `copytruncate` in file source (#571)
|
||||||
|
- Added `genfiles` sub command (#471)
|
||||||
|
- Added queue in LogConfig/ClusterLogConfig queue (#457)
|
||||||
|
- Added `sortBy` field in elasticsearch source (#473)
|
||||||
|
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
|
||||||
|
- New `addHostMeta` interceptor (#474)
|
||||||
|
- Added persistence driver `badger` (#475) (#584)
|
||||||
|
- Ignore LogConfig with sidecar injection annotation (#478)
|
||||||
|
- Added `toStr` action in transformer interceptor (#482)
|
||||||
|
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
|
||||||
|
- Get loggie version with api and sub command (#496) (#508)
|
||||||
|
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
|
||||||
|
- Upgrade `kafka-go` version (#506) (#567)
|
||||||
|
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
|
||||||
|
- Pretty error when unmarshal yaml configuration failed (#539)
|
||||||
|
- Added default topic if render kafka topic failed (#550)
|
||||||
|
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
|
||||||
|
- Supported multiple topics in kafka source (#548)
|
||||||
|
- Added default index if render elasticsearch index failed (#551) (#553)
|
||||||
|
- The default `maxOpenFds` is set to 4096 (#559)
|
||||||
|
- Supported default `sinkRef` in kubernetes discovery (#555)
|
||||||
|
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
|
||||||
|
- Supported omit empty fields in Kubernetes discovery (#570)
|
||||||
|
- Optimizes `maxbytes` interceptors (#575)
|
||||||
|
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
|
||||||
|
- Added `cheanUnfinished` in cleanFiles (#580)
|
||||||
|
- Added `target` in `maxbyte` interceptor (#588)
|
||||||
|
- Added `partionKey` in franzKafka (#562)
|
||||||
|
- Added `highPrecision` in `rateLimit` interceptor (#525)
|
||||||
|
- Supported build core components of Loggie (#598)
|
||||||
|
- Added `addonMetaSchema` in file source (#599)
|
||||||
|
- Added `timestamp` and `bodyKey` in source (#600)
|
||||||
|
- Added `discoverNodesOnStart` and `discoverNodesInterval` in elasticsearch sink (#612)
|
||||||
|
|
||||||
|
### :bug: Bug Fixes
|
||||||
|
- Fixed panic when kubeEvent Series is nil (#459)
|
||||||
|
- Upgraded `automaxprocs` version to v1.5.1 (#488)
|
||||||
|
- Fixed set defaults failed in `fieldsUnderKey` (#513)
|
||||||
|
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
|
||||||
|
- Fixed grpc batch out-of-order data streams (#517)
|
||||||
|
- Fixed large line may cause oom (#529)
|
||||||
|
- Fixed duplicated batchSize in queue (#533)
|
||||||
|
- Fixed sqlite locked panic (#524)
|
||||||
|
- Fixed command can't be used in multi-arch container (#541)
|
||||||
|
- Fixed `logger listener` may cause block (#561) (#552)
|
||||||
|
- Fixed `sink concurrency` deepCopy failed (#563)
|
||||||
|
- Drop events when partial error in elasticsearch sink (#572)
|
||||||
|
- Fix convert to string type when returning byte array type in transformer interceptor (#596)
|
||||||
|
- Invalid pipelines will not stop all the pipelines running (#602)
|
||||||
|
- Fixed configuration npe when query pipelines (#604)
|
||||||
|
- Optimized elasticsearch sink request buffer (#608)
|
||||||
|
- Add pod namespace as prefix of source name when pod is matched by ClusterLogConfig (#613)
|
||||||
|
|
||||||
|
# Release v1.5.0-rc.0
|
||||||
|
|
||||||
|
### :star2: Features
|
||||||
|
- [breaking]: The `db` in `file source` is moved to the [`loggie.yml`](https://loggie-io.github.io/docs/main/reference/global/db/). If upgrading from an earlier version to v1.5, be sure to check whether `db` has been configured for `file source`. If it is not configured, you can just ignore it, and the default value will remain compatible.
|
||||||
|
- Added rocketmq sink (#530)
|
||||||
|
- Added franzKafka source (#573)
|
||||||
|
- Added kata runtime (#554)
|
||||||
|
- `typePodFields`/`typeNodeFields` is supported in LogConfig/ClusterLogConfig (#450)
|
||||||
|
- sink codec support printEvents (#448)
|
||||||
|
- Added queue in LogConfig/ClusterLogConfig (#457)
|
||||||
|
- Changed `olivere/elastic` to the official elasticsearch go client (#581)
|
||||||
|
- Supported `copytruncate` in file source (#571)
|
||||||
|
- Added `genfiles` sub command (#471)
|
||||||
|
- Added queue in LogConfig/ClusterLogConfig queue (#457)
|
||||||
|
- Added `sortBy` field in elasticsearch source (#473)
|
||||||
|
- Added host VM mode with Kubernetes as the configuration center (#449) (#489)
|
||||||
|
- New `addHostMeta` interceptor (#474)
|
||||||
|
- Added persistence driver `badger` (#475) (#584)
|
||||||
|
- Ignore LogConfig with sidecar injection annotation (#478)
|
||||||
|
- Added `toStr` action in transformer interceptor (#482)
|
||||||
|
- You can mount the root directory of a node to the Loggie container without mounting additional Loggie volumes (#460)
|
||||||
|
- Get loggie version with api and sub command (#496) (#508)
|
||||||
|
- Added the `worker` and the `clientId` in Kafka source (#506) (#507)
|
||||||
|
- Upgrade `kafka-go` version (#506) (#567)
|
||||||
|
- Added resultStatus in dev sink which can be used to simulate failure, drop (#531)
|
||||||
|
- Pretty error when unmarshal yaml configuration failed (#539)
|
||||||
|
- Added default topic if render kafka topic failed (#550)
|
||||||
|
- Added `ignoreUnknownTopicOrPartition` in kafka sink (#560)
|
||||||
|
- Supported multiple topics in kafka source (#548)
|
||||||
|
- Added default index if render elasticsearch index failed (#551) (#553)
|
||||||
|
- The default `maxOpenFds` is set to 4096 (#559)
|
||||||
|
- Supported default `sinkRef` in kubernetes discovery (#555)
|
||||||
|
- Added `${_k8s.clusterlogconfig}` in `typePodFields` (#569)
|
||||||
|
- Supported omit empty fields in Kubernetes discovery (#570)
|
||||||
|
- Optimizes `maxbytes` interceptors (#575)
|
||||||
|
- Moved `readFromTail`, `cleanFiles`, `fdHoldTimeoutWhenInactive`, `fdHoldTimeoutWhenRemove` from watcher to outer layer in `file source` (#579) (#585)
|
||||||
|
- Added `cheanUnfinished` in cleanFiles (#580)
|
||||||
|
- Added `target` in `maxbyte` interceptor (#588)
|
||||||
|
- Added `partionKey` in franzKafka (#562)
|
||||||
|
- Added `highPrecision` in `rateLimit` interceptor (#525)
|
||||||
|
|
||||||
|
### :bug: Bug Fixes
|
||||||
|
- Fixed panic when kubeEvent Series is nil (#459)
|
||||||
|
- Upgraded `automaxprocs` version to v1.5.1 (#488)
|
||||||
|
- Fixed set defaults failed in `fieldsUnderKey` (#513)
|
||||||
|
- Fixed parse condition failed when contain ERROR in transformer interceptor (#514) (#515)
|
||||||
|
- Fixed grpc batch out-of-order data streams (#517)
|
||||||
|
- Fixed large line may cause oom (#529)
|
||||||
|
- Fixed duplicated batchSize in queue (#533)
|
||||||
|
- Fixed sqlite locked panic (#524)
|
||||||
|
- Fixed command can't be used in multi-arch container (#541)
|
||||||
|
- Fixed `logger listener` may cause block (#561) (#552)
|
||||||
|
- Fixed `sink concurrency` deepCopy failed (#563)
|
||||||
|
- Drop events when partial error in elasticsearch sink (#572)
|
||||||
|
|
||||||
|
# Release v1.4.0
|
||||||
|
|
||||||
|
### :star2: Features
|
||||||
|
|
||||||
|
- Added Loggie dashboard feature for easier troubleshooting (#416)
|
||||||
|
- Enhanced log alerting function with more flexible log alert detection rules and added alertWebhook sink (#392)
|
||||||
|
- Added sink concurrency support for automatic adaptation based on downstream delay (#376)
|
||||||
|
- Added franzKafka sink for users who prefer the franz kafka library (#423)
|
||||||
|
- Added elasticsearch source (#345)
|
||||||
|
- Added zinc sink (#254)
|
||||||
|
- Added pulsar sink (#417)
|
||||||
|
- Added grok action to transformer interceptor (#418)
|
||||||
|
- Added split action to transformer interceptor (#411)
|
||||||
|
- Added jsonEncode action to transformer interceptor (#421)
|
||||||
|
- Added fieldsFromPath configuration to source for obtaining fields from file content (#401)
|
||||||
|
- Added fieldsRef parameter to filesource listener for obtaining key value from fields configuration and adding to metrics as label (#402)
|
||||||
|
- In transformer interceptor, added dropIfError support to drop event if action execution fails (#409)
|
||||||
|
- Added info listener which currently exposes loggie_info_stat metrics and displays version label (#410)
|
||||||
|
- Added support for customized kafka sink partition key
|
||||||
|
- Added sasl support to Kafka source (#415)
|
||||||
|
- Added https insecureSkipVerify support to loki sink (#422)
|
||||||
|
- Optimized file source for large files (#430)
|
||||||
|
- Changed default value of file source maxOpenFds to 1024 (#437)
|
||||||
|
- ContainerRuntime can now be set to none (#439)
|
||||||
|
- Upgraded to go 1.18 (#440)
|
||||||
|
- Optimize the configuration parameters to remove the redundancy generated by rendering
|
||||||
|
|
||||||
|
### :bug: Bug Fixes
|
||||||
|
|
||||||
|
- Added source fields to filesource listener (#402)
|
||||||
|
- Fixed issue of transformer copy action not copying non-string body (#420)
|
||||||
|
- Added fetching of logs file from UpperDir when rootfs collection is enabled (#414)
|
||||||
|
- Fix pipeline restart npe (#454)
|
||||||
|
- Fix create dir soft link job (#453)
|
||||||
|
|
||||||
# Release v1.4.0-rc.0
|
# Release v1.4.0-rc.0
|
||||||
|
|
||||||
### :star2: Features
|
### :star2: Features
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -61,7 +61,6 @@ require (
|
||||||
github.com/cespare/xxhash v1.1.0 // indirect
|
github.com/cespare/xxhash v1.1.0 // indirect
|
||||||
github.com/danieljoos/wincred v1.0.2 // indirect
|
github.com/danieljoos/wincred v1.0.2 // indirect
|
||||||
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
github.com/dgraph-io/ristretto v0.1.1 // indirect
|
||||||
github.com/dustin/go-humanize v1.0.0 // indirect
|
|
||||||
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect
|
github.com/dvsekhvalnov/jose2go v0.0.0-20200901110807-248326c1351b // indirect
|
||||||
github.com/emirpasic/gods v1.12.0 // indirect
|
github.com/emirpasic/gods v1.12.0 // indirect
|
||||||
github.com/fatih/color v1.10.0 // indirect
|
github.com/fatih/color v1.10.0 // indirect
|
||||||
|
@ -174,6 +173,7 @@ require (
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/apache/rocketmq-client-go/v2 v2.1.1
|
github.com/apache/rocketmq-client-go/v2 v2.1.1
|
||||||
|
github.com/dustin/go-humanize v1.0.0
|
||||||
github.com/elastic/go-elasticsearch/v7 v7.17.10
|
github.com/elastic/go-elasticsearch/v7 v7.17.10
|
||||||
github.com/goccy/go-yaml v1.11.0
|
github.com/goccy/go-yaml v1.11.0
|
||||||
github.com/mattn/go-sqlite3 v1.11.0
|
github.com/mattn/go-sqlite3 v1.11.0
|
||||||
|
|
18
loggie.yml
18
loggie.yml
|
@ -14,7 +14,7 @@ loggie:
|
||||||
sink: ~
|
sink: ~
|
||||||
queue: ~
|
queue: ~
|
||||||
pipeline: ~
|
pipeline: ~
|
||||||
normalize: ~
|
sys: ~
|
||||||
|
|
||||||
discovery:
|
discovery:
|
||||||
enabled: false
|
enabled: false
|
||||||
|
@ -31,15 +31,17 @@ loggie:
|
||||||
defaults:
|
defaults:
|
||||||
sink:
|
sink:
|
||||||
type: dev
|
type: dev
|
||||||
interceptors:
|
|
||||||
- type: schema
|
|
||||||
name: global
|
|
||||||
order: 700
|
|
||||||
addMeta:
|
|
||||||
timestamp:
|
|
||||||
key: "@timestamp"
|
|
||||||
sources:
|
sources:
|
||||||
- type: file
|
- type: file
|
||||||
|
timestampKey: "@timestamp"
|
||||||
|
bodyKey: "message"
|
||||||
|
fieldsUnderRoot: true
|
||||||
|
addonMeta: true
|
||||||
|
addonMetaSchema:
|
||||||
|
underRoot: true
|
||||||
|
fields:
|
||||||
|
filename: "${_meta.filename}"
|
||||||
|
line: "${_meta.line}"
|
||||||
watcher:
|
watcher:
|
||||||
maxOpenFds: 6000
|
maxOpenFds: 6000
|
||||||
http:
|
http:
|
||||||
|
|
|
@ -130,11 +130,21 @@ func ReadPipelineConfigFromFile(path string, ignore FileIgnore) (*PipelineConfig
|
||||||
for _, fn := range all {
|
for _, fn := range all {
|
||||||
pipes := &PipelineConfig{}
|
pipes := &PipelineConfig{}
|
||||||
unpack := cfg.UnPackFromFile(fn, pipes)
|
unpack := cfg.UnPackFromFile(fn, pipes)
|
||||||
if err = unpack.Defaults().Validate().Do(); err != nil {
|
if err = unpack.Do(); err != nil {
|
||||||
log.Error("invalid pipeline configs: %v, \n%s", err, unpack.Contents())
|
log.Error("read pipeline configs from path %s failed: %v", path, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pipecfgs.AddPipelines(pipes.Pipelines)
|
|
||||||
|
for _, p := range pipes.Pipelines {
|
||||||
|
pip := p
|
||||||
|
if err := cfg.NewUnpack(nil, &pip, nil).Defaults().Validate().Do(); err != nil {
|
||||||
|
// ignore invalid pipeline, but continue to read other pipelines
|
||||||
|
// invalid pipeline will check by reloader later
|
||||||
|
log.Error("pipeline: %s configs invalid: %v", p.Name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pipecfgs.AddPipelines([]pipeline.Config{pip})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return pipecfgs, nil
|
return pipecfgs, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||||
package reloader
|
package reloader
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -49,7 +48,7 @@ func (r *reloader) readPipelineConfigHandler(writer http.ResponseWriter, request
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
content, err := ioutil.ReadFile(m)
|
content, err := os.ReadFile(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("read config error. err: %v", err)
|
log.Warn("read config error. err: %v", err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -38,6 +38,11 @@ type Config struct {
|
||||||
FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"`
|
FieldsFromEnv map[string]string `yaml:"fieldsFromEnv,omitempty"`
|
||||||
FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"`
|
FieldsFromPath map[string]string `yaml:"fieldsFromPath,omitempty"`
|
||||||
Codec *codec.Config `yaml:"codec,omitempty"`
|
Codec *codec.Config `yaml:"codec,omitempty"`
|
||||||
|
|
||||||
|
TimestampKey string `yaml:"timestampKey,omitempty"`
|
||||||
|
TimestampLocation string `yaml:"timestampLocation,omitempty"`
|
||||||
|
TimestampLayout string `yaml:"timestampLayout,omitempty"`
|
||||||
|
BodyKey string `yaml:"bodyKey,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Config) DeepCopy() *Config {
|
func (c *Config) DeepCopy() *Config {
|
||||||
|
@ -82,6 +87,11 @@ func (c *Config) DeepCopy() *Config {
|
||||||
FieldsFromEnv: newFieldsFromEnv,
|
FieldsFromEnv: newFieldsFromEnv,
|
||||||
FieldsFromPath: newFieldsFromPath,
|
FieldsFromPath: newFieldsFromPath,
|
||||||
Codec: c.Codec.DeepCopy(),
|
Codec: c.Codec.DeepCopy(),
|
||||||
|
|
||||||
|
TimestampKey: c.TimestampKey,
|
||||||
|
TimestampLocation: c.TimestampLocation,
|
||||||
|
TimestampLayout: c.TimestampLayout,
|
||||||
|
BodyKey: c.BodyKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
return out
|
return out
|
||||||
|
@ -155,6 +165,19 @@ func (c *Config) Merge(from *Config) {
|
||||||
} else {
|
} else {
|
||||||
c.Codec.Merge(from.Codec)
|
c.Codec.Merge(from.Codec)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if c.TimestampKey == "" {
|
||||||
|
c.TimestampKey = from.TimestampKey
|
||||||
|
}
|
||||||
|
if c.TimestampLocation == "" {
|
||||||
|
c.TimestampLocation = from.TimestampLocation
|
||||||
|
}
|
||||||
|
if c.TimestampLayout == "" {
|
||||||
|
c.TimestampLayout = from.TimestampLayout
|
||||||
|
}
|
||||||
|
if c.BodyKey == "" {
|
||||||
|
c.BodyKey = from.BodyKey
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func MergeSourceList(base []*Config, from []*Config) []*Config {
|
func MergeSourceList(base []*Config, from []*Config) []*Config {
|
||||||
|
|
|
@ -364,7 +364,7 @@ func (c *Controller) makeConfigPerSource(s *source.Config, pod *corev1.Pod, lgc
|
||||||
}
|
}
|
||||||
|
|
||||||
// change the source name, add pod.Name-containerName as prefix, since there maybe multiple containers in pod
|
// change the source name, add pod.Name-containerName as prefix, since there maybe multiple containers in pod
|
||||||
filesrc.Name = helper.GenTypePodSourceName(pod.Name, status.Name, filesrc.Name)
|
filesrc.Name = helper.GenTypePodSourceName(lgc.Namespace, pod.Namespace, pod.Name, status.Name, filesrc.Name)
|
||||||
|
|
||||||
// inject default pod metadata
|
// inject default pod metadata
|
||||||
if err := c.injectTypePodFields(c.config.DynamicContainerLog, filesrc, extra, pod, lgc, status.Name); err != nil {
|
if err := c.injectTypePodFields(c.config.DynamicContainerLog, filesrc, extra, pod, lgc, status.Name); err != nil {
|
||||||
|
|
|
@ -122,7 +122,12 @@ func ToPipelineInterceptor(interceptorsRaw string, interceptorRef string, interc
|
||||||
return interConfList, nil
|
return interConfList, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GenTypePodSourceName(podName string, containerName string, sourceName string) string {
|
func GenTypePodSourceName(lgcNamespace string, podNamespace string, podName string, containerName string, sourceName string) string {
|
||||||
|
// if lgcNamespace is empty, we use podNamespace as the first part of the source name,
|
||||||
|
// because this is the pod matched by clusterLogConfig, if the pod namespace is not added, it may cause the source to be duplicated
|
||||||
|
if lgcNamespace == "" {
|
||||||
|
return fmt.Sprintf("%s/%s/%s/%s", podNamespace, podName, containerName, sourceName)
|
||||||
|
}
|
||||||
return fmt.Sprintf("%s/%s/%s", podName, containerName, sourceName)
|
return fmt.Sprintf("%s/%s/%s", podName, containerName, sourceName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ package sys
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/dustin/go-humanize"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
@ -48,8 +49,9 @@ func makeListener() eventbus.Listener {
|
||||||
}
|
}
|
||||||
|
|
||||||
type sysData struct {
|
type sysData struct {
|
||||||
MemoryRss uint64 `json:"memRss"`
|
MemoryRss uint64 `json:"-"`
|
||||||
CPUPercent float64 `json:"cpuPercent"`
|
MemoryRssHumanize string `json:"memRss"`
|
||||||
|
CPUPercent float64 `json:"cpuPercent"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
@ -122,6 +124,7 @@ func (l *Listener) getSysStat() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
l.data.MemoryRss = mem.RSS
|
l.data.MemoryRss = mem.RSS
|
||||||
|
l.data.MemoryRssHumanize = humanize.Bytes(mem.RSS)
|
||||||
|
|
||||||
cpuPer, err := l.proc.Percent(1 * time.Second)
|
cpuPer, err := l.proc.Percent(1 * time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
|
//go:build !include_core
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Copyright 2021 Loggie Authors
|
Copyright 2023 Loggie Authors
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
|
@ -0,0 +1,62 @@
|
||||||
|
//go:build include_core
|
||||||
|
|
||||||
|
/*
|
||||||
|
Copyright 2023 Loggie Authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package include
|
||||||
|
|
||||||
|
import (
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/export/prometheus"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filesource"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/filewatcher"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/info"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/logalerting"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/pipeline"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/queue"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/reload"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sink"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/eventbus/listener/sys"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/addhostmeta"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/addk8smeta"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/limit"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/logalert/condition"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/metric"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/retry"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/schema"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/action"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/interceptor/transformer/condition"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/queue/channel"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/queue/memory"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/alertwebhook"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/codec/json"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/codec/raw"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/dev"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/elasticsearch"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/file"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/franz"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/sink/kafka"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/codec/json"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/codec/regex"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/dev"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/elasticsearch"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/file"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/file/process"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/franz"
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/source/kafka"
|
||||||
|
)
|
|
@ -51,5 +51,10 @@ func NewEqual(args []string) (*Equal, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (eq *Equal) Check(e api.Event) bool {
|
func (eq *Equal) Check(e api.Event) bool {
|
||||||
return eq.value == eventops.Get(e, eq.field)
|
value := eventops.Get(e, eq.field)
|
||||||
|
if byteValue, ok := value.([]byte); ok {
|
||||||
|
value = string(byteValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
return eq.value == value
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"github.com/loggie-io/loggie/pkg/ops/helper"
|
"github.com/loggie-io/loggie/pkg/ops/helper"
|
||||||
"github.com/loggie-io/loggie/pkg/util"
|
"github.com/loggie-io/loggie/pkg/util"
|
||||||
"github.com/rivo/tview"
|
"github.com/rivo/tview"
|
||||||
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -234,7 +235,7 @@ func (p *LogStatusPanel) SetData() {
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
out, err := ioutil.ReadAll(resp.Body)
|
out, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,6 +174,9 @@ func diffPipes(request *http.Request) string {
|
||||||
|
|
||||||
func queryPipelineConfig(cfgInPath *control.PipelineConfig, pipelineQuery string, sourceQuery string) map[string]pipeline.Config {
|
func queryPipelineConfig(cfgInPath *control.PipelineConfig, pipelineQuery string, sourceQuery string) map[string]pipeline.Config {
|
||||||
result := make(map[string]pipeline.Config)
|
result := make(map[string]pipeline.Config)
|
||||||
|
if cfgInPath == nil {
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
setResult := func(pipData pipeline.Config, srcData ...*source.Config) {
|
setResult := func(pipData pipeline.Config, srcData ...*source.Config) {
|
||||||
pip, ok := result[pipData.Name]
|
pip, ok := result[pipData.Name]
|
||||||
|
|
|
@ -18,7 +18,7 @@ package pipeline
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
timeutil "github.com/loggie-io/loggie/pkg/util/time"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -46,10 +46,9 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
FieldsUnderRoot = event.PrivateKeyPrefix + "FieldsUnderRoot"
|
|
||||||
FieldsUnderKey = event.PrivateKeyPrefix + "FieldsUnderKey"
|
|
||||||
|
|
||||||
fieldsFromPathMaxBytes = 1024
|
fieldsFromPathMaxBytes = 1024
|
||||||
|
|
||||||
|
defaultTsLayout = "2006-01-02T15:04:05.000Z"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1032,7 +1031,7 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for k, pathKey := range fieldsFromPath {
|
for k, pathKey := range fieldsFromPath {
|
||||||
out, err := ioutil.ReadFile(pathKey)
|
out, err := os.ReadFile(pathKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("init fieldsFromPath %s failed, read file %s err: %v", k, pathKey, err)
|
log.Error("init fieldsFromPath %s failed, read file %s err: %v", k, pathKey, err)
|
||||||
continue
|
continue
|
||||||
|
@ -1054,11 +1053,10 @@ func (p *Pipeline) initFieldsFromPath(fieldsFromPath map[string]string) {
|
||||||
|
|
||||||
func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
|
func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
|
||||||
// add meta fields
|
// add meta fields
|
||||||
e.Meta().Set(event.SystemProductTimeKey, time.Now())
|
now := time.Now()
|
||||||
|
e.Meta().Set(event.SystemProductTimeKey, now)
|
||||||
e.Meta().Set(event.SystemPipelineKey, p.name)
|
e.Meta().Set(event.SystemPipelineKey, p.name)
|
||||||
e.Meta().Set(event.SystemSourceKey, config.Name)
|
e.Meta().Set(event.SystemSourceKey, config.Name)
|
||||||
e.Meta().Set(FieldsUnderRoot, config.FieldsUnderRoot)
|
|
||||||
e.Meta().Set(FieldsUnderKey, config.FieldsUnderKey)
|
|
||||||
|
|
||||||
header := e.Header()
|
header := e.Header()
|
||||||
if header == nil {
|
if header == nil {
|
||||||
|
@ -1073,6 +1071,28 @@ func (p *Pipeline) fillEventMetaAndHeader(e api.Event, config source.Config) {
|
||||||
|
|
||||||
// add header source fields from file
|
// add header source fields from file
|
||||||
AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey)
|
AddSourceFields(header, p.pathMap, config.FieldsUnderRoot, config.FieldsUnderKey)
|
||||||
|
|
||||||
|
// remap timestamp
|
||||||
|
if config.TimestampKey != "" {
|
||||||
|
layout := config.TimestampLayout
|
||||||
|
if layout == "" {
|
||||||
|
layout = defaultTsLayout
|
||||||
|
}
|
||||||
|
|
||||||
|
// conf.Location could be "" or "UTC" or "Local"
|
||||||
|
// default "" indicate "UTC"
|
||||||
|
ts, err := timeutil.Format(now, config.TimestampLocation, layout)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("time format system product timestamp err: %+v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
header[config.TimestampKey] = ts
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.BodyKey != "" {
|
||||||
|
header[config.BodyKey] = util.ByteToStringUnsafe(e.Body())
|
||||||
|
e.Fill(e.Meta(), header, []byte{})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) {
|
func AddSourceFields(header map[string]interface{}, fields map[string]interface{}, underRoot bool, fieldsKey string) {
|
||||||
|
|
|
@ -18,6 +18,7 @@ package json
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/loggie-io/loggie/pkg/core/log"
|
"github.com/loggie-io/loggie/pkg/core/log"
|
||||||
|
"github.com/loggie-io/loggie/pkg/util"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
|
@ -73,7 +74,7 @@ func (j *Json) Encode(e api.Event) ([]byte, error) {
|
||||||
beatsFormat(e)
|
beatsFormat(e)
|
||||||
} else if len(e.Body()) != 0 {
|
} else if len(e.Body()) != 0 {
|
||||||
// put body in header
|
// put body in header
|
||||||
header[eventer.Body] = string(e.Body())
|
header[eventer.Body] = util.ByteToStringUnsafe(e.Body())
|
||||||
}
|
}
|
||||||
|
|
||||||
var result []byte
|
var result []byte
|
||||||
|
|
|
@ -21,7 +21,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
es "github.com/elastic/go-elasticsearch/v7"
|
es "github.com/elastic/go-elasticsearch/v7"
|
||||||
"github.com/elastic/go-elasticsearch/v7/esapi"
|
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
"github.com/loggie-io/loggie/pkg/core/api"
|
"github.com/loggie-io/loggie/pkg/core/api"
|
||||||
eventer "github.com/loggie-io/loggie/pkg/core/event"
|
eventer "github.com/loggie-io/loggie/pkg/core/event"
|
||||||
|
@ -30,7 +29,7 @@ import (
|
||||||
"github.com/loggie-io/loggie/pkg/util/pattern"
|
"github.com/loggie-io/loggie/pkg/util/pattern"
|
||||||
"github.com/loggie-io/loggie/pkg/util/runtime"
|
"github.com/loggie-io/loggie/pkg/util/runtime"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"io/ioutil"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
@ -44,8 +43,6 @@ type ClientSet struct {
|
||||||
cli *es.Client
|
cli *es.Client
|
||||||
opType string
|
opType string
|
||||||
|
|
||||||
buf *bytes.Buffer
|
|
||||||
aux []byte
|
|
||||||
reqCount int
|
reqCount int
|
||||||
|
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
|
@ -54,6 +51,70 @@ type ClientSet struct {
|
||||||
documentIdPattern *pattern.Pattern
|
documentIdPattern *pattern.Pattern
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bulkRequest struct {
|
||||||
|
lines []line
|
||||||
|
}
|
||||||
|
|
||||||
|
type line struct {
|
||||||
|
meta []byte
|
||||||
|
body []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bulkRequest) body() []byte {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
size := 0
|
||||||
|
for _, l := range b.lines {
|
||||||
|
size += len(l.meta) + len(l.body) + 1
|
||||||
|
}
|
||||||
|
buf.Grow(size)
|
||||||
|
|
||||||
|
for _, l := range b.lines {
|
||||||
|
buf.Write(l.meta)
|
||||||
|
buf.Write(l.body)
|
||||||
|
buf.WriteRune('\n')
|
||||||
|
}
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *bulkRequest) add(body []byte, action string, documentID string, index string) {
|
||||||
|
if len(body) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var buf bytes.Buffer
|
||||||
|
var aux []byte
|
||||||
|
|
||||||
|
// { "index" : { "_index" : "test", "_id" : "1" } }
|
||||||
|
buf.WriteRune('{')
|
||||||
|
aux = strconv.AppendQuote(aux, action)
|
||||||
|
buf.Write(aux)
|
||||||
|
aux = aux[:0]
|
||||||
|
buf.WriteRune(':')
|
||||||
|
buf.WriteRune('{')
|
||||||
|
if documentID != "" {
|
||||||
|
buf.WriteString(`"_id":`)
|
||||||
|
aux = strconv.AppendQuote(aux, documentID)
|
||||||
|
buf.Write(aux)
|
||||||
|
aux = aux[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
if index != "" {
|
||||||
|
buf.WriteString(`"_index":`)
|
||||||
|
aux = strconv.AppendQuote(aux, index)
|
||||||
|
buf.Write(aux)
|
||||||
|
}
|
||||||
|
buf.WriteRune('}')
|
||||||
|
buf.WriteRune('}')
|
||||||
|
buf.WriteRune('\n')
|
||||||
|
|
||||||
|
l := line{
|
||||||
|
meta: buf.Bytes(),
|
||||||
|
body: body,
|
||||||
|
}
|
||||||
|
|
||||||
|
b.lines = append(b.lines, l)
|
||||||
|
}
|
||||||
|
|
||||||
type Client interface {
|
type Client interface {
|
||||||
Bulk(ctx context.Context, batch api.Batch) error
|
Bulk(ctx context.Context, batch api.Batch) error
|
||||||
Stop()
|
Stop()
|
||||||
|
@ -68,7 +129,7 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
|
||||||
}
|
}
|
||||||
var ca []byte
|
var ca []byte
|
||||||
if config.CACertPath != "" {
|
if config.CACertPath != "" {
|
||||||
caData, err := ioutil.ReadFile(config.CACertPath)
|
caData, err := os.ReadFile(config.CACertPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -76,14 +137,16 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg := es.Config{
|
cfg := es.Config{
|
||||||
Addresses: config.Hosts,
|
Addresses: config.Hosts,
|
||||||
DisableRetry: true,
|
DisableRetry: true,
|
||||||
Username: config.UserName,
|
Username: config.UserName,
|
||||||
Password: config.Password,
|
Password: config.Password,
|
||||||
APIKey: config.APIKey,
|
APIKey: config.APIKey,
|
||||||
ServiceToken: config.ServiceToken,
|
ServiceToken: config.ServiceToken,
|
||||||
CompressRequestBody: config.Compress,
|
CompressRequestBody: config.Compress,
|
||||||
CACert: ca,
|
DiscoverNodesOnStart: config.DiscoverNodesOnStart,
|
||||||
|
DiscoverNodesInterval: config.DiscoverNodesInterval,
|
||||||
|
CACert: ca,
|
||||||
}
|
}
|
||||||
cli, err := es.NewClient(cfg)
|
cli, err := es.NewClient(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -94,8 +157,6 @@ func NewClient(config *Config, cod codec.Codec, indexPattern *pattern.Pattern, d
|
||||||
config: config,
|
config: config,
|
||||||
cli: cli,
|
cli: cli,
|
||||||
opType: config.OpType,
|
opType: config.OpType,
|
||||||
buf: bytes.NewBuffer(make([]byte, 0, config.SendBuffer)),
|
|
||||||
aux: make([]byte, 0, 512),
|
|
||||||
reqCount: 0,
|
reqCount: 0,
|
||||||
codec: cod,
|
codec: cod,
|
||||||
indexPattern: indexPattern,
|
indexPattern: indexPattern,
|
||||||
|
@ -109,16 +170,11 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
|
||||||
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
|
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
|
||||||
}
|
}
|
||||||
|
|
||||||
bulkReq := esapi.BulkRequest{}
|
|
||||||
|
|
||||||
if c.config.Etype != "" {
|
|
||||||
bulkReq.DocumentType = c.config.Etype
|
|
||||||
}
|
|
||||||
defer func() {
|
defer func() {
|
||||||
c.buf.Reset()
|
|
||||||
c.reqCount = 0
|
c.reqCount = 0
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
req := bulkRequest{}
|
||||||
for _, event := range batch.Events() {
|
for _, event := range batch.Events() {
|
||||||
headerObj := runtime.NewObject(event.Header())
|
headerObj := runtime.NewObject(event.Header())
|
||||||
|
|
||||||
|
@ -160,19 +216,14 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
c.reqCount++
|
c.reqCount++
|
||||||
if err := c.writeMeta(c.opType, docId, idx); err != nil {
|
req.add(data, c.opType, docId, idx)
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := c.writeBody(data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.reqCount == 0 {
|
if c.reqCount == 0 {
|
||||||
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
|
return errors.WithMessagef(eventer.ErrorDropEvent, "request to elasticsearch bulk is null")
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.cli.Bulk(bytes.NewReader(c.buf.Bytes()),
|
resp, err := c.cli.Bulk(bytes.NewReader(req.body()),
|
||||||
c.cli.Bulk.WithDocumentType(c.config.Etype),
|
c.cli.Bulk.WithDocumentType(c.config.Etype),
|
||||||
c.cli.Bulk.WithParameters(c.config.Params),
|
c.cli.Bulk.WithParameters(c.config.Params),
|
||||||
c.cli.Bulk.WithHeader(c.config.Headers))
|
c.cli.Bulk.WithHeader(c.config.Headers))
|
||||||
|
@ -213,39 +264,3 @@ func (c *ClientSet) Bulk(ctx context.Context, batch api.Batch) error {
|
||||||
func (c *ClientSet) Stop() {
|
func (c *ClientSet) Stop() {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
// { "index" : { "_index" : "test", "_id" : "1" } }
|
|
||||||
func (c *ClientSet) writeMeta(action string, documentID string, index string) error {
|
|
||||||
c.buf.WriteRune('{')
|
|
||||||
c.aux = strconv.AppendQuote(c.aux, action)
|
|
||||||
c.buf.Write(c.aux)
|
|
||||||
c.aux = c.aux[:0]
|
|
||||||
c.buf.WriteRune(':')
|
|
||||||
c.buf.WriteRune('{')
|
|
||||||
if documentID != "" {
|
|
||||||
c.buf.WriteString(`"_id":`)
|
|
||||||
c.aux = strconv.AppendQuote(c.aux, documentID)
|
|
||||||
c.buf.Write(c.aux)
|
|
||||||
c.aux = c.aux[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
if index != "" {
|
|
||||||
c.buf.WriteString(`"_index":`)
|
|
||||||
c.aux = strconv.AppendQuote(c.aux, index)
|
|
||||||
c.buf.Write(c.aux)
|
|
||||||
c.aux = c.aux[:0]
|
|
||||||
}
|
|
||||||
c.buf.WriteRune('}')
|
|
||||||
c.buf.WriteRune('}')
|
|
||||||
c.buf.WriteRune('\n')
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *ClientSet) writeBody(body []byte) error {
|
|
||||||
if len(body) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
c.buf.Write(body)
|
|
||||||
c.buf.WriteRune('\n')
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -16,33 +16,29 @@ limitations under the License.
|
||||||
|
|
||||||
package elasticsearch
|
package elasticsearch
|
||||||
|
|
||||||
import "github.com/loggie-io/loggie/pkg/util/pattern"
|
import (
|
||||||
|
"github.com/loggie-io/loggie/pkg/util/pattern"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
Hosts []string `yaml:"hosts,omitempty" validate:"required"`
|
Hosts []string `yaml:"hosts,omitempty" validate:"required"`
|
||||||
UserName string `yaml:"username,omitempty"`
|
UserName string `yaml:"username,omitempty"`
|
||||||
Password string `yaml:"password,omitempty"`
|
Password string `yaml:"password,omitempty"`
|
||||||
Index string `yaml:"index,omitempty"`
|
Index string `yaml:"index,omitempty"`
|
||||||
Headers map[string]string `yaml:"headers,omitempty"`
|
Headers map[string]string `yaml:"headers,omitempty"`
|
||||||
Params map[string]string `yaml:"parameters,omitempty"`
|
Params map[string]string `yaml:"parameters,omitempty"`
|
||||||
IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"`
|
IfRenderIndexFailed RenderIndexFail `yaml:"ifRenderIndexFailed,omitempty"`
|
||||||
Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility
|
Etype string `yaml:"etype,omitempty"` // elasticsearch type, for v5.* backward compatibility
|
||||||
DocumentId string `yaml:"documentId,omitempty"`
|
DocumentId string `yaml:"documentId,omitempty"`
|
||||||
Sniff *bool `yaml:"sniff,omitempty"` // deprecated
|
APIKey string `yaml:"apiKey,omitempty"`
|
||||||
APIKey string `yaml:"apiKey,omitempty"`
|
ServiceToken string `yaml:"serviceToken,omitempty"`
|
||||||
ServiceToken string `yaml:"serviceToken,omitempty"`
|
CACertPath string `yaml:"caCertPath,omitempty"`
|
||||||
CACertPath string `yaml:"caCertPath,omitempty"`
|
Compress bool `yaml:"compress,omitempty"`
|
||||||
Compress bool `yaml:"compress,omitempty"`
|
Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above
|
||||||
Gzip *bool `yaml:"gzip,omitempty"` // deprecated, use compress above
|
OpType string `yaml:"opType,omitempty" default:"index"`
|
||||||
OpType string `yaml:"opType,omitempty" default:"index"`
|
DiscoverNodesOnStart bool `yaml:"discoverNodesOnStart,omitempty"`
|
||||||
|
DiscoverNodesInterval time.Duration `yaml:"discoverNodesInterval,omitempty"`
|
||||||
SendBuffer int `yaml:"sendBufferBytes,omitempty" default:"131072" validate:"gte=0"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TLS struct {
|
|
||||||
CAFile string `yaml:"caFile,omitempty"`
|
|
||||||
CertFile string `yaml:"certFile,omitempty"`
|
|
||||||
KeyFile string `yaml:"keyFile,omitempty"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type RenderIndexFail struct {
|
type RenderIndexFail struct {
|
||||||
|
|
|
@ -45,6 +45,7 @@ type CollectConfig struct {
|
||||||
RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated
|
RereadTruncated bool `yaml:"rereadTruncated,omitempty" default:"true"` // Read from the beginning when the file is truncated
|
||||||
FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
|
FirstNBytesForIdentifier int `yaml:"firstNBytesForIdentifier,omitempty" default:"128" validate:"gte=10"` // If the file size is smaller than `firstNBytesForIdentifier`, it will not be collected
|
||||||
AddonMeta bool `yaml:"addonMeta,omitempty"`
|
AddonMeta bool `yaml:"addonMeta,omitempty"`
|
||||||
|
AddonMetaSchema AddonMetaSchema `yaml:"addonMetaSchema,omitempty"`
|
||||||
excludeFilePatterns []*regexp.Regexp
|
excludeFilePatterns []*regexp.Regexp
|
||||||
Charset string `yaml:"charset,omitempty" default:"utf-8"`
|
Charset string `yaml:"charset,omitempty" default:"utf-8"`
|
||||||
|
|
||||||
|
@ -54,6 +55,12 @@ type CollectConfig struct {
|
||||||
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
|
FdHoldTimeoutWhenRemove time.Duration `yaml:"fdHoldTimeoutWhenRemove,omitempty" default:"5m"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type AddonMetaSchema struct {
|
||||||
|
Fields map[string]string `yaml:"fields,omitempty"`
|
||||||
|
FieldsUnderRoot bool `yaml:"underRoot,omitempty"`
|
||||||
|
FieldsUnderKey string `yaml:"key,omitempty" default:"state"`
|
||||||
|
}
|
||||||
|
|
||||||
type LineDelimiterValue struct {
|
type LineDelimiterValue struct {
|
||||||
Charset string `yaml:"charset,omitempty" default:"utf-8"`
|
Charset string `yaml:"charset,omitempty" default:"utf-8"`
|
||||||
LineType string `yaml:"type,omitempty" default:"auto"`
|
LineType string `yaml:"type,omitempty" default:"auto"`
|
||||||
|
|
|
@ -71,6 +71,19 @@ type Source struct {
|
||||||
multilineProcessor *MultiProcessor
|
multilineProcessor *MultiProcessor
|
||||||
mTask *MultiTask
|
mTask *MultiTask
|
||||||
codec codec.Codec
|
codec codec.Codec
|
||||||
|
|
||||||
|
addonMetaField *AddonMetaFields
|
||||||
|
}
|
||||||
|
|
||||||
|
type AddonMetaFields struct {
|
||||||
|
Pipeline string `yaml:"pipeline,omitempty"`
|
||||||
|
Source string `yaml:"source,omitempty"`
|
||||||
|
Filename string `yaml:"filename,omitempty"`
|
||||||
|
Timestamp string `yaml:"timestamp,omitempty"`
|
||||||
|
Offset string `yaml:"offset,omitempty"`
|
||||||
|
Bytes string `yaml:"bytes,omitempty"`
|
||||||
|
Line string `yaml:"line,omitempty"`
|
||||||
|
Hostname string `yaml:"hostname,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Source) Config() interface{} {
|
func (s *Source) Config() interface{} {
|
||||||
|
@ -109,6 +122,10 @@ func (s *Source) Init(context api.Context) error {
|
||||||
s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout
|
s.config.ReaderConfig.MultiConfig.Timeout = 2 * inactiveTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.config.CollectConfig.AddonMeta {
|
||||||
|
s.addonMetaField = addonMetaFieldsConvert(s.config.CollectConfig.AddonMetaSchema.Fields)
|
||||||
|
}
|
||||||
|
|
||||||
// init reader chan size
|
// init reader chan size
|
||||||
s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds
|
s.config.ReaderConfig.readChanSize = s.config.WatchConfig.MaxOpenFds
|
||||||
|
|
||||||
|
@ -186,7 +203,7 @@ func (s *Source) ProductLoop(productFunc api.ProductFunc) {
|
||||||
s.productFunc = productFunc
|
s.productFunc = productFunc
|
||||||
s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig)
|
s.productFunc = jobFieldsProductFunc(s.productFunc, s.rawSourceConfig)
|
||||||
if s.config.CollectConfig.AddonMeta {
|
if s.config.CollectConfig.AddonMeta {
|
||||||
s.productFunc = addonMetaProductFunc(s.productFunc)
|
s.productFunc = addonMetaProductFunc(s.productFunc, s.addonMetaField, s.config.CollectConfig.AddonMetaSchema)
|
||||||
}
|
}
|
||||||
if s.config.ReaderConfig.MultiConfig.Active {
|
if s.config.ReaderConfig.MultiConfig.Active {
|
||||||
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
|
s.mTask = NewMultiTask(s.epoch, s.name, s.config.ReaderConfig.MultiConfig, s.eventPool, s.productFunc)
|
||||||
|
@ -238,21 +255,95 @@ func jobFieldsProductFunc(productFunc api.ProductFunc, srcCfg *source.Config) ap
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func addonMetaProductFunc(productFunc api.ProductFunc) api.ProductFunc {
|
func addonMetaProductFunc(productFunc api.ProductFunc, fields *AddonMetaFields, schema AddonMetaSchema) api.ProductFunc {
|
||||||
return func(event api.Event) api.Result {
|
return func(event api.Event) api.Result {
|
||||||
s, _ := event.Meta().Get(SystemStateKey)
|
s, _ := event.Meta().Get(SystemStateKey)
|
||||||
state := s.(*persistence.State)
|
state := s.(*persistence.State)
|
||||||
addonMeta := make(map[string]interface{})
|
addonMeta := make(map[string]interface{})
|
||||||
addonMeta["pipeline"] = state.PipelineName
|
|
||||||
addonMeta["source"] = state.SourceName
|
|
||||||
addonMeta["filename"] = state.Filename
|
|
||||||
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
|
|
||||||
addonMeta["offset"] = state.Offset
|
|
||||||
addonMeta["bytes"] = state.ContentBytes
|
|
||||||
addonMeta["hostname"] = global.NodeName
|
|
||||||
|
|
||||||
event.Header()["state"] = addonMeta
|
// if fields is nil, use default config
|
||||||
|
if fields == nil {
|
||||||
|
addonMeta["pipeline"] = state.PipelineName
|
||||||
|
addonMeta["source"] = state.SourceName
|
||||||
|
addonMeta["filename"] = state.Filename
|
||||||
|
addonMeta["timestamp"] = state.CollectTime.Local().Format(tsLayout)
|
||||||
|
addonMeta["offset"] = state.Offset
|
||||||
|
addonMeta["bytes"] = state.ContentBytes
|
||||||
|
addonMeta["hostname"] = global.NodeName
|
||||||
|
} else {
|
||||||
|
|
||||||
|
if fields.Pipeline != "" {
|
||||||
|
addonMeta[fields.Pipeline] = state.PipelineName
|
||||||
|
}
|
||||||
|
if fields.Source != "" {
|
||||||
|
addonMeta[fields.Source] = state.SourceName
|
||||||
|
}
|
||||||
|
if fields.Filename != "" {
|
||||||
|
addonMeta[fields.Filename] = state.Filename
|
||||||
|
}
|
||||||
|
if fields.Timestamp != "" {
|
||||||
|
addonMeta[fields.Timestamp] = state.CollectTime.Local().Format(tsLayout)
|
||||||
|
}
|
||||||
|
if fields.Offset != "" {
|
||||||
|
addonMeta[fields.Offset] = state.Offset
|
||||||
|
}
|
||||||
|
if fields.Bytes != "" {
|
||||||
|
addonMeta[fields.Bytes] = state.ContentBytes
|
||||||
|
}
|
||||||
|
if fields.Line != "" {
|
||||||
|
addonMeta[fields.Line] = state.LineNumber
|
||||||
|
}
|
||||||
|
if fields.Hostname != "" {
|
||||||
|
addonMeta[fields.Hostname] = global.NodeName
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if schema.FieldsUnderRoot {
|
||||||
|
for k, v := range addonMeta {
|
||||||
|
event.Header()[k] = v
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
event.Header()[schema.FieldsUnderKey] = addonMeta
|
||||||
|
}
|
||||||
|
|
||||||
productFunc(event)
|
productFunc(event)
|
||||||
return result.Success()
|
return result.Success()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addonMetaFieldsConvert(fields map[string]string) *AddonMetaFields {
|
||||||
|
if len(fields) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
amf := &AddonMetaFields{}
|
||||||
|
for k, v := range fields {
|
||||||
|
switch v {
|
||||||
|
case "${_meta.pipeline}":
|
||||||
|
amf.Pipeline = k
|
||||||
|
|
||||||
|
case "${_meta.source}":
|
||||||
|
amf.Source = k
|
||||||
|
|
||||||
|
case "${_meta.filename}":
|
||||||
|
amf.Filename = k
|
||||||
|
|
||||||
|
case "${_meta.timestamp}":
|
||||||
|
amf.Timestamp = k
|
||||||
|
|
||||||
|
case "${_meta.offset}":
|
||||||
|
amf.Offset = k
|
||||||
|
|
||||||
|
case "${_meta.bytes}":
|
||||||
|
amf.Bytes = k
|
||||||
|
|
||||||
|
case "${_meta.line}":
|
||||||
|
amf.Line = k
|
||||||
|
|
||||||
|
case "${_meta.hostname}":
|
||||||
|
amf.Hostname = k
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return amf
|
||||||
|
}
|
||||||
|
|
|
@ -34,7 +34,6 @@ type DbConfig struct {
|
||||||
File string `yaml:"file,omitempty"`
|
File string `yaml:"file,omitempty"`
|
||||||
FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"`
|
FlushTimeout time.Duration `yaml:"flushTimeout,omitempty" default:"2s"`
|
||||||
BufferSize int `yaml:"bufferSize,omitempty" default:"2048"`
|
BufferSize int `yaml:"bufferSize,omitempty" default:"2048"`
|
||||||
TableName string `yaml:"tableName,omitempty" default:"registry"`
|
|
||||||
CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted
|
CleanInactiveTimeout time.Duration `yaml:"cleanInactiveTimeout,omitempty" default:"504h"` // default records not updated in 21 days will be deleted
|
||||||
CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"`
|
CleanScanInterval time.Duration `yaml:"cleanScanInterval,omitempty" default:"1h"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/loggie-io/loggie/pkg/control"
|
||||||
|
"github.com/loggie-io/loggie/pkg/core/cfg"
|
||||||
|
"github.com/loggie-io/loggie/pkg/core/interceptor"
|
||||||
|
"github.com/loggie-io/loggie/pkg/core/log"
|
||||||
|
"github.com/loggie-io/loggie/pkg/core/queue"
|
||||||
|
"github.com/loggie-io/loggie/pkg/eventbus"
|
||||||
|
"github.com/loggie-io/loggie/pkg/eventbus/export/logger"
|
||||||
|
"github.com/loggie-io/loggie/pkg/interceptor/maxbytes"
|
||||||
|
"github.com/loggie-io/loggie/pkg/interceptor/metric"
|
||||||
|
"github.com/loggie-io/loggie/pkg/interceptor/retry"
|
||||||
|
"github.com/loggie-io/loggie/pkg/pipeline"
|
||||||
|
"github.com/loggie-io/loggie/pkg/queue/channel"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
_ "github.com/loggie-io/loggie/pkg/include"
|
||||||
|
)
|
||||||
|
|
||||||
|
const pipe1 = `
|
||||||
|
pipelines:
|
||||||
|
- name: test
|
||||||
|
sources:
|
||||||
|
- type: dev
|
||||||
|
name: test
|
||||||
|
qps: 100
|
||||||
|
byteSize: 10240
|
||||||
|
eventsTotal: 10000
|
||||||
|
sink:
|
||||||
|
type: elasticsearch
|
||||||
|
parallelism: 3
|
||||||
|
hosts: ["localhost:9200"]
|
||||||
|
index: "loggie-benchmark-${+YYYY.MM.DD}"
|
||||||
|
`
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
log.InitDefaultLogger()
|
||||||
|
pipeline.SetDefaultConfigRaw(pipeline.Config{
|
||||||
|
Queue: &queue.Config{
|
||||||
|
Type: channel.Type,
|
||||||
|
},
|
||||||
|
Interceptors: []*interceptor.Config{
|
||||||
|
{
|
||||||
|
Type: metric.Type,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: maxbytes.Type,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Type: retry.Type,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
eventbus.StartAndRun(eventbus.Config{
|
||||||
|
LoggerConfig: logger.Config{
|
||||||
|
Enabled: true,
|
||||||
|
Period: 5 * time.Second,
|
||||||
|
Pretty: false,
|
||||||
|
},
|
||||||
|
ListenerConfigs: map[string]cfg.CommonCfg{
|
||||||
|
"sink": map[string]interface{}{
|
||||||
|
"period": 5 * time.Second,
|
||||||
|
},
|
||||||
|
"sys": map[string]interface{}{
|
||||||
|
"period": 5 * time.Second,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
pipecfgs := &control.PipelineConfig{}
|
||||||
|
if err := cfg.UnPackFromRaw([]byte(pipe1), pipecfgs).Defaults().Validate().Do(); err != nil {
|
||||||
|
log.Panic("pipeline configs invalid: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
controller := control.NewController()
|
||||||
|
controller.Start(pipecfgs)
|
||||||
|
|
||||||
|
if err := http.ListenAndServe(fmt.Sprintf(":9196"), nil); err != nil {
|
||||||
|
log.Fatal("http listen and serve err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue