Logo Search packages:      
Sourcecode: landscape-client version File versions

reactor.py

import thread
import time
import sys
import logging
import bisect
import socket

import gobject

from twisted.test.proto_helpers import FakeDatagramTransport
from twisted.internet.defer import succeed, fail
from twisted.internet.error import DNSLookupError

from landscape.log import format_object


00017 class InvalidID(Exception):
    """Raised when an invalid ID is used with reactor.cancel_call()."""


00021 class CallHookError(Exception):
    """Raised when hooking on a reactor incorrectly."""


class EventID(object):

    def __init__(self, event_type, pair):
        self._event_type = event_type
        self._pair = pair


class EventHandlingReactorMixin(object):

    def __init__(self):
        super(EventHandlingReactorMixin, self).__init__()
        self._event_handlers = {}

    def call_on(self, event_type, handler, priority=0):
        pair = (handler, priority)

        handlers = self._event_handlers.setdefault(event_type, [])
        handlers.append(pair)
        handlers.sort(key=lambda pair: pair[1])

        return EventID(event_type, pair)

    def fire(self, event_type, *args, **kwargs):
        logging.debug("Started firing %s.", event_type)
        results = []
        for handler, priority in self._event_handlers.get(event_type, ()):
            try:
                logging.debug("Calling %s for %s with priority %d.",
                              format_object(handler), event_type, priority)
                results.append(handler(*args, **kwargs))
            except KeyboardInterrupt:
                logging.exception("Keyboard interrupt while running event "
                                  "handler %s for event type %r with "
                                  "args %r %r.", format_object(handler),
                                  event_type, args, kwargs)
                self.stop()
                raise
            except:
                logging.exception("Error running event handler %s for "
                                  "event type %r with args %r %r.",
                                  format_object(handler), event_type,
                                  args, kwargs)
        logging.debug("Finished firing %s.", event_type)
        return results

    def cancel_call(self, id):
        if type(id) is EventID:
            self._event_handlers[id._event_type].remove(id._pair)
        else:
            raise InvalidID("EventID instance expected, received %r" % id)


class ThreadedCallsReactorMixin(object):

    def __init__(self):
        super(ThreadedCallsReactorMixin, self).__init__()
        self._threaded_callbacks = []

    def call_in_main(self, f, *args, **kwargs):
        self._threaded_callbacks.append(lambda: f(*args, **kwargs))

    def call_in_thread(self, callback, errback, f, *args, **kwargs):
        thread.start_new_thread(self._in_thread,
                                (callback, errback, f, args, kwargs))

    def _in_thread(self, callback, errback, f, args, kwargs):
        try:
            result = f(*args, **kwargs)
        except Exception, e:
            exc_info = sys.exc_info()
            if errback is None:
                self.call_in_main(logging.error, e, exc_info=exc_info)
            else:
                self.call_in_main(errback, *exc_info)
        else:
            if callback:
                self.call_in_main(callback, result)

    def _run_threaded_callbacks(self):
        while self._threaded_callbacks:
            try:
                self._threaded_callbacks.pop(0)()
            except Exception, e:
                logging.exception(e)

    def _hook_threaded_callbacks(self):
        id = self.call_every(0.5, self._run_threaded_callbacks)
        self._run_threaded_callbacks_id = id

    def _unhook_threaded_callbacks(self):
        self.cancel_call(self._run_threaded_callbacks_id)


class ReactorID(object):

    def __init__(self, timeout):
        self._timeout = timeout


class Reactor(EventHandlingReactorMixin,
              ThreadedCallsReactorMixin):

    def __init__(self):
        super(Reactor, self).__init__()
        self._context = gobject.MainContext()
        self._mainloop = gobject.MainLoop(context=self._context)

    def call_later(self, timeout, function, *args, **kwargs):
        def fake_function():
            function(*args, **kwargs)
            return False
        timeout = gobject.Timeout(int(timeout*1000))
        timeout.set_callback(fake_function)
        timeout.attach(self._context)
        return ReactorID(timeout)

    def cancel_call(self, id):
        if type(id) is ReactorID:
            id._timeout.destroy()
        else:
            super(Reactor, self).cancel_call(id)

    def call_every(self, timeout, function, *args, **kwargs):
        def fake_function():
            function(*args, **kwargs)
            return True
        timeout = gobject.Timeout(int(timeout*1000))
        timeout.set_callback(fake_function)
        timeout.attach(self._context)
        return ReactorID(timeout)

    def run(self):
        self.fire("run")
        self._hook_threaded_callbacks()
        self._mainloop.run()
        self._unhook_threaded_callbacks()
        self.fire("stop")

    def stop(self):
        self._mainloop.quit()


class FakeReactorID(object):

    def __init__(self, data):
        self.active = True
        self._data = data


00174 class FakeReactor(EventHandlingReactorMixin,
                  ThreadedCallsReactorMixin):
    """
    @ivar udp_transports: dict of {port: (protocol, transport)}
    @ivar hosts: Dict of {hostname: ip}. Users should populate this
        and L{resolve} will use it.
    """
    def __init__(self):
        super(FakeReactor, self).__init__()
        self._current_time = 0
        self._calls = []
        self.udp_transports = {}
        self.hosts = {}

    def time(self):
        return float(self._current_time)

    def call_later(self, seconds, f, *args, **kwargs):
        scheduled_time = self._current_time + seconds
        call = (scheduled_time, f, args, kwargs)
        bisect.insort_left(self._calls, call)
        return FakeReactorID(call)

    def cancel_call(self, id):
        if type(id) is FakeReactorID:
            if id._data in self._calls:
                self._calls.remove(id._data)
            id.active = False
        else:
            super(FakeReactor, self).cancel_call(id)

    def call_every(self, seconds, f, *args, **kwargs):
        def fake():
            # update the call so that cancellation will continue
            # working with the same ID. And do it *before* the call
            # because the call might cancel it!
            call._data = self.call_later(seconds, fake)._data
            try:
                f(*args, **kwargs)
            except:
                if call.active:
                    self.cancel_call(call)
                raise
        call = self.call_later(seconds, fake)
        return call

    def call_in_thread(self, callback, errback, f, *args, **kwargs):
        self._in_thread(callback, errback, f, args, kwargs)

        # Running threaded callbacks here doesn't reflect reality, since
        # they're usually run while the main reactor loop is active.
        # At the same time, this is convenient as it means we don't need
        # to run the the reactor with all registered handlers to test for
        # actions performed on completion of specific events (e.g. firing
        # exchange will fire exchange-done when ready). IOW, it's easier
        # to test things synchronously.
        self._run_threaded_callbacks()

00232     def advance(self, seconds):
        """Advance this reactor C{seconds} into the future.

        This is the preferred method for advancing time in your unit tests.
        """
        while (self._calls and self._calls[0][0]
               <= self._current_time + seconds):
            call = self._calls.pop(0)
            # If we find a call within the time we're advancing,
            # before calling it, let's advance the time *just* to
            # when that call is expecting to be run, so that if it
            # schedules any calls itself they will be relative to
            # the correct time.
            seconds -= call[0] - self._current_time
            self._current_time = call[0]
            try:
                call[1](*call[2], **call[3])
            except Exception, e:
                logging.exception(e)
        self._current_time += seconds

00253     def run(self):
        """Continuously advance this reactor until reactor.stop() is called."""
        self.fire("run")
        self._running = True
        while self._running:
            self.advance(self._calls[0][0])
        self.fire("stop")

    def stop(self):
        self._running = False

00264     def listen_udp(self, port, protocol):
        """
        Connect the given protocol with a fake transport, and keep the
        transport in C{self.udp_transports}.
        """
        transport = FakeDatagramTransport()
        self.udp_transports[port] = (protocol, transport)
        protocol.makeConnection(transport)


00274     def resolve(self, hostname):
        """Look up the hostname in C{self.hosts}.

        @return: A Deferred resulting in the IP address.
        """
        try:
            # is it an IP address?
            socket.inet_aton(hostname)
        except socket.error: # no
            if hostname in self.hosts:
                return succeed(self.hosts[hostname])
            else:
                return fail(DNSLookupError(hostname))
        else: # yes
            return succeed(hostname)



class TwistedReactor(EventHandlingReactorMixin,
                     ThreadedCallsReactorMixin):

    def __init__(self):
        from twisted.internet import reactor
        from twisted.internet.task import LoopingCall
        self._LoopingCall = LoopingCall
        self._reactor = reactor
        self._cleanup()

        super(TwistedReactor, self).__init__()

    def _cleanup(self):
        # Since the reactor is global, we should clean it up when we
        # initialize one of our wrappers.
        for call in self._reactor.getDelayedCalls():
            if call.active():
                call.cancel()

    def call_later(self, *args, **kwargs):
        return self._reactor.callLater(*args, **kwargs)

    def call_every(self, seconds, f, *args, **kwargs):
        lc = self._LoopingCall(f, *args, **kwargs)
        lc.start(seconds, now=False)
        return lc

    def cancel_call(self, id):
        if isinstance(id, EventID):
            return EventHandlingReactorMixin.cancel_call(self, id)
        if isinstance(id, self._LoopingCall):
            return id.stop()
        if id.active():
            id.cancel()

    def call_in_main(self, f, *args, **kwargs):
        self._reactor.callFromThread(f, *args, **kwargs)

    def run(self):
        self.fire("run")
        self._reactor.run()
        self.fire("stop")

    def stop(self):
        self._reactor.crash()
        self._cleanup()

    def time(self):
        return time.time()


    def listen_udp(self, port, protocol):
        """Connect the given protocol with a UDP transport.

        See L{twisted.internet.interfaces.IReactorUDP.listenUDP}.
        """
        return self._reactor.listenUDP(port, protocol)

    def resolve(self, host):
        """Look up the IP of the given host.

        See L{twisted.internet.interfaces.IReactorCore.resolve}.

        @return: A Deferred resulting in the hostname.
        """
        return self._reactor.resolve(host)

Generated by  Doxygen 1.6.0   Back to index