Skip to content

Commit

Permalink
added support for 'replace_dot_with' floag in ES encoders
Browse files Browse the repository at this point in the history
  • Loading branch information
jxstanford committed Jun 10, 2016
1 parent d56f86c commit a6d2fa4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
38 changes: 34 additions & 4 deletions plugins/elasticsearch/encoders.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# Tanguy Leroux ([email protected])
# Rob Miller ([email protected])
# Xavier Lange ([email protected])
# John Staford ([email protected])
#
# ***** END LICENSE BLOCK *****/

Expand Down Expand Up @@ -41,6 +42,20 @@ func writeUTF16Escape(b *bytes.Buffer, c rune) {
b.WriteByte(lowerhex[c&0xF])
}

// replaceDots substitutes a string for all instances of '.' characters
// in another string.
func replaceDots(str string, sub string) (cname string) {
buf := bytes.Buffer{}
for _, r := range str {
if r == '.' {
buf.WriteString(sub)
} else {
buf.WriteRune(r)
}
}
return buf.String()
}

// Go json encoder will blow up on invalid utf8 so we use this custom json
// encoder. Also, go json encoder generates these funny \U escapes which I
// don't think are valid json.
Expand Down Expand Up @@ -88,12 +103,17 @@ func writeQuotedString(b *bytes.Buffer, str string) {

}

func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool) {
func writeField(first bool, b *bytes.Buffer, f *message.Field, raw bool, replaceDotsWith string) {
if !first {
b.WriteString(`,`)
}

writeQuotedString(b, f.GetName())
if replaceDotsWith != "." {
writeQuotedString(b, replaceDots(f.GetName(), replaceDotsWith))
} else {
writeQuotedString(b, f.GetName())
}

b.WriteString(`:`)

switch f.GetValueType() {
Expand Down Expand Up @@ -238,6 +258,7 @@ type ESJsonEncoder struct {
fieldMappings *ESFieldMappings
dynamicFields []string
usesDynamicFields bool
replaceDotsWith string
}

// Heka fields to ElasticSearch mapping
Expand Down Expand Up @@ -275,6 +296,8 @@ type ESJsonEncoderConfig struct {
// Dynamic fields to be included. Non-empty value raises an error if
// 'DynamicFields' is not in Fields []string property.
DynamicFields []string `toml:"dynamic_fields"`
// Replace dot (".") characters in JSON field names with a substitute string.
ReplaceDotsWith string `toml:"replace_dots_with"`
}

func (e *ESJsonEncoder) ConfigStruct() interface{} {
Expand All @@ -295,6 +318,7 @@ func (e *ESJsonEncoder) ConfigStruct() interface{} {
Pid: "Pid",
Hostname: "Hostname",
},
ReplaceDotsWith: ".",
}

config.Fields = fieldChoices[:]
Expand All @@ -307,6 +331,7 @@ func (e *ESJsonEncoder) Init(config interface{}) (err error) {
e.fields = conf.Fields
e.timestampFormat = conf.Timestamp
e.rawBytesFields = conf.RawBytesFields
e.replaceDotsWith = conf.ReplaceDotsWith
e.coord = &ElasticSearchCoordinates{
Index: conf.Index,
Type: conf.TypeName,
Expand Down Expand Up @@ -392,7 +417,7 @@ func (e *ESJsonEncoder) Encode(pack *PipelinePack) (output []byte, err error) {
}
}
}
writeField(first, &buf, field, raw)
writeField(first, &buf, field, raw, e.replaceDotsWith)
}
}
default:
Expand All @@ -416,6 +441,7 @@ type ESLogstashV0Encoder struct {
coord *ElasticSearchCoordinates
dynamicFields []string
useMessageType bool
replaceDotsWith string
}

type ESLogstashV0EncoderConfig struct {
Expand All @@ -440,6 +466,8 @@ type ESLogstashV0EncoderConfig struct {
// Dynamic fields to be included. Non-empty value raises an error if
// 'DynamicFields' is not in Fields []string property.
DynamicFields []string `toml:"dynamic_fields"`
// Replace dot (".") characters in JSON field names with a substitute string.
ReplaceDotsWith string `toml:"replace_dots_with"`
}

func (e *ESLogstashV0Encoder) ConfigStruct() interface{} {
Expand All @@ -451,6 +479,7 @@ func (e *ESLogstashV0Encoder) ConfigStruct() interface{} {
UseMessageType: false,
ESIndexFromTimestamp: false,
Id: "",
ReplaceDotsWith: ".",
}

config.Fields = fieldChoices[:]
Expand All @@ -464,6 +493,7 @@ func (e *ESLogstashV0Encoder) Init(config interface{}) (err error) {
e.fields = conf.Fields
e.timestampFormat = conf.Timestamp
e.useMessageType = conf.UseMessageType
e.replaceDotsWith = conf.ReplaceDotsWith
e.coord = &ElasticSearchCoordinates{
Index: conf.Index,
Type: conf.TypeName,
Expand Down Expand Up @@ -563,7 +593,7 @@ func (e *ESLogstashV0Encoder) Encode(pack *PipelinePack) (output []byte, err err
}
}
}
writeField(firstfield, &buf, field, raw)
writeField(firstfield, &buf, field, raw, e.replaceDotsWith)
firstfield = false
}
}
Expand Down
14 changes: 12 additions & 2 deletions plugins/elasticsearch/encoders_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#
# Contributor(s):
# Tanguy Leroux ([email protected])
# John Stanford ([email protected])
#
# ***** END LICENSE BLOCK *****/

Expand Down Expand Up @@ -74,6 +75,8 @@ func getTestMessageWithFunnyFields() *message.Message {
field12.AddValue("jkl;")
field12.AddValue("push")
field12.AddValue("pull")
field13 := message.NewFieldInit("test.dotted.field.name.string", message.Field_STRING, "")
field13.AddValue("{\"asdf\":123}")

msg := &message.Message{}
msg.SetType("TEST")
Expand Down Expand Up @@ -102,6 +105,7 @@ func getTestMessageWithFunnyFields() *message.Message {
msg.AddField(field10)
msg.AddField(field11)
msg.AddField(field12)
msg.AddField(field13)

return msg
}
Expand Down Expand Up @@ -164,7 +168,9 @@ func ESEncodersSpec(c gs.Context) {
"test_raw_field_bytes",
"test_raw_field_string_array",
"test_raw_field_bytes_array",
"test.dotted.field.name.string",
}
config.ReplaceDotsWith = "_"

c.Specify("Should properly encode a message", func() {
err := encoder.Init(config)
Expand Down Expand Up @@ -237,6 +243,7 @@ func ESEncodersSpec(c gs.Context) {
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0)
c.Expect(decoded["@fields"].(map[string]interface{})["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["@fields"].(map[string]interface{})["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0)
})

c.Specify("encodes w/ a different timestamp format", func() {
Expand Down Expand Up @@ -292,7 +299,7 @@ func ESEncodersSpec(c gs.Context) {
c.Expect(len(decoded), gs.Equals, 10)
fieldsValInterface := decoded["@fields"]
fieldsVal := fieldsValInterface.(map[string]interface{})
c.Expect(len(fieldsVal), gs.Equals, 13)
c.Expect(len(fieldsVal), gs.Equals, 14)
})
})
})
Expand All @@ -305,7 +312,9 @@ func ESEncodersSpec(c gs.Context) {
"test_raw_field_bytes",
"test_raw_field_string_array",
"test_raw_field_bytes_array",
"test.dotted.field.name.string",
}
config.ReplaceDotsWith = "_"

c.Specify("Should properly encode a message", func() {
err := encoder.Init(config)
Expand Down Expand Up @@ -377,6 +386,7 @@ func ESEncodersSpec(c gs.Context) {
c.Expect(decoded["test_raw_field_string_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[0].(map[string]interface{})["asdf"], gs.Equals, 123.0)
c.Expect(decoded["test_raw_field_bytes_array"].([]interface{})[1].(map[string]interface{})["jkl;"], gs.Equals, 123.0)
c.Expect(decoded["test_dotted_field_name_string"].(map[string]interface{})["asdf"], gs.Equals, 123.0)
})

c.Specify("Produces valid JSON when DynamicFields is first configured field and message has no fields", func() {
Expand Down Expand Up @@ -498,7 +508,7 @@ func ESEncodersSpec(c gs.Context) {
decoded := make(map[string]interface{})
err = json.Unmarshal([]byte(lines[1]), &decoded)
c.Assume(err, gs.IsNil)
c.Expect(len(decoded), gs.Equals, 22) // 9 base fields and 13 dynamic fields.
c.Expect(len(decoded), gs.Equals, 23) // 9 base fields and 14 dynamic fields.
})

c.Specify("when dynamic_fields is specified", func() {
Expand Down

0 comments on commit a6d2fa4

Please sign in to comment.