This commit is contained in:
Shen Yang 2025-05-22 10:06:23 -07:00 committed by GitHub
commit f67255f3dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 65 additions and 6 deletions

View File

@ -117,14 +117,15 @@ func (i *Influx) Operations() []bindings.OperationKind {
func (i *Influx) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case bindings.CreateOperation:
var jsonPoint map[string]interface{}
var jsonPoint InfluxPoint
err := json.Unmarshal(req.Data, &jsonPoint)
if err != nil {
return nil, ErrInvalidRequestData
}
line := fmt.Sprintf("%s,%s %s", jsonPoint["measurement"], jsonPoint["tags"], jsonPoint["values"])
line := jsonPoint.GetLine()
if line == "" {
return nil, ErrInvalidRequestData
}
// write the point
err = i.writeAPI.WriteRecord(ctx, line)
if err != nil {

View File

@ -0,0 +1,58 @@
package influx
import (
"strconv"
)
type InfluxPoint struct {
Line *string `json:"line,omitempty"`
Measurement *string `json:"measurement,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Fields map[string]interface{} `json:"fields,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
}
func (p *InfluxPoint) GetLine() string {
if p.Line != nil && *p.Line != "" {
return *p.Line
}
// If Line is nil, we can construct it from the other fields
if p.Measurement != nil {
line := *p.Measurement
if p.Tags != nil {
for k, v := range p.Tags {
line += "," + k + "=" + v
}
}
if p.Fields != nil {
line += " "
for k, v := range p.Fields {
switch value := v.(type) {
case string:
line += k + "=\"" + value + "\","
case int64:
line += k + "=" + strconv.FormatInt(value, 10) + ","
case int32:
line += k + "=" + strconv.FormatInt(int64(value), 10) + ","
case int:
line += k + "=" + strconv.Itoa(value) + ","
case float32:
line += k + "=" + strconv.FormatFloat(float64(value), 'f', -1, 32) + ","
case float64:
line += k + "=" + strconv.FormatFloat(value, 'f', -1, 64) + ","
case bool:
line += k + "=" + strconv.FormatBool(value) + ","
default:
// Handle unsupported types gracefully
line += k + "=null,"
}
}
line = line[:len(line)-1] // Remove the trailing comma
}
if p.Timestamp != nil {
line += " " + strconv.FormatInt(*p.Timestamp, 10)
}
return line
}
return ""
}

View File

@ -77,7 +77,7 @@ func TestInflux_Invoke_BindingCreateOperation(t *testing.T) {
err error
}{resp: nil, err: ErrInvalidRequestData}},
{"invoke valid request metadata", &bindings.InvokeRequest{
Data: []byte(`{"measurement":"a", "tags":"a", "values":"a"}`),
Data: []byte(`{"measurement":"a", "tags":{"a":"a"}, "fields":{"a":1}}`),
Operation: bindings.CreateOperation,
}, struct {
resp *bindings.InvokeResponse
@ -89,7 +89,7 @@ func TestInflux_Invoke_BindingCreateOperation(t *testing.T) {
defer ctrl.Finish()
w := NewMockWriteAPIBlocking(ctrl)
w.EXPECT().WriteRecord(gomock.Eq(t.Context()), gomock.Eq("a,a a")).Return(nil)
w.EXPECT().WriteRecord(gomock.Eq(t.Context()), gomock.Eq("a,a=a a=1")).Return(nil)
influx := &Influx{
writeAPI: w,
}