API Documentation

Hub

class vanilla.core.Hub

A Vanilla Hub is a handle to a self contained world of interwoven coroutines. It includes an event loop which is responsibile for scheduling which green thread should have context. Unlike most asynchronous libraries this Hub is explicit and must be passed to coroutines that need to interact with it. This is particularly nice for testing, as it makes it clear what’s going on, and other tests can’t inadvertently effect each other.

Concurrency

Hub.spawn(f, *a)

Schedules a new green thread to be created to run f(*a) on the next available tick:

def echo(pipe, s):
    pipe.send(s)

p = h.pipe()
h.spawn(echo, p, 'hi')
p.recv() # returns 'hi'
Hub.spawn_later(ms, f, *a)

Spawns a callable on a new green thread, scheduled for ms milliseconds in the future:

def echo(pipe, s):
    pipe.send(s)

p = h.pipe()
h.spawn_later(50, echo, p, 'hi')
p.recv() # returns 'hi' after 50ms
Hub.sleep(ms=1)

Pauses the current green thread for ms milliseconds:

p = h.pipe()

@h.spawn
def _():
    p.send('1')
    h.sleep(50)
    p.send('2')

p.recv() # returns '1'
p.recv() # returns '2' after 50 ms

Message Passing

Hub.pipe()

Returns a Pipe Pair.

Hub.select(ends, timeout=-1)

An end is either a Sender or a Recver. select takes a list of ends and blocks until one of them is ready. The select will block either forever, or until the optional timeout is reached. timeout is in milliseconds.

It returns of tuple of (end, value) where end is the end that has become ready. If the end is a Recver, then it will have already been recv‘d on which will be available as value. For Sender‘s however the sender is still in a ready state waiting for a send and value is None.

For example, the following is an appliance that takes an upstream Recver and a downstream Sender. Sending to its upstream will alter it’s current state. This state can be read at anytime by receiving on its downstream:

def state(h, upstream, downstream):
    current = None
    while True:
        end, value = h.select([upstream, downstream])
        if end == upstream:
            current = value
        elif end == downstream:
            end.send(current)
Hub.dealer()

Returns a Dealer Pair.

Hub.router()

Returns a Router Pair.

Hub.queue(size)

Returns a Queue Pair.

Hub.channel(size=-1)
send --\    +---------+  /--> recv
        +-> | Channel | -+
send --/    +---------+  \--> recv

A Channel can have many senders and many recvers. By default it is unbuffered, but you can create buffered Channels by specifying a size. They’re structurally equivalent to channels in Go. It’s implementation is literally a Router piped to a Dealer, with an optional Queue in between.

Hub.state(state=<class 'vanilla.message.NoState'>)

Returns a State Pair.

state if supplied sets the intial state.

Pipe Conveniences

Hub.producer(f)

Convenience to create a Pipe. f is a callable that takes the Sender end of this Pipe and the corresponding Recver is returned:

def counter(sender):
    i = 0
    while True:
        i += 1
        sender.send(i)

recver = h.producer(counter)

recver.recv() # returns 1
recver.recv() # returns 2
Hub.pulse(ms, item=True)

Convenience to create a Pipe that will have item sent on it every ms milliseconds. The Recver end of the Pipe is returned.

Note that since sends to a Pipe block until the Recver is ready, the pulses will be throttled if the Recver is unable to keep up:

recver = h.pulse(500)

for _ in recver:
    log.info('hello') # logs 'hello' every half a second

Thread

Hub.thread.spawn(f, *a)
  • Spawns callable f in a new thread. A new Hub is initialized for the thread and passed to f along with arguments a
  • A parent attribute is available on the new thread’s Hub which is a Pipe to communicate with it’s parent thread
  • spawn returns a Pipe to communicate with the new child thread

Example usage:

def ticker(h, n):
    import time

    while true:
        h.parent.send(time.time())
        time.sleep(n)

h = vanilla.Hub()

child = h.thread.spawn(ticker, 1)

while true:
    child.recv()
Hub.thread.call(f, *a)
  • Spawns a one-off thread to run callable f with arguments a
  • Returns a Recver which can be recv’d on to get f‘s result

Example usage:

def add(a, b):
    return a + b

h = vanilla.Hub()
h.thread.call(add, 2, 3).recv()  # 5
Hub.thread.pool(size)
  • Returns a reusable pool of size threads

Pool

Thread.Pool.call(f, *a)
  • Runs callable f with arguments a on one of the pool’s threads
  • Returns a Recver which can be recv’d on to get f‘s result

Example usage:

h = vanilla.Hub()

def sleeper(x):
    time.sleep(x)
    return x

p = h.thread.pool(2)
gather = h.router()

p.call(sleeper, 0.2).pipe(gather)
p.call(sleeper, 0.1).pipe(gather)
p.call(sleeper, 0.05).pipe(gather)

gather.recv()  # 0.1
gather.recv()  # 0.05
gather.recv()  # 0.2
Thread.Pool.wrap(ob)
  • Wraps ob with a proxy which will delegate method calls on ob to run on the pool’s threads
  • Each method call on the proxy returns a Recver which can be recv’d on the get the calls result

Example usage:

h = vanilla.Hub()

p = h.thread.pool(2)

db = pymongo.MongoClient()['database']
db = p.wrap(db)

response = db.posts.find_one({"author": "Mike"})
response.recv()

Process

Hub.process.execv(args, env=None, stderrtoout=False)
  • Forks a child process using args.
  • env is an optional dictionary of environment variables which will replace the parent’s environment for the child process. If not supplied the child will have access to the parent’s environment.
  • if stderrtoout is True the child’s stderr will be redirected to its stdout.
  • A Child object is return to interact with the child process.

Example usage:

h = vanilla.Hub()

child = h.process.execv(
    ['/usr/bin/env', 'grep', '--line-buffered', 'foo'])

child.stdin.send('foo1\n')
child.stdout.recv_partition('\n')   # foo1
child.stdin.send('bar1\n')
child.stdout.recv_partition('\n')   # would hang forever
child.stdin.send('foo2\n')
child.stdout.recv_partition('\n')   # foo2

child.terminate()
child.done.recv()

Child

Child.stdin

A Sender which allows you to send data to the child’s stdin.

Child.stdout

A Stream recver which allows you to receive data from the child’s stdout.

Child.stderr

A Stream recver which allows you to receive data from the child’s stderr. Only available if stderrtoout is False.

Child.done

A State that will be set once the child terminates.

Child.terminate()

Sends the child a SIGTERM.

Child.signal(signum)

Sends the child signum.

TCP

Hub.tcp.listen(port=0, host='127.0.0.1')

Listens for TCP connections on host and port. If port is 0, it will listen on a randomly available port. Returns a Recver which dispenses TCP connections:

h = vanilla.Hub()

server = h.tcp.listen()

@server.consume
def echo(conn):
    for date in conn.recver:
        conn.send('Echo: ' + data)

The Recver returned has an additional attribute port which is the port that was bound to.

Hub.tcp.connect(port, host='127.0.0.1')

Creates a TCP connection to host and port and returns a Pair of a Sender and Stream receiver.

HTTP

HTTPServer

Hub.http.listen(port=0, host='127.0.0.1')
  • Listens for HTTP connections on host and port. If port is 0, it will listen on a randomly available port.
  • Returns a Recver which dispenses HTTP connections.
  • These HTTP connections are a Recver which dispense HTTPRequest. Note that if this is a Keep-Alive connection, it can dispense more than one HTTPRequest.

An example server:

import vanilla

h = vanilla.Hub()

def handle_connection(conn):
    for request in conn:
        request.reply(vanilla.http.Status(200), {}, "Hello")

server = h.http.listen(8080)

for conn in server:
    h.spawn(handle_connection, conn)

HTTPRequest

A HTTP Request is a namedtuple with the following ordered items / attributes:

Request.method

The HTTP request method e.g. ‘GET’, ‘POST’, ‘PUT’, ‘DELETE’, ...

Request.path

The path requested

Request.version

The HTTP version of the request

Request.headers

A dictionary like interface to HTTP request headers. Keys are case insensitive.

Request.body

A Recver which yields the request’s body. If the Transfer-Encoding is chunked the entire body could be yielded over a period of time with successive receives.

A HTTP Request also has three methods:

Request.consume()

Blocks until the entire request body has been received and returns it as a single string.

Request.json()

Convenience to consume the entire request body, and json decode it.

Request.form

A convenience to access form url encoded data as a dictionary. The form data is available as a key, value mappings. If a key is in the form more than once, only it’s last value will be available.

Request.form_multi

A convenience to access form url encoded data as a dictionary. The form data is available as a key, list of values mappings.

Request.reply(status, headers, body)

Initiates a reply to this HTTP request. status is a tuple of (HTTP Code, message), for example (200, ‘OK’). headers is a dictionary like interface to the HTTP headers to respond with. body can either be a string, in which case this response will be completed immediately. Otherwise, body can be a Recver which can have a series of strings sent, before being closed to indicated the response has completed. There’s no need to set Content-Length or Transfer-Encoding in the response headers, this will be inferred depending on whether body is a string or a Recver.

Request.upgrade()

If this is a request to establish a `Websocket`_, the server can call this method to upgrade this connection. This method returns a `Websocket`_, and this connection can no longer be used as a HTTP connection.

Hub.http.connect(port, host='127.0.0.1')

Establishes a HTTPClient connection to host and port and requests a HTTP client connection. Note that if supported, this connection will be a Keep-Alive and multiple requests can be made over the same connection.

An example server with chunked transfer:

import vanilla

h = vanilla.Hub()


serve = h.http.listen()

client = h.http.connect('http://localhost:%s' % serve.port)
response = client.get('/')


conn = serve.recv()  # recvs http connection
request = conn.recv()  # recvs http request + headers

sender, recver = h.pipe()
request.reply(vanilla.http.Status(200), {}, recver)

response = response.recv()  # recvs the response + headers, but not the body

sender.send('oh')
print response.body.recv()  # 'oh'

sender.send('hai')
print response.body.recv()  # 'hai'

sender.close()
print response.body.recv()  # raises Closed

HTTPClient

class vanilla.http.HTTPClient
delete(path='/', params=None, headers=None)
get(path='/', params=None, headers=None, auth=None)
post(path='/', params=None, headers=None, data='')
put(path='/', params=None, headers=None, data='')
request(method, path='/', params=None, headers=None, data=None)
websocket(path='/', params=None, headers=None)

Message Passing Primitives

Pair

class vanilla.message.Pair

A Pair is a tuple of a Sender and a Recver. The pair only share a weakref to each other so unless a reference is kept to both ends, the remaining end will be abandoned and the entire pair will be garbage collected.

It’s possible to call methods directly on the Pair tuple. A common pattern though is to split up the tuple with the Sender used in one closure and the Recver in another:

# create a Pipe Pair
p = h.pipe()

# call the Pair tuple directly
h.spawn(p.send, '1')
p.recv() # returns '1'

# split the sender and recver
sender, recver = p
sender.send('2')
recver.recv() # returns '2'
close()

Closes both ends of this Pair

consume(f)

Consumes this Pair with f; see vanilla.core.Recver.consume().

Returns only our Sender

map(f)

Maps this Pair with f‘; see vanilla.core.Recver.map()

Returns a new Pair of our current Sender and the mapped target’s Recver.

pipe(target)

Pipes are Recver to the target; see vanilla.core.Recver.pipe()

Returns a new Pair of our current Sender and the target’s Recver.

recv(timeout=-1)

Receive and item from our Sender. This will block unless our Sender is ready, either forever or unless timeout milliseconds.

send(item, timeout=-1)

Send an item on this pair. This will block unless our Rever is ready, either forever or until timeout milliseconds.

Sender

class vanilla.message.Sender(pipe)
send(item, timeout=-1)

Send an item on this pair. This will block unless our Rever is ready, either forever or until timeout milliseconds.

Recver

class vanilla.message.Recver(pipe)
consume(f)

Creates a sink which consumes all values for this Recver. f is a callable which takes a single argument. All values sent on this Recver’s Sender will be passed to f for processing. Unlike map however consume terminates this chain:

sender, recver = h.pipe

@recver.consume
def _(data):
    logging.info(data)

sender.send('Hello') # logs 'Hello'
map(f)

f is a callable that takes a single argument. All values sent on this Recver’s Sender will be passed to f to be transformed:

def double(i):
    return i * 2

sender, recver = h.pipe()
recver.map(double)

h.spawn(sender.send, 2)
recver.recv() # returns 4
pipe(target)

Pipes this Recver to target. target can either be Sender (or Pair) or a callable.

If target is a Sender, the two pairs are rewired so that sending on this Recver’s Sender will now be directed to the target’s Recver:

sender1, recver1 = h.pipe()
sender2, recver2 = h.pipe()

recver1.pipe(sender2)

h.spawn(sender1.send, 'foo')
recver2.recv() # returns 'foo'

If target is a callable, a new Pipe will be created. This Recver and the new Pipe’s Sender are passed to the target callable to act as upstream and downstream. The callable can then do any processing desired including filtering, mapping and duplicating packets:

sender, recver = h.pipe()

def pipeline(upstream, downstream):
    for i in upstream:
        if i % 2:
            downstream.send(i*2)

recver = recver.pipe(pipeline)

@h.spawn
def _():
    for i in xrange(10):
        sender.send(i)

recver.recv() # returns 2 (0 is filtered, so 1*2)
recver.recv() # returns 6 (2 is filtered, so 3*2)
recv(timeout=-1)

Receive and item from our Sender. This will block unless our Sender is ready, either forever or unless timeout milliseconds.

Pipe

class vanilla.message.Pipe
         +------+
send --> | Pipe | --> recv
         +------+

The most basic primitive is the Pipe. A Pipe has exactly one sender and exactly one recver. A Pipe has no buffering, so send and recvs will block until there is a corresponding send or recv.

For example, the following code will deadlock as the sender will block, preventing the recv from ever being called:

h = vanilla.Hub()
p = h.pipe()
p.send(1)     # deadlock
p.recv()

The following is OK as the send is spawned to a background green thread:

h = vanilla.Hub()
p = h.pipe()
h.spawn(p.send, 1)
p.recv()      # returns 1

Dealer

class vanilla.message.Dealer
         +--------+  /--> recv
send --> | Dealer | -+
         +--------+  \--> recv

A Dealer has exactly one sender but can have many recvers. It has no buffer, so sends and recvs block until a corresponding green thread is ready. Sends are round robined to waiting recvers on a first come first serve basis:

h = vanilla.Hub()
d = h.dealer()
# d.send(1)      # this would deadlock as there are no recvers
h.spawn(lambda: 'recv 1: %s' % d.recv())
h.spawn(lambda: 'recv 2: %s' % d.recv())
d.send(1)
d.send(2)

Router

class vanilla.message.Router
send --\    +--------+
        +-> | Router | --> recv
send --/    +--------+

A Router has exactly one recver but can have many senders. It has no buffer, so sends and recvs block until a corresponding thread is ready. Sends are accepted on a first come first servce basis:

h = vanilla.Hub()
r = h.router()
h.spawn(r.send, 3)
h.spawn(r.send, 2)
h.spawn(r.send, 1)
r.recv() # returns 3
r.recv() # returns 2
r.recv() # returns 1

Queue

message.Queue(hub, size)
         +----------+
send --> |  Queue   |
         | (buffer) | --> recv
         +----------+

A Queue may also only have exactly one sender and recver. A Queue however has a fifo buffer of a custom size. Sends to the Queue won’t block until the buffer becomes full:

h = vanilla.Hub()
q = h.queue(1)
q.send(1)      # safe from deadlock
# q.send(1)    # this would deadlock however as the queue only has a
               # buffer size of 1
q.recv()       # returns 1

Stream

class vanilla.message.Stream

A Stream is a specialized Recver which provides additional methods for working with streaming sources, particularly sockets and file descriptors.

class Stream.Recver(pipe)
recv_line(timeout=-1)

Short hand to receive a line from the stream. The line seperator defaults to ‘n’ but can be changed by setting recver.sep on this recver.

recv_n(n, timeout=-1)

Blocks until n bytes of data are available, and then returns them.

recv_partition(sep, timeout=-1)

Blocks until the seperator sep is seen in the stream, and then returns all data received until sep.

State

message.State()

State is a specialized Pipe which maintains the state of a previous send. Sends never block, but modify the object’s current state.

When the current state is unset, a recv will block until the state is set.

If state is set, recvs never block as well, and return the current state.

State can cleared using the clear method:

s = h.state()

s.recv()  # this will deadlock as state is not set

s.send(3) # sets state, note the send doesn't block even though there
          # is no recver
s.recv()  # 3
s.recv()  # 3 - note subsequent recvs don't block

s.clear() # clear the current state
s.recv()  # this will deadlock as state is not set