Python client for Consul.io

Documentation

Read the Docs

Status

Build StatusCoverage Status

Example

import consul

c = consul.Consul()

# poll a key for updates
index = None
while True:
    index, data = c.kv.get('foo', index=index)
    print data['Value']

# in another process
c.kv.put('foo', 'bar')

Installation

pip install python-consul

Status

There’s a few API endpoints still to go to expose all features available in Consul v0.5.0. If you need an endpoint that’s not in the documentation, just open an issue and I’ll try and add it straight away.

Clients

This library is designed to be easily adapted for a number of clients. Particularly asynchronous clients. The following clients are currently supported.

Standard

This is a standard blocking python client. It isn’t particularly useful for creating server components - but it does serve as a base. It makes use of the requests library for http requests.

>>> import consul

>>> c = consul.Consul()

>>> c.kv.put('foo', 'bar')
True

>>> index, data = c.kv.get('foo')
>>> data['Value']
'bar'

# this will block until there's an update or a timeout
>>> index, data = c.kv.get('foo', index=index)

Vanilla

An asynchronous Vanilla plugin based on this library is available at: https://github.com/cablehead/vanilla.consul

gevent

The terribly awful thing about gevent is that anything that uses the socket library from the python standard lib, including the requests library can be made non-blocking via monkey patching. This means the standard python-consul client will just work asynchronously with gevent.

Tornado

There is a Tornado client which makes use of gen.coroutine. The API for this client is identical to the standard python-consul client except that you need to yield the result of each API call. This client is available in consul.tornado.

import consul.tornado

class Config(object):
    def __init__(self):
        self.foo = None
        loop.add_callback(self.watch)

    @tornado.gen.coroutine
    def watch(self):
        c = consul.tornado.Consul()

        # asynchronously poll for updates
        index = None
        while True:
            index, data = yield c.kv.get('foo', index=index)
            self.foo = data['Value']

asyncio

There is a asyncio (using aiohttp) client which works with Python3.4 and makes use of asyncio.coroutine. The API for this client is identical to the standard python-consul client except that you need to yield from the result of each API call. This client is available in consul.aio.

import asyncio
import consul.aio


loop = asyncio.get_event_loop()

@asyncio.coroutine
def go():

    # always better to pass ``loop`` explicitly, but this
    # is not mandatory, you can relay on global event loop
    c = consul.aio.Consul(port=consul_port, loop=loop)

    # set value, same as default api but with ``yield from``
    response = yield from c.kv.put(b'foo', b'bar')
    assert response is True

    # get value
    index, data = yield from c.kv.get(b'foo')
    assert data['Value'] == b'bar'

    # delete value
    response = yield from c.kv.delete(b'foo2')
    assert response is True

loop.run_until_complete(go())

Wanted

Adaptors for Twisted and a thread pool based adaptor.

Tools

Handy tools built on python-consul.

ianitor

ianitor is a doorkeeper for your services discovered using consul. It can automatically register new services through consul API and manage TTL health checks.

Example Uses

ACLs

import consul

# master_token is a *management* token, for example the *acl_master_token*
# you started the Consul server with
master = consul.Consul(token=master_token)

master.kv.put('foo', 'bar')
master.kv.put('private/foo', 'bar')

rules = """
    key "" {
        policy = "read"
    }
    key "private/" {
        policy = "deny"
    }
"""
token = master.acl.create(rules=rules)

client = consul.Consul(token=token)

client.kv.get('foo')          # OK
client.kv.put('foo', 'bar2')  # raises ACLPermissionDenied

client.kv.get('private/foo')  # returns None, as though the key doesn't
                              # exist - slightly unintuitive
client.kv.put('private/foo', 'bar2')  # raises ACLPermissionDenied

API Documentation

Check

class consul.Check

There are three different kinds of checks: script, http and ttl

Check.script

classmethod Check.script(script, interval)

Run script every interval (e.g. “10s”) to peform health check

Check.http

classmethod Check.http(url, interval, timeout=None)

Peform a HTTP GET against url every interval (e.g. “10s”) to peform health check with an option timeout

Check.ttl

classmethod Check.ttl(ttl)

Set check to be marked as critical after ttl (e.g. “10s”) unless the check is periodically marked as passing.

Consul

class consul.Consul(host='127.0.0.1', port=8500, token=None, scheme='http', consistency='default', dc=None)

token is an optional ACL token. If supplied it will be used by default for all requests made with this client session. It’s still possible to override this token by passing a token explicitly for a request.

consistency sets the consistency mode to use by default for all reads that support the consistency option. It’s still possible to override this by passing explicitly for a given request. consistency can be either ‘default’, ‘consistent’ or ‘stale’.

dc is the datacenter that this agent will communicate with. By default the datacenter of the host is used.

Consul.kv

class Consul.KV

The KV endpoint is used to expose a simple key/value store. This can be used to store service configurations or other meta data in a simple way.

get(key, index=None, recurse=False, wait=None, token=None, consistency=None, keys=False, separator=None, dc=None)

Returns a tuple of (index, value[s])

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

wait the maximum duration to wait (e.g. ‘10ms’) to retrieve a given index. this parameter is only applied if index is also specified. the wait time by default is 10 minutes.

token is an optional ACL token to apply to this request.

keys is a boolean which, if True, says to return a flat list of keys without values or other metadata. separator can be used with keys to list keys only up to a given separator character.

dc is the optional datacenter that you wish to communicate with. If None is provided, defaults to the agent’s datacenter.

The value returned is for the specified key, or if recurse is True a list of values for all keys with the given prefix is returned.

Each value looks like this:

{
    "CreateIndex": 100,
    "ModifyIndex": 200,
    "LockIndex": 200,
    "Key": "foo",
    "Flags": 0,
    "Value": "bar",
    "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
}

Note, if the requested key does not exists (index, None) is returned. It’s then possible to long poll on the index for when the key is created.

put(key, value, cas=None, flags=None, acquire=None, release=None, token=None, dc=None)

Sets key to the given value.

value can either be None (useful for marking a key as a directory) or any string type, including binary data (e.g. a msgpack’d data structure)

The optional cas parameter is used to turn the PUT into a Check-And-Set operation. This is very useful as it allows clients to build more complex syncronization primitives on top. If the index is 0, then Consul will only put the key if it does not already exist. If the index is non-zero, then the key is only set if the index matches the ModifyIndex of that key.

An optional flags can be set. This can be used to specify an unsigned value between 0 and 2^64-1.

acquire is an optional session_id. if supplied a lock acquisition will be attempted.

release is an optional session_id. if supplied a lock release will be attempted.

token is an optional ACL token to apply to this request. If the token’s policy is not allowed to write to this key an ACLPermissionDenied exception will be raised.

dc is the optional datacenter that you wish to communicate with. If None is provided, defaults to the agent’s datacenter.

The return value is simply either True or False. If False is returned, then the update has not taken place.

delete(key, recurse=None, cas=None, token=None, dc=None)

Deletes a single key or if recurse is True, all keys sharing a prefix.

cas is an optional flag is used to turn the DELETE into a Check-And-Set operation. This is very useful as a building block for more complex synchronization primitives. Unlike PUT, the index must be greater than 0 for Consul to take any action: a 0 index will not delete the key. If the index is non-zero, the key is only deleted if the index matches the ModifyIndex of that key.

token is an optional ACL token to apply to this request. If the token’s policy is not allowed to delete to this key an ACLPermissionDenied exception will be raised.

dc is the optional datacenter that you wish to communicate with. If None is provided, defaults to the agent’s datacenter.

Consul.agent

class Consul.Agent

The Agent endpoints are used to interact with a local Consul agent. Usually, services and checks are registered with an agent, which then takes on the burden of registering with the Catalog and performing anti-entropy to recover from outages.

self()

Returns configuration of the local agent and member information.

services()

Returns all the services that are registered with the local agent. These services were either provided through configuration files, or added dynamically using the HTTP API. It is important to note that the services known by the agent may be different than those reported by the Catalog. This is usually due to changes being made while there is no leader elected. The agent performs active anti-entropy, so in most situations everything will be in sync within a few seconds.

checks()

Returns all the checks that are registered with the local agent. These checks were either provided through configuration files, or added dynamically using the HTTP API. Similar to services, the checks known by the agent may be different than those reported by the Catalog. This is usually due to changes being made while there is no leader elected. The agent performs active anti-entropy, so in most situations everything will be in sync within a few seconds.

members(wan=False)

Returns all the members that this agent currently sees. This may vary by agent, use the nodes api of Catalog to retrieve a cluster wide consistent view of members.

For agents running in server mode, setting wan to True returns the list of WAN members instead of the LAN members which is default.

maintenance(enable, reason=None)

The node maintenance endpoint can place the agent into “maintenance mode”.

enable is either ‘true’ or ‘false’. ‘true’ enables maintenance mode, ‘false’ disables maintenance mode.

reason is an optional string. This is simply to aid human operators.

class Consul.Agent.Service
register(name, service_id=None, address=None, port=None, tags=None, check=None, script=None, interval=None, ttl=None, http=None, timeout=None)

Add a new service to the local agent. There is more documentation on services here.

name is the name of the service.

If the optional service_id is not provided it is set to name. You cannot have duplicate service_id entries per agent, so it may be necessary to provide one.

address will default to the address of the agent if not provided.

An optional health check can be created for this service is one of Check.script, Check.http, or Check.ttl.

script, interval, ttl, http, and timeout arguments are deprecated. use check instead.

deregister(service_id)

Used to remove a service from the local agent. The agent will take care of deregistering the service with the Catalog. If there is an associated check, that is also deregistered.

maintenance(service_id, enable, reason=None)

The service maintenance endpoint allows placing a given service into “maintenance mode”.

service_id is the id of the service that is to be targeted for maintenance.

enable is either ‘true’ or ‘false’. ‘true’ enables maintenance mode, ‘false’ disables maintenance mode.

reason is an optional string. This is simply to aid human operators.

class Consul.Agent.Check
register(name, check=None, check_id=None, notes=None, service_id=None, script=None, interval=None, ttl=None, http=None, timeout=None)

Register a new check with the local agent. More documentation on checks can be found here.

name is the name of the check.

check is one of Check.script, Check.http, or Check.ttl and is required.

If the optional check_id is not provided it is set to name. check_id must be unique for this agent.

notes is not used by Consul, and is meant to be human readable.

Optionally, a service_id can be specified to associate a registered check with an existing service.

script, interval, ttl, http, and timeout arguments are deprecated. use check instead.

Returns True on success.

deregister(check_id)

Remove a check from the local agent.

ttl_pass(check_id, notes=None)

Mark a ttl based check as passing. Optional notes can be attached to describe the status of the check.

ttl_fail(check_id, notes=None)

Mark a ttl based check as failing. Optional notes can be attached to describe why check is failing. The status of the check will be set to critical and the ttl clock will be reset.

ttl_warn(check_id, notes=None)

Mark a ttl based check with warning. Optional notes can be attached to describe the warning. The status of the check will be set to warn and the ttl clock will be reset.

Consul.catalog

class Consul.Catalog
register(node, address, service=None, check=None, dc=None)

A low level mechanism for directly registering or updating entries in the catalog. It is usually recommended to use agent.service.register and agent.check.register, as they are simpler and perform anti-entropy.

node is the name of the node to register.

address is the ip of the node.

service is an optional service to register. if supplied this is a dict:

{
    "Service": "redis",
    "ID": "redis1",
    "Tags": [
        "master",
        "v1"
    ],
    "Port": 8000
}

where

Service is required and is the name of the service

ID is optional, and will be set to Service if not provided. Note ID must be unique for the given node.

Tags and Port are optional.

check is an optional check to register. if supplied this is a dict:

{
    "Node": "foobar",
    "CheckID": "service:redis1",
    "Name": "Redis health check",
    "Notes": "Script based health check",
    "Status": "passing",
    "ServiceID": "redis1"
}

dc is the datacenter of the node and defaults to this agents datacenter.

This manipulates the health check entry, but does not setup a script or TTL to actually update the status. The full documentation is here.

Returns True on success.

deregister(node, service_id=None, check_id=None, dc=None)

A low level mechanism for directly removing entries in the catalog. It is usually recommended to use the agent APIs, as they are simpler and perform anti-entropy.

node and dc specify which node on which datacenter to remove. If service_id and check_id are not provided, all associated services and checks are deleted. Otherwise only one of service_id and check_id should be provided and only that service or check will be removed.

Returns True on success.

datacenters()

Returns all the datacenters that are known by the Consul server.

nodes(index=None, consistency=None, dc=None)

Returns a tuple of (index, nodes) of all nodes known about in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

(index, [
    {
        "Node": "baz",
        "Address": "10.1.10.11"
    },
    {
        "Node": "foobar",
        "Address": "10.1.10.12"
    }
])
services(index=None, consistency=None, dc=None)

Returns a tuple of (index, services) of all services known about in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

(index, {
    "consul": [],
    "redis": [],
    "postgresql": [
        "master",
        "slave"
    ]
})

The main keys are the service names and the list provides all the known tags for a given service.

node(node, index=None, consistency=None, dc=None)

Returns a tuple of (index, services) of all services provided by node.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

dc is the datacenter of the node and defaults to this agents datacenter.

The response looks like this:

(index, {
    "Node": {
        "Node": "foobar",
        "Address": "10.1.10.12"
    },
    "Services": {
        "consul": {
            "ID": "consul",
            "Service": "consul",
            "Tags": null,
            "Port": 8300
        },
        "redis": {
            "ID": "redis",
            "Service": "redis",
            "Tags": [
                "v1"
            ],
            "Port": 8000
        }
    }
})
service(service, index=None, tag=None, consistency=None, dc=None)

Returns a tuple of (index, nodes) of the nodes providing service in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

If tag is provided, the list of nodes returned will be filtered by that tag.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

(index, [
    {
        "Node": "foobar",
        "Address": "10.1.10.12",
        "ServiceID": "redis",
        "ServiceName": "redis",
        "ServiceTags": null,
        "ServicePort": 8000
    }
])

Consul.health

class Consul.Health
service(service, index=None, passing=None, tag=None)

Returns a tuple of (index, nodes)

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

nodes are the nodes providing the given service.

Calling with passing set to True will filter results to only those nodes whose checks are currently passing.

Calling with tag will filter the results by tag.

state(name, index=None)

Returns a tuple of (index, nodes)

name is a supported state. From the Consul docs:

The supported states are any, unknown, passing, warning, or critical. The any state is a wildcard that can be used to return all checks.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

nodes are the nodes providing the given service.

node(node, index=None)

Returns a tuple of (index, checks)

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

nodes are the nodes providing the given service.

Consul.session

class Consul.Session
create(name=None, node=None, checks=None, lock_delay=15, behavior='release', ttl=None, dc=None)

Creates a new session. There is more documentation for sessions here.

name is an optional human readable name for the session.

node is the node to create the session on. if not provided the current agent’s node will be used.

checks is a list of checks to associate with the session. if not provided it defaults to the serfHealth check. It is highly recommended that, if you override this list, you include the default serfHealth.

lock_delay is an integer of seconds.

behavior can be set to either ‘release’ or ‘delete’. This controls the behavior when a session is invalidated. By default, this is ‘release’, causing any locks that are held to be released. Changing this to ‘delete’ causes any locks that are held to be deleted. ‘delete’ is useful for creating ephemeral key/value entries.

when ttl is provided, the session is invalidated if it is not renewed before the TTL expires. If specified, it is an integer of seconds. Currently it must be between 10 and 3600 seconds.

By default the session will be created in the current datacenter but an optional dc can be provided.

Returns the string session_id for the session.

destroy(session_id, dc=None)

Destroys the session session_id

Returns True on success.

list(index=None, consistency=None, dc=None)

Returns a tuple of (index, sessions) of all active sessions in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

 (index, [
     {
         "LockDelay": 1.5e+10,
         "Checks": [
             "serfHealth"
         ],
         "Node": "foobar",
         "ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
         "CreateIndex": 1086449
     },
   ...
])
node(node, index=None, consistency=None, dc=None)

Returns a tuple of (index, sessions) as per session.list, but filters the sessions returned to only those active for node.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

info(session_id, index=None, consistency=None, dc=None)

Returns a tuple of (index, session) for the session session_id in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

renew(session_id, dc=None)

This is used with sessions that have a TTL, and it extends the expiration by the TTL.

dc is the optional datacenter that you wish to communicate with. If None is provided, defaults to the agent’s datacenter.

Returns the session.

Consul.acl

class Consul.ACL
list(token=None)

Lists all the active ACL tokens. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

info(acl_id, token=None)

Returns the token information for acl_id.

create(name=None, type='client', rules=None, token=None)

Creates a new ACL token. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

name is an optional name for this token.

type is either ‘management’ or ‘client’. A management token is effectively like a root user, and has the ability to perform any action including creating, modifying, and deleting ACLs. A client token can only perform actions as permitted by rules.

rules is an optional HCL string for this ACL Token Rule Specification.

Rules look like this:

# Default all keys to read-only
key "" {
  policy = "read"
}
key "foo/" {
  policy = "write"
}
key "foo/private/" {
  # Deny access to the private dir
  policy = "deny"
}

Returns the string acl_id for the new token.

update(acl_id, name=None, type=None, rules=None, token=None)

Updates the ACL token acl_id. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

name is an optional name for this token.

type is either ‘management’ or ‘client’. A management token is effectively like a root user, and has the ability to perform any action including creating, modifying, and deleting ACLs. A client token can only perform actions as permitted by rules.

rules is an optional HCL string for this ACL Token Rule Specification.

Returns the string acl_id of this token on success.

clone(acl_id, token=None)

Clones the ACL token acl_id. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

Returns the string of the newly created acl_id.

destroy(acl_id, token=None)

Destroys the ACL token acl_id. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

Returns True on success.

Consul.event

class Consul.Event

The event command provides a mechanism to fire a custom user event to an entire datacenter. These events are opaque to Consul, but they can be used to build scripting infrastructure to do automated deploys, restart services, or perform any other orchestration action.

Unlike most Consul data, which is replicated using consensus, event data is purely peer-to-peer over gossip.

This means it is not persisted and does not have a total ordering. In practice, this means you cannot rely on the order of message delivery. An advantage however is that events can still be used even in the absence of server nodes or during an outage.

fire(name, body='', node=None, service=None, tag=None)

Sends an event to Consul’s gossip protocol.

name is the Consul-opaque name of the event. This can be filtered on in calls to list, below

body is the Consul-opaque body to be delivered with the event.
From the Consul documentation:
The underlying gossip also sets limits on the size of a user event message. It is hard to give an exact number, as it depends on various parameters of the event, but the payload should be kept very small (< 100 bytes). Specifying too large of an event will return an error.

node, service, and tag are regular expressions which remote agents will filter against to determine if they should store the event

list(name=None)
Returns a tuple of (index, events)
Note: Since Consul’s event protocol uses gossip, there is no ordering, and instead index maps to the newest event that matches the query.

name is the type of events to list, if None, lists all available.

Consul agents only buffer the most recent entries. The current buffer size is 256, but this value could change in the future.

Each event looks like this:

{
      {
        "ID": "b54fe110-7af5-cafc-d1fb-afc8ba432b1c",
        "Name": "deploy",
        "Payload": "1609030",
        "NodeFilter": "",
        "ServiceFilter": "",
        "TagFilter": "",
        "Version": 1,
        "LTime": 19
      },
}

Consul.status

class Consul.Status
The Status endpoints are used to get information about the status
of the Consul cluster.
leader()

This endpoint is used to get the Raft leader for the datacenter in which the agent is running.

peers()

This endpoint retrieves the Raft peers for the datacenter in which the the agent is running.