Implement distributed scheduler building block (#562)

* feat: add jobs/scheduling api (with validation override)

Signed-off-by: mikeee <hey@mike.ee>

* chore: fix deps

Signed-off-by: mikeee <hey@mike.ee>

* fix: use cli fix

Signed-off-by: mikeee <hey@mike.ee>

* fix: ci artifact path set for cli build

Signed-off-by: mikeee <hey@mike.ee>

* chore: remove sidecar step

Signed-off-by: mikeee <hey@mike.ee>

* chore: revert changes to other examples

Signed-off-by: mikeee <hey@mike.ee>

---------

Signed-off-by: mikeee <hey@mike.ee>
Signed-off-by: Mike Nguyen <hey@mike.ee>
This commit is contained in:
Mike Nguyen 2024-07-17 17:04:45 +01:00 committed by GitHub
parent 01c0f3154f
commit 967570515b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 488 additions and 33 deletions

View File

@ -23,6 +23,8 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
with:
submodules: recursive
- name: Setup
uses: actions/setup-go@v5

View File

@ -39,10 +39,12 @@ jobs:
outputs:
DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }}
DAPR_RUNTIME_VER: ${{ steps.outputs.outputs.DAPR_RUNTIME_VER }}
DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }}
CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }}
DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }}
GITHUB_SHA: ${{ steps.outputs.outputs.GITHUB_SHA }}
steps:
- name: Parse repository_dispatch payload
if: github.event_name == 'repository_dispatch'
@ -79,9 +81,6 @@ jobs:
echo "DAPR_CLI_VER=$CLI_VERSION" >> $GITHUB_ENV
echo "Found $CLI_VERSION"
- name: Set up Dapr CLI
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}
- name: Checkout Dapr CLI repo to override dapr command.
uses: actions/checkout@v4
if: env.DAPR_CLI_REF != ''
@ -105,17 +104,19 @@ jobs:
cd cli
make
mkdir -p $HOME/artifacts/$GITHUB_SHA/
sudo cp dist/linux_amd64/release/dapr $HOME/artifacts/$GITHUB_SHA/dapr
sudo cp dist/linux_amd64/release/dapr ~/artifacts/$GITHUB_SHA/dapr
echo "artifactPath=~/artifacts/$GITHUB_SHA/" >> $GITHUB_ENV
echo "DAPR_CLI_REF=$DAPR_CLI_REF" >> $GITHUB_ENV
- name: Build daprd and placement with referenced commit.
- name: Build dapr
if: env.DAPR_REF != ''
run: |
echo "artifactPath=~/artifacts/$GITHUB_SHA/" >> $GITHUB_ENV
cd dapr_runtime
make
mkdir -p $HOME/artifacts/$GITHUB_SHA/
cp dist/linux_amd64/release/daprd $HOME/artifacts/$GITHUB_SHA/daprd
cp dist/linux_amd64/release/placement $HOME/artifacts/$GITHUB_SHA/placement
echo "artifactPath=~/artifacts/$GITHUB_SHA/" >> $GITHUB_ENV
cp ./dist/linux_amd64/release/* ~/artifacts/$GITHUB_SHA/
- name: Upload dapr-artifacts
uses: actions/upload-artifact@v4
@ -132,10 +133,12 @@ jobs:
run: |
echo "DAPR_INSTALL_URL=$DAPR_INSTALL_URL"
echo "DAPR_CLI_VER=$DAPR_CLI_VER" >> "$GITHUB_OUTPUT"
echo "DAPR_CLI_REF=$DAPR_CLI_REF" >> "$GITHUB_OUTPUT"
echo "DAPR_RUNTIME_VER=$DAPR_RUNTIME_VER" >> "$GITHUB_OUTPUT"
echo "CHECKOUT_REPO=$CHECKOUT_REPO" >> "$GITHUB_OUTPUT"
echo "CHECKOUT_REF=$CHECKOUT_REF" >> "$GITHUB_OUTPUT"
echo "DAPR_REF=$DAPR_REF" >> "$GITHUB_OUTPUT"
echo "GITHUB_SHA=$GITHUB_SHA" >> "$GITHUB_OUTPUT"
validate-example:
needs: setup
@ -148,10 +151,11 @@ jobs:
DAPR_INSTALL_URL: ${{ needs.setup.outputs.DAPR_INSTALL_URL }}
DAPR_CLI_VER: ${{ needs.setup.outputs.DAPR_CLI_VER }}
DAPR_RUNTIME_VER: 1.14.0-rc.2
DAPR_CLI_REF: ${{ github.event.inputs.daprcli_commit }}
DAPR_REF: ${{ github.event.inputs.daprdapr_commit }}
DAPR_CLI_REF: ${{ needs.setup.outputs.DAPR_CLI_REF }}
DAPR_REF: ${{ needs.setup.outputs.DAPR_REF }}
CHECKOUT_REPO: ${{ needs.setup.outputs.CHECKOUT_REPO }}
CHECKOUT_REF: ${{ needs.setup.outputs.CHECKOUT_REF }}
GITHUB_SHA: ${{ needs.setup.outputs.GITHUB_SHA }}
strategy:
fail-fast: false
@ -161,6 +165,7 @@ jobs:
"actor",
"configuration",
"crypto",
"dist-scheduler",
"grpc-service",
"hello-world",
"pubsub",
@ -187,7 +192,11 @@ jobs:
uses: actions/download-artifact@v4
with:
name: dapr-artifacts
path: $HOME/artifacts/$GITHUB_SHA/
path: ~/artifacts/${{ env.GITHUB_SHA }}/
- name: Display artifacts downloaded
if: env.DAPR_CLI_REF != '' || env.DAPR_REF != ''
run: ls ~/artifacts/$GITHUB_SHA/
- name: Set up Go
id: setup-go
@ -196,6 +205,7 @@ jobs:
go-version-file: "go.mod"
- name: Set up Dapr CLI
if: env.DAPR_CLI_VER != ''
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}
- name: Override dapr cli with referenced commit.
@ -208,13 +218,15 @@ jobs:
dapr uninstall --all
dapr init --runtime-version ${{ env.DAPR_RUNTIME_VER }}
- name: Override daprd and placement service with referenced commit.
- name: Print scheduler logs
run: |
docker logs dapr_scheduler
- name: Override daprd with referenced commit.
if: env.DAPR_REF != ''
run: |
mkdir -p $HOME/.dapr/bin/
cp $HOME/artifacts/$GITHUB_SHA/daprd $HOME/.dapr/bin/daprd
docker stop dapr_placement
$HOME/artifacts/$GITHUB_SHA/placement --healthz-port 9091 &
- name: Set up Python ${{ env.PYTHON_VER }}
uses: actions/setup-python@v5

View File

@ -250,6 +250,15 @@ type Client interface {
// RaiseEventWorkflowBeta1 raises an event for a workflow.
RaiseEventWorkflowBeta1(ctx context.Context, req *RaiseEventWorkflowRequest) error
// ScheduleJobAlpha1 creates and schedules a job.
ScheduleJobAlpha1(ctx context.Context, req *Job) error
// GetJobAlpha1 returns a scheduled job.
GetJobAlpha1(ctx context.Context, name string) (*Job, error)
// DeleteJobAlpha1 deletes a scheduled job.
DeleteJobAlpha1(ctx context.Context, name string) error
// GrpcClient returns the base grpc client if grpc is used and nil otherwise
GrpcClient() pb.DaprClient

View File

@ -557,6 +557,33 @@ func (s *testDaprServer) RaiseEventWorkflowBeta1(ctx context.Context, in *pb.Rai
return &emptypb.Empty{}, nil
}
func (s *testDaprServer) ScheduleJobAlpha1(ctx context.Context, in *pb.ScheduleJobRequest) (*pb.ScheduleJobResponse, error) {
return &pb.ScheduleJobResponse{}, nil
}
func (s *testDaprServer) GetJobAlpha1(ctx context.Context, in *pb.GetJobRequest) (*pb.GetJobResponse, error) {
var (
schedule = "@every 10s"
dueTime = "10s"
repeats uint32 = 4
ttl = "10s"
)
return &pb.GetJobResponse{
Job: &pb.Job{
Name: "name",
Schedule: &schedule,
Repeats: &repeats,
DueTime: &dueTime,
Ttl: &ttl,
Data: nil,
},
}, nil
}
func (s *testDaprServer) DeleteJobAlpha1(ctx context.Context, in *pb.DeleteJobRequest) (*pb.DeleteJobResponse, error) {
return &pb.DeleteJobResponse{}, nil
}
func TestGrpcClient(t *testing.T) {
protoClient := pb.NewDaprClient(nil)
client := &GRPCClient{protoClient: protoClient}

91
client/scheduling.go Normal file
View File

@ -0,0 +1,91 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"log"
"google.golang.org/protobuf/types/known/anypb"
pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
)
type Job struct {
Name string
Schedule string
Repeats uint32 // Optional
DueTime string // Optional
TTL string // Optional
Data *anypb.Any
}
// ScheduleJobAlpha1 raises and schedules a job.
func (c *GRPCClient) ScheduleJobAlpha1(ctx context.Context, job *Job) error {
// TODO: Assert job fields are defined: Name, Schedule, Data
jobRequest := &pb.Job{
Name: job.Name,
Schedule: &job.Schedule,
Data: job.Data,
}
if job.Schedule != "" {
jobRequest.Schedule = &job.Schedule
}
if job.Repeats != 0 {
jobRequest.Repeats = &job.Repeats
}
if job.DueTime != "" {
jobRequest.DueTime = &job.DueTime
}
if job.TTL != "" {
jobRequest.Ttl = &job.TTL
}
_, err := c.protoClient.ScheduleJobAlpha1(ctx, &pb.ScheduleJobRequest{
Job: jobRequest,
})
return err
}
// GetJobAlpha1 retrieves a scheduled job.
func (c *GRPCClient) GetJobAlpha1(ctx context.Context, name string) (*Job, error) {
// TODO: Name validation
resp, err := c.protoClient.GetJobAlpha1(ctx, &pb.GetJobRequest{
Name: name,
})
log.Println(resp)
if err != nil {
return nil, err
}
return &Job{
Name: resp.GetJob().GetName(),
Schedule: resp.GetJob().GetSchedule(),
Repeats: resp.GetJob().GetRepeats(),
DueTime: resp.GetJob().GetDueTime(),
TTL: resp.GetJob().GetTtl(),
Data: resp.GetJob().GetData(),
}, nil
}
// DeleteJobAlpha1 deletes a scheduled job.
func (c *GRPCClient) DeleteJobAlpha1(ctx context.Context, name string) error {
// TODO: Name validation
_, err := c.protoClient.DeleteJobAlpha1(ctx, &pb.DeleteJobRequest{
Name: name,
})
return err
}

58
client/scheduling_test.go Normal file
View File

@ -0,0 +1,58 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/anypb"
)
func TestSchedulingAlpha1(t *testing.T) {
ctx := context.Background()
t.Run("schedule job - valid", func(t *testing.T) {
err := testClient.ScheduleJobAlpha1(ctx, &Job{
Name: "test",
Schedule: "test",
Data: &anypb.Any{},
})
require.NoError(t, err)
})
t.Run("get job - valid", func(t *testing.T) {
expected := &Job{
Name: "name",
Schedule: "@every 10s",
Repeats: 4,
DueTime: "10s",
TTL: "10s",
Data: nil,
}
resp, err := testClient.GetJobAlpha1(ctx, "name")
require.NoError(t, err)
assert.Equal(t, expected, resp)
})
t.Run("delete job - valid", func(t *testing.T) {
err := testClient.DeleteJobAlpha1(ctx, "name")
require.NoError(t, err)
})
}

View File

@ -0,0 +1,43 @@
# Dapr Distributed Scheduler Example with go-sdk
## Steps
### Prepare
- Dapr installed (v1.14 or higher)
### Run Distributed Scheduling Example
<!-- STEP
name: Run Distributed Scheduling Example
output_match_mode: substring
expected_stdout_lines:
- 'Scheduler stream connected'
- 'schedulejob - success'
- 'job 0 received'
- 'extracted payload: {db-backup {my-prod-db /backup-dir}}'
- 'job 1 received'
- 'extracted payload: {db-backup {my-prod-db /backup-dir}}'
- 'job 2 received'
- 'extracted payload: {db-backup {my-prod-db /backup-dir}}'
- 'getjob - resp: &{prod-db-backup @every 1s 10 value:"{\"task\":\"db-backup\",\"metadata\":{\"db_name\":\"my-prod-db\",\"backup_location\":\"/backup-dir\"}}"}'
- 'deletejob - success'
background: true
sleep: 30
-->
```bash
dapr run --app-id=distributed-scheduler \
--metrics-port=9091 \
--scheduler-host-address=localhost:50006 \
--dapr-grpc-port 50001 \
--app-port 50070 \
--app-protocol grpc \
--log-level debug \
go run ./main.go
```
<!-- END_STEP -->

View File

@ -0,0 +1,11 @@
package api
type Metadata struct {
DBName string `json:"db_name"`
BackupLocation string `json:"backup_location"`
}
type DBBackup struct {
Task string `json:"task"`
Metadata Metadata `json:"metadata"`
}

View File

@ -0,0 +1,114 @@
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"time"
"google.golang.org/protobuf/types/known/anypb"
daprc "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/examples/dist-scheduler/api"
"github.com/dapr/go-sdk/service/common"
daprs "github.com/dapr/go-sdk/service/grpc"
)
func main() {
server, err := daprs.NewService(":50070")
if err != nil {
log.Fatalf("failed to start the server: %v", err)
}
if err = server.AddJobEventHandler("prod-db-backup", prodDBBackupHandler); err != nil {
log.Fatalf("failed to register job event handler: %v", err)
}
log.Println("starting server")
go func() {
if err = server.Start(); err != nil {
log.Fatalf("failed to start server: %v", err)
}
}()
// Brief intermission to allow for the server to initialize.
time.Sleep(10 * time.Second)
ctx := context.Background()
jobData, err := json.Marshal(&api.DBBackup{
Task: "db-backup",
Metadata: api.Metadata{
DBName: "my-prod-db",
BackupLocation: "/backup-dir",
},
},
)
if err != nil {
panic(err)
}
job := daprc.Job{
Name: "prod-db-backup",
Schedule: "@every 1s",
Repeats: 10,
Data: &anypb.Any{
Value: jobData,
},
}
// create the client
client, err := daprc.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
err = client.ScheduleJobAlpha1(ctx, &job)
if err != nil {
panic(err)
}
fmt.Println("schedulejob - success")
time.Sleep(3 * time.Second)
resp, err := client.GetJobAlpha1(ctx, "prod-db-backup")
if err != nil {
panic(err)
}
fmt.Printf("getjob - resp: %v\n", resp) // parse
err = client.DeleteJobAlpha1(ctx, "prod-db-backup")
if err != nil {
fmt.Printf("job deletion error: %v\n", err)
} else {
fmt.Println("deletejob - success")
}
if err = server.Stop(); err != nil {
log.Fatalf("failed to stop server: %v\n", err)
}
}
var jobCount = 0
func prodDBBackupHandler(ctx context.Context, job *common.JobEvent) error {
var jobData common.Job
if err := json.Unmarshal(job.Data, &jobData); err != nil {
return fmt.Errorf("failed to unmarshal job: %v", err)
}
decodedPayload, err := base64.StdEncoding.DecodeString(jobData.Value)
if err != nil {
return fmt.Errorf("failed to decode job payload: %v", err)
}
var jobPayload api.DBBackup
if err := json.Unmarshal(decodedPayload, &jobPayload); err != nil {
return fmt.Errorf("failed to unmarshal payload: %v", err)
}
fmt.Printf("job %d received:\n type: %v \n typeurl: %v\n value: %v\n extracted payload: %v\n", jobCount, job.JobType, jobData.TypeURL, jobData.Value, jobPayload)
jobCount++
return nil
}

View File

@ -1,6 +1,6 @@
module github.com/dapr/go-sdk/examples
go 1.22.4
go 1.22.5
replace github.com/dapr/go-sdk => ../
@ -10,14 +10,15 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/google/uuid v1.6.0
google.golang.org/grpc v1.65.0
google.golang.org/grpc/examples v0.0.0-20240529235625-24e902437554
google.golang.org/grpc/examples v0.0.0-20240516203910-e22436abb809
google.golang.org/protobuf v1.34.2
)
require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dapr/dapr v1.14.0-rc.1 // indirect
github.com/dapr/dapr v1.14.0-rc.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
@ -34,6 +35,5 @@ require (
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@ -7,8 +7,8 @@ github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.1 h1:4P376+PIU66hMtLz5TiF41IJ6Lh5FNY1DiwaNNYZv/8=
github.com/dapr/dapr v1.14.0-rc.1/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -77,8 +77,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/grpc/examples v0.0.0-20240529235625-24e902437554 h1:w4+xIPcgnCPx2fCAcl+9XZ4wA7tpurRvJ+KBSgAig4A=
google.golang.org/grpc/examples v0.0.0-20240529235625-24e902437554/go.mod h1:zVJi9R2NecTook3ihvrayAmkYWKtOfRrvL+2eRtKdMk=
google.golang.org/grpc/examples v0.0.0-20240516203910-e22436abb809 h1:f96Rv5C5Y2CWlbKK6KhKDdyFgGOjPHPEMsdyaxE9k0c=
google.golang.org/grpc/examples v0.0.0-20240516203910-e22436abb809/go.mod h1:uaPEAc5V00jjG3DPhGFLXGT290RUV3+aNQigs1W50/8=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

6
go.mod
View File

@ -1,11 +1,9 @@
module github.com/dapr/go-sdk
go 1.22.4
toolchain go1.22.5
go 1.22.5
require (
github.com/dapr/dapr v1.14.0-rc.1
github.com/dapr/dapr v1.14.0-rc.2
github.com/go-chi/chi/v5 v5.1.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.6.0

4
go.sum
View File

@ -1,8 +1,8 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/dapr/dapr v1.14.0-rc.1 h1:4P376+PIU66hMtLz5TiF41IJ6Lh5FNY1DiwaNNYZv/8=
github.com/dapr/dapr v1.14.0-rc.1/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/dapr/dapr v1.14.0-rc.2 h1:wuXninZLTyokeztCinVIVAc9mpVYJS8QyxecPCLdlY8=
github.com/dapr/dapr v1.14.0-rc.2/go.mod h1:uZMuD9K7y+LKSsQUoSAvv1Yn8Cim9X/9ZQ9XuTobyP8=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=

View File

@ -42,6 +42,9 @@ type Service interface {
RegisterActorImplFactory(f actor.Factory, opts ...config.Option)
// RegisterActorImplFactoryContext Register a new actor to actor runtime of go sdk
RegisterActorImplFactoryContext(f actor.FactoryContext, opts ...config.Option)
// AddJobEventHandler appends a provided job event handler with its name to
// the service.
AddJobEventHandler(name string, fn JobEventHandler) error
// Start starts service.
Start() error
// Stop stops the previously started service.
@ -54,5 +57,6 @@ type (
ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
TopicEventHandler func(ctx context.Context, e *TopicEvent) (retry bool, err error)
BindingInvocationHandler func(ctx context.Context, in *BindingEvent) (out []byte, err error)
JobEventHandler func(ctx context.Context, in *JobEvent) error
HealthCheckHandler func(context.Context) error
)

View File

@ -120,3 +120,13 @@ const (
type SubscriptionResponse struct {
Status SubscriptionResponseStatus `json:"status"`
}
type JobEvent struct {
JobType string `json:"job_type"`
Data []byte `json:"data"`
}
type Job struct {
TypeURL string `json:"type_url"`
Value string `json:"value"`
}

View File

@ -0,0 +1,50 @@
package grpc
import (
"context"
"errors"
"fmt"
"strings"
runtimepb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/go-sdk/service/common"
)
// AddJobEventHandler registers a job handler
func (s *Server) AddJobEventHandler(name string, fn common.JobEventHandler) error {
if name == "" {
return errors.New("job event name cannot be empty")
}
if fn == nil {
return errors.New("job event handler not supplied")
}
s.jobEventHandlers[name] = fn
return nil
}
// OnJobEvent is invoked by the sidecar following a scheduled job registered in
// the scheduler
func (s *Server) OnJobEventAlpha1(ctx context.Context, in *runtimepb.JobEventRequest) (*runtimepb.JobEventResponse, error) {
// parse the job type from the method or name
jobType, found := strings.CutPrefix(in.GetMethod(), "job/")
if !found {
if in.GetName() == "" {
return &runtimepb.JobEventResponse{}, errors.New("unsupported invocation")
}
jobType = in.GetName()
}
if fn, ok := s.jobEventHandlers[jobType]; ok {
e := &common.JobEvent{
JobType: jobType,
Data: in.GetData().GetValue(),
}
if err := fn(ctx, e); err != nil {
return nil, fmt.Errorf("error executing %s binding: %w", in.GetName(), err)
}
return &runtimepb.JobEventResponse{}, nil
}
return &runtimepb.JobEventResponse{}, errors.New("job event handler not found")
}

View File

@ -55,11 +55,12 @@ func NewServiceWithGrpcServer(lis net.Listener, server *grpc.Server) common.Serv
func newService(lis net.Listener, grpcServer *grpc.Server, opts ...grpc.ServerOption) *Server {
s := &Server{
listener: lis,
invokeHandlers: make(map[string]common.ServiceInvocationHandler),
topicRegistrar: make(internal.TopicRegistrar),
bindingHandlers: make(map[string]common.BindingInvocationHandler),
authToken: os.Getenv(common.AppAPITokenEnvVar),
listener: lis,
invokeHandlers: make(map[string]common.ServiceInvocationHandler),
topicRegistrar: make(internal.TopicRegistrar),
bindingHandlers: make(map[string]common.BindingInvocationHandler),
jobEventHandlers: make(map[string]common.JobEventHandler),
authToken: os.Getenv(common.AppAPITokenEnvVar),
}
if grpcServer == nil {
@ -67,6 +68,7 @@ func newService(lis net.Listener, grpcServer *grpc.Server, opts ...grpc.ServerOp
}
pb.RegisterAppCallbackServer(grpcServer, s)
pb.RegisterAppCallbackAlphaServer(grpcServer, s)
pb.RegisterAppCallbackHealthCheckServer(grpcServer, s)
s.grpcServer = grpcServer
@ -81,6 +83,7 @@ type Server struct {
invokeHandlers map[string]common.ServiceInvocationHandler
topicRegistrar internal.TopicRegistrar
bindingHandlers map[string]common.BindingInvocationHandler
jobEventHandlers map[string]common.JobEventHandler
healthCheckHandler common.HealthCheckHandler
authToken string
grpcServer *grpc.Server

View File

@ -169,3 +169,7 @@ func getCustomMetadataFromContext(ctx context.Context) map[string]string {
}
return md
}
func (s *Server) OnBulkTopicEventAlpha1(ctx context.Context, in *runtimev1pb.TopicEventBulkRequest) (*runtimev1pb.TopicEventBulkResponse, error) {
panic("This API callback is not supported.")
}

View File

@ -0,0 +1,19 @@
package http
import (
"errors"
"fmt"
"github.com/dapr/go-sdk/service/common"
)
func (s *Server) AddJobEventHandler(name string, fn common.JobEventHandler) error {
if name == "" {
return fmt.Errorf("job event name required")
}
if fn == nil {
return fmt.Errorf("job event handler required")
}
return errors.New("handling http scheduling requests has not been implemented in this sdk")
}