Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pytx] Feedback ifaces #1630

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions python-threatexchange/threatexchange/cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FlexFilesInputAction(argparse.Action):
All producing the same result.
"""

def __call__(self, _parser, namespace, values, _option_string=None):
def __call__(self, _parser, namespace, values, _option_string=None) -> None:
args: t.List[str] = list(values)
if not args:
raise argparse.ArgumentError(self, "this argument is required")
Expand All @@ -48,10 +48,10 @@ def __call__(self, _parser, namespace, values, _option_string=None):
ret = []
for i, filename in enumerate(args):
if filename.strip() == "-":
with tempfile.NamedTemporaryFile("wb", delete=False) as tmp:
logging.debug("Writing stdin to %s", tmp.name)
shutil.copyfileobj(sys.stdin.buffer, tmp)
filename = tmp.name
with tempfile.NamedTemporaryFile("wb", delete=False) as bytes_tmp:
logging.debug("Writing stdin to %s", bytes_tmp.name)
shutil.copyfileobj(sys.stdin.buffer, bytes_tmp) # type: ignore
filename = bytes_tmp.name
elif filename.strip() == "--":
# We could also just open this as a series of streams and seek() them
with tempfile.NamedTemporaryFile("w", delete=False) as tmp:
Expand All @@ -63,10 +63,10 @@ def __call__(self, _parser, namespace, values, _option_string=None):
elif filename.startswith(("http://", "https://")):
resp = requests.get(filename, allow_redirects=True)
resp.raise_for_status()
with tempfile.NamedTemporaryFile("wb", delete=False) as tmp:
logging.debug("Writing -- to %s", tmp.name)
tmp.write(resp.content)
filename = pathlib.Path(tmp.name)
with tempfile.NamedTemporaryFile("wb", delete=False) as bytes_tmp:
logging.debug("Writing -- to %s", bytes_tmp.name)
bytes_tmp.write(resp.content)
filename = bytes_tmp.name
path = pathlib.Path(filename)
if not path.is_file():
raise argparse.ArgumentError(self, f"no such file {path}")
Expand Down
15 changes: 9 additions & 6 deletions python-threatexchange/threatexchange/cli/label_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
import argparse
import pathlib
import typing as t
from threatexchange.cli.helpers import FlexFilesInputAction
from threatexchange.exchanges.fetch_state import SignalOpinion, SignalOpinionCategory


from threatexchange.signal_type.signal_base import MatchesStr, SignalType, TextHasher

from threatexchange import common
from threatexchange.cli.cli_config import CLISettings
from threatexchange.cli.exceptions import CommandError
from threatexchange.cli.helpers import FlexFilesInputAction
from threatexchange.exchanges.fetch_state import SignalOpinion, SignalOpinionCategory
from threatexchange.content_type.content_base import ContentType
from threatexchange.exchanges import write_api_mixins
from threatexchange.exchanges.collab_config import CollaborationConfigBase
from threatexchange.signal_type.signal_base import SignalType
from threatexchange.cli import command_base


Expand Down Expand Up @@ -129,12 +129,15 @@ def execute(self, settings: CLISettings) -> None:
# signal_types = self.only_signals or settings.get_signal_types_for_content(
# self.content_type
# )
if not isinstance(api, write_api_mixins.ExchangeWithUpload):
raise CommandError.user(f"api {api.get_name()} doesn't suppport uploading")

if self.as_hash is not None:
for f in self.files:
signal_type = self.as_hash
hash_val = signal_type.validate_signal_str(f.read_text())
api.report_opinion(
api.submit_opinion(
self.collab,
signal_type,
hash_val,
SignalOpinion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,39 +253,6 @@ def fetch_iter(
"""
raise NotImplementedError

# TODO - Restore in a future version
# def report_seen(
# self,
# collab: TCollabConfig,
# s_type: SignalType,
# signal: str,
# metadata: state.TFetchedSignalMetadata,
# ) -> None:
# """
# Report that you observed this signal.

# This is an optional API, and places that use it should catch
# the NotImplementError.
# """
# raise NotImplementedError

def report_opinion(
self,
s_type: t.Type[SignalType],
signal: str,
opinion: state.SignalOpinion,
) -> None:
"""
Weigh in on a signal for this collaboration.

Most implementations will want a full replacement specialization, but this
allows a common interface for all uploads for the simplest usecases.

This is an optional API, and places that use it should catch
the NotImplementError.
"""
raise NotImplementedError


# Convenience for avoiding mypy strict errors
AnySignalExchangeAPI = SignalExchangeAPI[t.Any, t.Any, t.Any, t.Any, t.Any]
Expand Down
137 changes: 137 additions & 0 deletions python-threatexchange/threatexchange/exchanges/write_api_mixins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.

"""
These contain small mixins that enable write features on SignalExchangeAPI

While it would be possible to make all these extensions of SignalExchangeAPI,
we're just about at the limit of understandable complexity for the typing on
SignalExchangeAPI, and so moving these into mixins might allow us to simplify
the logic a bit.

Example
class MyApiWithFeedback(
SignalExchangeAPI[
MyCollabConfig,
MyCheckpoint,
MySignalMetadata,
str,
MyUpdateRecordValue,
],
ExchangeWithSeen[MyCollabConfig, MyUpdateRecordValue],
ExchangeWithFeedback[MyCollabConfig, MyUpdateRecordValue],
ExchangeWithUpload[MyCollabConfig],
):
pass

# Inside py-tx reflection
if isinstance(exchange, ExchangeWithSeen):
exchange.submit_matched(collab, metadata)


@see SignalExchangeAPI
"""

import typing as t

from threatexchange.exchanges import fetch_state
from threatexchange.signal_type.signal_base import SignalType
from threatexchange.exchanges.collab_config import CollaborationConfigBase


TCollabConfig = t.TypeVar("TCollabConfig", bound=CollaborationConfigBase)
TFetchedSignalMetadata = t.TypeVar(
"TFetchedSignalMetadata", bound=fetch_state.FetchedSignalMetadata
)


class ExchangeWithMatchedOnPlatform(
t.Generic[TCollabConfig, fetch_state.TUpdateRecordKey]
):
"""
Mixin for an exchange that supports recording you have successfully made a match

This can be useful for automatically cerifying that your matching pipeline is
functioning, or to allow insights on platform spread.
"""

def submit_matched(
self,
# The collaboration we should share the event to
collab: TCollabConfig,
# The API record that we matched, corresponding to the key
# from the original API. This seems to be unifying across all existing APIs
# though some can reconstruct the feedback target from the signal or the metadata
matched_record_key: fetch_state.TUpdateRecordKey,
) -> None:
"""
Report that you matched this signal to content on your platform.

This doesn't have any indication of whether this content is harmful,
but this can help certify that your matching implementation is
functional, and help other platforms track platform-to-platform
spread.
"""
raise NotImplementedError


class ExchangeWithReviewFeedback(
t.Generic[TCollabConfig, fetch_state.TUpdateRecordKey]
):
"""
Mixin for exchanges that supports recording the results of a manual review.

This can help other platforms stack rank potential matches for review to
prioritize the ones that seem to work well accross multiple platforms.
"""

def submit_review_feedback(
self,
# The collaboration we should share the event to
collab: TCollabConfig,
# The API record that we're sharing feedback on, corresponding to the key
# from the original API. This seems to be unifying across all existing APIs
# though some can reconstruct the feedback target from the signal or the metadata
matched_record_key: fetch_state.TUpdateRecordKey,
# Whether the review of matched content corresponded to material the
# exchange aims to find. Usually this corresponds to harmful content
review_result: t.Literal[
fetch_state.SignalOpinionCategory.POSITIVE_CLASS,
fetch_state.SignalOpinionCategory.NEGATIVE_CLASS,
],
# Someday, we might also support tags, since StopNCII supports it
# tags: t.Optional[t.Set[str]] = None
) -> None:
"""
Report that you matched this signal to content on your platform.

This doesn't have any indication of whether this content is harmful,
but this can help certify that your matching implementation is
functional, and help other platforms track platform-to-platform
spread.
"""
raise NotImplementedError


class ExchangeWithUpload(t.Generic[TCollabConfig]):
"""
Mixin for exchanges that supports uploading new opinions to the exchange
"""

def submit_opinion(
self,
# The collaboration we should upload the opinion to
collab: TCollabConfig,
# The SignalType we are uploading
s_type: t.Type[SignalType],
# The signal value we are uploading
signal: str,
# The opinion we are sharing
opinion: fetch_state.SignalOpinion,
) -> None:
"""
Weigh in on a signal for this collaboration.

Most implementations will want a full replacement specialization, but this
allows a common interface for all uploads for the simplest usecases.
"""
raise NotImplementedError