Python client for Consul.io

Documentation

Read the Docs

Status

Build StatusCoverage Status

Example

import consul

c = consul.Consul()

# poll a key for updates
index = None
while True:
    index, data = c.kv.get('foo', index=index)
    print data['Value']

# in another process
c.kv.put('foo', 'bar')

Installation

pip install python-consul

Status

There’s a few API endpoints still to go to expose all features available in Consul v0.4.1. If you need an endpoint that’s not in the documentation, just open an issue and I’ll try and add it straight away.

Clients

This library is designed to be easily adapted for a number of clients. Particularly asynchronous clients. The following clients are currently supported.

Standard

This is a standard blocking python client. It isn’t particularly useful for creating server components - but it does serve as a base. It makes use of the requests library for http requests.

>>> import consul

>>> c = consul.Consul()

>>> c.kv.put('foo', 'bar')
True

>>> index, data = c.kv.get('foo')
>>> data['Value']
'bar'

# this will block until there's an update or a timeout
>>> index, data = c.kv.get('foo', index=index)

Vanilla

An asynchronous Vanilla plugin based on this library is available at: https://github.com/cablehead/vanilla.consul

gevent

The terribly awful thing about gevent is that anything that uses the socket library from the python standard lib, including the requests library can be made non-blocking via monkey patching. This means the standard python-consul client will just work asynchronously with gevent.

Tornado

There is a Tornado client which makes use of gen.coroutine. The API for this client is identical to the standard python-consul client except that you need to yield the result of each API call. This client is available in consul.tornado.

import consul.tornado

class Config(object):
    def __init__(self):
        self.foo = None
        loop.add_callback(self.watch)

    @tornado.gen.coroutine
    def watch(self):
        c = consul.tornado.Consul()

        # asynchronously poll for updates
        index = None
        while True:
            index, data = yield c.kv.get('foo', index=index)
            self.foo = data['Value']

asyncio

There is a asyncio (using aiohttp) client which works with Python3.4 and makes use of asyncio.coroutine. The API for this client is identical to the standard python-consul client except that you need to yield from the result of each API call. This client is available in consul.aio.

import asyncio
import consul.aio


loop = asyncio.get_event_loop()

@asyncio.coroutine
def go():

    # always better to pass ``loop`` explicitly, but this
    # is not mandatory, you can relay on global event loop
    c = consul.aio.Consul(port=consul_port, loop=loop)

    # set value, same as default api but with ``yield from``
    response = yield from c.kv.put(b'foo', b'bar')
    assert response is True

    # get value
    index, data = yield from c.kv.get(b'foo')
    assert data['Value'] == b'bar'

    # delete value
    response = yield from c.kv.delete(b'foo2')
    assert response is True

loop.run_until_complete(go())

Wanted

Adaptors for Twisted and a thread pool based adaptor.

Tools

Handy tools built on python-consul.

ianitor

ianitor is a doorkeeper for your services discovered using consul. It can automatically register new services through consul API and manage TTL health checks.

Example Uses

ACLs

import consul

# master_token is a *management* token, for example the *acl_master_token*
# you started the Consul server with
master = consul.Consul(token=master_token)

master.kv.put('foo', 'bar')
master.kv.put('private/foo', 'bar')

rules = """
    key "" {
        policy = "read"
    }
    key "private/" {
        policy = "deny"
    }
"""
token = master.acl.create(rules=rules)

client = consul.Consul(token=token)

client.kv.get('foo')          # OK
client.kv.put('foo', 'bar2')  # raises ACLPermissionDenied

client.kv.get('private/foo')  # returns None, as though the key doesn't
                              # exist - slightly unintuitive
client.kv.put('private/foo', 'bar2')  # raises ACLPermissionDenied

API Documentation

class consul.Consul(host='127.0.0.1', port=8500, token=None, scheme='http', consistency='default', dc=None)

token is an optional ACL token. If supplied it will be used by default for all requests made with this client session. It’s still possible to override this token by passing a token explicitly for a request.

consistency sets the consistency mode to use by default for all reads that support the consistency option. It’s still possible to override this by passing explicitly for a given request. consistency can be either ‘default’, ‘consistent’ or ‘stale’.

dc is the datacenter that this agent will communicate with. By default the datacenter of the host is used.

consul.kv

class Consul.KV

The KV endpoint is used to expose a simple key/value store. This can be used to store service configurations or other meta data in a simple way.

get(key, index=None, recurse=False, token=None, consistency=None, dc=None)

Returns a tuple of (index, value[s])

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

token is an optional ACL token to apply to this request.

dc is the optional datacenter that you wish to communicate with. If None is provided, defaults to the agent’s datacenter.

The value returned is for the specified key, or if recurse is True a list of values for all keys with the given prefix is returned.

Each value looks like this:

{
    "CreateIndex": 100,
    "ModifyIndex": 200,
    "LockIndex": 200,
    "Key": "foo",
    "Flags": 0,
    "Value": "bar",
    "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
}

Note, if the requested key does not exists (index, None) is returned. It’s then possible to long poll on the index for when the key is created.

put(key, value, cas=None, flags=None, acquire=None, release=None, token=None, dc=None)

Sets key to the given value.

value can either be None (useful for marking a key as a directory) or any string type, including binary data (e.g. a msgpack’d data structure)

The optional cas parameter is used to turn the PUT into a Check-And-Set operation. This is very useful as it allows clients to build more complex syncronization primitives on top. If the index is 0, then Consul will only put the key if it does not already exist. If the index is non-zero, then the key is only set if the index matches the ModifyIndex of that key.

An optional flags can be set. This can be used to specify an unsigned value between 0 and 2^64-1.

acquire is an optional session_id. if supplied a lock acquisition will be attempted.

release is an optional session_id. if supplied a lock release will be attempted.

token is an optional ACL token to apply to this request. If the token’s policy is not allowed to write to this key an ACLPermissionDenied exception will be raised.

dc is the optional datacenter that you wish to communicate with. If None is provided, defaults to the agent’s datacenter.

The return value is simply either True or False. If False is returned, then the update has not taken place.

delete(key, recurse=None, token=None, dc=None)

Deletes a single key or if recurse is True, all keys sharing a prefix.

token is an optional ACL token to apply to this request. If the token’s policy is not allowed to delete to this key an ACLPermissionDenied exception will be raised.

dc is the optional datacenter that you wish to communicate with. If None is provided, defaults to the agent’s datacenter.

consul.agent

class Consul.Agent

The Agent endpoints are used to interact with a local Consul agent. Usually, services and checks are registered with an agent, which then takes on the burden of registering with the Catalog and performing anti-entropy to recover from outages.

self()

Returns configuration of the local agent and member information.

services()

Returns all the services that are registered with the local agent. These services were either provided through configuration files, or added dynamically using the HTTP API. It is important to note that the services known by the agent may be different than those reported by the Catalog. This is usually due to changes being made while there is no leader elected. The agent performs active anti-entropy, so in most situations everything will be in sync within a few seconds.

checks()

Returns all the checks that are registered with the local agent. These checks were either provided through configuration files, or added dynamically using the HTTP API. Similar to services, the checks known by the agent may be different than those reported by the Catalog. This is usually due to changes being made while there is no leader elected. The agent performs active anti-entropy, so in most situations everything will be in sync within a few seconds.

members(wan=False)

Returns all the members that this agent currently sees. This may vary by agent, use the nodes api of Catalog to retrieve a cluster wide consistent view of members.

For agents running in server mode, setting wan to True returns the list of WAN members instead of the LAN members which is default.

class Consul.Agent.Service
register(name, service_id=None, port=None, tags=None, script=None, interval=None, ttl=None)

Add a new service to the local agent. There is more documentation on services here.

name is the name of the service.

If the optional service_id is not provided it is set to name. You cannot have duplicate service_id entries per agent, so it may be necessary to provide one.

An optional health check can be created for this service. The health check is only one of script and interval OR ttl.

deregister(service_id)

Used to remove a service from the local agent. The agent will take care of deregistering the service with the Catalog. If there is an associated check, that is also deregistered.

class Consul.Agent.Check
register(name, check_id=None, script=None, interval=None, ttl=None, notes=None)

Register a new check with the local agent. More documentation on checks can be found here.

name is the name of the check.

If the optional check_id is not provided it is set to name. check_id must be unique for this agent.

There are two ways to define a check, either with a script and an interval or with a ttl timeout. If a script is supplied then the interval is expected and the ttl should be absent. For a ttl check, the script and interval should not be supplied.

notes is not used by Consul, and is meant to be human readable.

Returns True on success.

deregister(check_id)

Remove a check from the local agent.

ttl_pass(check_id, notes=None)

Mark a ttl based check as passing. Optional notes can be attached to describe the status of the check.

ttl_fail(check_id, notes=None)

Mark a ttl based check as failing. Optional notes can be attached to describe why check is failing. The status of the check will be set to critical and the ttl clock will be reset.

ttl_warn(check_id, notes=None)

Mark a ttl based check with warning. Optional notes can be attached to describe the warning. The status of the check will be set to warn and the ttl clock will be reset.

consul.catalog

class Consul.Catalog
register(node, address, service=None, check=None, dc=None)

A low level mechanism for directly registering or updating entries in the catalog. It is usually recommended to use agent.service.register and agent.check.register, as they are simpler and perform anti-entropy.

node is the name of the node to register.

address is the ip of the node.

service is an optional service to register. if supplied this is a dict:

{
    "Service": "redis",
    "ID": "redis1",
    "Tags": [
        "master",
        "v1"
    ],
    "Port": 8000
}

where

Service is required and is the name of the service

ID is optional, and will be set to Service if not provided. Note ID must be unique for the given node.

Tags and Port are optional.

check is an optional check to register. if supplied this is a dict:

{
    "Node": "foobar",
    "CheckID": "service:redis1",
    "Name": "Redis health check",
    "Notes": "Script based health check",
    "Status": "passing",
    "ServiceID": "redis1"
}

dc is the datacenter of the node and defaults to this agents datacenter.

This manipulates the health check entry, but does not setup a script or TTL to actually update the status. The full documentation is here.

Returns True on success.

deregister(node, service_id=None, check_id=None, dc=None)

A low level mechanism for directly removing entries in the catalog. It is usually recommended to use the agent APIs, as they are simpler and perform anti-entropy.

node and dc specify which node on which datacenter to remove. If service_id and check_id are not provided, all associated services and checks are deleted. Otherwise only one of service_id and check_id should be provided and only that service or check will be removed.

Returns True on success.

datacenters()

Returns all the datacenters that are known by the Consul server.

nodes(index=None, consistency=None, dc=None)

Returns a tuple of (index, nodes) of all nodes known about in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

(index, [
    {
        "Node": "baz",
        "Address": "10.1.10.11"
    },
    {
        "Node": "foobar",
        "Address": "10.1.10.12"
    }
])
services(index=None, consistency=None, dc=None)

Returns a tuple of (index, services) of all services known about in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

(index, {
    "consul": [],
    "redis": [],
    "postgresql": [
        "master",
        "slave"
    ]
})

The main keys are the service names and the list provides all the known tags for a given service.

node(node, index=None, consistency=None, dc=None)

Returns a tuple of (index, services) of all services provided by node.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

dc is the datacenter of the node and defaults to this agents datacenter.

The response looks like this:

(index, {
    "Node": {
        "Node": "foobar",
        "Address": "10.1.10.12"
    },
    "Services": {
        "consul": {
            "ID": "consul",
            "Service": "consul",
            "Tags": null,
            "Port": 8300
        },
        "redis": {
            "ID": "redis",
            "Service": "redis",
            "Tags": [
                "v1"
            ],
            "Port": 8000
        }
    }
})
service(service, index=None, tag=None, consistency=None, dc=None)

Returns a tuple of (index, nodes) of the nodes providing service in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

If tag is provided, the list of nodes returned will be filtered by that tag.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

(index, [
    {
        "Node": "foobar",
        "Address": "10.1.10.12",
        "ServiceID": "redis",
        "ServiceName": "redis",
        "ServiceTags": null,
        "ServicePort": 8000
    }
])

consul.health

class Consul.Health
service(service, index=None, passing=None)

Returns a tuple of (index, nodes)

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

nodes are the nodes providing the given service.

Calling with passing set to True will filter results to only those nodes whose checks are currently passing.

class Consul.Health.Check
ttl_pass(check_id)

Mark a local TTL check as passing.

consul.session

class Consul.Session
create(name=None, node=None, checks=None, lock_delay=15, dc=None)

Creates a new session. There is more documentation for sessions here.

name is an optional human readable name for the session.

node is the node to create the session on. if not provided the current agent’s node will be used.

checks is a list of checks to associate with the session. if not provided it defaults to the serfHealth check.

lock_delay is an integer of seconds.

By default the session will be created in the current datacenter but an optional dc can be provided.

Returns the string session_id for the session.

destroy(session_id, dc=None)

Destroys the session session_id

Returns True on success.

list(index=None, consistency=None, dc=None)

Returns a tuple of (index, sessions) of all active sessions in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

The response looks like this:

 (index, [
     {
         "LockDelay": 1.5e+10,
         "Checks": [
             "serfHealth"
         ],
         "Node": "foobar",
         "ID": "adf4238a-882b-9ddc-4a9d-5b6758e4159e",
         "CreateIndex": 1086449
     },
   ...
])
node(node, index=None, consistency=None, dc=None)

Returns a tuple of (index, sessions) as per session.list, but filters the sessions returned to only those active for node.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

info(session_id, index=None, consistency=None, dc=None)

Returns a tuple of (index, session) for the session session_id in the dc datacenter. dc defaults to the current datacenter of this agent.

index is the current Consul index, suitable for making subsequent calls to wait for changes since this query was last run.

consistency can be either ‘default’, ‘consistent’ or ‘stale’. if not specified consistency will the consistency level this client was configured with.

consul.acl

class Consul.ACL
list(token=None)

Lists all the active ACL tokens. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

info(acl_id, token=None)

Returns the token information for acl_id.

create(name=None, type='client', rules=None, token=None)

Creates a new ACL token. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

name is an optional name for this token.

type is either ‘management’ or ‘client’. A management token is effectively like a root user, and has the ability to perform any action including creating, modifying, and deleting ACLs. A client token can only perform actions as permitted by rules.

rules is an optional HCL string for this ACL Token Rule Specification.

Rules look like this:

# Default all keys to read-only
key "" {
  policy = "read"
}
key "foo/" {
  policy = "write"
}
key "foo/private/" {
  # Deny access to the private dir
  policy = "deny"
}

Returns the string acl_id for the new token.

update(acl_id, name=None, type=None, rules=None, token=None)

Updates the ACL token acl_id. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

name is an optional name for this token.

type is either ‘management’ or ‘client’. A management token is effectively like a root user, and has the ability to perform any action including creating, modifying, and deleting ACLs. A client token can only perform actions as permitted by rules.

rules is an optional HCL string for this ACL Token Rule Specification.

Returns the string acl_id of this token on success.

clone(acl_id, token=None)

Clones the ACL token acl_id. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

Returns the string of the newly created acl_id.

destroy(acl_id, token=None)

Destroys the ACL token acl_id. This is a privileged endpoint, and requires a management token. token will override this client’s default token. An ACLPermissionDenied exception will be raised if a management token is not used.

Returns True on success.