-
Notifications
You must be signed in to change notification settings - Fork 321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] ncmec: store checkpoint occasionally when start, end diff is one second #1731
base: main
Are you sure you want to change the base?
Conversation
928ddce
to
4f12e50
Compare
2965a46
to
5270515
Compare
5270515
to
d7f207e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looking good, thanks for making this change, and I think it will help a lot!
I am slightly suspicious that the paging URLs can go sour (e.g. I have noticed that NCMEC API tends to throw exceptions near the very end of the paging list that make me think that they are invaliding), so I think adding the time-based invalidation logic is a requirement.
As part of your test plan, can you also attempt fetching past an extremely dense time segment in the NCMEC API and confirm the behavior works as expected?
python-threatexchange/threatexchange/exchanges/tests/test_state_compatibility.py
Outdated
Show resolved
Hide resolved
python-threatexchange/threatexchange/exchanges/impl/ncmec_api.py
Outdated
Show resolved
Hide resolved
|
||
updates.extend(entry.updates) | ||
|
||
if i % 100 == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: by change this from elif
to if
, I think it will now print the large update warning every update, which is incorrect, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would print for the 0th, which we would not want. I updated this to be (i + 1) % 100 == 0, so it's every 100th iteration
we need to extend updates
everytime, regardless of i
, so this was cleaner than other things I thought of
but please suggest alternatives
python-threatexchange/threatexchange/exchanges/impl/ncmec_api.py
Outdated
Show resolved
Hide resolved
python-threatexchange/threatexchange/exchanges/tests/test_state_compatibility.py
Outdated
Show resolved
Hide resolved
python-threatexchange/threatexchange/exchanges/clients/ncmec/hash_api.py
Outdated
Show resolved
Hide resolved
log(f"large fetch ({i}), up to {len(updates)}") | ||
updates.extend(entry.updates) | ||
# so store the checkpoint occasionally | ||
log(f"large fetch ({i}), up to {len(updates)}. storing checkpoint") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You don't actually store the checkpoint by yielding, technically the caller can decide whether to keep calling or store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah so the original elif
block doesn't need to change? the only real change that's needed is to use the next_url in the for loop on L283?
edit: I think the yield is still needed, just the comment might be incorrect.. let me know if not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the comment 👍
start_timestamp=current_start, end_timestamp=current_end | ||
start_timestamp=current_start, | ||
end_timestamp=current_end, | ||
next_=current_next_fetch, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking: Danger! It's actually very easy to mess up this argument and accidentally trigger and endless loop. It may be that you have done so in the current code, but it's hard to tell.
The only time current_next_fetch
should be populated is when you are resuming from checkpoint, and you need to explicitly disable the overfetch check (L290) then.
There might be a refactoring of this code that makes this easier, or now that we are switching over to the next pointer version we can get rid of the probing behavior, which simplifies the implementation quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah as I mentioned in slack looks like we need the probing behavior so I wasn't able to simplify. I added a check to disable the overfetch when resuming from a checkpoint
start_timestamp=current_start, end_timestamp=current_end | ||
start_timestamp=current_start, | ||
end_timestamp=current_end, | ||
next_=current_next_fetch, | ||
) | ||
): | ||
if i == 0: # First batch, check for overfetch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a comment, it turns out my implementation for estimation of the entries in range was completely off, and so this is basically always overly cautious. Not sure what to do about it, since the alternatives that I can think of are complicated.
python-threatexchange/threatexchange/exchanges/impl/ncmec_api.py
Outdated
Show resolved
Hide resolved
python-threatexchange/threatexchange/exchanges/impl/ncmec_api.py
Outdated
Show resolved
Hide resolved
82bc20b
to
c4a004e
Compare
3488550
to
83ebd79
Compare
83ebd79
to
b0f7997
Compare
# note: the default_factory value was not being set correctly when | ||
# reading from pickle | ||
if not "last_fetch_time" in d: | ||
d["last_fetch_time"] = int(time.time()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was getting AttributeError: 'NCMECCheckpoint' object has no attribute 'last_fetch_time'
without this in the test_state_compatibility
test
seems sort of related to pydantic/pydantic#7821, since default
was working (but wouldn't work if we want to set it to the current time)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, tough bug, good find on the fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, I think I'm only down to questions and non-logical changes!
This probing behavior is of course quite complicated, and I'm very tempted to try refactoring it to something similar based on the challenge of verifying this PR.
I think a lot of this may depend on the strength of your e2e testing. Were you able to complete a fetch and could tell that you end up with the same hashes at the end in both cases?
|
||
def get_progress_timestamp(self) -> t.Optional[int]: | ||
return self.get_entries_max_ts | ||
|
||
def get_paging_url_if_recent(self) -> str: | ||
if int(time.time()) - self.last_fetch_time < self.PAGING_URL_EXPIRATION: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ignorable/alt; Since this constant is only used in one place, you can probably inline it.
Another approach might be using an accessor property, but I don't know if you'll fight with the dataclass over it.
_paging_url: str = ""
@property
def paging_url(self):
... The implementation of this function
# A url to fetch the next page of results | ||
# Only reference this value through get_paging_url_if_recent | ||
paging_url: str = "" | ||
# a timestamp for the last fetch time, specifically used with a pagingpyth_url |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pagingpyth_url
speeling?
NCMECCheckpoint(current_end), | ||
NCMECCheckpoint( | ||
get_entries_max_ts=current_end, | ||
paging_url=current_paging_url, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Since current paging URL is always empty here (and it's important it not be set), suggest not providing the argument so it'll be set to default.
NCMECCheckpoint( | ||
get_entries_max_ts=current_end, | ||
paging_url=current_paging_url, | ||
last_fetch_time=int(time.time()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this is the default value, suggest not specifying it.
if (i + 1) % 100 == 0: | ||
# On large fetches, yield a checkpoint to avoid re-fetching later | ||
log(f"large fetch ({i}), up to {len(updates)}. yielding checkpoint") | ||
yield state.FetchDelta( | ||
{f"{entry.member_id}-{entry.id}": entry for entry in updates}, | ||
NCMECCheckpoint( | ||
get_entries_max_ts=current_start, | ||
paging_url=entry.next, | ||
last_fetch_time=int(time.time()), | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
blocking q: Why only yield every 100 fetches from this point? Why not yield every fetch once you realize you are on a large fetch?
|
||
if (i + 1) % 100 == 0: | ||
# On large fetches, yield a checkpoint to avoid re-fetching later | ||
log(f"large fetch ({i}), up to {len(updates)}. yielding checkpoint") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This "up to" might not make sense anymore, since update clears each time it yields
yield state.FetchDelta( | ||
{f"{entry.member_id}-{entry.id}": entry for entry in updates}, | ||
NCMECCheckpoint( | ||
get_entries_max_ts=current_start, | ||
paging_url=entry.next, | ||
last_fetch_time=int(time.time()), | ||
), | ||
) | ||
updates = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flagging that you can end up in a state where you end up yielding an empty update (if the 99th update is also the end of the sequence). I believe empty updates is treated as no more updates are available, which might be surprising.
"get_checkpoint_func", | ||
[ | ||
get_SignalOpinion(), | ||
get_FBThreatExchangeOpinion(), | ||
get_NCMECOpinion(), | ||
get_NCMECCheckpoint(), | ||
get_SignalOpinion, | ||
get_FBThreatExchangeOpinion, | ||
get_NCMECOpinion, | ||
get_NCMECCheckpoint, | ||
], | ||
) | ||
def test_previous_pickle_state( | ||
current_version: object, historical_versions: t.Sequence[object] | ||
get_checkpoint_func: t.Callable[[], t.Tuple[object, t.Sequence[object]]], | ||
monkeypatch: pytest.MonkeyPatch, | ||
): | ||
monkeypatch.setattr("time.time", lambda: 10**8) | ||
|
||
current_version, historical_versions = get_checkpoint_func() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking: Except only 1 of 4 things are a checkpoint (get_NCMECCheckpoint) - this change is a bit misleading, since it's meant to test pickle compatibility for all of these objects.
Recommend reverting this change, since I can't see
Summary
sometimes ncmec fails to make progress after hitting a second w/ a large number of results: #1679. when that happens (diff of end and start is a second and we have lots of data), store checkpoints occasionally via a next pointer
Test Plan
confirmed that resuming from a checkpoint works around the cursed second