diff --git a/go.mod b/go.mod
index 468b32619..226716c0c 100644
--- a/go.mod
+++ b/go.mod
@@ -30,6 +30,7 @@ require (
github.com/klauspost/compress v1.17.8
github.com/minio/minio-go v6.0.14+incompatible
github.com/ozontech/insane-json v0.1.9
+ github.com/pierrec/lz4/v4 v4.1.21
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
@@ -120,7 +121,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pascaldekloe/name v1.0.1 // indirect
- github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
diff --git a/plugin/input/file/README.md b/plugin/input/file/README.md
index 90207fea5..ae229bfc6 100755
--- a/plugin/input/file/README.md
+++ b/plugin/input/file/README.md
@@ -119,6 +119,13 @@ Job maintenance `fstat` tracked files to detect if new portion of data have been
+**`remove_after`** *`cfg.Duration`* *`default=0`*
+
+After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
+If not specified, files are not removed.
+
+
+
**`should_watch_file_changes`** *`bool`* *`default=false`*
It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
diff --git a/plugin/input/file/file.go b/plugin/input/file/file.go
index 9842376d7..dec756a16 100644
--- a/plugin/input/file/file.go
+++ b/plugin/input/file/file.go
@@ -152,6 +152,13 @@ type Config struct {
MaintenanceInterval cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` // *
MaintenanceInterval_ time.Duration
+ // > @3@4@5@6
+ // >
+ // > After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
+ // > If not specified, files are not removed.
+ RemoveAfter cfg.Duration `json:"remove_after" default:"0" parse:"duration"` // *
+ RemoveAfter_ time.Duration
+
// > @3@4@5@6
// >
// > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
diff --git a/plugin/input/file/file_test.go b/plugin/input/file/file_test.go
index d17ba31a2..2bf3babe4 100644
--- a/plugin/input/file/file_test.go
+++ b/plugin/input/file/file_test.go
@@ -15,6 +15,7 @@ import (
"time"
"github.com/alecthomas/units"
+ "github.com/pierrec/lz4/v4"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -86,6 +87,7 @@ func pluginConfig(opts ...string) *Config {
PersistenceMode: "async",
OffsetsOp: op,
MaintenanceInterval: "5s",
+ RemoveAfter: "0",
}
test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})
@@ -172,6 +174,16 @@ func createTempFile() string {
return file.Name()
}
+func createTempLZ4File() string {
+ u := uuid.NewV4().String()
+ file, err := os.Create(path.Join(filesDir, u+".lz4"))
+ if err != nil {
+ panic(err.Error())
+ }
+
+ return file.Name()
+}
+
func createOffsetFile() string {
file, err := os.Create(path.Join(offsetsDir, offsetsFile))
if err != nil {
@@ -237,6 +249,27 @@ func addString(file string, str string, isLine bool, doSync bool) {
}
}
+func addEventsToLZ4File(file string, events []string) error {
+ outputFile, err := os.Create(file)
+ if err != nil {
+ return err
+ }
+ defer outputFile.Close() // Ensure file descriptor is closed
+
+ lz4Writer := lz4.NewWriter(outputFile)
+ defer lz4Writer.Close() // Ensure LZ4 writer is closed
+
+ // Join events with newlines and write
+ data := strings.Join(events, "\n") + "\n"
+ _, err = lz4Writer.Write([]byte(data))
+ if err != nil {
+ return err
+ }
+
+ // Sync to disk before closing
+ return outputFile.Sync()
+}
+
func addLines(file string, from int, to int) int {
f, err := os.OpenFile(file, os.O_APPEND|os.O_WRONLY, perm)
if err != nil {
@@ -488,6 +521,46 @@ func TestReadContinue(t *testing.T) {
}, blockSize+blockSize-processed, "dirty")
}
+// TestReadCompressed test if works of compressed file
+func TestReadCompressed(t *testing.T) {
+ eventCount := 5
+ events := make([]string, 0)
+
+ run(&test.Case{
+ Prepare: func() {
+ for i := 0; i < eventCount; i++ {
+ events = append(events, fmt.Sprintf(`{"field":"value_%d"}`, i))
+ }
+ },
+ Act: func(p *pipeline.Pipeline) {
+ file := createTempLZ4File()
+ addEventsToLZ4File(file, events)
+ },
+ Assert: func(p *pipeline.Pipeline) {
+ assert.Equal(t, eventCount, p.GetEventsTotal(), "wrong event count")
+ for i, s := range events {
+ assert.Equal(t, s, p.GetEventLogItem(i), "wrong event")
+ }
+ },
+ }, eventCount)
+}
+
+// TestReadCompressedEmpty test if works of empty compressed file
+func TestReadCompressedEmpty(t *testing.T) {
+ eventCount := 0
+
+ run(&test.Case{
+ Prepare: func() {
+ },
+ Act: func(p *pipeline.Pipeline) {
+ createTempLZ4File()
+ },
+ Assert: func(p *pipeline.Pipeline) {
+ assert.Equal(t, eventCount, p.GetEventsTotal(), "wrong event count")
+ },
+ }, eventCount)
+}
+
// TestOffsetsSaveSimple tests if offsets saving works right in the simple case
func TestOffsetsSaveSimple(t *testing.T) {
eventCount := 5
diff --git a/plugin/input/file/offset.go b/plugin/input/file/offset.go
index 6b9403f43..63fc23b7d 100644
--- a/plugin/input/file/offset.go
+++ b/plugin/input/file/offset.go
@@ -10,6 +10,7 @@ import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/xtime"
"go.uber.org/atomic"
)
@@ -24,9 +25,10 @@ type offsetDB struct {
}
type inodeOffsets struct {
- filename string
- sourceID pipeline.SourceID
- streams map[pipeline.StreamName]int64
+ filename string
+ sourceID pipeline.SourceID
+ streams map[pipeline.StreamName]int64
+ lastReadTimestamp int64
}
type (
@@ -88,6 +90,7 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
filename := ""
inodeStr := ""
sourceIDStr := ""
+ lastReadTimestampStr := ""
var err error
filename, content, err = o.parseLine(content, "- file: ")
@@ -103,6 +106,11 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
return "", fmt.Errorf("can't parse source_id: %w", err)
}
+ lastReadTimestampStr, content, err = o.parseOptionalLine(content, " last_read_timestamp: ")
+ if err != nil {
+ return "", fmt.Errorf("can't parse last_read_timestamp: %w", err)
+ }
+
sysInode, err := strconv.ParseUint(inodeStr, 10, 64)
if err != nil {
return "", fmt.Errorf("wrong offsets format, can't parse inode: %s: %w", inodeStr, err)
@@ -120,10 +128,21 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
return "", fmt.Errorf("wrong offsets format, duplicate inode %d", inode)
}
+ var lastReadTimestampVal int64
+ if lastReadTimestampStr != "" {
+ lastReadTimestampVal, err = strconv.ParseInt(lastReadTimestampStr, 10, 64)
+ if err != nil {
+ return "", fmt.Errorf("invalid timestamp format %q: %w", lastReadTimestampStr, err)
+ }
+ } else {
+ lastReadTimestampVal = xtime.GetInaccurateUnixNano()
+ }
+
offsets[fp] = &inodeOffsets{
- streams: make(map[pipeline.StreamName]int64),
- filename: filename,
- sourceID: fp,
+ streams: make(map[pipeline.StreamName]int64),
+ filename: filename,
+ sourceID: fp,
+ lastReadTimestamp: lastReadTimestampVal,
}
return o.parseStreams(content, offsets[fp].streams)
@@ -172,21 +191,54 @@ func (o *offsetDB) parseStreams(content string, streams streamsOffsets) (string,
return content, nil
}
-func (o *offsetDB) parseLine(content string, start string) (string, string, error) {
- l := len(start)
+func (o *offsetDB) parseLine(content string, prefix string) (string, string, error) {
+ if content == "" {
+ return "", "", fmt.Errorf("unexpected end of content while looking for %q", prefix)
+ }
linePos := strings.IndexByte(content, '\n')
if linePos < 0 {
- return "", "", fmt.Errorf("wrong offsets format, no nl: %q", content)
+ return "", "", fmt.Errorf("no newline found in content")
+ }
+
+ line := content[:linePos]
+ remaining := content[linePos+1:]
+
+ if len(line) < len(prefix) || line[:len(prefix)] != prefix {
+ return "", "", fmt.Errorf("expected prefix %q, got %q", prefix, safeSubstring(line, len(prefix)))
+ }
+
+ return line[len(prefix):], remaining, nil
+}
+
+func (o *offsetDB) parseOptionalLine(content string, prefix string) (string, string, error) {
+ if content == "" {
+ return "", content, nil // No content, return empty value
+ }
+
+ if len(content) >= len(prefix) && content[:len(prefix)] == prefix {
+ return o.parseLine(content, prefix)
+ }
+
+ if strings.HasPrefix(content, " streams:") || content[0] == '-' {
+ return "", content, nil
}
- line := content[0:linePos]
- content = content[linePos+1:]
- if linePos < l || line[0:l] != start {
- return "", "", fmt.Errorf("wrong offsets file format expected=%q, got=%q", start, line[0:l])
+ linePos := strings.IndexByte(content, '\n')
+ var nextLine string
+ if linePos >= 0 {
+ nextLine = content[:linePos]
+ } else {
+ nextLine = content
}
+ return "", "", fmt.Errorf("unexpected line format, expected %q or streams section, got %q", prefix, safeSubstring(nextLine, len(prefix)))
+}
- return line[l:], content, nil
+func safeSubstring(s string, length int) string {
+ if len(s) < length {
+ return s
+ }
+ return s[:length]
}
func (o *offsetDB) save(jobs map[pipeline.SourceID]*Job, mu *sync.RWMutex) {
@@ -234,6 +286,10 @@ func (o *offsetDB) save(jobs map[pipeline.SourceID]*Job, mu *sync.RWMutex) {
o.buf = strconv.AppendUint(o.buf, uint64(job.sourceID), 10)
o.buf = append(o.buf, '\n')
+ o.buf = append(o.buf, " last_read_timestamp: "...)
+ o.buf = strconv.AppendInt(o.buf, job.eofReadInfo.getUnixNanoTimestamp(), 10)
+ o.buf = append(o.buf, '\n')
+
o.buf = append(o.buf, " streams:\n"...)
for _, strOff := range job.offsets {
o.buf = append(o.buf, " "...)
diff --git a/plugin/input/file/offset_test.go b/plugin/input/file/offset_test.go
index a45d2cf65..976af25c5 100644
--- a/plugin/input/file/offset_test.go
+++ b/plugin/input/file/offset_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/xtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
@@ -23,6 +24,7 @@ func TestParseOffsets(t *testing.T) {
- file: /another/informational/name
inode: 2
source_id: 4321
+ last_read_timestamp: 1763651665000000000
streams:
stderr: 300
error:: 0
@@ -36,6 +38,7 @@ func TestParseOffsets(t *testing.T) {
assert.Equal(t, "/some/informational/name", item.filename)
assert.Equal(t, pipeline.SourceID(1234), item.sourceID)
+ assert.InDelta(t, xtime.GetInaccurateUnixNano(), item.lastReadTimestamp, 10)
offset, has := item.streams["default"]
assert.True(t, has, "stream isn't found")
@@ -54,6 +57,7 @@ func TestParseOffsets(t *testing.T) {
assert.Equal(t, "/another/informational/name", item.filename)
assert.Equal(t, pipeline.SourceID(4321), item.sourceID)
+ assert.Equal(t, int64(1763651665000000000), item.lastReadTimestamp)
offset, has = item.streams["stderr"]
assert.True(t, has, "stream isn't found")
@@ -167,3 +171,264 @@ func TestParallelOffsetsGetSet(t *testing.T) {
}
wg.Wait()
}
+
+func TestParseLine(t *testing.T) {
+ offsetDB := newOffsetDB("", "")
+
+ tests := []struct {
+ name string
+ content string
+ prefix string
+ expectedLine string
+ expectedRemain string
+ expectedErr string
+ }{
+ // happy path
+ {
+ name: "parse file line",
+ content: "- file: /some/informational/name\n inode: 1",
+ prefix: "- file: ",
+ expectedLine: "/some/informational/name",
+ expectedRemain: " inode: 1",
+ },
+ {
+ name: "parse indented file line",
+ content: " file: /some/informational/name\n inode: 1",
+ prefix: " file: ",
+ expectedLine: "/some/informational/name",
+ expectedRemain: " inode: 1",
+ },
+ {
+ name: "parse inode line",
+ content: " inode: 1\n source_id: 1234",
+ prefix: " inode: ",
+ expectedLine: "1",
+ expectedRemain: " source_id: 1234",
+ },
+ {
+ name: "parse source_id line",
+ content: " source_id: 1234\n streams:\n default: 100",
+ prefix: " source_id: ",
+ expectedLine: "1234",
+ expectedRemain: " streams:\n default: 100",
+ },
+ {
+ name: "parse streams header",
+ content: " streams:\n default: 100\n error:: 960",
+ prefix: " streams:",
+ expectedLine: "",
+ expectedRemain: " default: 100\n error:: 960",
+ },
+ {
+ name: "parse stream line with double colon",
+ content: " error:: 960\n another: 200",
+ prefix: " error:: ",
+ expectedLine: "960",
+ expectedRemain: " another: 200",
+ },
+ {
+ name: "parse stream line with single colon",
+ content: " default: 100\n error:: 960",
+ prefix: " default: ",
+ expectedLine: "100",
+ expectedRemain: " error:: 960",
+ },
+ {
+ name: "parse zero value stream",
+ content: " error:: 0\n- file: /another/file",
+ prefix: " error:: ",
+ expectedLine: "0",
+ expectedRemain: "- file: /another/file",
+ },
+ {
+ name: "parse empty stream value",
+ content: " empty:\n next: 100",
+ prefix: " empty:",
+ expectedLine: "",
+ expectedRemain: " next: 100",
+ },
+ {
+ name: "line with trailing spaces",
+ content: " inode: 1 \n source_id: 1234",
+ prefix: " inode: ",
+ expectedLine: "1 ",
+ expectedRemain: " source_id: 1234",
+ },
+
+ // Error cases
+ {
+ name: "empty content",
+ content: "",
+ prefix: " file: ",
+ expectedErr: "unexpected end of content while looking for",
+ },
+ {
+ name: "no newline",
+ content: " file: /some/path",
+ prefix: " file: ",
+ expectedErr: "no newline found in content",
+ },
+ {
+ name: "wrong prefix",
+ content: " filename: /some/path\n",
+ prefix: " file: ",
+ expectedErr: "expected prefix",
+ },
+ {
+ name: "prefix longer than line",
+ content: " fil\n",
+ prefix: " file: ",
+ expectedErr: "expected prefix",
+ },
+ {
+ name: "wrong indentation",
+ content: " file: /some/path\n", // one space instead of two
+ prefix: " file: ",
+ expectedErr: "expected prefix",
+ },
+ {
+ name: "missing colon",
+ content: " file /some/path\n",
+ prefix: " file: ",
+ expectedErr: "expected prefix",
+ },
+ {
+ name: "case mismatch",
+ content: " File: /some/path\n",
+ prefix: " file: ",
+ expectedErr: "expected prefix",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ line, remain, err := offsetDB.parseLine(tt.content, tt.prefix)
+
+ if tt.expectedErr != "" {
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), tt.expectedErr)
+ return
+ }
+
+ require.NoError(t, err)
+ assert.Equal(t, tt.expectedLine, line)
+ assert.Equal(t, tt.expectedRemain, remain)
+ })
+ }
+}
+
+func TestParseOptionalLineLastReadTimestamp(t *testing.T) {
+ offsetDB := newOffsetDB("", "")
+
+ tests := []struct {
+ name string
+ content string
+ expectedLine string
+ expectedRemain string
+ expectedErr bool
+ desc string
+ }{
+ // last_read_timestamp exists
+ {
+ name: "has last_read_timestamp before streams",
+ content: " last_read_timestamp: 1763651665000000000\n streams:\n default: 100",
+ expectedLine: "1763651665000000000",
+ expectedRemain: " streams:\n default: 100",
+ desc: "timestamp present, streams follows",
+ },
+ {
+ name: "has last_read_timestamp before new item",
+ content: " last_read_timestamp: 1763651665000000000\n- file: /another/file",
+ expectedLine: "1763651665000000000",
+ expectedRemain: "- file: /another/file",
+ desc: "timestamp present, new item follows",
+ },
+ {
+ name: "has last_read_timestamp at end",
+ content: " last_read_timestamp: 1763651665000000000\n",
+ expectedLine: "1763651665000000000",
+ expectedRemain: "",
+ desc: "timestamp present at end of content",
+ },
+ {
+ name: "has last_read_timestamp with spaces",
+ content: " last_read_timestamp: 1763651665000000000 \n streams:",
+ expectedLine: " 1763651665000000000 ",
+ expectedRemain: " streams:",
+ desc: "timestamp with extra spaces",
+ },
+ {
+ name: "has last_read_timestamp zero value",
+ content: " last_read_timestamp: 0\n streams:",
+ expectedLine: "0",
+ expectedRemain: " streams:",
+ desc: "zero timestamp value",
+ },
+
+ // last_read_timestamp does NOT exist
+ {
+ name: "no last_read_timestamp, streams follows",
+ content: " streams:\n default: 100",
+ expectedLine: "",
+ expectedRemain: " streams:\n default: 100",
+ desc: "no timestamp, streams follows",
+ },
+ {
+ name: "no last_read_timestamp, new item starts",
+ content: "- file: /another/file\n inode: 2",
+ expectedLine: "",
+ expectedRemain: "- file: /another/file\n inode: 2",
+ desc: "no timestamp, new item starts",
+ },
+ {
+ name: "no last_read_timestamp, empty content",
+ content: "",
+ expectedLine: "",
+ expectedRemain: "",
+ desc: "empty content",
+ },
+
+ // Error cases
+ {
+ name: "unexpected line before streams",
+ content: " unknown_field: value\n streams:",
+ expectedErr: true,
+ desc: "unexpected line causes error",
+ },
+ {
+ name: "wrong field name",
+ content: " lastreadtimestamp: 1763651665000000000\n streams:",
+ expectedErr: true,
+ desc: "wrong field name (no underscores)",
+ },
+ {
+ name: "case mismatch",
+ content: " Last_Read_Timestamp: 1763651665000000000\n streams:",
+ expectedErr: true,
+ desc: "wrong case in field name",
+ },
+ {
+ name: "missing colon",
+ content: " last_read_timestamp 1763651665000000000\n streams:",
+ expectedErr: true,
+ desc: "missing colon after field name",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.desc, func(t *testing.T) {
+ line, remain, err := offsetDB.parseOptionalLine(tt.content, " last_read_timestamp: ")
+
+ if tt.expectedErr {
+ require.Error(t, err)
+ assert.Contains(t, err.Error(), "unexpected line format")
+ assert.Contains(t, err.Error(), "expected \" last_read_timestamp: \"")
+ return
+ }
+
+ require.NoError(t, err)
+ assert.Equal(t, tt.expectedLine, line, "timestamp value mismatch")
+ assert.Equal(t, tt.expectedRemain, remain, "remaining content mismatch")
+ })
+ }
+}
diff --git a/plugin/input/file/provider.go b/plugin/input/file/provider.go
index f68669a6b..ba2e4cf59 100644
--- a/plugin/input/file/provider.go
+++ b/plugin/input/file/provider.go
@@ -9,8 +9,11 @@ import (
"syscall"
"time"
+ sync_atomic "sync/atomic"
+
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
+ "github.com/ozontech/file.d/xtime"
"github.com/prometheus/client_golang/prometheus"
"github.com/rjeczalik/notify"
"go.uber.org/atomic"
@@ -60,16 +63,19 @@ type jobProvider struct {
}
type Job struct {
- file *os.File
- inode inodeID
- sourceID pipeline.SourceID // some value to distinguish jobs with same inode
- filename string
- symlink string
- curOffset int64 // offset to not call Seek() everytime
- tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
+ file *os.File
+ mimeType string
+ isCompressed bool
+ inode inodeID
+ sourceID pipeline.SourceID // some value to distinguish jobs with same inode
+ filename string
+ symlink string
+ curOffset int64 // offset to not call Seek() everytime
+ tail []byte // some data of a new line read by worker, to not seek backwards to read from line start
ignoreEventsLE uint64 // events with seq id less or equal than this should be ignored in terms offset commitment
lastEventSeq uint64
+ eofReadInfo eofInfo // timestamp of last EOF event
isVirgin bool // it should be set to false if job hits isDone=true at the first time
isDone bool
@@ -83,16 +89,54 @@ type Job struct {
mu *sync.Mutex
}
-func (j *Job) seek(offset int64, whence int, hint string) int64 {
- n, err := j.file.Seek(offset, whence)
- if err != nil {
- logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
+func (j *Job) seek(offset int64, whence int, hint string) (n int64) {
+ var err error
+ if !j.isCompressed {
+ n, err = j.file.Seek(offset, whence)
+ if err != nil {
+ logger.Infof("file seek error hint=%s, name=%s, err=%s", hint, j.filename, err.Error())
+ }
+ } else {
+ n = 0
}
j.curOffset = n
return n
}
+type eofInfo struct {
+ timestamp int64
+ offset int64
+}
+
+func (e *eofInfo) setTimestamp(t time.Time) {
+ sync_atomic.StoreInt64(&e.timestamp, t.UnixNano())
+}
+
+func (e *eofInfo) getTimestamp() time.Time {
+ nanos := sync_atomic.LoadInt64(&e.timestamp)
+ return time.Unix(0, nanos)
+}
+
+// setUnixNanoTimestamp sets timestamp in seconds
+func (e *eofInfo) setUnixNanoTimestamp(nanos int64) {
+ sync_atomic.StoreInt64(&e.timestamp, nanos)
+}
+
+// getUnixNanoTimestamp returns timestamp in seconds
+func (e *eofInfo) getUnixNanoTimestamp() int64 {
+ nanos := sync_atomic.LoadInt64(&e.timestamp)
+ return nanos
+}
+
+func (e *eofInfo) setOffset(offset int64) {
+ sync_atomic.StoreInt64(&e.offset, offset)
+}
+
+func (e *eofInfo) getOffset() int64 {
+ return sync_atomic.LoadInt64(&e.offset)
+}
+
type inodeID uint64
type symlinkInfo struct {
@@ -354,6 +398,10 @@ func (jp *jobProvider) checkFileWasTruncated(job *Job, size int64) {
}
}
+func isCompressed(mimeType string) bool {
+ return mimeType == "application/x-lz4"
+}
+
func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string, symlink string) {
sourceID := sourceIDByStat(stat, symlink)
@@ -370,12 +418,16 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
}
inode := getInode(stat)
+ mimeType := getMimeType(filename)
+
job := &Job{
- file: file,
- inode: inode,
- filename: filename,
- symlink: symlink,
- sourceID: sourceID,
+ file: file,
+ isCompressed: isCompressed(mimeType),
+ mimeType: mimeType,
+ inode: inode,
+ filename: filename,
+ symlink: symlink,
+ sourceID: sourceID,
isVirgin: true,
isDone: true,
@@ -386,6 +438,8 @@ func (jp *jobProvider) addJob(file *os.File, stat os.FileInfo, filename string,
mu: &sync.Mutex{},
}
+ jp.initEofInfo(job)
+
// set curOffset
job.seek(0, io.SeekCurrent, "add job")
@@ -474,6 +528,25 @@ func (jp *jobProvider) initJobOffset(operation offsetsOp, job *Job) {
}
}
+func (jp *jobProvider) initEofInfo(job *Job) {
+ offsets, has := jp.loadedOffsets[job.sourceID]
+ if !has {
+ return
+ }
+ eofInfoFromOffsets := eofInfo{}
+ eofInfoFromOffsets.setUnixNanoTimestamp(offsets.lastReadTimestamp)
+
+ minOffset := int64(math.MaxInt64)
+ for _, offset := range offsets.streams {
+ if offset < minOffset {
+ minOffset = offset
+ }
+ }
+ eofInfoFromOffsets.setOffset(minOffset)
+
+ job.eofReadInfo = eofInfoFromOffsets
+}
+
// tryResumeJob job should be already locked and it'll be unlocked.
func (jp *jobProvider) tryResumeJobAndUnlock(job *Job, filename string) {
jp.logger.Debugf("job for %d:%s resumed", job.sourceID, job.filename)
@@ -671,7 +744,7 @@ func (jp *jobProvider) maintenanceJob(job *Job) int {
offset := job.seek(0, io.SeekCurrent, "maintenance")
- if stat.Size() != offset {
+ if stat.Size() != offset && !job.isCompressed {
jp.tryResumeJobAndUnlock(job, filename)
return maintenanceResultResumed
@@ -717,6 +790,19 @@ func (jp *jobProvider) maintenanceJob(job *Job) int {
return maintenanceResultDeleted
}
+ if jp.config.RemoveAfter_ > 0 && xtime.GetInaccurateTime().Sub(job.eofReadInfo.getTimestamp()) > jp.config.RemoveAfter_ {
+ err = file.Close()
+ if err != nil {
+ jp.logger.Errorf("cannot close file %s: ", err)
+ }
+ jp.deleteJobAndUnlock(job)
+ err = os.Remove(file.Name())
+ if err != nil {
+ jp.logger.Errorf("cannot remove file %s: ", err)
+ }
+ return maintenanceResultDeleted
+ }
+
// seek to saved offset
job.file = file
job.seek(offset, io.SeekStart, "maintenance")
diff --git a/plugin/input/file/provider_test.go b/plugin/input/file/provider_test.go
index 45f17383f..b8c8fc21a 100644
--- a/plugin/input/file/provider_test.go
+++ b/plugin/input/file/provider_test.go
@@ -3,12 +3,15 @@ package file
import (
"os"
"path/filepath"
+ "sync"
"testing"
+ "time"
"runtime"
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/metric"
+ "github.com/ozontech/file.d/xtime"
"github.com/prometheus/client_golang/prometheus"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/require"
@@ -181,3 +184,107 @@ func TestProviderWatcherPaths(t *testing.T) {
})
}
}
+
+func TestEOFInfo(t *testing.T) {
+ t.Run("setTimestamp and getTimestamp match", func(t *testing.T) {
+ e := &eofInfo{}
+
+ now := time.Now()
+ e.setTimestamp(now)
+
+ got := e.getTimestamp()
+ assert.Equal(t, now.UnixNano(), got.UnixNano())
+ assert.True(t, now.Equal(got))
+ })
+
+ t.Run("setUnixTimestamp and getUnixTimestamp match", func(t *testing.T) {
+ e := &eofInfo{}
+
+ expected := time.Now().Unix()
+ e.setUnixNanoTimestamp(expected)
+
+ got := e.getUnixNanoTimestamp()
+ assert.Equal(t, expected, got)
+ })
+
+ t.Run("setOffset and getOffset match", func(t *testing.T) {
+ e := &eofInfo{}
+
+ expected := int64(12345)
+ e.setOffset(expected)
+
+ got := e.getOffset()
+ assert.Equal(t, expected, got)
+ })
+
+ t.Run("concurrent access to timestamp", func(t *testing.T) {
+ e := &eofInfo{}
+ var wg sync.WaitGroup
+
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func(idx int) {
+ defer wg.Done()
+ ts := time.Unix(int64(idx), 0)
+ e.setTimestamp(ts)
+ _ = e.getTimestamp() // Just read it
+ }(i)
+ }
+
+ wg.Wait()
+ })
+
+ t.Run("concurrent access to offset", func(t *testing.T) {
+ e := &eofInfo{}
+ var wg sync.WaitGroup
+
+ // Start multiple goroutines writing different offsets
+ for i := 0; i < 100; i++ {
+ wg.Add(1)
+ go func(offset int64) {
+ defer wg.Done()
+ e.setOffset(offset)
+ _ = e.getOffset() // Just read it
+ }(int64(i * 100))
+ }
+
+ wg.Wait()
+ })
+
+ t.Run("timestamp and unix timestamp consistency", func(t *testing.T) {
+ e := &eofInfo{}
+
+ now := time.Now()
+ e.setTimestamp(now)
+
+ // Verify unix timestamp matches
+ unixTs := e.getUnixNanoTimestamp()
+ assert.Equal(t, now.UnixNano(), unixTs)
+
+ // Now set via unix timestamp and verify via getTimestamp
+ newUnixTs := now.Add(time.Hour).Unix()
+ e.setUnixNanoTimestamp(newUnixTs)
+
+ gotTime := e.getTimestamp()
+ assert.Equal(t, newUnixTs, gotTime.UnixNano())
+ })
+
+ t.Run("timestamp and unix timestamp consistency with xtime", func(t *testing.T) {
+ e := &eofInfo{}
+
+ now := xtime.GetInaccurateTime()
+ e.setTimestamp(now)
+
+ // Verify unix timestamp matches
+ unixTs := e.getUnixNanoTimestamp()
+ assert.Equal(t, now.UnixNano(), unixTs)
+
+ // Now set via unix timestamp and verify via getTimestamp
+ newUnixTs := xtime.GetInaccurateUnixNano()
+ e.setUnixNanoTimestamp(newUnixTs)
+
+ gotTime := e.getTimestamp()
+ assert.Equal(t, newUnixTs, gotTime.UnixNano())
+ })
+
+}
diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go
index c1c2df2b9..726fa859a 100644
--- a/plugin/input/file/worker.go
+++ b/plugin/input/file/worker.go
@@ -3,12 +3,18 @@ package file
import (
"bytes"
"io"
+ "math"
+ "mime"
"os"
+ "os/exec"
+ "path/filepath"
"strings"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/pipeline/metadata"
k8s_meta "github.com/ozontech/file.d/plugin/input/k8s/meta"
+ "github.com/ozontech/file.d/xtime"
+ "github.com/pierrec/lz4/v4"
"go.uber.org/zap"
)
@@ -90,20 +96,52 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
}
}
+ var reader io.Reader
+ if job.mimeType == "application/x-lz4" {
+ if isNotFileBeingWritten(file.Name()) {
+ // lz4 does not support appending, so we check that no one is writing to the file
+ logger.Error("cannot lock file", zap.String("filename", file.Name()))
+ break
+ }
+ lz4Reader := lz4.NewReader(file)
+ if len(offsets) > 0 {
+ // skip processed lines cause we cannot use fseek on a compressed file
+ minOffset := int64(math.MaxInt64)
+ for _, offset := range offsets {
+ if offset.Offset < minOffset {
+ minOffset = offset.Offset
+ }
+ }
+ for lastOffset+int64(readBufferSize) < minOffset {
+ n, err := lz4Reader.Read(readBuf)
+ if err == io.EOF {
+ break // End of file reached
+ }
+ lastOffset += int64(n)
+ }
+ }
+ reader = lz4Reader
+ } else {
+ reader = file
+ }
+
// append the data of the old work, this happens when the event was not completely written to the file
// for example: {"level": "info", "message": "some...
// the end of the message can be added later and will be read in this iteration
accumBuf = append(accumBuf[:0], job.tail...)
for {
- n, err := file.Read(readBuf)
+ n, err := reader.Read(readBuf)
controller.IncReadOps()
// if we read to end of file it's time to check truncation etc and process next job
- if err == io.EOF || n == 0 {
+ if (!job.isCompressed && err == io.EOF) || n == 0 {
+ // cause lz4reader can return EOF and n > 0
isEOFReached = true
break
}
if err != nil {
- logger.Fatalf("file %d:%s read error, %s read=%d", sourceID, sourceName, err.Error(), n)
+ if !job.isCompressed && err != io.EOF {
+ logger.Fatalf("file %d:%s read error, %s read=%d", sourceID, sourceName, err.Error(), n)
+ }
}
read := int64(n)
@@ -177,22 +215,69 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi
}
}
+var customMimeTypes = map[string]string{
+ ".lz4": "application/x-lz4",
+}
+
+func getMimeType(filename string) string {
+ ext := strings.ToLower(filepath.Ext(filename))
+
+ if mimeType := mime.TypeByExtension(ext); mimeType != "" {
+ return mimeType
+ }
+
+ // Try custom mapping
+ if mimeType, exists := customMimeTypes[ext]; exists {
+ return mimeType
+ }
+
+ // Default for unknown types
+ return "application/octet-stream"
+}
+
+func isNotFileBeingWritten(filePath string) bool {
+ // Run the lsof command to check open file descriptors
+ cmd := exec.Command("lsof", filePath)
+ output, err := cmd.Output()
+ if err != nil {
+ return false // Error running lsof
+ }
+
+ // Check the output for write access
+ lines := strings.Split(string(output), "\n")
+ for _, line := range lines {
+ // Check if the line contains 'w' indicating write access
+ if strings.Contains(line, "w") {
+ return true // File is being written to
+ }
+ }
+
+ return false // File is not being written to
+}
+
func (w *worker) processEOF(file *os.File, job *Job, jobProvider *jobProvider, totalOffset int64) error {
stat, err := file.Stat()
if err != nil {
return err
}
- // files truncated from time to time, after logs from file was processed.
- // Position > stat.Size() means that data was truncated and
- // caret pointer must be moved to start of file.
- if totalOffset > stat.Size() {
- jobProvider.truncateJob(job)
+ if !job.isCompressed {
+ // files truncated from time to time, after logs from file was processed.
+ // Position > stat.Size() means that data was truncated and
+ // caret pointer must be moved to start of file.
+ if totalOffset > stat.Size() {
+ jobProvider.truncateJob(job)
+ }
}
-
// Mark job as done till new lines has appeared.
jobProvider.doneJob(job)
+ if job.eofReadInfo.getUnixNanoTimestamp() == 0 || job.eofReadInfo.getOffset() != totalOffset {
+ // store info about event of end of file
+ job.eofReadInfo.setUnixNanoTimestamp(xtime.GetInaccurateUnixNano())
+ job.eofReadInfo.setOffset(totalOffset)
+ }
+
return nil
}
diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go
index f38527387..98cc73665 100644
--- a/plugin/input/file/worker_test.go
+++ b/plugin/input/file/worker_test.go
@@ -8,6 +8,7 @@ import (
"strings"
"sync"
"testing"
+ "time"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
@@ -79,6 +80,13 @@ func TestWorkerWork(t *testing.T) {
readBufferSize: 1024,
expData: "abc\n",
},
+ {
+ name: "should_ok_when_read_1_line_without_newline",
+ maxEventSize: 1024,
+ inFile: "abc",
+ readBufferSize: 1024,
+ expData: "",
+ },
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -461,3 +469,100 @@ func TestGetData(t *testing.T) {
})
}
}
+
+func TestWorkerRemoveAfter(t *testing.T) {
+ tests := []struct {
+ name string
+ removeAfter time.Duration
+ fileRemoved bool
+ fileIsChanged bool
+ }{
+ {
+ name: "is_not_set",
+ },
+ {
+ name: "remove_after",
+ removeAfter: 5 * time.Second,
+ fileRemoved: true,
+ },
+ {
+ name: "dont_remove_after_append",
+ removeAfter: 5 * time.Second,
+ fileRemoved: false,
+ fileIsChanged: true,
+ },
+ {
+ name: "dont_remove_after",
+ removeAfter: 1 * time.Hour,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ f, err := os.CreateTemp("/tmp", "worker_test")
+ require.NoError(t, err)
+ info, err := f.Stat()
+ require.NoError(t, err)
+ defer os.Remove(path.Join("/tmp", info.Name()))
+
+ str := "abc\n"
+ _, _ = fmt.Fprint(f, str)
+ _, err = f.Seek(0, io.SeekStart)
+ require.NoError(t, err)
+ job := &Job{
+ file: f,
+ inode: getInode(info),
+ sourceID: 0,
+ filename: f.Name(),
+ symlink: "",
+ ignoreEventsLE: 0,
+ lastEventSeq: 0,
+ isVirgin: false,
+ isDone: false,
+ shouldSkip: *atomic.NewBool(false),
+ mu: &sync.Mutex{},
+ }
+
+ if tt.removeAfter > 0 {
+ job.eofReadInfo.setTimestamp(time.Now().Add(-5 * time.Minute))
+ }
+
+ if !tt.fileIsChanged {
+ job.eofReadInfo.setOffset(int64(len(str)))
+ }
+ ctl := metric.NewCtl("test", prometheus.NewRegistry())
+ metrics := newMetricCollection(
+ ctl.RegisterCounter("worker1", "help_test"),
+ ctl.RegisterCounter("worker2", "help_test"),
+ ctl.RegisterGauge("worker3", "help_test"),
+ ctl.RegisterGauge("worker4", "help_test"),
+ )
+ logger := zap.L().Sugar().With("fd")
+ jp := NewJobProvider(&Config{}, metrics, logger)
+ jp.jobsChan = make(chan *Job, 2)
+ jp.jobs = map[pipeline.SourceID]*Job{
+ 1: job,
+ }
+ jp.jobsChan <- job
+ jp.jobsChan <- nil
+
+ maxEventSize := 1024
+ readBufferSize := 1024
+
+ w := &worker{
+ maxEventSize: maxEventSize,
+ }
+ if tt.removeAfter > 0 {
+ jp.config.RemoveAfter_ = tt.removeAfter
+ }
+ inputer := inputerMock{}
+
+ w.work(&inputer, jp, readBufferSize, logger)
+ jp.maintenanceJob(job)
+ if tt.fileRemoved {
+ assert.NoFileExists(t, f.Name())
+ } else {
+ assert.FileExists(t, f.Name())
+ }
+ })
+ }
+}