Skip to content

Commit 85618f0

Browse files
authored
Merge pull request #12 from cosmo0920/use-logrus-logger
Use logrus logger instead of fmt
2 parents bd98b5a + 2fcaa53 commit 85618f0

File tree

854 files changed

+385002
-27
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

854 files changed

+385002
-27
lines changed

Gopkg.lock

Lines changed: 74 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

out_s3.go

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@ import "github.com/aws/aws-sdk-go/aws/awserr"
77
import "github.com/aws/aws-sdk-go/aws/session"
88
import "github.com/aws/aws-sdk-go/service/s3"
99
import "github.com/aws/aws-sdk-go/service/s3/s3manager"
10+
import log "github.com/sirupsen/logrus"
1011
import "github.com/prometheus/common/version"
1112

1213
import (
1314
"C"
1415
"bytes"
1516
"compress/gzip"
16-
"fmt"
1717
"os"
1818
"path/filepath"
1919
"strconv"
@@ -23,12 +23,19 @@ import (
2323
)
2424

2525
var plugin GoOutputPlugin = &fluentPlugin{}
26+
var logger *log.Logger
27+
28+
func init() {
29+
logLevel, _ := log.ParseLevel("info")
30+
logger = newLogger(logLevel)
31+
}
2632

2733
type s3operator struct {
2834
bucket string
2935
prefix string
3036
uploader *s3manager.Uploader
3137
compressFormat format
38+
logger *log.Logger
3239
}
3340

3441
type GoOutputPlugin interface {
@@ -130,15 +137,19 @@ func ensureBucket(session *session.Session, bucket, region *string) (bool, error
130137
}
131138
}
132139

133-
_, err := svc.CreateBucket(input)
140+
result, err := svc.CreateBucket(input)
141+
logger.Tracef("CreateBucket request result is: %s, err: %s", result, err)
134142
if err != nil {
135143
if aerr, ok := err.(awserr.Error); ok {
136144
switch aerr.Code() {
137145
case s3.ErrCodeBucketAlreadyExists:
146+
logger.Tracef("Bucket(%s) is already exists.", *bucket)
138147
return true, nil
139148
case s3.ErrCodeBucketAlreadyOwnedByYou:
149+
logger.Tracef("Bucket(%s) is already owned by you.", *bucket)
140150
return true, nil
141151
default:
152+
logger.Tracef("CreateBucket is failed with: %s", aerr.Error())
142153
return false, aerr
143154
}
144155
} else {
@@ -148,6 +159,12 @@ func ensureBucket(session *session.Session, bucket, region *string) (bool, error
148159
return true, nil
149160
}
150161

162+
func newLogger(logLevel log.Level) *log.Logger {
163+
logger := log.New()
164+
logger.Level = logLevel
165+
return logger
166+
}
167+
151168
func newS3Output(ctx unsafe.Pointer, operatorID int) (*s3operator, error) {
152169
// Example to retrieve an optional configuration parameter
153170
credential := plugin.PluginConfigKey(ctx, "Credential")
@@ -159,21 +176,24 @@ func newS3Output(ctx unsafe.Pointer, operatorID int) (*s3operator, error) {
159176
compress := plugin.PluginConfigKey(ctx, "Compress")
160177
endpoint := plugin.PluginConfigKey(ctx, "Endpoint")
161178
autoCreateBucket := plugin.PluginConfigKey(ctx, "AutoCreateBucket")
179+
logLevel := plugin.PluginConfigKey(ctx, "LogLevel")
162180

163-
config, err := getS3Config(accessKeyID, secretAccessKey, credential, s3prefix, bucket, region, compress, endpoint, autoCreateBucket)
181+
config, err := getS3Config(accessKeyID, secretAccessKey, credential, s3prefix, bucket, region, compress, endpoint, autoCreateBucket, logLevel)
164182
if err != nil {
165183
return nil, err
166184
}
167-
fmt.Printf("[flb-go %d] Starting fluent-bit-go-s3: %s\n", operatorID, version.Info())
168-
fmt.Printf("[flb-go %d] plugin credential parameter = '%s'\n", operatorID, credential)
169-
fmt.Printf("[flb-go %d] plugin accessKeyID parameter = '%s'\n", operatorID, accessKeyID)
170-
fmt.Printf("[flb-go %d] plugin secretAccessKey parameter = '%s'\n", operatorID, secretAccessKey)
171-
fmt.Printf("[flb-go %d] plugin bucket parameter = '%s'\n", operatorID, bucket)
172-
fmt.Printf("[flb-go %d] plugin s3prefix parameter = '%s'\n", operatorID, s3prefix)
173-
fmt.Printf("[flb-go %d] plugin region parameter = '%s'\n", operatorID, region)
174-
fmt.Printf("[flb-go %d] plugin compress parameter = '%s'\n", operatorID, compress)
175-
fmt.Printf("[flb-go %d] plugin endpoint parameter = '%s'\n", operatorID, endpoint)
176-
fmt.Printf("[flb-go %d] plugin autoCreateBucket parameter = '%s'\n", operatorID, autoCreateBucket)
185+
logger := newLogger(config.logLevel)
186+
187+
logger.Infof("[flb-go %d] Starting fluent-bit-go-s3: %v\n", operatorID, version.Info())
188+
logger.Infof("[flb-go %d] plugin credential parameter = '%s'\n", operatorID, credential)
189+
logger.Infof("[flb-go %d] plugin accessKeyID parameter = '%s'\n", operatorID, accessKeyID)
190+
logger.Infof("[flb-go %d] plugin secretAccessKey parameter = '%s'\n", operatorID, secretAccessKey)
191+
logger.Infof("[flb-go %d] plugin bucket parameter = '%s'\n", operatorID, bucket)
192+
logger.Infof("[flb-go %d] plugin s3prefix parameter = '%s'\n", operatorID, s3prefix)
193+
logger.Infof("[flb-go %d] plugin region parameter = '%s'\n", operatorID, region)
194+
logger.Infof("[flb-go %d] plugin compress parameter = '%s'\n", operatorID, compress)
195+
logger.Infof("[flb-go %d] plugin endpoint parameter = '%s'\n", operatorID, endpoint)
196+
logger.Infof("[flb-go %d] plugin autoCreateBucket parameter = '%s'\n", operatorID, autoCreateBucket)
177197

178198
cfg := aws.Config{
179199
Credentials: config.credentials,
@@ -202,6 +222,7 @@ func newS3Output(ctx unsafe.Pointer, operatorID int) (*s3operator, error) {
202222
prefix: *config.s3prefix,
203223
uploader: uploader,
204224
compressFormat: config.compress,
225+
logger: logger,
205226
}
206227

207228
return s3operator, nil
@@ -210,7 +231,7 @@ func newS3Output(ctx unsafe.Pointer, operatorID int) (*s3operator, error) {
210231

211232
func addS3Output(ctx unsafe.Pointer) error {
212233
operatorID := len(s3operators)
213-
fmt.Printf("[s3operator] id = %q\n", operatorID)
234+
logger.Infof("[s3operator] id = %d\n", operatorID)
214235
// Set the context to point to any Go variable
215236
output.FLBPluginSetContext(ctx, operatorID)
216237
operator, err := newS3Output(ctx, operatorID)
@@ -263,7 +284,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
263284

264285
line, err := createJSON(record)
265286
if err != nil {
266-
fmt.Printf("error creating message for S3: %v\n", err)
287+
s3operator.logger.Warnf("error creating message for S3: %v\n", err)
267288
continue
268289
}
269290
lines += line + "\n"
@@ -272,7 +293,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
272293
objectKey := GenerateObjectKey(s3operator, time.Now())
273294
err := plugin.Put(s3operator, objectKey, time.Now(), lines)
274295
if err != nil {
275-
fmt.Printf("error sending message for S3: %v\n", err)
296+
s3operator.logger.Warnf("error sending message for S3: %v\n", err)
276297
return output.FLB_RETRY
277298
}
278299

out_s3_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ type testFluentPlugin struct {
162162
compress string
163163
endpoint string
164164
autoCreateBucket string
165+
logLevel string
165166
records []testrecord
166167
position int
167168
events []*events
@@ -187,6 +188,8 @@ func (p *testFluentPlugin) PluginConfigKey(ctx unsafe.Pointer, key string) strin
187188
return p.endpoint
188189
case "AutoCreateBucket":
189190
return p.autoCreateBucket
191+
case "LogLevel":
192+
return p.logLevel
190193
}
191194
return "unknown-" + key
192195
}
@@ -246,7 +249,7 @@ func (c *testS3Credential) GetCredentials(accessID, secretkey, credential string
246249

247250
func TestPluginInitializationWithStaticCredentials(t *testing.T) {
248251
s3Creds = &testS3Credential{}
249-
_, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "", "", "false")
252+
_, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "", "", "false", "info")
250253
if err != nil {
251254
t.Fatalf("failed test %#v", err)
252255
}
@@ -259,14 +262,15 @@ func TestPluginInitializationWithStaticCredentials(t *testing.T) {
259262
compress: "",
260263
endpoint: "",
261264
autoCreateBucket: "false",
265+
logLevel: "info",
262266
}
263267
res := FLBPluginInit(unsafe.Pointer(&plugin))
264268
assert.Equal(t, output.FLB_OK, res)
265269
}
266270

267271
func TestPluginInitializationWithSharedCredentials(t *testing.T) {
268272
s3Creds = &testS3Credential{}
269-
_, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "", "", "false")
273+
_, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "", "", "false", "info")
270274
if err != nil {
271275
t.Fatalf("failed test %#v", err)
272276
}
@@ -278,6 +282,7 @@ func TestPluginInitializationWithSharedCredentials(t *testing.T) {
278282
compress: "",
279283
endpoint: "",
280284
autoCreateBucket: "false",
285+
logLevel: "info",
281286
}
282287
res := FLBPluginInit(unsafe.Pointer(&plugin))
283288
assert.Equal(t, output.FLB_OK, res)

s3.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package main
22

33
import "github.com/aws/aws-sdk-go/aws"
44
import "github.com/aws/aws-sdk-go/aws/credentials"
5+
import log "github.com/sirupsen/logrus"
56

67
import (
78
"fmt"
@@ -23,6 +24,7 @@ type s3Config struct {
2324
region *string
2425
compress format
2526
endpoint string
27+
logLevel log.Level
2628
autoCreateBucket bool
2729
}
2830

@@ -62,7 +64,7 @@ func (c *s3PluginConfig) GetCredentials(accessKeyID, secretKey, credential strin
6264
return nil, fmt.Errorf("Failed to create credentials")
6365
}
6466

65-
func getS3Config(accessID, secretKey, credential, s3prefix, bucket, region, compress, endpoint, autoCreateBucket string) (*s3Config, error) {
67+
func getS3Config(accessID, secretKey, credential, s3prefix, bucket, region, compress, endpoint, autoCreateBucket, logLevel string) (*s3Config, error) {
6668
conf := &s3Config{}
6769
creds, err := s3Creds.GetCredentials(accessID, secretKey, credential)
6870
if err != nil {
@@ -106,5 +108,14 @@ func getS3Config(accessID, secretKey, credential, s3prefix, bucket, region, comp
106108
conf.autoCreateBucket = isAutoCreateBucket
107109
}
108110

111+
if logLevel == "" {
112+
logLevel = "info"
113+
}
114+
var level log.Level
115+
if level, err = log.ParseLevel(logLevel); err != nil {
116+
return nil, fmt.Errorf("invalid log level: %v", logLevel)
117+
}
118+
conf.logLevel = level
119+
109120
return conf, nil
110121
}

s3_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func TestGetS3ConfigStaticCredentials(t *testing.T) {
11-
conf, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "", "", "")
11+
conf, err := getS3Config("exampleaccessID", "examplesecretkey", "", "exampleprefix", "examplebucket", "exampleregion", "", "", "", "")
1212
if err != nil {
1313
t.Fatalf("failed test %#v", err)
1414
}
@@ -23,7 +23,7 @@ func TestGetS3ConfigStaticCredentials(t *testing.T) {
2323

2424
func TestGetS3ConfigSharedCredentials(t *testing.T) {
2525
s3Creds = &testS3Credential{}
26-
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "", "", "")
26+
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "", "", "", "")
2727
if err != nil {
2828
t.Fatalf("failed test %#v", err)
2929
}
@@ -38,7 +38,7 @@ func TestGetS3ConfigSharedCredentials(t *testing.T) {
3838

3939
func TestGetS3ConfigCompression(t *testing.T) {
4040
s3Creds = &testS3Credential{}
41-
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "", "")
41+
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "", "", "")
4242
if err != nil {
4343
t.Fatalf("failed test %#v", err)
4444
}
@@ -53,7 +53,7 @@ func TestGetS3ConfigCompression(t *testing.T) {
5353

5454
func TestGetS3ConfigEndpoint(t *testing.T) {
5555
s3Creds = &testS3Credential{}
56-
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "http://localhost:9000", "false")
56+
conf, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "http://localhost:9000", "false", "")
5757
if err != nil {
5858
t.Fatalf("failed test %#v", err)
5959
}
@@ -69,7 +69,7 @@ func TestGetS3ConfigEndpoint(t *testing.T) {
6969

7070
func TestGetS3ConfigInvalidEndpoint(t *testing.T) {
7171
s3Creds = &testS3Credential{}
72-
_, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "https://your-bucketname.s3.amazonaws.com", "false")
72+
_, err := getS3Config("", "", "examplecredentials", "exampleprefix", "examplebucket", "exampleregion", "gzip", "https://your-bucketname.s3.amazonaws.com", "false", "")
7373
if err != nil {
7474
expected := errors.New("Endpoint is not supported for AWS S3. This parameter is intended for S3 compatible services. Use Region instead.")
7575
assert.Equal(t, expected, err)

0 commit comments

Comments
 (0)