Skip to content
This repository has been archived by the owner on Oct 16, 2021. It is now read-only.

Commit

Permalink
streamlined naming and structure for cpy2py.kernel (see issue #18 and #…
Browse files Browse the repository at this point in the history
  • Loading branch information
maxfischer2781 committed Apr 19, 2018
1 parent 324347f commit 2c3f67e
Show file tree
Hide file tree
Showing 28 changed files with 119 additions and 118 deletions.
2 changes: 1 addition & 1 deletion cpy2py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from cpy2py.proxy.baseclass import TwinObject, localmethod
from cpy2py.proxy.function import twinfunction
from cpy2py.twinterpreter.master import TwinMaster
from cpy2py.kernel import kernel_state
from cpy2py.kernel import state as kernel_state


_base_logger = _logging.getLogger('__cpy2py__')
Expand Down
6 changes: 3 additions & 3 deletions cpy2py/ipyc/ipyc_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from cpy2py.utility.compat import BytesFile
from cpy2py.utility.utils import random_str
from cpy2py.kernel import kernel_state
from cpy2py.kernel import state


class DuplexSocketIPyC(object):
Expand Down Expand Up @@ -62,7 +62,7 @@ def open(self):
self.client_socket, client_address = self.server_socket.accept()
self._logger.warning(
'<%s> [%s] accepted from %r',
kernel_state.TWIN_ID,
state.TWIN_ID,
self.__class__.__name__,
client_address
)
Expand All @@ -71,7 +71,7 @@ def open(self):
self.client_socket.connect(self.address)
self._logger.warning(
'<%s> [%s] connected from %r',
kernel_state.TWIN_ID,
state.TWIN_ID,
self.__class__.__name__,
self.client_socket.getsockname()
)
Expand Down
File renamed without changes.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from cpy2py.utility.exceptions import format_exception
from cpy2py.ipyc import ipyc_exceptions
from cpy2py.kernel import kernel_state
from cpy2py.kernel.kernel_single import SingleThreadKernelClient, SingleThreadKernelServer
from cpy2py.kernel import state
from cpy2py.kernel.flavours.single import SingleThreadKernelClient, SingleThreadKernelServer


class AsyncKernelServer(SingleThreadKernelServer):
Expand All @@ -45,7 +45,7 @@ def __init__(self, peer_id, ipyc, pickle_protocol=2):
def _serve_requests(self):
while not self._terminate.is_set():
if __debug__:
self._logger.warning('<%s> [%s] Server Listening', kernel_state.TWIN_ID, self.peer_id)
self._logger.warning('<%s> [%s] Server Listening', state.TWIN_ID, self.peer_id)
request_id, directive = self._server_recv()
if self._except_callback is not None:
raise self._except_callback # pylint: disable=raising-bad-type
Expand Down Expand Up @@ -80,14 +80,14 @@ def _digest_replies(self):
try:
while not self._terminate.is_set():
if __debug__:
self._logger.warning('<%s> [%s] Client Listening', kernel_state.TWIN_ID, self.peer_id)
self._logger.warning('<%s> [%s] Client Listening', state.TWIN_ID, self.peer_id)
request_id, reply_body = self._client_recv()
request = self._requests.pop(request_id)
request[1] = reply_body
request[0].set()
del request_id, reply_body, request
except (ipyc_exceptions.IPyCTerminated, EOFError, ValueError):
self._logger.warning('<%s> [%s] Client Released', kernel_state.TWIN_ID, self.peer_id)
self._logger.warning('<%s> [%s] Client Released', state.TWIN_ID, self.peer_id)
self.stop_local()
except Exception as err: # pylint: disable=broad-except
# DEBUG: sometimes, request_id raises KeyError even though it's in _requests - MF@20160518
Expand All @@ -96,7 +96,7 @@ def _digest_replies(self):
for key in self._requests:
self._logger.critical('Await : %r (%s)', key, type(key))
self._logger.critical(
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', kernel_state.TWIN_ID, self.peer_id, err
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', state.TWIN_ID, self.peer_id, err
)
format_exception(self._logger, 3)
raise
Expand Down
44 changes: 22 additions & 22 deletions cpy2py/kernel/kernel_single.py → cpy2py/kernel/flavours/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import logging
import threading

from cpy2py.kernel import kernel_state
from cpy2py.kernel import state

from cpy2py.utility.exceptions import format_exception
from cpy2py.ipyc import ipyc_exceptions
from cpy2py.kernel.kernel_exceptions import StopTwinterpreter
from cpy2py.kernel.exceptions import StopTwinterpreter
from cpy2py.proxy import tracker
from cpy2py.kernel.kernel_requesthandler import RequestDispatcher, RequestHandler
from cpy2py.kernel.requesthandler import RequestDispatcher, RequestHandler


def _connect_ipyc(ipyc, pickle_protocol):
Expand All @@ -56,12 +56,12 @@ class SingleThreadKernelServer(object):
:type pickle_protocol: int
"""
def __new__(cls, peer_id, *args, **kwargs): # pylint: disable=unused-argument
assert peer_id not in kernel_state.KERNEL_SERVERS, 'Twinterpreters must have unique IDs'
kernel_state.KERNEL_SERVERS[peer_id] = object.__new__(cls)
return kernel_state.KERNEL_SERVERS[peer_id]
assert peer_id not in state.KERNEL_SERVERS, 'Twinterpreters must have unique IDs'
state.KERNEL_SERVERS[peer_id] = object.__new__(cls)
return state.KERNEL_SERVERS[peer_id]

def __init__(self, peer_id, ipyc, pickle_protocol=2):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.server' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.server' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
self._ipyc = ipyc
self._ipyc.open()
Expand All @@ -80,42 +80,42 @@ def run(self):
self._terminate.clear()
exit_code = 1
self._logger.warning(
'<%s> [%s] Starting %s @ %s', kernel_state.TWIN_ID, self.peer_id, self.__class__.__name__, time.asctime()
'<%s> [%s] Starting %s @ %s', state.TWIN_ID, self.peer_id, self.__class__.__name__, time.asctime()
)
try:
self._serve_requests()
except StopTwinterpreter as err:
# actively shutting down
self._logger.critical('<%s> [%s] TWIN KERNEL TERMINATED: %s', kernel_state.TWIN_ID, self.peer_id, err)
self._logger.critical('<%s> [%s] TWIN KERNEL TERMINATED: %s', state.TWIN_ID, self.peer_id, err)
exit_code = err.exit_code
# cPickle may raise EOFError by itself
except (ipyc_exceptions.IPyCTerminated, EOFError) as err:
# regular shutdown by master
self._logger.critical('<%s> [%s] TWIN KERNEL RELEASED: %s', kernel_state.TWIN_ID, self.peer_id, err)
self._logger.critical('<%s> [%s] TWIN KERNEL RELEASED: %s', state.TWIN_ID, self.peer_id, err)
exit_code = 0
except Exception as err: # pylint: disable=broad-except
# unexpected shutdown
# provide extended traceback if requested
self._logger.critical(
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', kernel_state.TWIN_ID, self.peer_id, err
'<%s> [%s] TWIN KERNEL INTERNAL EXCEPTION: %s', state.TWIN_ID, self.peer_id, err
)
format_exception(self._logger, 3)
# emulate regular python exit
import traceback
exit_code = 1
traceback.print_exc(file=sys.stderr)
print('TwinError: unhandled exception in', kernel_state.TWIN_ID, file=sys.stderr)
print('TwinError: unhandled exception in', state.TWIN_ID, file=sys.stderr)
finally:
self._terminate.set()
self._logger.critical('<%s> [%s] TWIN KERNEL SHUTDOWN: %d', kernel_state.TWIN_ID, self.peer_id, exit_code)
self._logger.critical('<%s> [%s] TWIN KERNEL SHUTDOWN: %d', state.TWIN_ID, self.peer_id, exit_code)
self._ipyc.close()
del kernel_state.KERNEL_SERVERS[self.peer_id]
del state.KERNEL_SERVERS[self.peer_id]
return exit_code

def _serve_requests(self):
while not self._terminate.is_set():
if __debug__:
self._logger.warning('<%s> [%s] Server Listening', kernel_state.TWIN_ID, self.peer_id)
self._logger.warning('<%s> [%s] Server Listening', state.TWIN_ID, self.peer_id)
request_id, directive = self._server_recv()
self.request_handler.serve_request(request_id, directive)

Expand Down Expand Up @@ -144,19 +144,19 @@ class SingleThreadKernelClient(object):
:type pickle_protocol: int
"""
def __new__(cls, peer_id, *args, **kwargs): # pylint: disable=unused-argument
assert peer_id not in kernel_state.KERNEL_CLIENTS, 'Twinterpreters must have unique IDs'
kernel_state.KERNEL_CLIENTS[peer_id] = object.__new__(cls)
return kernel_state.KERNEL_CLIENTS[peer_id]
assert peer_id not in state.KERNEL_CLIENTS, 'Twinterpreters must have unique IDs'
state.KERNEL_CLIENTS[peer_id] = object.__new__(cls)
return state.KERNEL_CLIENTS[peer_id]

def __init__(self, peer_id, ipyc, pickle_protocol=2):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.client' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.client' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
# communication
self._ipyc = ipyc
self._ipyc.open()
self._client_send, self._client_recv = _connect_ipyc(ipyc, pickle_protocol)
self.request_dispatcher = RequestDispatcher(peer_id=self.peer_id, kernel_client=self)
kernel_state.KERNEL_INTERFACE[peer_id] = self.request_dispatcher
state.KERNEL_INTERFACE[peer_id] = self.request_dispatcher

def run_request(self, request_body):
my_id = threading.current_thread().ident
Expand All @@ -179,8 +179,8 @@ def stop_local(self):
"""Shutdown the local server"""
self._ipyc.close()
try:
del kernel_state.KERNEL_CLIENTS[self.peer_id]
del kernel_state.KERNEL_INTERFACE[self.peer_id]
del state.KERNEL_CLIENTS[self.peer_id]
del state.KERNEL_INTERFACE[self.peer_id]
except KeyError:
pass

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import random

from cpy2py.utility.thread_tools import FifoQueue, ItemError, ThreadGuard
from .kernel_async import AsyncKernelClient, AsyncKernelServer
from cpy2py.kernel.flavours.async import AsyncKernelClient, AsyncKernelServer


class MultiThreadKernelServer(AsyncKernelServer):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
# - # limitations under the License.
import logging

from cpy2py.kernel import kernel_state
from cpy2py.kernel import state
from cpy2py.ipyc import ipyc_exceptions
from cpy2py.utility.exceptions import format_exception, CPy2PyException
from cpy2py.kernel.kernel_exceptions import StopTwinterpreter, TwinterpeterTerminated
from cpy2py.kernel.exceptions import StopTwinterpreter, TwinterpeterTerminated


# Message Enums
Expand Down Expand Up @@ -70,7 +70,7 @@ class RequestHandler(object):
:type kernel_server: :py:class:`~cpy2py.kernel.kernel_single.SingleThreadKernelServer`
"""
def __init__(self, peer_id, kernel_server):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.handler' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.handler' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
self.kernel_server = kernel_server
# instance => ref_count
Expand Down Expand Up @@ -100,7 +100,7 @@ def serve_request(self, request_id, directive):
try:
if __debug__:
self._logger.warning(
'<%s> [%s] Directive %s', kernel_state.TWIN_ID, self.peer_id, E_SYMBOL[directive_type]
'<%s> [%s] Directive %s', state.TWIN_ID, self.peer_id, E_SYMBOL[directive_type]
)
response = directive_method(directive_body)
# catch internal errors to reraise them
Expand All @@ -109,7 +109,7 @@ def serve_request(self, request_id, directive):
# send everything else back to calling scope
except Exception as err: # pylint: disable=broad-except
self.kernel_server.send_reply(request_id, (__E_EXCEPTION__, err))
self._logger.critical('<%s> [%s] TWIN KERNEL PAYLOAD EXCEPTION', kernel_state.TWIN_ID, self.peer_id)
self._logger.critical('<%s> [%s] TWIN KERNEL PAYLOAD EXCEPTION', state.TWIN_ID, self.peer_id)
format_exception(self._logger, 3)
if isinstance(err, (KeyboardInterrupt, SystemExit)):
raise StopTwinterpreter(message=err.__class__.__name__, exit_code=1)
Expand Down Expand Up @@ -200,7 +200,7 @@ class should not be instantiated manually. Use
empty_reply = (None, None)

def __init__(self, peer_id, kernel_client):
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.dispatcher' % (kernel_state.TWIN_ID, peer_id))
self._logger = logging.getLogger('__cpy2py__.kernel.%s_to_%s.dispatcher' % (state.TWIN_ID, peer_id))
self.peer_id = peer_id
self.kernel_client = kernel_client
self.exit_code = None
Expand Down
2 changes: 1 addition & 1 deletion cpy2py/kernel/kernel_state.py → cpy2py/kernel/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import os
import sys

from cpy2py.kernel.kernel_exceptions import TwinterpeterUnavailable
from cpy2py.kernel.exceptions import TwinterpeterUnavailable


# current twin state
Expand Down
6 changes: 3 additions & 3 deletions cpy2py/proxy/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# - # See the License for the specific language governing permissions and
# - # limitations under the License.
from cpy2py.utility.proxy import clone_function_meta
from cpy2py.kernel import kernel_state
from cpy2py.kernel import state


def twinfunction(twinterpreter_id):
Expand All @@ -28,7 +28,7 @@ def twinfunction(twinterpreter_id):
def decorator(func):
func.__twin_id__ = twinterpreter_id
# native twin, never redirect
if kernel_state.is_twinterpreter(twinterpreter_id):
if state.is_twinterpreter(twinterpreter_id):
return func
# redirect to kernel
# - must dispatch to the proxy, otherwise pickling will fail
Expand All @@ -40,7 +40,7 @@ def decorator(func):
def function_runner_factory(*fargs, **fkwargs):
def function_runner(*args, **kwargs):
return function_runner.dispatch_call(function_dispatch_proxy, *args, **kwargs)
function_runner.dispatch_call = kernel_state.get_kernel(twinterpreter_id).dispatch_call
function_runner.dispatch_call = state.get_kernel(twinterpreter_id).dispatch_call
function_dispatch_proxy.function_runner = function_runner
return function_runner(*fargs, **fkwargs)

Expand Down
12 changes: 6 additions & 6 deletions cpy2py/proxy/metaclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# - # limitations under the License.
from __future__ import print_function
import types
from cpy2py.kernel import kernel_state
from cpy2py.kernel import state

from cpy2py.proxy.proxy import InstanceProxy, UnboundedMethodProxy

Expand Down Expand Up @@ -67,7 +67,7 @@ def __new__(mcs, name, bases, class_dict):
else:
break
else:
twin_id = kernel_state.MASTER_ID
twin_id = state.MASTER_ID
class_dict['__twin_id__'] = twin_id
# enable persistent dump/load without pickle
class_dict['__import_mod_name__'] = (class_dict['__module__'], name)
Expand All @@ -78,7 +78,7 @@ def __new__(mcs, name, bases, class_dict):
proxy_class = mcs.__get_proxy_class__(real_class=real_class)
mcs.register_proxy(real_class=real_class, proxy_class=proxy_class)
# return the appropriate object or proxy for the current twin
if kernel_state.is_twinterpreter(class_dict['__twin_id__']):
if state.is_twinterpreter(class_dict['__twin_id__']):
return real_class
else:
return proxy_class
Expand Down Expand Up @@ -155,21 +155,21 @@ def register_proxy(mcs, real_class, proxy_class):
# class attributes
def __getattr__(cls, name):
if cls.__is_twin_proxy__ and name not in TwinMeta.__proxy_inherits_attributes__:
kernel = kernel_state.get_kernel(cls.__twin_id__)
kernel = state.get_kernel(cls.__twin_id__)
return kernel.get_attribute(cls, name)
else:
type.__getattribute__(cls, name)

def __setattr__(cls, name, value):
if cls.__is_twin_proxy__ and name not in TwinMeta.__proxy_inherits_attributes__:
kernel = kernel_state.get_kernel(cls.__twin_id__)
kernel = state.get_kernel(cls.__twin_id__)
return kernel.set_attribute(cls, name, value)
else:
type.__setattr__(cls, name, value)

def __delattr__(cls, name):
if cls.__is_twin_proxy__ and name not in TwinMeta.__proxy_inherits_attributes__:
kernel = kernel_state.get_kernel(cls.__twin_id__)
kernel = state.get_kernel(cls.__twin_id__)
return kernel.del_attribute(cls, name)
else:
type.__setattr__(cls, name)
Expand Down
8 changes: 4 additions & 4 deletions cpy2py/proxy/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
"""
Proxies for entities in other twinterpreters
"""
from cpy2py.kernel.kernel_exceptions import TwinterpeterUnavailable
from cpy2py.kernel import kernel_state
from cpy2py.kernel.exceptions import TwinterpeterUnavailable
from cpy2py.kernel import state

from cpy2py.proxy import tracker

Expand All @@ -40,7 +40,7 @@ def __init__(self, real_method):
def __get__(self, instance, owner):
if instance is None:
subject = owner
kernel = kernel_state.get_kernel(subject.__twin_id__)
kernel = state.get_kernel(subject.__twin_id__)
else:
subject = instance
kernel = subject.__kernel__
Expand Down Expand Up @@ -69,7 +69,7 @@ class InstanceProxy(object):

def __new__(cls, *args, **kwargs):
self = object.__new__(cls)
__kernel__ = kernel_state.get_kernel(self.__twin_id__)
__kernel__ = state.get_kernel(self.__twin_id__)
object.__setattr__(self, '__kernel__', __kernel__)
try:
# native instance exists, but no proxy yet
Expand Down
Loading

0 comments on commit 2c3f67e

Please sign in to comment.