Source code for gaffer.webhooks

# -*- coding: utf-8 -
# This file is part of gaffer. See the NOTICE for more information.
Webhooks allow to register an url to a specific event (or alls) and the
event will be posted on this URL. Each events can triger a post on a
given url.

for example to listen all create events on you
can add this line in the webhooks sections of the gaffer setting file::

    create = you

Or programatically::

    from gaffer.manager import Manager
    from gaffer.webhooks import WebHooks
    hooks = [("create", " you ")
    webhooks = WebHooks(hooks=hooks)

    manager = Manager()

This gaffer application is started like other applications in the
manager. All :doc:`events` are supported.

The :mod:`webhooks` Module

from functools import partial
import json

from tornado.httpclient import HTTPError

from .httpclient import HTTPClient
from .sync import atomic_read, increment, decrement

[docs]class WebHooks(object): """ webhook app """ def __init__(self, hooks=[]): = {} self._refcount = 0 = False # initialize hooks for event, url in hooks: if not event in[event] = set()[event].add(url) self.incref()
[docs] def start(self, loop, manager): """ start the webhook app """ self.loop = loop self.manager = manager self.maybe_start_monitor()
[docs] def stop(self): """ stop the webhook app, stop monitoring to events """ if self._stop_monitor()
[docs] def restart(self): self._stop_monitor() self._start_monitor()
[docs] def close(self): self.stop() = [] self._refcount = 0
@property def refcount(self): return atomic_read(self._refcount)
[docs] def register_hook(self, event, url): """ associate an url to an event """ if event not in[event] = set()[event].add(url) self.incref() self.maybe_start_monitor()
[docs] def unregister_hook(self, event, url): """ unregister an url for this event """ if event not in return # remove an url from registered hooks urls =[event] urls.remove(url)[event] = urls self.decref() self.maybe_stop_monitor()
[docs] def maybe_start_monitor(self): if self.refcount and not self._start_monitor()
[docs] def maybe_stop_monitor(self): if self.refcount > 0 or not return self._stop_monitor()
[docs] def incref(self): self._refcount = increment(self._refcount)
[docs] def decref(self): self._refcount = decrement(self._refcount)
def _on_event(self, event, msg): if not return urls = set() if event in urls =[event] if "." in urls = urls.union(['.']) if not urls: return # queue the hook, it will be executed in a thread asap callback = partial(self._post_hook, msg, urls) self.loop.queue_work(callback) def _post_hook(self, msg, urls): body = json.dumps(msg) headers = { "Content-Length": str(len(body)), "Content-Type": "application/json" } client = HTTPClient() for url in urls: try: client.fetch(url, method="POST", headers=headers, body=body) except HTTPError: # for now we ignore all http errors. pass def _start_monitor(self):".", self._on_event) = True def _stop_monitor(self):".", self._on_event) = False