Source code for gaffer.webhooks

# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.
"""
Webhooks allow to register an url to a specific event (or alls) and the
event will be posted on this URL. Each events can triger a post on a
given url.

for example to listen all create events on http://echohttp.com/echo you
can add this line in the webhooks sections of the gaffer setting file::

    [webhooks]
    create = http://echohttp.com/echo you

Or programatically::

    from gaffer.manager import Manager
    from gaffer.webhooks import WebHooks
    hooks = [("create", "http://echohttp.com/echo you ")
    webhooks = WebHooks(hooks=hooks)

    manager = Manager()
    manager.start(apps=[webhooks])


This gaffer application is started like other applications in the
manager. All :doc:`events` are supported.


The :mod:`webhooks` Module
--------------------------

"""
from collections import deque
import json
from threading import RLock
import time

import pyuv

from tornado.httpclient import HTTPError

from .httpclient import HTTPClient
from .sync import atomic_read, increment, decrement

[docs]class WebHooks(object): """ webhook app """ def __init__(self, hooks=[]): self.events = {} self._refcount = 0 self._active = 0 self._jobcount = 0 self._queue = deque() self._lock = RLock() # initialize hooks for event, url in hooks: if not event in self.events: self.events[event] = set() self.events[event].add(url) self.incref()
[docs] def start(self, loop, manager): """ start the webhook app """ self.loop = loop self.manager = manager self._pool = pyuv.ThreadPool(self.loop) self.maybe_start_monitor()
[docs] def stop(self): """ stop the webhook app, stop monitoring to events """ self._stop_monitor() self._queue.clear() if self.jobcount > 0: while self.jobcount > 0: time.sleep(0.01)
[docs] def restart(self): self._stop_monitor() self._start_monitor()
[docs] def close(self): self.stop() with self._lock: self.events = [] self._refcount = 0 self._queue.clear()
@property
[docs] def active(self): return atomic_read(self._active) > 0
@property
[docs] def refcount(self): return atomic_read(self._refcount)
@property
[docs] def jobcount(self): return atomic_read(self._jobcount)
[docs] def register_hook(self, event, url): """ associate an url to an event """ with self._lock: if event not in self.events: self.events[event] = set() self.events[event].add(url) self.incref() self.maybe_start_monitor()
[docs] def unregister_hook(self, event, url): """ unregister an url for this event """ with self._lock: if event not in self.events: return # remove an url from registered hooks urls = self.events[event] urls.remove(url) self.events[event] = urls self.decref() self.maybe_stop_monitor()
[docs] def maybe_start_monitor(self): if self.refcount and not self.active: self._start_monitor()
[docs] def maybe_stop_monitor(self): if self.refcount > 0 or not self.active: return self._stop_monitor()
[docs] def incref(self): self._refcount = increment(self._refcount)
[docs] def decref(self): self._refcount = decrement(self._refcount)
def _on_event(self, event, msg): if not self.active: return urls = set() if event in self.events: urls = self.events[event] if "." in self.events: urls = urls.union(self.events['.']) if not urls: return with self._lock: self._jobcount = increment(self._jobcount) self._queue.append((msg, urls)) self._pool.queue_work(self._send, self._sent) def _sent(self, res, exc): self._jobcount = decrement(self._jobcount) def _send(self): try: msg, urls = self._queue.popleft() except IndexError: return body = json.dumps(msg) headers = { "Content-Length": str(len(body)), "Content-Type": "application/json" } client = HTTPClient() for url in urls: try: client.fetch(url, method="POST", headers=headers, body=body) except HTTPError as e: # for now we ignore all http errors. pass def _start_monitor(self): with self._lock: self.manager.subscribe(".", self._on_event) self._active = increment(self._active) def _stop_monitor(self): with self._lock: self.manager.unsubscribe(".", self._on_event) self._active = decrement(self._active)

This Page