Skip to content

Commit

Permalink
Merge pull request #126 from rapidsai/branch-0.35
Browse files Browse the repository at this point in the history
Forward-merge branch-0.35 to branch-0.36
  • Loading branch information
GPUtester authored Nov 15, 2023
2 parents 6dd86f8 + 6d5673e commit abc1e45
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def test_ucxx_deserialize(ucxx_loop):
[
lambda cudf: cudf.Series([1, 2, 3]),
lambda cudf: cudf.Series([], dtype=object),
lambda cudf: cudf.DataFrame([]),
lambda cudf: cudf.DataFrame([], dtype=object),
lambda cudf: cudf.DataFrame([1]).head(0),
lambda cudf: cudf.DataFrame([1.0]).head(0),
lambda cudf: cudf.DataFrame({"a": []}),
Expand Down
12 changes: 10 additions & 2 deletions python/ucxx/benchmarks/backends/ucxx_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ async def server_handler(ep):
await ep.close()
lf.close()

lf = ucxx.create_listener(server_handler, port=self.args.port)
lf = ucxx.create_listener(
server_handler,
port=self.args.port,
endpoint_error_handling=self.args.error_handling,
)
self.queue.put(lf.port)

while not lf.closed():
Expand Down Expand Up @@ -126,7 +130,11 @@ async def run(self):

register_am_allocators(self.args)

ep = await ucxx.create_endpoint(self.server_address, self.port)
ep = await ucxx.create_endpoint(
self.server_address,
self.port,
endpoint_error_handling=self.args.error_handling,
)

if self.args.enable_am:
msg = xp.arange(self.args.n_bytes, dtype="u1")
Expand Down
6 changes: 4 additions & 2 deletions python/ucxx/benchmarks/backends/ucxx_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def run(self):

def _listener_handler(conn_request):
global ep
ep = listener.create_endpoint_from_conn_request(conn_request, True)
ep = listener.create_endpoint_from_conn_request(
conn_request, endpoint_error_handling=self.args.error_handling
)

listener = ucx_api.UCXListener.create(
worker=worker, port=self.args.port or 0, cb_func=_listener_handler
Expand Down Expand Up @@ -236,7 +238,7 @@ def run(self):
worker,
self.server_address,
self.port,
endpoint_error_handling=True,
endpoint_error_handling=self.args.error_handling,
)

# Wireup before starting to transfer data
Expand Down
6 changes: 6 additions & 0 deletions python/ucxx/benchmarks/send_recv.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@ def parse_args():
help="Only applies to 'ucxx-core' backend: number of maximum outstanding "
"operations, see --delay-progress. (Default: 32)",
)
parser.add_argument(
"--error-handling",
action=argparse.BooleanOptionalAction,
default=True,
help="Enable endpoint error handling.",
)

args = parser.parse_args()

Expand Down

0 comments on commit abc1e45

Please sign in to comment.