Skip to content

Commit 1c15481

Browse files
committed
add TestWorkerRemoveAfter
1 parent 5c07e7b commit 1c15481

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed

plugin/input/file/file_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func pluginConfig(opts ...string) *Config {
8787
PersistenceMode: "async",
8888
OffsetsOp: op,
8989
MaintenanceInterval: "5s",
90+
RemoveAfter: "5s",
9091
}
9192
test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
9293

plugin/input/file/worker_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"strings"
99
"sync"
1010
"testing"
11+
"time"
1112

1213
"github.com/ozontech/file.d/metric"
1314
"github.com/ozontech/file.d/pipeline"
@@ -468,3 +469,88 @@ func TestGetData(t *testing.T) {
468469
})
469470
}
470471
}
472+
473+
func TestWorkerRemoveAfter(t *testing.T) {
474+
475+
tests := []struct {
476+
name string
477+
removeAfter time.Duration
478+
fileRemoved bool
479+
}{
480+
{
481+
name: "is_not_set",
482+
},
483+
{
484+
name: "remove_after",
485+
removeAfter: 5 * time.Second,
486+
fileRemoved: true,
487+
},
488+
{
489+
name: "dont_remove_after",
490+
removeAfter: 1 * time.Hour,
491+
},
492+
}
493+
for _, tt := range tests {
494+
t.Run(tt.name, func(t *testing.T) {
495+
f, err := os.CreateTemp("/tmp", "worker_test")
496+
require.NoError(t, err)
497+
info, err := f.Stat()
498+
require.NoError(t, err)
499+
defer os.Remove(path.Join("/tmp", info.Name()))
500+
501+
_, _ = fmt.Fprint(f, "abc\n")
502+
_, err = f.Seek(0, io.SeekStart)
503+
require.NoError(t, err)
504+
job := &Job{
505+
file: f,
506+
inode: 0,
507+
sourceID: 0,
508+
filename: "",
509+
symlink: "",
510+
ignoreEventsLE: 0,
511+
lastEventSeq: 0,
512+
isVirgin: false,
513+
isDone: false,
514+
shouldSkip: *atomic.NewBool(false),
515+
mu: &sync.Mutex{},
516+
}
517+
518+
if tt.removeAfter > 0 {
519+
job.eofTimestamp = time.Now().Add(-5 * time.Minute)
520+
}
521+
ctl := metric.NewCtl("test", prometheus.NewRegistry())
522+
metrics := newMetricCollection(
523+
ctl.RegisterCounter("worker1", "help_test"),
524+
ctl.RegisterCounter("worker2", "help_test"),
525+
ctl.RegisterGauge("worker3", "help_test"),
526+
ctl.RegisterGauge("worker4", "help_test"),
527+
)
528+
logger := zap.L().Sugar().With("fd")
529+
jp := NewJobProvider(&Config{}, metrics, logger)
530+
jp.jobsChan = make(chan *Job, 2)
531+
jp.jobs = map[pipeline.SourceID]*Job{
532+
1: job,
533+
}
534+
jp.jobsChan <- job
535+
jp.jobsChan <- nil
536+
537+
maxEventSize := 1024
538+
readBufferSize := 1024
539+
540+
w := &worker{
541+
maxEventSize: maxEventSize,
542+
}
543+
if tt.removeAfter > 0 {
544+
w.removeAfter = tt.removeAfter
545+
}
546+
inputer := inputerMock{}
547+
548+
w.work(&inputer, jp, readBufferSize, logger)
549+
if tt.fileRemoved {
550+
assert.NoFileExists(t, f.Name())
551+
} else {
552+
assert.FileExists(t, f.Name())
553+
}
554+
})
555+
}
556+
}

0 commit comments

Comments
 (0)