-
Notifications
You must be signed in to change notification settings - Fork 94
feature: dag support mysql store #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Wenne
wants to merge
71
commits into
ShiningRush:master
Choose a base branch
from
Wenne:develop
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
71 commits
Select commit
Hold shift + click to select a range
2daf860
feature: dag support mysql store
Wenne 00eb356
feature: dag support mysql store
Wenne c537dc4
feature: dag support mysql store
Wenne 3d50058
fixut: change some entity define
Wenne 313d67a
fixut: change some entity define
Wenne 46870df
fixut: change some entity define
Wenne e8f27c6
add ut
Wenne 22f9e57
Merge remote-tracking branch 'origin/master' into develop
Wenne a2db360
add ut
Wenne c805b01
add ut
Wenne 918e173
add ut
Wenne aa4b00f
add ut
Wenne 1058399
add ut
Wenne aadb81b
add ut
Wenne 13cd343
add ut
Wenne 96a5393
add ut
Wenne 3a87805
add ut
Wenne 173bd9d
add ut
Wenne 1b79b0a
add ut
Wenne 4ab0a53
add ut
Wenne d40fb6a
add ut
Wenne 596a69c
add ut
Wenne 01a6a52
add ut
Wenne 0532f64
add ut
Wenne ac08e8f
add ut
Wenne 410297b
add ut
Wenne 43d926b
add ut
Wenne 607939f
add ut
Wenne 36b2046
add ut
Wenne bf62e7c
add ut
Wenne 933fa2a
add ut
Wenne 2be5aef
add ut
Wenne 58ca0f0
add ut
Wenne ae79e05
add ut
Wenne 0f420dc
update promhttp
Wenne ac83bb4
rand dispatcher
Wenne e80f79e
collect failed dag ins id
Wenne cf2b570
Merge remote-tracking branch 'origin/master' into develop
Wenne 0cf22ac
merge master
Wenne b9fcfc5
fix comments
Wenne d213eeb
fix comments
Wenne 84b1e47
fix comments
Wenne 38bb8c2
fix keeper bugs
Wenne 7fb3a13
fix keeper bugs
Wenne 039f2bb
fix keeper bugs
Wenne c53efb1
fix keeper bugs
Wenne cc26e27
fix keeper bugs
Wenne 06cfc5c
fix keeper bugs
Wenne 463c220
fix keeper bugs
Wenne aa0ced7
fix keeper bugs
Wenne adf448f
fix keeper bugs
Wenne 33f2892
fix keeper bugs
Wenne 92a6922
fix keeper bugs
Wenne e41753c
fix keeper bugs
Wenne 32d4256
fix keeper bugs
Wenne 4a6f35d
fix keeper bugs
Wenne 2feaf7c
fix keeper bugs
Wenne c2a12f8
add id generators to resolve id duplication
Wenne 57874a4
add id generators to resolve id duplication
Wenne 3fb9c47
add id generators to resolve id duplication
Wenne a9f71d1
add tags
Wenne 9c28b60
add tags
Wenne 5b5e853
add dag filter in ListDagInstanceWithoutFilterTags function
qiffang a3e689c
Merge pull request #3 from qiffang/add-dia-filter-in-listdaginstances
Wenne 04784d1
add collectors
Wenne b2017dc
Merge remote-tracking branch 'origin/tidb-cloud' into tidb-cloud
Wenne 1c79fc5
add collectors
Wenne 6f00392
add collectors
Wenne 9035f39
add-cancel-for-daginstance
Wenne e828d08
add-cancel-for-daginstance
Wenne 8221432
Merge pull request #5 from Wenne/tidb-cloud
Wenne File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,197 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "log" | ||
| "net/http" | ||
| "time" | ||
|
|
||
| "github.com/go-sql-driver/mysql" | ||
| "github.com/shiningrush/fastflow" | ||
| mysqlKeeper "github.com/shiningrush/fastflow/keeper/mysql" | ||
| "github.com/shiningrush/fastflow/pkg/entity" | ||
| "github.com/shiningrush/fastflow/pkg/entity/run" | ||
| "github.com/shiningrush/fastflow/pkg/exporter" | ||
| "github.com/shiningrush/fastflow/pkg/mod" | ||
| "github.com/shiningrush/fastflow/pkg/utils/data" | ||
| mysqlStore "github.com/shiningrush/fastflow/store/mysql" | ||
| ) | ||
|
|
||
| type ActionParam struct { | ||
| Name string | ||
| Desc string | ||
| } | ||
|
|
||
| type ActionA struct { | ||
| code string | ||
| } | ||
|
|
||
| func (a *ActionA) Name() string { | ||
| return fmt.Sprintf("Action-%s", a.code) | ||
| } | ||
| func (a *ActionA) RunBefore(ctx run.ExecuteContext, params interface{}) error { | ||
| input := params.(*ActionParam) | ||
| log.Println(fmt.Sprintf("%s run before, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc)) | ||
| time.Sleep(time.Second) | ||
| if a.code != "B" && a.code != "C" { | ||
| ctx.ShareData().Set(fmt.Sprintf("%s-key", a.code), fmt.Sprintf("%s value", a.code)) | ||
| } | ||
| return nil | ||
| } | ||
| func (a *ActionA) Run(ctx run.ExecuteContext, params interface{}) error { | ||
| input := params.(*ActionParam) | ||
| log.Println(fmt.Sprintf("%s run, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc)) | ||
| ctx.Trace("run start", run.TraceOpPersistAfterAction) | ||
| time.Sleep(2 * time.Second) | ||
| ctx.Trace("run end") | ||
| return nil | ||
| } | ||
| func (a *ActionA) RunAfter(ctx run.ExecuteContext, params interface{}) error { | ||
| input := params.(*ActionParam) | ||
| log.Println(fmt.Sprintf("%s run after, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc)) | ||
| time.Sleep(time.Second) | ||
| return nil | ||
| } | ||
| func (a *ActionA) ParameterNew() interface{} { | ||
| return &ActionParam{} | ||
| } | ||
|
|
||
| func ensureDagCreated() error { | ||
| dag := &entity.Dag{ | ||
| BaseInfo: entity.BaseInfo{ | ||
| ID: "test-dag", | ||
| }, | ||
| Name: "test", | ||
| Vars: entity.DagVars{ | ||
| "var": {DefaultValue: "default-var"}, | ||
| }, | ||
| Status: entity.DagStatusNormal, | ||
| Tasks: []entity.Task{ | ||
| {ID: "task1", ActionName: "Action-A", Params: map[string]interface{}{ | ||
| "Name": "task-p1", | ||
| "Desc": "{{var}}", | ||
| }, TimeoutSecs: 5}, | ||
| {ID: "task2", ActionName: "Action-B", DependOn: []string{"task1"}, Params: map[string]interface{}{ | ||
| "Name": "task-p1", | ||
| "Desc": "{{var}}", | ||
| }}, | ||
| {ID: "task3", ActionName: "Action-C", DependOn: []string{"task1"}, Params: map[string]interface{}{ | ||
| "Name": "task-p1", | ||
| "Desc": "{{var}}", | ||
| }}, | ||
| {ID: "task4", ActionName: "Action-D", DependOn: []string{"task2", "task3"}, Params: map[string]interface{}{ | ||
| "Name": "task-p1", | ||
| "Desc": "{{var}}", | ||
| }}, | ||
| }, | ||
| } | ||
| oldDag, err := mod.GetStore().GetDag(dag.ID) | ||
| if errors.Is(err, data.ErrDataNotFound) { | ||
| if err := mod.GetStore().CreateDag(dag); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| if oldDag != nil { | ||
| if err := mod.GetStore().UpdateDag(dag); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func main() { | ||
| // init action | ||
| fastflow.RegisterAction([]run.Action{ | ||
| &ActionA{code: "A"}, | ||
| &ActionA{code: "B"}, | ||
| &ActionA{code: "C"}, | ||
| &ActionA{code: "D"}, | ||
| }) | ||
| // init keeper | ||
| keeper := mysqlKeeper.NewKeeper(&mysqlKeeper.KeeperOption{ | ||
| Key: "worker-1", | ||
| MySQLConfig: &mysql.Config{ | ||
| Addr: "127.0.0.1:55000", | ||
| User: "root", | ||
| Passwd: "mysqlpw", | ||
| DBName: "fastflow", | ||
| }, | ||
| MigrationSwitch: true, | ||
| }) | ||
| if err := keeper.Init(); err != nil { | ||
| log.Fatal(fmt.Errorf("init keeper failed: %w", err)) | ||
| return | ||
| } | ||
|
|
||
| // init store | ||
| st := mysqlStore.NewStore(&mysqlStore.StoreOption{ | ||
| MySQLConfig: &mysql.Config{ | ||
| Addr: "127.0.0.1:55000", | ||
| User: "root", | ||
| Passwd: "mysqlpw", | ||
| DBName: "fastflow", | ||
| }, | ||
| MigrationSwitch: true, | ||
| }) | ||
| if err := st.Init(); err != nil { | ||
| log.Fatal(fmt.Errorf("init store failed: %w", err)) | ||
| return | ||
| } | ||
|
|
||
| // init fastflow | ||
| if err := fastflow.Init(&fastflow.InitialOption{ | ||
| Keeper: keeper, | ||
| Store: st, | ||
| ParserWorkersCnt: 10, | ||
| ExecutorWorkerCnt: 50, | ||
| }); err != nil { | ||
| panic(fmt.Sprintf("init fastflow failed: %s", err)) | ||
| } | ||
|
|
||
| // create a dag as template | ||
| if err := ensureDagCreated(); err != nil { | ||
| log.Fatalf(err.Error()) | ||
| return | ||
| } | ||
| // run dag interval | ||
| go runInstance() | ||
|
|
||
| // listen a http endpoint to serve metrics | ||
| if err := http.ListenAndServe(":9090", exporter.HttpHandler()); err != nil { | ||
| panic(fmt.Sprintf("metrics serve failed: %s", err)) | ||
| } | ||
| } | ||
|
|
||
| func runInstance() { | ||
| // wait init completed | ||
| time.Sleep(2 * time.Second) | ||
| dag, err := mod.GetStore().GetDag("test-dag") | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
|
|
||
| count := uint64(0) | ||
| for { | ||
| runVar := map[string]string{ | ||
| "var": "run-var", | ||
| } | ||
| if count%2 == 0 { | ||
| runVar = nil | ||
| } | ||
| dagIns, err := dag.Run(entity.TriggerManually, runVar) | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
|
|
||
| dagIns.Tags = entity.NewDagInstanceTags(map[string]string{"testKey": "testValue", "testKey2": "testValue2", "testKey3": "testValue3"}) | ||
|
|
||
| err = mod.GetStore().CreateDagIns(dagIns) | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
|
|
||
| count++ | ||
| time.Sleep(1 * time.Second) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,16 +1,50 @@ | ||
| module github.com/shiningrush/fastflow | ||
|
|
||
| go 1.14 | ||
| go 1.20 | ||
|
|
||
| require ( | ||
| github.com/go-sql-driver/mysql v1.7.0 | ||
| github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e | ||
| github.com/golang/mock v1.6.0 | ||
| github.com/mitchellh/mapstructure v1.1.2 | ||
| github.com/prometheus/client_golang v1.14.0 | ||
| github.com/shiningrush/goevent v0.1.0 | ||
| github.com/shiningrush/goext v0.2.4-0.20230805045150-8b8c5748342b | ||
| github.com/sony/sonyflake v1.0.0 | ||
| github.com/spaolacci/murmur3 v1.1.0 | ||
| github.com/stretchr/testify v1.6.1 | ||
| github.com/stretchr/testify v1.7.0 | ||
| go.mongodb.org/mongo-driver v1.5.4 | ||
| gopkg.in/yaml.v3 v3.0.0 | ||
| gorm.io/driver/mysql v1.5.0 | ||
| gorm.io/gorm v1.25.1 | ||
| ) | ||
|
|
||
| require ( | ||
| github.com/aws/aws-sdk-go v1.34.28 // indirect | ||
| github.com/beorn7/perks v1.0.1 // indirect | ||
| github.com/cespare/xxhash/v2 v2.1.2 // indirect | ||
| github.com/davecgh/go-spew v1.1.1 // indirect | ||
| github.com/go-stack/stack v1.8.0 // indirect | ||
| github.com/golang/protobuf v1.5.2 // indirect | ||
| github.com/golang/snappy v0.0.1 // indirect | ||
| github.com/jinzhu/inflection v1.0.0 // indirect | ||
| github.com/jinzhu/now v1.1.5 // indirect | ||
| github.com/jmespath/go-jmespath v0.4.0 // indirect | ||
| github.com/klauspost/compress v1.9.5 // indirect | ||
| github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect | ||
| github.com/pkg/errors v0.9.1 // indirect | ||
| github.com/pmezard/go-difflib v1.0.0 // indirect | ||
| github.com/prometheus/client_model v0.3.0 // indirect | ||
| github.com/prometheus/common v0.37.0 // indirect | ||
| github.com/prometheus/procfs v0.8.0 // indirect | ||
| github.com/stretchr/objx v0.1.1 // indirect | ||
| github.com/xdg-go/pbkdf2 v1.0.0 // indirect | ||
| github.com/xdg-go/scram v1.0.2 // indirect | ||
| github.com/xdg-go/stringprep v1.0.2 // indirect | ||
| github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect | ||
| golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect | ||
| golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect | ||
| golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect | ||
| golang.org/x/text v0.3.7 // indirect | ||
| google.golang.org/protobuf v1.28.1 // indirect | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里如果要做交叉编译的话,建议分离成不同的命令,基本的build应该是编译出当前平台的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,这个是给 mac M系列机器用的。我单独领出来一下