Skip to content

Commit 1563e75

Browse files
committed
feat: background task pool support
1 parent 7c3f376 commit 1563e75

File tree

5 files changed

+110
-0
lines changed

5 files changed

+110
-0
lines changed

internal/app/cli/cli.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/ka1i/cli/internal/pkg/datasource"
1313
"github.com/ka1i/cli/internal/pkg/handlers"
1414
"github.com/ka1i/cli/internal/pkg/system/prepare"
15+
"github.com/ka1i/cli/internal/pkg/task"
1516
"github.com/ka1i/cli/pkg/logger"
1617
"github.com/ka1i/cli/pkg/utils"
1718
)
@@ -44,6 +45,9 @@ func Execute() error {
4445
// system initialize
4546
prepare.Configure()
4647

48+
// service background task
49+
go task.New.Start()
50+
4751
// configure web server
4852
engine := gin.New()
4953
handlers.SetHandlers(engine)

internal/pkg/config/model.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ type Options struct {
99

1010
Mysql `yaml:"mysql"` // 数据库配置
1111
Redis `yaml:"redis"` // 数据库配置
12+
13+
Task `yaml:"task"` // 后台任务配置
1214
}
1315

1416
// app env config
@@ -43,3 +45,7 @@ type Redis struct {
4345
Passwd string `yaml:"passwd"` // 密码
4446
MasterName string `yaml:"mastername"` // 选项
4547
}
48+
49+
type Task struct {
50+
Interval time.Duration `yaml:"interval"` // 间隔
51+
}

internal/pkg/task/init.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package task
2+
3+
import (
4+
"runtime"
5+
)
6+
7+
var (
8+
waitCount int = 0
9+
10+
waitLatestTaskDone = make(chan bool)
11+
12+
goroutineMaxProcess = make(chan bool, runtime.NumCPU()*100)
13+
)

internal/pkg/task/task.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package task
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/ka1i/cli/internal/pkg/config"
8+
"github.com/ka1i/cli/pkg/logger"
9+
"go.uber.org/zap"
10+
)
11+
12+
var New = getTask()
13+
14+
func getTask() *taskbBase {
15+
return &taskbBase{}
16+
}
17+
18+
type taskbBase struct {
19+
surveillance time.Duration
20+
}
21+
22+
func (t *taskbBase) init() {
23+
time.Sleep(time.Second * 1)
24+
interval := config.Cfg.Get().Interval
25+
if interval == 0 {
26+
logger.Error("Please check background task interval, Use defaults interval: 1min")
27+
interval = time.Second * 60
28+
}
29+
t.surveillance = interval / 2
30+
31+
logger.Info("background task", zap.String("interval", interval.String()), zap.String("surveillance", t.surveillance.String()))
32+
}
33+
34+
func (t *taskbBase) Start() {
35+
36+
t.init()
37+
38+
var wg sync.WaitGroup
39+
surveillanceTicker := time.NewTicker(t.surveillance)
40+
41+
var waitLatestTask bool = false
42+
for {
43+
select {
44+
case <-surveillanceTicker.C:
45+
// surveillance task
46+
if waitLatestTask { // check result wait
47+
waitCount++
48+
if waitCount == 1 {
49+
go func() {
50+
wg.Wait()
51+
waitLatestTaskDone <- true
52+
}()
53+
}
54+
} else { // start surveillance group by
55+
wg.Add(1)
56+
go surveillance(&wg)
57+
waitLatestTask = true
58+
}
59+
case <-waitLatestTaskDone:
60+
if waitCount > 1 {
61+
logger.Warnf("注意,请调整任务间隔,建议调整间隔最小为: %v~%v\n", t.surveillance*time.Duration(waitCount-1), t.surveillance*time.Duration(waitCount+1))
62+
}
63+
// analyse surveillance task result
64+
waitCount = 0
65+
waitLatestTask = false
66+
}
67+
}
68+
}

internal/pkg/task/worker.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package task
2+
3+
import (
4+
"sync"
5+
6+
"github.com/ka1i/cli/pkg/logger"
7+
)
8+
9+
func surveillance(wg *sync.WaitGroup) {
10+
defer wg.Done()
11+
12+
wg.Add(1)
13+
goroutineMaxProcess <- true
14+
go func() {
15+
defer wg.Done()
16+
defer func() { <-goroutineMaxProcess }()
17+
logger.Info("new task running...")
18+
}()
19+
}

0 commit comments

Comments
 (0)