Skip to content

Commit dfb97ec

Browse files
committed
add TestWorkerRemoveAfter
1 parent 5c07e7b commit dfb97ec

File tree

2 files changed

+86
-0
lines changed

2 files changed

+86
-0
lines changed

plugin/input/file/file_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func pluginConfig(opts ...string) *Config {
8787
PersistenceMode: "async",
8888
OffsetsOp: op,
8989
MaintenanceInterval: "5s",
90+
RemoveAfter: "5s",
9091
}
9192
test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
9293

plugin/input/file/worker_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99
"sync"
1010
"testing"
11+
"time"
1112

1213
"github.com/ozontech/file.d/metric"
1314
"github.com/ozontech/file.d/pipeline"
@@ -468,3 +469,87 @@ func TestGetData(t *testing.T) {
468469
})
469470
}
470471
}
472+
473+
func TestWorkerRemoveAfter(t *testing.T) {
474+
tests := []struct {
475+
name string
476+
removeAfter time.Duration
477+
fileRemoved bool
478+
}{
479+
{
480+
name: "is_not_set",
481+
},
482+
{
483+
name: "remove_after",
484+
removeAfter: 5 * time.Second,
485+
fileRemoved: true,
486+
},
487+
{
488+
name: "dont_remove_after",
489+
removeAfter: 1 * time.Hour,
490+
},
491+
}
492+
for _, tt := range tests {
493+
t.Run(tt.name, func(t *testing.T) {
494+
f, err := os.CreateTemp("/tmp", "worker_test")
495+
require.NoError(t, err)
496+
info, err := f.Stat()
497+
require.NoError(t, err)
498+
defer os.Remove(path.Join("/tmp", info.Name()))
499+
500+
_, _ = fmt.Fprint(f, "abc\n")
501+
_, err = f.Seek(0, io.SeekStart)
502+
require.NoError(t, err)
503+
job := &Job{
504+
file: f,
505+
inode: 0,
506+
sourceID: 0,
507+
filename: "",
508+
symlink: "",
509+
ignoreEventsLE: 0,
510+
lastEventSeq: 0,
511+
isVirgin: false,
512+
isDone: false,
513+
shouldSkip: *atomic.NewBool(false),
514+
mu: &sync.Mutex{},
515+
}
516+
517+
if tt.removeAfter > 0 {
518+
job.eofTimestamp = time.Now().Add(-5 * time.Minute)
519+
}
520+
ctl := metric.NewCtl("test", prometheus.NewRegistry())
521+
metrics := newMetricCollection(
522+
ctl.RegisterCounter("worker1", "help_test"),
523+
ctl.RegisterCounter("worker2", "help_test"),
524+
ctl.RegisterGauge("worker3", "help_test"),
525+
ctl.RegisterGauge("worker4", "help_test"),
526+
)
527+
logger := zap.L().Sugar().With("fd")
528+
jp := NewJobProvider(&Config{}, metrics, logger)
529+
jp.jobsChan = make(chan *Job, 2)
530+
jp.jobs = map[pipeline.SourceID]*Job{
531+
1: job,
532+
}
533+
jp.jobsChan <- job
534+
jp.jobsChan <- nil
535+
536+
maxEventSize := 1024
537+
readBufferSize := 1024
538+
539+
w := &worker{
540+
maxEventSize: maxEventSize,
541+
}
542+
if tt.removeAfter > 0 {
543+
w.removeAfter = tt.removeAfter
544+
}
545+
inputer := inputerMock{}
546+
547+
w.work(&inputer, jp, readBufferSize, logger)
548+
if tt.fileRemoved {
549+
assert.NoFileExists(t, f.Name())
550+
} else {
551+
assert.FileExists(t, f.Name())
552+
}
553+
})
554+
}
555+
}

0 commit comments

Comments
 (0)