package syslog import ( "bufio" "crypto/tls" "errors" "net" "strings" "sync" "time" "gopkg.in/mcuadros/go-syslog.v2/format" ) var ( RFC3164 = &format.RFC3164{} // RFC3164: http://www.ietf.org/rfc/rfc3164.txt RFC5424 = &format.RFC5424{} // RFC5424: http://www.ietf.org/rfc/rfc5424.txt RFC6587 = &format.RFC6587{} // RFC6587: http://www.ietf.org/rfc/rfc6587.txt - octet counting variant Automatic = &format.Automatic{} // Automatically identify the format ) const ( datagramChannelBufferSize = 10 datagramReadBufferSize = 64 * 1024 ) // A function type which gets the TLS peer name from the connection. Can return // ok=false to terminate the connection type TlsPeerNameFunc func(tlsConn *tls.Conn) (tlsPeer string, ok bool) type Server struct { listeners []net.Listener connections []net.PacketConn wait sync.WaitGroup doneTcp chan bool datagramChannelSize int datagramChannel chan DatagramMessage format format.Format handler Handler lastError error readTimeoutMilliseconds int64 tlsPeerNameFunc TlsPeerNameFunc datagramPool sync.Pool } //NewServer returns a new Server func NewServer() *Server { return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{ New: func() interface{} { return make([]byte, 65536) }, }, datagramChannelSize: datagramChannelBufferSize, } } //Sets the syslog format (RFC3164 or RFC5424 or RFC6587) func (s *Server) SetFormat(f format.Format) { s.format = f } //Sets the handler, this handler with receive every syslog entry func (s *Server) SetHandler(handler Handler) { s.handler = handler } //Sets the connection timeout for TCP connections, in milliseconds func (s *Server) SetTimeout(millseconds int64) { s.readTimeoutMilliseconds = millseconds } // Set the function that extracts a TLS peer name from the TLS connection func (s *Server) SetTlsPeerNameFunc(tlsPeerNameFunc TlsPeerNameFunc) { s.tlsPeerNameFunc = tlsPeerNameFunc } func (s *Server) SetDatagramChannelSize(size int) { s.datagramChannelSize = size } // Default TLS peer name function - returns the CN of the certificate func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) { state := tlsConn.ConnectionState() if len(state.PeerCertificates) <= 0 { return "", false } cn := state.PeerCertificates[0].Subject.CommonName return cn, true } //Configure the server for listen on an UDP addr func (s *Server) ListenUDP(addr string) error { udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return err } connection, err := net.ListenUDP("udp", udpAddr) if err != nil { return err } connection.SetReadBuffer(datagramReadBufferSize) s.connections = append(s.connections, connection) return nil } //Configure the server for listen on an unix socket func (s *Server) ListenUnixgram(addr string) error { unixAddr, err := net.ResolveUnixAddr("unixgram", addr) if err != nil { return err } connection, err := net.ListenUnixgram("unixgram", unixAddr) if err != nil { return err } connection.SetReadBuffer(datagramReadBufferSize) s.connections = append(s.connections, connection) return nil } //Configure the server for listen on a TCP addr func (s *Server) ListenTCP(addr string) error { tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return err } listener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { return err } s.doneTcp = make(chan bool) s.listeners = append(s.listeners, listener) return nil } //Configure the server for listen on a TCP addr for TLS func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error { listener, err := tls.Listen("tcp", addr, config) if err != nil { return err } s.doneTcp = make(chan bool) s.listeners = append(s.listeners, listener) return nil } //Starts the server, all the go routines goes to live func (s *Server) Boot() error { if s.format == nil { return errors.New("please set a valid format") } if s.handler == nil { return errors.New("please set a valid handler") } for _, listener := range s.listeners { s.goAcceptConnection(listener) } if len(s.connections) > 0 { s.goParseDatagrams() } for _, connection := range s.connections { s.goReceiveDatagrams(connection) } return nil } func (s *Server) goAcceptConnection(listener net.Listener) { s.wait.Add(1) go func(listener net.Listener) { loop: for { select { case <-s.doneTcp: break loop default: } connection, err := listener.Accept() if err != nil { continue } s.goScanConnection(connection) } s.wait.Done() }(listener) } func (s *Server) goScanConnection(connection net.Conn) { scanner := bufio.NewScanner(connection) if sf := s.format.GetSplitFunc(); sf != nil { scanner.Split(sf) } remoteAddr := connection.RemoteAddr() var client string if remoteAddr != nil { client = remoteAddr.String() } tlsPeer := "" if tlsConn, ok := connection.(*tls.Conn); ok { // Handshake now so we get the TLS peer information if err := tlsConn.Handshake(); err != nil { connection.Close() return } if s.tlsPeerNameFunc != nil { var ok bool tlsPeer, ok = s.tlsPeerNameFunc(tlsConn) if !ok { connection.Close() return } } } var scanCloser *ScanCloser scanCloser = &ScanCloser{scanner, connection} s.wait.Add(1) go s.scan(scanCloser, client, tlsPeer) } func (s *Server) scan(scanCloser *ScanCloser, client string, tlsPeer string) { loop: for { select { case <-s.doneTcp: break loop default: } if s.readTimeoutMilliseconds > 0 { scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond)) } if scanCloser.Scan() { s.parser([]byte(scanCloser.Text()), client, tlsPeer) } else { break loop } } scanCloser.closer.Close() s.wait.Done() } func (s *Server) parser(line []byte, client string, tlsPeer string) { parser := s.format.GetParser(line) err := parser.Parse() if err != nil { s.lastError = err } logParts := parser.Dump() logParts["client"] = client if logParts["hostname"] == "" && (s.format == RFC3164 || s.format == Automatic) { if i := strings.Index(client, ":"); i > 1 { logParts["hostname"] = client[:i] } else { logParts["hostname"] = client } } logParts["tls_peer"] = tlsPeer s.handler.Handle(logParts, int64(len(line)), err) } //Returns the last error func (s *Server) GetLastError() error { return s.lastError } //Kill the server func (s *Server) Kill() error { for _, connection := range s.connections { err := connection.Close() if err != nil { return err } } for _, listener := range s.listeners { err := listener.Close() if err != nil { return err } } // Only need to close channel once to broadcast to all waiting if s.doneTcp != nil { close(s.doneTcp) } if s.datagramChannel != nil { close(s.datagramChannel) } return nil } //Waits until the server stops func (s *Server) Wait() { s.wait.Wait() } type TimeoutCloser interface { Close() error SetReadDeadline(t time.Time) error } type ScanCloser struct { *bufio.Scanner closer TimeoutCloser } type DatagramMessage struct { message []byte client string } func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) { s.wait.Add(1) go func() { defer s.wait.Done() for { buf := s.datagramPool.Get().([]byte) n, addr, err := packetconn.ReadFrom(buf) if err == nil { // Ignore trailing control characters and NULs for ; (n > 0) && (buf[n-1] < 32); n-- { } if n > 0 { var address string if addr != nil { address = addr.String() } s.datagramChannel <- DatagramMessage{buf[:n], address} } } else { // there has been an error. Either the server has been killed // or may be getting a transitory error due to (e.g.) the // interface being shutdown in which case sleep() to avoid busy wait. opError, ok := err.(*net.OpError) if (ok) && !opError.Temporary() && !opError.Timeout() { return } time.Sleep(10 * time.Millisecond) } } }() } func (s *Server) goParseDatagrams() { s.datagramChannel = make(chan DatagramMessage, s.datagramChannelSize) s.wait.Add(1) go func() { defer s.wait.Done() for { select { case msg, ok := (<-s.datagramChannel): if !ok { return } if sf := s.format.GetSplitFunc(); sf != nil { if _, token, err := sf(msg.message, true); err == nil { s.parser(token, msg.client, "") } } else { s.parser(msg.message, msg.client, "") } s.datagramPool.Put(msg.message[:cap(msg.message)]) } } }() }