Source code for gaffer.events

# -*- coding: utf-8 -
#
# This file is part of gaffer. See the NOTICE for more information.

"""
Many events happend in gaffer.


Manager events
--------------

Manager events have the following format::

    {
      "event": "<nameofevent">>,
      "name": "<templatename>"
    }

- **create**: a process template is created
- **start**: a process template start to launch OS processes
- **stop**: all OS processes of a process template are stopped
- **restart**: all processes of a process template are restarted
- **update**: a process template is updated
- **delete**: a process template is deleted
- **spawn**: a new process is spawned
- **reap**: a process is reaped
- **exit**: a process exited
- **stop_pid**: a process has been stopped


Processes events
----------------

All processes' events are prefixed by ``proc.<name>`` to make the pattern
matching easier, where ``<name>`` is the name of the process template

Events are:

- **proc.<name>.start** : the template <name> start to spawn processes
- **proc.<name>.spawn** : one OS process using the process <name>
  template is spawned. Message is::

    {
      "event": "proc.<name>.spawn">>,
      "name": "<name>",
      "detach": false,
      "pid": int
    }


  .. note::

    pid is the internal pid
- **proc.<name>.exit**: one OS process of the <name> template has
  exited. Message is::

    {
      "event": "proc.<name>.exit">>,
      "name": "<name>",
      "pid": int,
      "exit_code": int,
      "term_signal": int
    }

- **proc.<name>.stop**: all OS processes in the template <name> are
  stopped.
- **proc.<name>.stop_pid**: One OS process of the template <name> is
  stopped. Message is::

    {
      "event": "proc.<name>.stop_pid">>,
      "name": "<name>",
      "pid": int
    }

- **proc.<name>.stop_pid**: One OS process of the template <name> is
  reapped. Message is::

    {
      "event": "proc.<name>.reap">>,
      "name": "<name>",
      "pid": int
    }


The :mod:`events` Module
------------------------


This module offeres a common way to susbscribe and emit events. All
events in gaffer are using.

Example of usage
++++++++++++++++

::

        event = EventEmitter()

        # subscribe to all events with the pattern a.*
        event.subscribe("a", subscriber)

        # subscribe to all events "a.b"
        event.subscribe("a.b", subscriber2)

        # subscribe to all events (wildcard)
        event.subscribe(".", subscriber3)

        # publish an event
        event.publish("a.b", arg, namedarg=val)

In this example all subscribers will be notified of the event. A
subscriber is just a callable *(event, *args, **kwargs)*

Classes
-------

"""

from collections import deque
import logging

import pyuv


[docs]class EventEmitter(object): """ Many events happend in gaffer. For example a process will emist the events "start", "stop", "exit". This object offer a common interface to all events emitters """ def __init__(self, loop, max_size=10000): self.loop = loop self._events = {} self._wildcards = set() self._queue = deque(maxlen=max_size) self._wqueue = deque(maxlen=max_size) self._event_dispatcher = pyuv.Prepare(self.loop) self._event_dispatcher.start(self._send) self._event_dispatcher.unref() self._spinner = pyuv.Idle(self.loop)
[docs] def close(self): """ close the event This function clear the list of listeners and stop all idle callback """ self._wqueue.clear() self._queue.clear() self._events = {} self._wildcards = set() # close handlers if not self._event_dispatcher.closed: self._event_dispatcher.close() if not self._spinner.closed: self._spinner.close()
[docs] def publish(self, evtype, *args, **kwargs): """ emit an event **evtype** The event will be emitted asynchronously so we don't block here """ if "." in evtype: parts = evtype.split(".") self._queue.append((parts[0], evtype, args, kwargs)) key = [] for part in parts: key.append(part) self._queue.append((".".join(key), evtype, args, kwargs)) else: self._queue.append((evtype, evtype, args, kwargs)) # emit the event for wildcards events self._wqueue.append((evtype, args, kwargs)) # send the event for later self._dispatch_event()
[docs] def subscribe(self, evtype, listener, once=False): """ subcribe to an event """ if evtype == ".": # wildcard self._wildcards.add((once, listener)) return if evtype.endswith("."): evtype = evtype[:-1] if evtype not in self._events: self._events[evtype] = set() self._events[evtype].add((once, listener))
[docs] def subscribe_once(self, evtype, listener): """ subscribe to event once. Once the evennt is triggered we remove ourself from the list of listenerrs """ self.subscribe(evtype, listener, True)
[docs] def unsubscribe(self, evtype, listener, once=False): """ unsubscribe from an event""" if evtype not in self._events: return try: self._events[evtype].remove((once, listener)) except KeyError: pass
[docs] def unsubscribe_once(self, evtype, listener): self.unsubscribe(evtype, listener, True)
[docs] def unsubscribe_all(self, events=[]): """ unsubscribe all listeners from a list of events """ for evtype in events: if evtype == ".": self._wildcards = set() else: self._events[evtype] = set()
### private methods def _dispatch_event(self): self._spinner.start(lambda h: None) def _send(self, handle): lwqueue = len(self._wqueue) lqueue = len(self._queue) for i in range(lwqueue): evtype, args, kwargs = self._wqueue.popleft() if self._wildcards: self._wildcards = self._send_listeners(evtype, self._wildcards.copy(), *args, **kwargs) for i in range(lqueue): pattern, evtype, args, kwargs = self._queue.popleft() # emit the event to all listeners if pattern in self._events: self._events[pattern] = self._send_listeners(evtype, self._events[pattern].copy(), *args, **kwargs) if not self._spinner.closed: self._spinner.stop() def _send_listeners(self, evtype, listeners, *args, **kwargs): to_remove = [] for once, listener in listeners: try: listener(evtype, *args, **kwargs) except Exception: # we ignore all exception logging.error('Uncaught exception', exc_info=True) to_remove.append(listener) if once: # once event to_remove.append(listener) if to_remove: for listener in to_remove: try: listeners.remove((True, listener)) except KeyError: pass return listeners