Skip to content

Commit

Permalink
Updates for making stuff work.
Browse files Browse the repository at this point in the history
  • Loading branch information
bradhe committed Aug 18, 2015
1 parent 7940e45 commit 52e695e
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
3 changes: 2 additions & 1 deletion kafka_stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package epee

import (
"fmt"
"github.com/Shopify/sarama"
"log"
"sync"
Expand Down Expand Up @@ -76,7 +77,7 @@ func (ks *kafkaStreamImpl) Consume(topic string, partition int, offset int64) (*
}
}

ch := make(chan Message, 0)
ch := make(chan *Message, 0)
consumer := newStreamConsumer(ch, partitionConsumer)

// We have to acquire the lock to modify the map.
Expand Down
7 changes: 4 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

const (
// Number of seconds to wait between flush checks.
DefaultMonitorTimeout = 10 * time.Second
DefaultMonitorTimeout = 5 * time.Second
)

func offsetPath(clientID string, topic string, partition int) string {
Expand Down Expand Up @@ -63,7 +63,7 @@ func (q *Stream) dispatch(proc StreamProcessor, t reflect.Type, message *Message
return proc.Process(message.Offset, obj)
}

func (q *Stream) runConsumer(topic string, partition int, src <-chan Message, proc StreamProcessor) {
func (q *Stream) runConsumer(topic string, partition int, src <-chan *Message, proc StreamProcessor) {
for message := range src {
t, ok := q.types[message.Topic]

Expand All @@ -72,7 +72,7 @@ func (q *Stream) runConsumer(topic string, partition int, src <-chan Message, pr
log.Panicf("Failed to find registerd type for topic %s", message.Topic)
}

q.dispatch(proc, t, &message)
q.dispatch(proc, t, message)
}
}

Expand Down Expand Up @@ -155,6 +155,7 @@ func (q *Stream) flushAll() {
} else {
// This is kind of hacky...but it generates the correct path based on
// the key in the proxies hash. Le sigh.
log.Printf("INFO: Flushing successful. Setting %s to %d", keyPath(q.clientID, key), proxy.LastOffset())
q.zk.Set(keyPath(q.clientID, key), proxy.LastOffset())
}
}
Expand Down
9 changes: 4 additions & 5 deletions stream_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"github.com/Shopify/sarama"
"log"
"sync"
"time"
)

type streamConsumer struct {
Expand All @@ -18,7 +17,7 @@ type streamConsumer struct {
partitionConsumer sarama.PartitionConsumer

// The channel to deliver messages to
dst chan Message
dst chan *Message
}

func (sc *streamConsumer) run() {
Expand All @@ -41,7 +40,7 @@ func (sc *streamConsumer) run() {
case message := <-messages:
// We'll instantiate one of our own messages and keep it 'round incase we
// want to keep goin'.
sc.dst <- Message{
sc.dst <- &Message{
Offset: message.Offset,
Value: message.Value,
Topic: message.Topic,
Expand All @@ -57,7 +56,7 @@ func (sc *streamConsumer) run() {
}
}

func (sc *streamConsumer) Messages() <-chan Message {
func (sc *streamConsumer) Messages() <-chan *Message {
return sc.dst
}

Expand All @@ -74,7 +73,7 @@ func (sc *streamConsumer) Close() {
sc.wg.Wait()
}

func newStreamConsumer(ch chan Message, partitionConsumer sarama.PartitionConsumer) *streamConsumer {
func newStreamConsumer(ch chan *Message, partitionConsumer sarama.PartitionConsumer) *streamConsumer {
sc := new(streamConsumer)
sc.dst = ch
sc.partitionConsumer = partitionConsumer
Expand Down

0 comments on commit 52e695e

Please sign in to comment.