Source code for gaffer.tornado_pyuv

# -*- coding: utf-8 -
# Copyright (C) 2011 by Saúl Ibarra Corretgé
# This file is part of gaffer. See the NOTICE for more information.

import pyuv

import datetime
import errno
import logging
import os
import time

    import _thread as thread
except ImportError:
    import thread

from collections import deque
import six
from tornado import ioloop, stack_context

[docs]class Waker(object): def __init__(self, loop): self._async = pyuv.Async(loop, lambda x: None) self._async.unref()
[docs] def wake(self): self._async.send()
[docs]class IOLoop(object): NONE = ioloop.IOLoop.NONE READ = ioloop.IOLoop.READ WRITE = ioloop.IOLoop.WRITE ERROR = ioloop.IOLoop.ERROR _instance_lock = thread.allocate_lock() def __init__(self, impl=None, _loop=None): if impl is not None: raise RuntimeError('When using pyuv the poller implementation cannot be specifiedi') self._loop = _loop or pyuv.Loop() self._handlers = {} self._callbacks = deque() self._callback_lock = thread.allocate_lock() self._timeouts = set() self._running = False self._stopped = False self._thread_ident = None self._cb_handle = pyuv.Prepare(self._loop) self._waker = Waker(self._loop) @staticmethod
[docs] def instance(): if not hasattr(IOLoop, "_instance"): with IOLoop._instance_lock: if not hasattr(IOLoop, "_instance"): # New instance after double check IOLoop._instance = IOLoop() return IOLoop._instance
[docs] def initialized(): """Returns true if the singleton instance has been created.""" return hasattr(IOLoop, "_instance")
[docs] def install(self): """Installs this IOLoop object as the singleton instance. This is normally not necessary as `instance()` will create an IOLoop on demand, but you may want to call `install` to use a custom subclass of IOLoop. """ assert not IOLoop.initialized() IOLoop._instance = self
def _close_loop_handles(self): def cb(handle): if not handle.closed: handle.close() self._loop.walk(cb)
[docs] def close(self): for fd, (poll, callback) in self._handlers.items(): poll.close() try: os.close(fd) except Exception: logging.debug("error closing fd %s", fd, exc_info=True) self._handlers = {} # Run the loop so the close callbacks are fired and memory is freed self._loop = None
[docs] def add_handler(self, fd, handler, events): if fd in self._handlers: raise IOError("fd %d already registered" % fd) poll = pyuv.Poll(self._loop, fd) self._handlers[fd] = (poll, stack_context.wrap(handler)) poll_events = 0 if (events & IOLoop.READ): poll_events |= pyuv.UV_READABLE if (events & IOLoop.WRITE): poll_events |= pyuv.UV_WRITABLE poll.start(poll_events, self._handle_poll_events)
[docs] def update_handler(self, fd, events): poll, _ = self._handlers[fd] poll_events = 0 if (events & IOLoop.READ): poll_events |= pyuv.UV_READABLE if (events & IOLoop.WRITE): poll_events |= pyuv.UV_WRITABLE poll.start(poll_events, self._handle_poll_events)
[docs] def remove_handler(self, fd): items = self._handlers.pop(fd, None) if items is not None: items[0].close()
[docs] def set_blocking_signal_threshold(self, seconds, action): raise NotImplementedError
[docs] def set_blocking_log_threshold(self, seconds): raise NotImplementedError
[docs] def log_stack(self, signal, frame): raise NotImplementedError
[docs] def start(self): if self._stopped: self._stopped = False return self._thread_ident = thread.get_ident() self._running = True # This timer will prevent busy-looping in case there is nothing scheduled in the loop timer = pyuv.Timer(self._loop) timer.start(lambda x: None, 3600, 3600) timer.close() # reset the stopped flag so another start/stop pair can be issued self._stopped = False
[docs] def stop(self): self._running = False self._stopped = True self._loop.stop() self._waker.wake()
[docs] def running(self): """Returns true if this IOLoop is currently running.""" return self._running
[docs] def add_timeout(self, deadline, callback): timeout = _Timeout(deadline, stack_context.wrap(callback), io_loop=self) self._timeouts.add(timeout) return timeout
[docs] def remove_timeout(self, timeout): self._timeouts.remove(timeout) timer = timeout._timer if timer.stop()
[docs] def add_callback(self, callback): with self._callback_lock: was_active = self._callbacks.append(stack_context.wrap(callback)) if not was_active: self._cb_handle.start(self._prepare_cb) if not was_active or thread.get_ident() != self._thread_ident: self._waker.wake()
[docs] def handle_callback_exception(self, callback): """This method is called whenever a callback run by the IOLoop throws an exception. By default simply logs the exception as an error. Subclasses may override this method to customize reporting of exceptions. The exception itself is not passed explicitly, but is available in sys.exc_info. """ logging.error("Exception in callback %r", callback, exc_info=True)
def _run_callback(self, callback): try: callback() except Exception: self.handle_callback_exception(callback) def _handle_poll_events(self, handle, poll_events, error): events = 0 if error is not None: # Some error was detected, signal readability and writability so that the # handler gets and handles the error events |= IOLoop.READ events |= IOLoop.WRITE else: if (poll_events & pyuv.UV_READABLE): events |= IOLoop.READ if (poll_events & pyuv.UV_WRITABLE): events |= IOLoop.WRITE fd = handle.fileno() try: self._handlers[fd][1](fd, events) except (OSError, IOError) as e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %s", fd, exc_info=True) except Exception: logging.error("Exception in I/O handler for fd %s", fd, exc_info=True) def _prepare_cb(self, handle): self._cb_handle.stop() with self._callback_lock: callbacks = self._callbacks self._callbacks = deque() while callbacks: self._run_callback(callbacks.popleft())
class _Timeout(object): """An IOLoop timeout, a UNIX timestamp and a callback""" # Reduce memory overhead when there are lots of pending callbacks __slots__ = ['deadline', 'callback', 'io_loop', '_timer'] def __init__(self, deadline, callback, io_loop=None): now = time.time() if (isinstance(deadline, six.integer_types) or isinstance(deadline, float)): self.deadline = deadline elif isinstance(deadline, datetime.timedelta): self.deadline = now + _Timeout.timedelta_to_seconds(deadline) else: raise TypeError("Unsupported deadline %r" % deadline) self.callback = callback self.io_loop = io_loop or IOLoop.instance() timeout = max(self.deadline - now, 0) self._timer = pyuv.Timer(self.io_loop._loop) self._timer.start(self._timer_cb, timeout, 0.0) def _timer_cb(self, handle): self._timer.close() self._timer = None self.io_loop._timeouts.remove(self) self.io_loop._run_callback(self.callback) self.io_loop = None @staticmethod def timedelta_to_seconds(td): """Equivalent to td.total_seconds() (introduced in python 2.7).""" return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
[docs]class PeriodicCallback(object): def __init__(self, callback, callback_time, io_loop=None): self.callback = callback self.callback_time = callback_time / 1000.0 self.io_loop = io_loop or IOLoop.instance() self._timer = pyuv.Timer(self.io_loop._loop) self._running = False def _timer_cb(self, timer): try: self.callback() except Exception: logging.error("Error in periodic callback", exc_info=True)
[docs] def start(self): if self._running: return self._running = True self._timer.start(self._timer_cb, self.callback_time, self.callback_time) self._timer.repeat = self.callback_time
[docs] def stop(self): if not self._running: return self._running = False self._timer.stop()
[docs]def install(): # Patch Tornado's classes with ours ioloop.IOLoop = IOLoop ioloop._Timeout = _Timeout ioloop.PeriodicCallback = PeriodicCallback