Skip to content

Commit

Permalink
Teach Vault how to register with Consul
Browse files Browse the repository at this point in the history
Vault will now register itself with Consul.  The active node can be found using `active.vault.service.consul`.  All standby vaults are available via `standby.vault.service.consul`.  All unsealed vaults are considered healthy and available via `vault.service.consul`.  Change in status and registration is event driven and should happen at the speed of a write to Consul (~network RTT + ~1x fsync(2)).

Healthy/active:

```
curl -X GET 'http://127.0.0.1:8500/v1/health/service/vault?pretty' && echo;
[
    {
        "Node": {
            "Node": "vm1",
            "Address": "127.0.0.1",
            "TaggedAddresses": {
                "wan": "127.0.0.1"
            },
            "CreateIndex": 3,
            "ModifyIndex": 20
        },
        "Service": {
            "ID": "vault:127.0.0.1:8200",
            "Service": "vault",
            "Tags": [
                "active"
            ],
            "Address": "127.0.0.1",
            "Port": 8200,
            "EnableTagOverride": false,
            "CreateIndex": 17,
            "ModifyIndex": 20
        },
        "Checks": [
            {
                "Node": "vm1",
                "CheckID": "serfHealth",
                "Name": "Serf Health Status",
                "Status": "passing",
                "Notes": "",
                "Output": "Agent alive and reachable",
                "ServiceID": "",
                "ServiceName": "",
                "CreateIndex": 3,
                "ModifyIndex": 3
            },
            {
                "Node": "vm1",
                "CheckID": "vault-sealed-check",
                "Name": "Vault Sealed Status",
                "Status": "passing",
                "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
                "Output": "",
                "ServiceID": "vault:127.0.0.1:8200",
                "ServiceName": "vault",
                "CreateIndex": 19,
                "ModifyIndex": 19
            }
        ]
    }
]
```

Healthy/standby:

```
[snip]
        "Service": {
            "ID": "vault:127.0.0.2:8200",
            "Service": "vault",
            "Tags": [
                "standby"
            ],
            "Address": "127.0.0.2",
            "Port": 8200,
            "EnableTagOverride": false,
            "CreateIndex": 17,
            "ModifyIndex": 20
        },
        "Checks": [
            {
                "Node": "vm2",
                "CheckID": "serfHealth",
                "Name": "Serf Health Status",
                "Status": "passing",
                "Notes": "",
                "Output": "Agent alive and reachable",
                "ServiceID": "",
                "ServiceName": "",
                "CreateIndex": 3,
                "ModifyIndex": 3
            },
            {
                "Node": "vm2",
                "CheckID": "vault-sealed-check",
                "Name": "Vault Sealed Status",
                "Status": "passing",
                "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
                "Output": "",
                "ServiceID": "vault:127.0.0.2:8200",
                "ServiceName": "vault",
                "CreateIndex": 19,
                "ModifyIndex": 19
            }
        ]
    }
]
```

Sealed:

```
        "Checks": [
            {
                "Node": "vm2",
                "CheckID": "serfHealth",
                "Name": "Serf Health Status",
                "Status": "passing",
                "Notes": "",
                "Output": "Agent alive and reachable",
                "ServiceID": "",
                "ServiceName": "",
                "CreateIndex": 3,
                "ModifyIndex": 3
            },
            {
                "Node": "vm2",
                "CheckID": "vault-sealed-check",
                "Name": "Vault Sealed Status",
                "Status": "critical",
                "Notes": "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
                "Output": "Vault Sealed",
                "ServiceID": "vault:127.0.0.2:8200",
                "ServiceName": "vault",
                "CreateIndex": 19,
                "ModifyIndex": 38
            }
        ]
```
  • Loading branch information
sean- committed Apr 26, 2016
1 parent 0d3ce59 commit c0bbeba
Show file tree
Hide file tree
Showing 4 changed files with 673 additions and 9 deletions.
6 changes: 6 additions & 0 deletions command/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ func (c *ServerCommand) Run(args []string) int {

if envAA := os.Getenv("VAULT_ADVERTISE_ADDR"); envAA != "" {
coreConfig.AdvertiseAddr = envAA
if consulBackend, ok := (backend).(*physical.ConsulBackend); ok {
consulBackend.UpdateAdvertiseAddr(envAA)
}
}

// Attempt to detect the advertise address, if possible
Expand All @@ -220,6 +223,9 @@ func (c *ServerCommand) Run(args []string) int {
c.Ui.Error("Failed to detect advertise address.")
} else {
coreConfig.AdvertiseAddr = advertise
if consulBackend, ok := (backend).(*physical.ConsulBackend); ok {
consulBackend.UpdateAdvertiseAddr(advertise)
}
}
}

Expand Down
293 changes: 285 additions & 8 deletions physical/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,62 @@ package physical
import (
"fmt"
"io/ioutil"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"

"crypto/tls"
"crypto/x509"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/go-cleanhttp"
)

const (
// checkJitterFactor specifies the jitter factor used to stagger checks
checkJitterFactor = 16

// checkMinBuffer specifies provides a guarantee that a check will not
// be executed too close to the TTL check timeout
checkMinBuffer = 100 * time.Millisecond

// defaultCheckTimeout changes the timeout of TTL checks
defaultCheckTimeout = 5 * time.Second

// defaultCheckInterval specifies the default interval used to send
// checks
defaultCheckInterval = 4 * time.Second

// defaultServiceName is the default Consul service name used when
// advertising a Vault instance.
defaultServiceName = "vault"
)

// ConsulBackend is a physical backend that stores data at specific
// prefix within Consul. It is used for most production situations as
// it allows Vault to run on multiple machines in a highly-available manner.
type ConsulBackend struct {
path string
client *api.Client
kv *api.KV
permitPool *PermitPool
path string
client *api.Client
kv *api.KV
permitPool *PermitPool
serviceLock sync.RWMutex
service *api.AgentServiceRegistration
sealedCheck *api.AgentCheckRegistration
advertiseAddr string
consulClientConf *api.Config
serviceName string
running bool
active bool
sealed bool
checkTimeout time.Duration
checkTimer *time.Timer
}

// newConsulBackend constructs a Consul backend using the given API client
Expand All @@ -43,6 +78,28 @@ func newConsulBackend(conf map[string]string) (Backend, error) {
path = strings.TrimPrefix(path, "/")
}

// Get the service name to advertise in Consul
service, ok := conf["service"]
if !ok {
service = defaultServiceName
}

checkTimeout := defaultCheckTimeout
checkTimeoutStr, ok := conf["check_timeout"]
if ok {
d, err := time.ParseDuration(checkTimeoutStr)
if err != nil {
return nil, err
}

min, _ := lib.DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor)
if d < min {
return nil, fmt.Errorf("Consul check_timeout must be greater than %v", min)
}

checkTimeout = d
}

// Configure the client
consulConf := api.DefaultConfig()

Expand Down Expand Up @@ -84,14 +141,234 @@ func newConsulBackend(conf map[string]string) (Backend, error) {

// Setup the backend
c := &ConsulBackend{
path: path,
client: client,
kv: client.KV(),
permitPool: NewPermitPool(maxParInt),
path: path,
client: client,
kv: client.KV(),
permitPool: NewPermitPool(maxParInt),
consulClientConf: consulConf,
serviceName: service,
checkTimeout: checkTimeout,
checkTimer: time.NewTimer(checkTimeout),
}
return c, nil
}

// UpdateAdvertiseAddr provides a pre-initialization hook for updating
// Consul's advertise address.
func (c *ConsulBackend) UpdateAdvertiseAddr(addr string) error {
if c.running {
return fmt.Errorf("service registration unable to update advertise address, backend already running")
}

url, err := url.Parse(addr)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse URL "%v": {{err}}`, addr), err)
}

_, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
}
_, err = strconv.ParseInt(portStr, 10, 0)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`updating advertise address failed to parse port "%v": {{err}}`, portStr), err)
}

c.advertiseAddr = addr
return nil
}

// serviceTags returns all of the relevant tags for Consul. Assumes
// c.serviceLock held for writing.
func serviceTags(active bool) []string {
activeTag := "standby"
if active {
activeTag = "active"
}
return []string{activeTag}
}

func (c *ConsulBackend) AdvertiseActive(active bool) error {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()

// Vault is still bootstrapping
if c.service == nil {
return nil
}

c.service.Tags = serviceTags(active)
agent := c.client.Agent()
if err := agent.ServiceRegister(c.service); err != nil {
return errwrap.Wrapf("service registration failed: {{err}}", err)
}

// Save a cached copy of the active state: no way to query Core
c.active = active

return nil
}

func (c *ConsulBackend) AdvertiseSealed(sealed bool) error {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
c.sealed = sealed

// Vault is still bootstrapping
if c.service == nil {
return nil
}

// Push a TTL check immediately to update the state
c.runCheck()

return nil
}

func (c *ConsulBackend) RunServiceDiscovery(shutdownCh ShutdownChannel) (err error) {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()

if c.running {
return fmt.Errorf("service registration routine already running")
}

url, err := url.Parse(c.advertiseAddr)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err)
}

host, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
}
port, err := strconv.ParseInt(portStr, 10, 0)
if err != nil {
return errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err)
}

serviceID, err := c.serviceID()
if err != nil {
return err
}

c.service = &api.AgentServiceRegistration{
ID: serviceID,
Name: c.serviceName,
Tags: serviceTags(c.active),
Port: int(port),
Address: host,
EnableTagOverride: false,
}

checkStatus := "failing"
if !c.sealed {
checkStatus = "passing"
}

c.sealedCheck = &api.AgentCheckRegistration{
ID: c.checkID(),
Name: "Vault Sealed Status",
Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server",
ServiceID: serviceID,
AgentServiceCheck: api.AgentServiceCheck{
TTL: c.checkTimeout.String(),
Status: checkStatus,
},
}

agent := c.client.Agent()
if err := agent.ServiceRegister(c.service); err != nil {
return errwrap.Wrapf("service registration failed: {{err}}", err)
}

if err := agent.CheckRegister(c.sealedCheck); err != nil {
return errwrap.Wrapf("service registration check registration failed: {{err}}", err)
}

go c.checkRunner(shutdownCh)
c.running = true

// Deregister upon shutdown
go func() {
shutdown:
for {
select {
case <-shutdownCh:
// wtb logger: log.Printf("[DEBUG]: Shutting down consul backend")
break shutdown
}
}

if err := agent.ServiceDeregister(serviceID); err != nil {
// wtb logger: log.Printf("[WARNING]: service deregistration failed: {{err}}", err)
}
c.running = false
}()

return nil
}

// checkRunner periodically runs TTL checks
func (c *ConsulBackend) checkRunner(shutdownCh ShutdownChannel) {
defer c.checkTimer.Stop()

for {
select {
case <-c.checkTimer.C:
go func() {
c.serviceLock.Lock()
defer c.serviceLock.Unlock()
c.runCheck()
}()
case <-shutdownCh:
return
}
}
}

// runCheck immediately pushes a TTL check. Assumes c.serviceLock is held
// exclusively.
func (c *ConsulBackend) runCheck() {
// Reset timer before calling run check in order to not slide the
// window of the next check.
c.checkTimer.Reset(lib.DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor))

// Run a TTL check
agent := c.client.Agent()
if !c.sealed {
agent.UpdateTTL(c.checkID(), "Vault Unsealed", api.HealthPassing)
} else {
agent.UpdateTTL(c.checkID(), "Vault Sealed", api.HealthCritical)
}
}

// checkID returns the ID used for a Consul Check. Assume at least a read
// lock is held.
func (c *ConsulBackend) checkID() string {
return "vault-sealed-check"
}

// serviceID returns the Vault ServiceID for use in Consul. Assume at least
// a read lock is held.
func (c *ConsulBackend) serviceID() (string, error) {
url, err := url.Parse(c.advertiseAddr)
if err != nil {
return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse URL "%v": {{err}}`, c.advertiseAddr), err)
}

host, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to find a host:port in advertise address "%v": {{err}}`, url.Host), err)
}
port, err := strconv.ParseInt(portStr, 10, 0)
if err != nil {
return "", errwrap.Wrapf(fmt.Sprintf(`service registration failed to parse port "%v": {{err}}`, portStr), err)
}

return fmt.Sprintf("%s:%s:%d", c.serviceName, host, int(port)), nil
}

func setupTLSConfig(conf map[string]string) (*tls.Config, error) {
serverName := strings.Split(conf["address"], ":")

Expand Down
Loading

0 comments on commit c0bbeba

Please sign in to comment.