diff --git a/examples/dynamic/main.go b/examples/dynamic/main.go new file mode 100644 index 0000000..e71363b --- /dev/null +++ b/examples/dynamic/main.go @@ -0,0 +1,214 @@ +package main + +import ( + "errors" + "fmt" + "github.com/shiningrush/fastflow/pkg/exporter" + "log" + "net/http" + "time" + + "github.com/shiningrush/fastflow" + mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo" + "github.com/shiningrush/fastflow/pkg/entity" + "github.com/shiningrush/fastflow/pkg/entity/run" + "github.com/shiningrush/fastflow/pkg/mod" + "github.com/shiningrush/fastflow/pkg/utils/data" + mongoStore "github.com/shiningrush/fastflow/store/mongo" +) + +type ActionParam struct { + Name string + Desc string +} + +type ActionA struct { + code string +} + +func (a *ActionA) Name() string { + return fmt.Sprintf("Action-%s", a.code) +} +func (a *ActionA) RunBefore(ctx run.ExecuteContext, params interface{}) error { + input := params.(*ActionParam) + log.Println(fmt.Sprintf("%s run before, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc)) + time.Sleep(time.Second) + if a.code != "B" && a.code != "C" { + ctx.ShareData().Set(fmt.Sprintf("%s-key", a.code), fmt.Sprintf("%s value", a.code)) + } + return nil +} +func (a *ActionA) Run(ctx run.ExecuteContext, params interface{}) error { + input := params.(*ActionParam) + log.Println(fmt.Sprintf("%s run, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc)) + ctx.Trace("run start", run.TraceOpPersistAfterAction) + time.Sleep(2 * time.Second) + ctx.Trace("run end") + return nil +} +func (a *ActionA) RunAfter(ctx run.ExecuteContext, params interface{}) error { + input := params.(*ActionParam) + log.Println(fmt.Sprintf("%s run after, p.Name: %s, p.Desc: %s", a.Name(), input.Name, input.Desc)) + time.Sleep(time.Second) + return nil +} +func (a *ActionA) ParameterNew() interface{} { + return &ActionParam{} +} + +func ensureDagCreated() error { + dag := mod.Register(&entity.Dag{ + BaseInfo: entity.BaseInfo{ + ID: "test-dag", + }, + Name: "test", + Vars: entity.DagVars{ + "var": {DefaultValue: "default-var"}, + }, + Status: entity.DagStatusNormal, + Tasks: []entity.Task{ + {ID: "task1", ActionName: "Action-A", Params: map[string]interface{}{ + "Name": "task-p1", + "Desc": "{{var}}", + }, TimeoutSecs: 5}, + {ID: "task2", ActionName: "Action-B", DependOn: []string{"task1"}, Params: map[string]interface{}{ + "Name": "task-p1", + "Desc": "{{var}}", + }}, + {ID: "task3", ActionName: "Action-C", DependOn: []string{"task1"}, Params: map[string]interface{}{ + "Name": "task-p1", + "Desc": "{{var}}", + }}, + {ID: "task4", ActionName: "Action-D", DependOn: []string{"task2", "task3"}, Params: map[string]interface{}{ + "Name": "task-p1", + "Desc": "{{var}}", + }}, + }, + }) + oldDag, err := mod.GetStore().GetDag(dag.ID) + if errors.Is(err, data.ErrDataNotFound) { + if err := mod.GetStore().CreateDag(dag); err != nil { + return err + } + } + if oldDag != nil { + if err := mod.GetStore().UpdateDag(dag); err != nil { + return err + } + } + return nil +} + +//err := mod.RegisterActionCap(PrintAction{}) +// if err != nil { +// fmt.Println(err) +// return +// } +// //action := GetRegisteredAction("PrintAction") +// keeper, store := MongoInit() +// dag := entity.Dag{ +// Name: "testdag", +// Desc: "dasda", +// Vars: nil, +// Status: entity.DagStatusNormal, +// Tasks: []entity.Task{ +// entity.Task{ +// ID: "dasdq", +// Name: "ascadca", +// DependOn: nil, +// ActionName: "PrintAction", +// TimeoutSecs: 0, +// Params: nil, +// PreChecks: nil, +// }, +// }, +// } +// dag.Initial() +// actions := []run.Action{} +// for _, task := range dag.Tasks { +// action := mod.GetRegisteredAction(task.ActionName) +// actions = append(actions, action) +// } +// fastflow.RegisterAction(actions) + +func main() { + err := mod.RegisterActionCap(&ActionA{code: "A"}, &ActionA{code: "B"}, &ActionA{code: "C"}, &ActionA{code: "D"}) + if err != nil { + return + } + // init keeper + keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{ + Key: "worker-1", + // if your mongo does not set user/pwd, you should remove it + ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", + Database: "mongo-demo", + Prefix: "test", + }) + if err := keeper.Init(); err != nil { + log.Fatal(fmt.Errorf("init keeper failed: %w", err)) + } + + // init store + st := mongoStore.NewStore(&mongoStore.StoreOption{ + // if your mongo does not set user/pwd, you should remove it + ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", + Database: "mongo-demo", + Prefix: "test", + }) + if err := st.Init(); err != nil { + log.Fatal(fmt.Errorf("init store failed: %w", err)) + } + + // init fastflow + if err := fastflow.Init(&fastflow.InitialOption{ + Keeper: keeper, + Store: st, + ParserWorkersCnt: 10, + ExecutorWorkerCnt: 50, + }); err != nil { + panic(fmt.Sprintf("init fastflow failed: %s", err)) + } + + // create a dag as template + if err := ensureDagCreated(); err != nil { + log.Fatalf(err.Error()) + } + // run dag interval + go runInstance() + + // listen a http endpoint to serve metrics + if err := http.ListenAndServe(":9091", exporter.HttpHandler()); err != nil { + panic(fmt.Sprintf("metrics serve failed: %s", err)) + } +} + +func runInstance() { + // wait init completed + time.Sleep(2 * time.Second) + dag, err := mod.GetStore().GetDag("test-dag") + if err != nil { + panic(err) + } + + count := uint64(0) + //for { + runVar := map[string]string{ + "var": "run-var", + } + if count%2 == 0 { + runVar = nil + } + dagIns, err := dag.Run(entity.TriggerManually, runVar) + if err != nil { + panic(err) + } + + err = mod.GetStore().CreateDagIns(dagIns) + if err != nil { + panic(err) + } + + count++ + time.Sleep(1 * time.Second) + //} +} diff --git a/pkg/mod/mod_define.go b/pkg/mod/mod_define.go index 1c56ddf..a4996df 100644 --- a/pkg/mod/mod_define.go +++ b/pkg/mod/mod_define.go @@ -1,6 +1,8 @@ package mod import ( + "errors" + "reflect" "time" "github.com/shiningrush/fastflow/pkg/entity" @@ -8,13 +10,13 @@ import ( ) var ( - ActionMap = map[string]run.Action{} - - defExc Executor - defStore Store - defKeeper Keeper - defParser Parser - defCommander Commander + ActionMap = map[string]run.Action{} + registeredActionMap = map[string]reflect.Value{} + defExc Executor + defStore Store + defKeeper Keeper + defParser Parser + defCommander Commander ) // Commander used to execute command @@ -196,3 +198,44 @@ func SetParser(e Parser) { func GetParser() Parser { return defParser } + +func GetRegisteredAction(param string) run.Action { + action, ok := registeredActionMap[param] + if !ok { + return nil + } + return action.Interface().(run.Action) +} + +func RegisterActionCap(intrs ...interface{}) error { + for _, intr := range intrs { + action := reflect.ValueOf(intr) + name, err := callName(action) + if err != nil { + return err + } + if _, ok := registeredActionMap[name]; !ok { + registeredActionMap[name] = action + } + } + return nil +} + +func callName(action reflect.Value) (string, error) { + result := action.MethodByName("Name").Call(make([]reflect.Value, 0)) + if len(result) == 0 { + return "", errors.New("Name function call err,check out if you set true return value") + } + return result[0].String(), nil +} + +func Register(d *entity.Dag) *entity.Dag { + actions := make([]run.Action, len(d.Tasks)) + for _, task := range d.Tasks { + actions = append(actions, GetRegisteredAction(task.ActionName)) + } + for i := range actions { + ActionMap[actions[i].Name()] = actions[i] + } + return d +}