forked from mozilla-services/heka
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request mozilla-services#1947 from Solinea/dev
added support for 'replace_dot_with' flag in ES encoders
- Loading branch information
Showing
5 changed files
with
41 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
# Tanguy Leroux ([email protected]) | ||
# Rob Miller ([email protected]) | ||
# Xavier Lange ([email protected]) | ||
# John Staford ([email protected]) | ||
# | ||
# ***** END LICENSE BLOCK *****/ | ||
|
||
|
@@ -88,12 +89,17 @@ func writeQuotedString(b *bytes.Buffer, str string) { | |
|
||
} | ||
|
||
func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool) { | ||
func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool, replaceDotsWith string) { | ||
if !first { | ||
b.WriteString(`,`) | ||
} | ||
|
||
writeQuotedString(b, f.GetName()) | ||
if replaceDotsWith != "." { | ||
writeQuotedString(b, strings.Replace(f.GetName(), ".", replaceDotsWith, -1)) | ||
} else { | ||
writeQuotedString(b, f.GetName()) | ||
} | ||
|
||
b.WriteString(`:`) | ||
|
||
switch f.GetValueType() { | ||
|
@@ -238,6 +244,7 @@ type ESJsonEncoder struct { | |
fieldMappings *ESFieldMappings | ||
dynamicFields []string | ||
usesDynamicFields bool | ||
replaceDotsWith string | ||
} | ||
|
||
// Heka fields to ElasticSearch mapping | ||
|
@@ -275,6 +282,8 @@ type ESJsonEncoderConfig struct { | |
// Dynamic fields to be included. Non-empty value raises an error if | ||
// 'DynamicFields' is not in Fields []string property. | ||
DynamicFields []string `toml:"dynamic_fields"` | ||
// Replace dot (".") characters in JSON field names with a substitute string. | ||
ReplaceDotsWith string `toml:"replace_dots_with"` | ||
} | ||
|
||
func (e *ESJsonEncoder) ConfigStruct() interface{} { | ||
|
@@ -295,6 +304,7 @@ func (e *ESJsonEncoder) ConfigStruct() interface{} { | |
Pid: "Pid", | ||
Hostname: "Hostname", | ||
}, | ||
ReplaceDotsWith: ".", | ||
} | ||
|
||
config.Fields = fieldChoices[:] | ||
|
@@ -307,6 +317,7 @@ func (e *ESJsonEncoder) Init(config interface{}) (err error) { | |
e.fields = conf.Fields | ||
e.timestampFormat = conf.Timestamp | ||
e.rawBytesFields = conf.RawBytesFields | ||
e.replaceDotsWith = conf.ReplaceDotsWith | ||
e.coord = &ElasticSearchCoordinates{ | ||
Index: conf.Index, | ||
Type: conf.TypeName, | ||
|
@@ -392,7 +403,7 @@ func (e *ESJsonEncoder) Encode(pack *PipelinePack) (output []byte, err error) { | |
} | ||
} | ||
} | ||
writeField(first, &buf, field, raw) | ||
writeField(first, &buf, field, raw, e.replaceDotsWith) | ||
} | ||
} | ||
default: | ||
|
@@ -416,6 +427,7 @@ type ESLogstashV0Encoder struct { | |
coord *ElasticSearchCoordinates | ||
dynamicFields []string | ||
useMessageType bool | ||
replaceDotsWith string | ||
} | ||
|
||
type ESLogstashV0EncoderConfig struct { | ||
|
@@ -440,6 +452,8 @@ type ESLogstashV0EncoderConfig struct { | |
// Dynamic fields to be included. Non-empty value raises an error if | ||
// 'DynamicFields' is not in Fields []string property. | ||
DynamicFields []string `toml:"dynamic_fields"` | ||
// Replace dot (".") characters in JSON field names with a substitute string. | ||
ReplaceDotsWith string `toml:"replace_dots_with"` | ||
} | ||
|
||
func (e *ESLogstashV0Encoder) ConfigStruct() interface{} { | ||
|
@@ -451,6 +465,7 @@ func (e *ESLogstashV0Encoder) ConfigStruct() interface{} { | |
UseMessageType: false, | ||
ESIndexFromTimestamp: false, | ||
Id: "", | ||
ReplaceDotsWith: ".", | ||
} | ||
|
||
config.Fields = fieldChoices[:] | ||
|
@@ -464,6 +479,7 @@ func (e *ESLogstashV0Encoder) Init(config interface{}) (err error) { | |
e.fields = conf.Fields | ||
e.timestampFormat = conf.Timestamp | ||
e.useMessageType = conf.UseMessageType | ||
e.replaceDotsWith = conf.ReplaceDotsWith | ||
e.coord = &ElasticSearchCoordinates{ | ||
Index: conf.Index, | ||
Type: conf.TypeName, | ||
|
@@ -563,7 +579,7 @@ func (e *ESLogstashV0Encoder) Encode(pack *PipelinePack) (output []byte, err err | |
} | ||
} | ||
} | ||
writeField(firstfield, &buf, field, raw) | ||
writeField(firstfield, &buf, field, raw, e.replaceDotsWith) | ||
firstfield = false | ||
} | ||
} | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
# | ||
# Contributor(s): | ||
# Tanguy Leroux ([email protected]) | ||
# John Stanford ([email protected]) | ||
# | ||
# ***** END LICENSE BLOCK *****/ | ||
|
||
|
@@ -74,6 +75,8 @@ func getTestMessageWithFunnyFields() *message.Message { | |
field12.AddValue("jkl;") | ||
field12.AddValue("push") | ||
field12.AddValue("pull") | ||
field13 := message.NewFieldInit("test.dotted.field.name.string", message.Field_STRING, "") | ||
field13.AddValue("{\"asdf\":123}") | ||
|
||
msg := &message.Message{} | ||
msg.SetType("TEST") | ||
|
@@ -102,6 +105,7 @@ func getTestMessageWithFunnyFields() *message.Message { | |
msg.AddField(field10) | ||
msg.AddField(field11) | ||
msg.AddField(field12) | ||
msg.AddField(field13) | ||
|
||
return msg | ||
} | ||
|
@@ -164,7 +168,9 @@ func ESEncodersSpec(c gs.Context) { | |
"test_raw_field_bytes", | ||
"test_raw_field_string_array", | ||
"test_raw_field_bytes_array", | ||
"test.dotted.field.name.string", | ||
} | ||
config.ReplaceDotsWith = "_" | ||
|
||
c.Specify("Should properly encode a message", func() { | ||
err := encoder.Init(config) | ||
|
@@ -237,6 +243,7 @@ func ESEncodersSpec(c gs.Context) { | |
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) | ||
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0) | ||
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) | ||
c.Expect(decoded["@fields"].(map[string]interface{})["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0) | ||
}) | ||
|
||
c.Specify("encodes w/ a different timestamp format", func() { | ||
|
@@ -292,7 +299,7 @@ func ESEncodersSpec(c gs.Context) { | |
c.Expect(len(decoded), gs.Equals, 10) | ||
fieldsValInterface := decoded["@fields"] | ||
fieldsVal := fieldsValInterface.(map[string]interface{}) | ||
c.Expect(len(fieldsVal), gs.Equals, 13) | ||
c.Expect(len(fieldsVal), gs.Equals, 14) | ||
}) | ||
}) | ||
}) | ||
|
@@ -305,7 +312,9 @@ func ESEncodersSpec(c gs.Context) { | |
"test_raw_field_bytes", | ||
"test_raw_field_string_array", | ||
"test_raw_field_bytes_array", | ||
"test.dotted.field.name.string", | ||
} | ||
config.ReplaceDotsWith = "_" | ||
|
||
c.Specify("Should properly encode a message", func() { | ||
err := encoder.Init(config) | ||
|
@@ -377,6 +386,7 @@ func ESEncodersSpec(c gs.Context) { | |
c.Expect(decoded["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) | ||
c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0) | ||
c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0) | ||
c.Expect(decoded["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0) | ||
}) | ||
|
||
c.Specify("Produces valid JSON when DynamicFields is first configured field and message has no fields", func() { | ||
|
@@ -498,7 +508,7 @@ func ESEncodersSpec(c gs.Context) { | |
decoded := make(map[string]interface{}) | ||
err = json.Unmarshal([]byte(lines[1]), &decoded) | ||
c.Assume(err, gs.IsNil) | ||
c.Expect(len(decoded), gs.Equals, 22) // 9 base fields and 13 dynamic fields. | ||
c.Expect(len(decoded), gs.Equals, 23) // 9 base fields and 14 dynamic fields. | ||
}) | ||
|
||
c.Specify("when dynamic_fields is specified", func() { | ||
|