Skip to content

Commit

Permalink
Merge pull request #4 from Dludora/main
Browse files Browse the repository at this point in the history
Grpc Basis
  • Loading branch information
Dludora authored Jan 12, 2025
2 parents c18ab8c + 65a8080 commit 44e7a1a
Show file tree
Hide file tree
Showing 40 changed files with 1,184 additions and 0 deletions.
8 changes: 8 additions & 0 deletions openhufu/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@


def main():
pass

if __name__ == "__main__":
main()

Empty file added openhufu/private/__init__.py
Empty file.
Empty file.
21 changes: 21 additions & 0 deletions openhufu/private/client/client_deployer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

from openhufu.private.utlis.config_class import ClientConfig
from openhufu.private.utlis.util import get_logger
from openhufu.private.client.fed_client import FederatedClient


class ClientDeployer:
def __init__(self, config: ClientConfig):
self.config : ClientConfig = config
self.logger = get_logger(__name__)


def create_client(self):
self.config.addr = self.config.host + ":" + str(self.config.port)

client = FederatedClient(config=self.config)
self.logger.info(f"Client {self.config} created")

return client


32 changes: 32 additions & 0 deletions openhufu/private/client/client_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import yaml
import argparse
from pathlib import Path

from openhufu.private.client.client_deployer import ClientDeployer
from openhufu.private.utlis.config_class import BaseConfig
from openhufu.private.utlis.util import load_config


def parse_args():
parser = argparse.ArgumentParser(description="Federated Server")
parser.add_argument(
"--config",
"-c",
required=True,
type=str,
help="path to the config file",
)
return parser.parse_args()


if __name__ == "__main__":
args = parse_args()
config : BaseConfig = load_config(args.config)
print(config)
deployer = ClientDeployer(config=config)

client = deployer.create_client()

client.set_up()

client.register()
29 changes: 29 additions & 0 deletions openhufu/private/client/fed_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@

from openhufu.private.net.client_cell import ClientCell
from openhufu.private.utlis.config_class import ClientConfig

class FederatedClient:
def __init__(self, config: ClientConfig):
self.cell = None
self.config : ClientConfig = config


def _create_cell(self):
self.cell = ClientCell(config=self.config)
self.cell.start()

self.cell.stop()


def set_up(self):
schema_location = self.config.host + ":" + str(self.config.port)
self.config.addr = schema_location

if not self.cell:
self._create_cell()


def register(self):
self.cell.register()


Empty file.
70 changes: 70 additions & 0 deletions openhufu/private/drivers/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import threading
from abc import ABC
from abc import abstractmethod
from dataclasses import dataclass
from typing import Dict
from enum import Enum

from openhufu.private.net.net_params import ConParams, DriverInfo
from openhufu.private.net.connection import Connection, ConnState


class ConnMonitor(ABC):
@abstractmethod
def state_change(self, connection: Connection):
pass


class Driver:
def __init__(self):
self.connections : Dict[str, Connection] = {}
self.conn_lock = threading.Lock()
self.conn_monitor = None


@abstractmethod
def connect(self, connection: DriverInfo):
pass


@abstractmethod
def listen(self, connection: DriverInfo):
pass

@abstractmethod
def close(self):
pass

def add_connection(self, connection: Connection):
connection.state = ConnState.CONNECTED
with self.conn_lock:
self.connections[connection.name] = connection

self._notify_monitor(connection)


def close_connection(self, connection: Connection):
connection.state = ConnState.CLOSED
with self.conn_lock:
if connection.name in self.connections:
del self.connections[connection.name]

self._notify_monitor(connection)


def register_monitor(self, monitor: ConnMonitor):
self.conn_monitor = monitor


def _notify_monitor(self, connection: Connection):
self.conn_monitor.state_change(connection)


def close_all_connections(self):
with self.conn_lock:
for name in self.connections.keys():
self.connections[name].close()




Empty file.
Empty file.
1 change: 1 addition & 0 deletions openhufu/private/drivers/proto/gen_proto.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
python -m grpc_tools.protoc -I. --python_out=. --pyi_out=. --grpc_python_out=. grpc_stream.proto
12 changes: 12 additions & 0 deletions openhufu/private/drivers/proto/grpc_stream.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
syntax = "proto3";

package bio_stream;

service grpcStreamFunc {
rpc processStream (stream Frame) returns (stream Frame) {}
}

message Frame {
int32 seq = 1;
bytes data = 2;
}
38 changes: 38 additions & 0 deletions openhufu/private/drivers/proto/grpc_stream_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions openhufu/private/drivers/proto/grpc_stream_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from typing import ClassVar as _ClassVar, Optional as _Optional

DESCRIPTOR: _descriptor.FileDescriptor

class Frame(_message.Message):
__slots__ = ("seq", "data")
SEQ_FIELD_NUMBER: _ClassVar[int]
DATA_FIELD_NUMBER: _ClassVar[int]
seq: int
data: bytes
def __init__(self, seq: _Optional[int] = ..., data: _Optional[bytes] = ...) -> None: ...
97 changes: 97 additions & 0 deletions openhufu/private/drivers/proto/grpc_stream_pb2_grpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings

import openhufu.private.drivers.proto.grpc_stream_pb2 as grpc__stream__pb2

GRPC_GENERATED_VERSION = '1.68.1'
GRPC_VERSION = grpc.__version__
_version_not_supported = False

try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True

if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in grpc_stream_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)


class grpcStreamFuncStub(object):
"""Missing associated documentation comment in .proto file."""

def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.processStream = channel.stream_stream(
'/bio_stream.grpcStreamFunc/processStream',
request_serializer=grpc__stream__pb2.Frame.SerializeToString,
response_deserializer=grpc__stream__pb2.Frame.FromString,
_registered_method=True)


class grpcStreamFuncServicer(object):
"""Missing associated documentation comment in .proto file."""

def processStream(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_grpcStreamFuncServicer_to_server(servicer, server):
rpc_method_handlers = {
'processStream': grpc.stream_stream_rpc_method_handler(
servicer.processStream,
request_deserializer=grpc__stream__pb2.Frame.FromString,
response_serializer=grpc__stream__pb2.Frame.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'bio_stream.grpcStreamFunc', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('bio_stream.grpcStreamFunc', rpc_method_handlers)


# This class is part of an EXPERIMENTAL API.
class grpcStreamFunc(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def processStream(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(
request_iterator,
target,
'/bio_stream.grpcStreamFunc/processStream',
grpc__stream__pb2.Frame.SerializeToString,
grpc__stream__pb2.Frame.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
Loading

0 comments on commit 44e7a1a

Please sign in to comment.