Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ require (
github.com/klauspost/compress v1.17.8
github.com/minio/minio-go v6.0.14+incompatible
github.com/ozontech/insane-json v0.1.9
github.com/pierrec/lz4/v4 v4.1.21
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0
Expand Down Expand Up @@ -120,7 +121,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/runtime-spec v1.0.2 // indirect
github.com/pascaldekloe/name v1.0.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
Expand Down
7 changes: 7 additions & 0 deletions plugin/input/file/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ Job maintenance `fstat` tracked files to detect if new portion of data have been

<br>

**`remove_after`** *`cfg.Duration`* *`default=0`*

After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
If not specified, files are not removed.

<br>

**`should_watch_file_changes`** *`bool`* *`default=false`*

It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
Expand Down
7 changes: 7 additions & 0 deletions plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ type Config struct {
MaintenanceInterval cfg.Duration `json:"maintenance_interval" default:"10s" parse:"duration"` // *
MaintenanceInterval_ time.Duration

// > @3@4@5@6
// >
// > After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
// > If not specified, files are not removed.
RemoveAfter cfg.Duration `json:"remove_after" default:"0" parse:"duration"` // *
RemoveAfter_ time.Duration

// > @3@4@5@6
// >
// > It turns on watching for file modifications. Turning it on cause more CPU work, but it is more probable to catch file truncation
Expand Down
73 changes: 73 additions & 0 deletions plugin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/alecthomas/units"
"github.com/pierrec/lz4/v4"
uuid "github.com/satori/go.uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -86,6 +87,7 @@ func pluginConfig(opts ...string) *Config {
PersistenceMode: "async",
OffsetsOp: op,
MaintenanceInterval: "5s",
RemoveAfter: "0",
}
test.NewConfig(config, map[string]int{"gomaxprocs": runtime.GOMAXPROCS(0)})

Expand Down Expand Up @@ -172,6 +174,16 @@ func createTempFile() string {
return file.Name()
}

func createTempLZ4File() string {
u := uuid.NewV4().String()
file, err := os.Create(path.Join(filesDir, u+".lz4"))
if err != nil {
panic(err.Error())
}

return file.Name()
}

func createOffsetFile() string {
file, err := os.Create(path.Join(offsetsDir, offsetsFile))
if err != nil {
Expand Down Expand Up @@ -237,6 +249,27 @@ func addString(file string, str string, isLine bool, doSync bool) {
}
}

func addEventsToLZ4File(file string, events []string) error {
outputFile, err := os.Create(file)
if err != nil {
return err
}
defer outputFile.Close() // Ensure file descriptor is closed

lz4Writer := lz4.NewWriter(outputFile)
defer lz4Writer.Close() // Ensure LZ4 writer is closed

// Join events with newlines and write
data := strings.Join(events, "\n") + "\n"
_, err = lz4Writer.Write([]byte(data))
if err != nil {
return err
}

// Sync to disk before closing
return outputFile.Sync()
}

func addLines(file string, from int, to int) int {
f, err := os.OpenFile(file, os.O_APPEND|os.O_WRONLY, perm)
if err != nil {
Expand Down Expand Up @@ -488,6 +521,46 @@ func TestReadContinue(t *testing.T) {
}, blockSize+blockSize-processed, "dirty")
}

// TestReadCompressed test if works of compressed file
func TestReadCompressed(t *testing.T) {
eventCount := 5
events := make([]string, 0)

run(&test.Case{
Prepare: func() {
for i := 0; i < eventCount; i++ {
events = append(events, fmt.Sprintf(`{"field":"value_%d"}`, i))
}
},
Act: func(p *pipeline.Pipeline) {
file := createTempLZ4File()
addEventsToLZ4File(file, events)
},
Assert: func(p *pipeline.Pipeline) {
assert.Equal(t, eventCount, p.GetEventsTotal(), "wrong event count")
for i, s := range events {
assert.Equal(t, s, p.GetEventLogItem(i), "wrong event")
}
},
}, eventCount)
}

// TestReadCompressedEmpty test if works of empty compressed file
func TestReadCompressedEmpty(t *testing.T) {
eventCount := 0

run(&test.Case{
Prepare: func() {
},
Act: func(p *pipeline.Pipeline) {
createTempLZ4File()
},
Assert: func(p *pipeline.Pipeline) {
assert.Equal(t, eventCount, p.GetEventsTotal(), "wrong event count")
},
}, eventCount)
}

// TestOffsetsSaveSimple tests if offsets saving works right in the simple case
func TestOffsetsSaveSimple(t *testing.T) {
eventCount := 5
Expand Down
84 changes: 70 additions & 14 deletions plugin/input/file/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/xtime"
"go.uber.org/atomic"
)

Expand All @@ -24,9 +25,10 @@ type offsetDB struct {
}

type inodeOffsets struct {
filename string
sourceID pipeline.SourceID
streams map[pipeline.StreamName]int64
filename string
sourceID pipeline.SourceID
streams map[pipeline.StreamName]int64
lastReadTimestamp int64
}

type (
Expand Down Expand Up @@ -88,6 +90,7 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
filename := ""
inodeStr := ""
sourceIDStr := ""
lastReadTimestampStr := ""
var err error

filename, content, err = o.parseLine(content, "- file: ")
Expand All @@ -103,6 +106,11 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
return "", fmt.Errorf("can't parse source_id: %w", err)
}

lastReadTimestampStr, content, err = o.parseOptionalLine(content, " last_read_timestamp: ")
if err != nil {
return "", fmt.Errorf("can't parse last_read_timestamp: %w", err)
}

sysInode, err := strconv.ParseUint(inodeStr, 10, 64)
if err != nil {
return "", fmt.Errorf("wrong offsets format, can't parse inode: %s: %w", inodeStr, err)
Expand All @@ -120,10 +128,21 @@ func (o *offsetDB) parseOne(content string, offsets fpOffsets) (string, error) {
return "", fmt.Errorf("wrong offsets format, duplicate inode %d", inode)
}

var lastReadTimestampVal int64
if lastReadTimestampStr != "" {
lastReadTimestampVal, err = strconv.ParseInt(lastReadTimestampStr, 10, 64)
if err != nil {
return "", fmt.Errorf("invalid timestamp format %q: %w", lastReadTimestampStr, err)
}
} else {
lastReadTimestampVal = xtime.GetInaccurateUnixNano()
}

offsets[fp] = &inodeOffsets{
streams: make(map[pipeline.StreamName]int64),
filename: filename,
sourceID: fp,
streams: make(map[pipeline.StreamName]int64),
filename: filename,
sourceID: fp,
lastReadTimestamp: lastReadTimestampVal,
}

return o.parseStreams(content, offsets[fp].streams)
Expand Down Expand Up @@ -172,21 +191,54 @@ func (o *offsetDB) parseStreams(content string, streams streamsOffsets) (string,
return content, nil
}

func (o *offsetDB) parseLine(content string, start string) (string, string, error) {
l := len(start)
func (o *offsetDB) parseLine(content string, prefix string) (string, string, error) {
if content == "" {
return "", "", fmt.Errorf("unexpected end of content while looking for %q", prefix)
}

linePos := strings.IndexByte(content, '\n')
if linePos < 0 {
return "", "", fmt.Errorf("wrong offsets format, no nl: %q", content)
return "", "", fmt.Errorf("no newline found in content")
}

line := content[:linePos]
remaining := content[linePos+1:]

if len(line) < len(prefix) || line[:len(prefix)] != prefix {
return "", "", fmt.Errorf("expected prefix %q, got %q", prefix, safeSubstring(line, len(prefix)))
}

return line[len(prefix):], remaining, nil
}

func (o *offsetDB) parseOptionalLine(content string, prefix string) (string, string, error) {
if content == "" {
return "", content, nil // No content, return empty value
}

if len(content) >= len(prefix) && content[:len(prefix)] == prefix {
return o.parseLine(content, prefix)
}

if strings.HasPrefix(content, " streams:") || content[0] == '-' {
return "", content, nil
}
line := content[0:linePos]

content = content[linePos+1:]
if linePos < l || line[0:l] != start {
return "", "", fmt.Errorf("wrong offsets file format expected=%q, got=%q", start, line[0:l])
linePos := strings.IndexByte(content, '\n')
var nextLine string
if linePos >= 0 {
nextLine = content[:linePos]
} else {
nextLine = content
}
return "", "", fmt.Errorf("unexpected line format, expected %q or streams section, got %q", prefix, safeSubstring(nextLine, len(prefix)))
}

return line[l:], content, nil
func safeSubstring(s string, length int) string {
if len(s) < length {
return s
}
return s[:length]
}

func (o *offsetDB) save(jobs map[pipeline.SourceID]*Job, mu *sync.RWMutex) {
Expand Down Expand Up @@ -234,6 +286,10 @@ func (o *offsetDB) save(jobs map[pipeline.SourceID]*Job, mu *sync.RWMutex) {
o.buf = strconv.AppendUint(o.buf, uint64(job.sourceID), 10)
o.buf = append(o.buf, '\n')

o.buf = append(o.buf, " last_read_timestamp: "...)
o.buf = strconv.AppendInt(o.buf, job.eofReadInfo.getUnixNanoTimestamp(), 10)
o.buf = append(o.buf, '\n')

o.buf = append(o.buf, " streams:\n"...)
for _, strOff := range job.offsets {
o.buf = append(o.buf, " "...)
Expand Down
Loading