Skip to content
This repository has been archived by the owner on Sep 20, 2019. It is now read-only.

updates for mongo embedded #22

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 66 additions & 17 deletions intake_bluesky/mongo_embedded.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import intake.source.base
import pymongo
import pymongo.errors
import heapq
import json

from .core import parse_handler_registry

Expand Down Expand Up @@ -55,22 +57,28 @@ def _get_event_cursor(self, descriptor_uids, skip=0, limit=None):
if limit is None:
limit = maxsize

page_cursor = self._db.event.find(
def event_gen(cursor):
nonlocal skip
nonlocal limit
for page_index, event_page in enumerate(cursor):
for event_index, event in (
enumerate(event_model.unpack_event_page(event_page))):
while ((event_index + 1) * (page_index + 1)) < skip:
continue
if not ((event_index + 1) * (page_index + 1)) < (skip + limit):
return
yield event

cursors = [event_gen(self._db.event.find(
{'$and': [
{'descriptor': {'$in': descriptor_uids}},
{'descriptor': descriptor},
{'last_index': {'$gte': skip}},
{'first_index': {'$lte': skip + limit}}]},
{'_id': False},
sort=[('last_index', pymongo.ASCENDING)])
sort=[('last_index', pymongo.ASCENDING)]))
for descriptor in descriptor_uids]

for page_index, event_page in enumerate(page_cursor):
for event_index, event in (
enumerate(event_model.unpack_event_page(event_page))):
while ((event_index + 1) * (page_index + 1)) < skip:
continue
if not ((event_index + 1) * (page_index + 1)) < (skip + limit):
return
yield event
yield from interlace_gens(*cursors)

def _get_datum_cursor(self, resource_uid, skip=0, limit=None):
if limit is None:
Expand Down Expand Up @@ -118,14 +126,15 @@ def get_header_field(field):
if field[0:6] == 'count_':
return 0
else:
return None
return {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under what circumstances would this codepath be hit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

example could be. If a run doesn't have a stop document and you ask for the stop document


def get_resource(uid):
resources = get_header_field('resources')
for resource in resources:
if resource['uid'] == uid:
return resource
return None
raise KeyError(
f"Could not find Resource with datum_id={uid}")

def get_datum(datum_id):
""" This method is likely very slow. """
Expand All @@ -138,7 +147,8 @@ def get_datum(datum_id):
for datum in event_model.unpack_datum_page(datum_page):
if datum['datum_id'] == datum_id:
return datum
return None
raise KeyError(
f"Could not find Datum with datum_id={datum_id}")

def get_event_count(descriptor_uids):
return sum([get_header_field('count_' + uid)
Expand Down Expand Up @@ -204,7 +214,7 @@ def __getitem__(self, name):
try:
N = int(name)
except ValueError:
query = {'$and': [catalog._query, {'uid': name}]}
query = {'$and': [catalog._query, {'start.uid': name}]}
header_doc = catalog._db.header.find_one(query)
if header_doc is None:
regex_query = {
Expand All @@ -217,7 +227,7 @@ def __getitem__(self, name):
elif len(matches) == 1:
header_doc, = matches
else:
match_list = '\n'.join(doc['uid'] for doc in matches)
match_list = '\n'.join(doc['start'][0]['uid'] for doc in matches)
raise ValueError(
f"Multiple matches to partial uid {name!r}. "
f"Up to 10 listed here:\n"
Expand Down Expand Up @@ -278,7 +288,7 @@ def search(self, query):
MongoDB query.
"""
if query:
query = {f"start.{key}": val for key, val in query.items()}
query = query_embedder(query)
if self._query:
query = {'$and': [self._query, query]}
cat = type(self)(
Expand Down Expand Up @@ -307,4 +317,43 @@ def _get_database(uri):
f"Did you forget to include a database?") from err


def interlace_gens(*gens):
"""Take generators and interlace their results by timestamp
Parameters
----------
gens : generators
Generators of (name, dict) pairs where the dict contains a 'time'
key.
Yields
-------
val : tuple
The next (name, dict) pair in time order
"""
iters = [iter(g) for g in gens]
heap = []

def safe_next(indx):
try:
val = next(iters[indx])
except StopIteration:
return
heapq.heappush(heap, (val['time'], indx, val))

for i in range(len(iters)):
safe_next(i)
while heap:
_, indx, val = heapq.heappop(heap)
yield val
safe_next(indx)


def query_embedder(query):
query_string = json.dumps(query)
keys = [item.split('"')[-1] for item in query_string.split('":')][0:-1]
new_keys = [f'start.{key}' if '$' not in key else key for key in keys]
for index, key in enumerate(keys):
query_string = query_string.replace(key, new_keys[index])
return json.loads(query_string)


intake.registry['mongo_metadatastore'] = BlueskyMongoCatalog
3 changes: 1 addition & 2 deletions intake_bluesky/tests/test_mongo_embedded.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ def teardown_module(module):
@pytest.fixture(params=['local', 'remote'])
def bundle(request, intake_server, example_data, db_factory): # noqa
fullname = os.path.join(TMP_DIR, YAML_FILENAME)
volatile_db = db_factory()
permanent_db = db_factory()
serializer = Serializer(volatile_db, permanent_db)
serializer = Serializer(permanent_db)
uid, docs = example_data
for name, doc in docs:
serializer(name, doc)
Expand Down