API Documentation¶
Hub¶
-
class
vanilla.core.
Hub
¶ A Vanilla Hub is a handle to a self contained world of interwoven coroutines. It includes an event loop which is responsibile for scheduling which green thread should have context. Unlike most asynchronous libraries this Hub is explicit and must be passed to coroutines that need to interact with it. This is particularly nice for testing, as it makes it clear what’s going on, and other tests can’t inadvertently effect each other.
Concurrency¶
-
Hub.
spawn
(f, *a)¶ Schedules a new green thread to be created to run f(*a) on the next available tick:
def echo(pipe, s): pipe.send(s) p = h.pipe() h.spawn(echo, p, 'hi') p.recv() # returns 'hi'
-
Hub.
spawn_later
(ms, f, *a)¶ Spawns a callable on a new green thread, scheduled for ms milliseconds in the future:
def echo(pipe, s): pipe.send(s) p = h.pipe() h.spawn_later(50, echo, p, 'hi') p.recv() # returns 'hi' after 50ms
-
Hub.
sleep
(ms=1)¶ Pauses the current green thread for ms milliseconds:
p = h.pipe() @h.spawn def _(): p.send('1') h.sleep(50) p.send('2') p.recv() # returns '1' p.recv() # returns '2' after 50 ms
Message Passing¶
-
Hub.
select
(ends, timeout=-1)¶ An end is either a Sender or a Recver. select takes a list of ends and blocks until one of them is ready. The select will block either forever, or until the optional timeout is reached. timeout is in milliseconds.
It returns of tuple of (end, value) where end is the end that has become ready. If the end is a Recver, then it will have already been recv‘d on which will be available as value. For Sender‘s however the sender is still in a ready state waiting for a send and value is None.
For example, the following is an appliance that takes an upstream Recver and a downstream Sender. Sending to its upstream will alter it’s current state. This state can be read at anytime by receiving on its downstream:
def state(h, upstream, downstream): current = None while True: end, value = h.select([upstream, downstream]) if end == upstream: current = value elif end == downstream: end.send(current)
-
Hub.
channel
(size=-1)¶ send --\ +---------+ /--> recv +-> | Channel | -+ send --/ +---------+ \--> recv
A Channel can have many senders and many recvers. By default it is unbuffered, but you can create buffered Channels by specifying a size. They’re structurally equivalent to channels in Go. It’s implementation is literally a Router piped to a Dealer, with an optional Queue in between.
Pipe Conveniences¶
-
Hub.
producer
(f)¶ Convenience to create a Pipe. f is a callable that takes the Sender end of this Pipe and the corresponding Recver is returned:
def counter(sender): i = 0 while True: i += 1 sender.send(i) recver = h.producer(counter) recver.recv() # returns 1 recver.recv() # returns 2
-
Hub.
pulse
(ms, item=True)¶ Convenience to create a Pipe that will have item sent on it every ms milliseconds. The Recver end of the Pipe is returned.
Note that since sends to a Pipe block until the Recver is ready, the pulses will be throttled if the Recver is unable to keep up:
recver = h.pulse(500) for _ in recver: log.info('hello') # logs 'hello' every half a second
Thread¶
-
Hub.thread.
spawn
(f, *a)¶
Example usage:
def ticker(h, n):
import time
while true:
h.parent.send(time.time())
time.sleep(n)
h = vanilla.Hub()
child = h.thread.spawn(ticker, 1)
while true:
child.recv()
-
Hub.thread.
call
(f, *a)¶ - Spawns a one-off thread to run callable f with arguments a
- Returns a Recver which can be recv’d on to get f‘s result
Example usage:
def add(a, b):
return a + b
h = vanilla.Hub()
h.thread.call(add, 2, 3).recv() # 5
-
Hub.thread.
pool
(size)¶ - Returns a reusable pool of size threads
Pool¶
-
Thread.Pool.
call
(f, *a)¶ - Runs callable f with arguments a on one of the pool’s threads
- Returns a Recver which can be recv’d on to get f‘s result
Example usage:
h = vanilla.Hub()
def sleeper(x):
time.sleep(x)
return x
p = h.thread.pool(2)
gather = h.router()
p.call(sleeper, 0.2).pipe(gather)
p.call(sleeper, 0.1).pipe(gather)
p.call(sleeper, 0.05).pipe(gather)
gather.recv() # 0.1
gather.recv() # 0.05
gather.recv() # 0.2
-
Thread.Pool.
wrap
(ob)¶ - Wraps ob with a proxy which will delegate method calls on ob to run on the pool’s threads
- Each method call on the proxy returns a Recver which can be recv’d on the get the calls result
Example usage:
h = vanilla.Hub()
p = h.thread.pool(2)
db = pymongo.MongoClient()['database']
db = p.wrap(db)
response = db.posts.find_one({"author": "Mike"})
response.recv()
Process¶
-
Hub.process.
execv
(args, env=None, stderrtoout=False)¶ - Forks a child process using args.
- env is an optional dictionary of environment variables which will replace the parent’s environment for the child process. If not supplied the child will have access to the parent’s environment.
- if stderrtoout is True the child’s stderr will be redirected to its stdout.
- A Child object is return to interact with the child process.
Example usage:
h = vanilla.Hub()
child = h.process.execv(
['/usr/bin/env', 'grep', '--line-buffered', 'foo'])
child.stdin.send('foo1\n')
child.stdout.recv_partition('\n') # foo1
child.stdin.send('bar1\n')
child.stdout.recv_partition('\n') # would hang forever
child.stdin.send('foo2\n')
child.stdout.recv_partition('\n') # foo2
child.terminate()
child.done.recv()
TCP¶
-
Hub.tcp.
listen
(port=0, host='127.0.0.1')¶ Listens for TCP connections on host and port. If port is 0, it will listen on a randomly available port. Returns a Recver which dispenses TCP connections:
h = vanilla.Hub() server = h.tcp.listen() @server.consume def echo(conn): for date in conn.recver: conn.send('Echo: ' + data)
The Recver returned has an additional attribute port which is the port that was bound to.
HTTP¶
HTTPServer¶
-
Hub.http.
listen
(port=0, host='127.0.0.1')¶ - Listens for HTTP connections on host and port. If port is 0, it will listen on a randomly available port.
- Returns a Recver which dispenses HTTP connections.
- These HTTP connections are a Recver which dispense HTTPRequest. Note that if this is a Keep-Alive connection, it can dispense more than one HTTPRequest.
An example server:
import vanilla
h = vanilla.Hub()
def handle_connection(conn):
for request in conn:
request.reply(vanilla.http.Status(200), {}, "Hello")
server = h.http.listen(8080)
for conn in server:
h.spawn(handle_connection, conn)
HTTPRequest¶
A HTTP Request is a namedtuple with the following ordered items / attributes:
-
Request.
method
¶ The HTTP request method e.g. ‘GET’, ‘POST’, ‘PUT’, ‘DELETE’, ...
-
Request.
path
¶ The path requested
-
Request.
version
¶ The HTTP version of the request
-
Request.
headers
¶ A dictionary like interface to HTTP request headers. Keys are case insensitive.
-
Request.
body
¶ A Recver which yields the request’s body. If the Transfer-Encoding is chunked the entire body could be yielded over a period of time with successive receives.
A HTTP Request also has three methods:
-
Request.
consume
()¶ Blocks until the entire request body has been received and returns it as a single string.
-
Request.
json
()¶ Convenience to consume the entire request body, and json decode it.
-
Request.
form
¶ A convenience to access form url encoded data as a dictionary. The form data is available as a key, value mappings. If a key is in the form more than once, only it’s last value will be available.
-
Request.
form_multi
¶ A convenience to access form url encoded data as a dictionary. The form data is available as a key, list of values mappings.
-
Request.
reply
(status, headers, body)¶ Initiates a reply to this HTTP request. status is a tuple of (HTTP Code, message), for example (200, ‘OK’). headers is a dictionary like interface to the HTTP headers to respond with. body can either be a string, in which case this response will be completed immediately. Otherwise, body can be a Recver which can have a series of strings sent, before being closed to indicated the response has completed. There’s no need to set Content-Length or Transfer-Encoding in the response headers, this will be inferred depending on whether body is a string or a Recver.
-
Request.
upgrade
()¶ If this is a request to establish a `Websocket`_, the server can call this method to upgrade this connection. This method returns a `Websocket`_, and this connection can no longer be used as a HTTP connection.
-
Hub.http.
connect
(port, host='127.0.0.1')¶ Establishes a HTTPClient connection to host and port and requests a HTTP client connection. Note that if supported, this connection will be a Keep-Alive and multiple requests can be made over the same connection.
An example server with chunked transfer:
import vanilla
h = vanilla.Hub()
serve = h.http.listen()
client = h.http.connect('http://localhost:%s' % serve.port)
response = client.get('/')
conn = serve.recv() # recvs http connection
request = conn.recv() # recvs http request + headers
sender, recver = h.pipe()
request.reply(vanilla.http.Status(200), {}, recver)
response = response.recv() # recvs the response + headers, but not the body
sender.send('oh')
print response.body.recv() # 'oh'
sender.send('hai')
print response.body.recv() # 'hai'
sender.close()
print response.body.recv() # raises Closed
HTTPClient¶
-
class
vanilla.http.
HTTPClient
¶ -
delete
(path='/', params=None, headers=None)¶
-
get
(path='/', params=None, headers=None, auth=None)¶
-
post
(path='/', params=None, headers=None, data='')¶
-
put
(path='/', params=None, headers=None, data='')¶
-
request
(method, path='/', params=None, headers=None, data=None)¶
-
websocket
(path='/', params=None, headers=None)¶
-
Message Passing Primitives¶
Pair¶
-
class
vanilla.message.
Pair
¶ A Pair is a tuple of a Sender and a Recver. The pair only share a weakref to each other so unless a reference is kept to both ends, the remaining end will be abandoned and the entire pair will be garbage collected.
It’s possible to call methods directly on the Pair tuple. A common pattern though is to split up the tuple with the Sender used in one closure and the Recver in another:
# create a Pipe Pair p = h.pipe() # call the Pair tuple directly h.spawn(p.send, '1') p.recv() # returns '1' # split the sender and recver sender, recver = p sender.send('2') recver.recv() # returns '2'
-
close
()¶ Closes both ends of this Pair
-
consume
(f)¶ Consumes this Pair with f; see
vanilla.core.Recver.consume()
.Returns only our Sender
-
map
(f)¶ Maps this Pair with f‘; see
vanilla.core.Recver.map()
Returns a new Pair of our current Sender and the mapped target’s Recver.
-
pipe
(target)¶ Pipes are Recver to the target; see
vanilla.core.Recver.pipe()
Returns a new Pair of our current Sender and the target’s Recver.
-
recv
(timeout=-1)¶ Receive and item from our Sender. This will block unless our Sender is ready, either forever or unless timeout milliseconds.
-
send
(item, timeout=-1)¶ Send an item on this pair. This will block unless our Rever is ready, either forever or until timeout milliseconds.
-
Sender¶
Recver¶
-
class
vanilla.message.
Recver
(pipe)¶ -
consume
(f)¶ Creates a sink which consumes all values for this Recver. f is a callable which takes a single argument. All values sent on this Recver’s Sender will be passed to f for processing. Unlike map however consume terminates this chain:
sender, recver = h.pipe @recver.consume def _(data): logging.info(data) sender.send('Hello') # logs 'Hello'
-
map
(f)¶ f is a callable that takes a single argument. All values sent on this Recver’s Sender will be passed to f to be transformed:
def double(i): return i * 2 sender, recver = h.pipe() recver.map(double) h.spawn(sender.send, 2) recver.recv() # returns 4
-
pipe
(target)¶ Pipes this Recver to target. target can either be Sender (or Pair) or a callable.
If target is a Sender, the two pairs are rewired so that sending on this Recver’s Sender will now be directed to the target’s Recver:
sender1, recver1 = h.pipe() sender2, recver2 = h.pipe() recver1.pipe(sender2) h.spawn(sender1.send, 'foo') recver2.recv() # returns 'foo'
If target is a callable, a new Pipe will be created. This Recver and the new Pipe’s Sender are passed to the target callable to act as upstream and downstream. The callable can then do any processing desired including filtering, mapping and duplicating packets:
sender, recver = h.pipe() def pipeline(upstream, downstream): for i in upstream: if i % 2: downstream.send(i*2) recver = recver.pipe(pipeline) @h.spawn def _(): for i in xrange(10): sender.send(i) recver.recv() # returns 2 (0 is filtered, so 1*2) recver.recv() # returns 6 (2 is filtered, so 3*2)
-
recv
(timeout=-1)¶ Receive and item from our Sender. This will block unless our Sender is ready, either forever or unless timeout milliseconds.
-
Pipe¶
-
class
vanilla.message.
Pipe
¶ +------+ send --> | Pipe | --> recv +------+
The most basic primitive is the Pipe. A Pipe has exactly one sender and exactly one recver. A Pipe has no buffering, so send and recvs will block until there is a corresponding send or recv.
For example, the following code will deadlock as the sender will block, preventing the recv from ever being called:
h = vanilla.Hub() p = h.pipe() p.send(1) # deadlock p.recv()
The following is OK as the send is spawned to a background green thread:
h = vanilla.Hub() p = h.pipe() h.spawn(p.send, 1) p.recv() # returns 1
Dealer¶
-
class
vanilla.message.
Dealer
¶ +--------+ /--> recv send --> | Dealer | -+ +--------+ \--> recv
A Dealer has exactly one sender but can have many recvers. It has no buffer, so sends and recvs block until a corresponding green thread is ready. Sends are round robined to waiting recvers on a first come first serve basis:
h = vanilla.Hub() d = h.dealer() # d.send(1) # this would deadlock as there are no recvers h.spawn(lambda: 'recv 1: %s' % d.recv()) h.spawn(lambda: 'recv 2: %s' % d.recv()) d.send(1) d.send(2)
Router¶
-
class
vanilla.message.
Router
¶ send --\ +--------+ +-> | Router | --> recv send --/ +--------+
A Router has exactly one recver but can have many senders. It has no buffer, so sends and recvs block until a corresponding thread is ready. Sends are accepted on a first come first servce basis:
h = vanilla.Hub() r = h.router() h.spawn(r.send, 3) h.spawn(r.send, 2) h.spawn(r.send, 1) r.recv() # returns 3 r.recv() # returns 2 r.recv() # returns 1
Queue¶
-
message.
Queue
(hub, size)¶ +----------+ send --> | Queue | | (buffer) | --> recv +----------+
A Queue may also only have exactly one sender and recver. A Queue however has a fifo buffer of a custom size. Sends to the Queue won’t block until the buffer becomes full:
h = vanilla.Hub() q = h.queue(1) q.send(1) # safe from deadlock # q.send(1) # this would deadlock however as the queue only has a # buffer size of 1 q.recv() # returns 1
Stream¶
-
class
vanilla.message.
Stream
¶ A Stream is a specialized Recver which provides additional methods for working with streaming sources, particularly sockets and file descriptors.
-
class
Stream.
Recver
(pipe)¶ -
recv_line
(timeout=-1)¶ Short hand to receive a line from the stream. The line seperator defaults to ‘n’ but can be changed by setting recver.sep on this recver.
-
recv_n
(n, timeout=-1)¶ Blocks until n bytes of data are available, and then returns them.
-
recv_partition
(sep, timeout=-1)¶ Blocks until the seperator sep is seen in the stream, and then returns all data received until sep.
-
State¶
-
message.
State
()¶ State is a specialized Pipe which maintains the state of a previous send. Sends never block, but modify the object’s current state.
When the current state is unset, a recv will block until the state is set.
If state is set, recvs never block as well, and return the current state.
State can cleared using the clear method:
s = h.state() s.recv() # this will deadlock as state is not set s.send(3) # sets state, note the send doesn't block even though there # is no recver s.recv() # 3 s.recv() # 3 - note subsequent recvs don't block s.clear() # clear the current state s.recv() # this will deadlock as state is not set