-
Notifications
You must be signed in to change notification settings - Fork 3
Method: Streaming
Twitter offers two different streaming endpoints to gather tweets in real-time:
-
Filtered stream : The filtered stream endpoint enables developers to filter the real-time stream of public Tweets.
- There are also filtered stream endpoints that enable you to create and manage matching rules, and apply those rules to filter a stream of real-time Tweets that will return matching public Tweets. For example, you can request all tweets which include the word "politics" or some other string.
- Sampled stream : The sampled stream endpoint delivers a roughly 1% random sample of publicly available Tweets in real-time.
There are three different osometweet
methods that will help you stream real-time filtered public tweets.
Type |
osometweet method |
Purpose | Twitter endpoint |
---|---|---|---|
Streaming | filtered_stream |
Connect to the stream | GET /2/tweets/search/stream |
Management | set_filtered_stream_rule |
Add or delete rules from your stream | POST /2/tweets/search/stream/rules |
Management | get_filtered_stream_rule |
Retrieve your stream's rules | GET /2/tweets/search/stream/rules |
To utilize the filtered_stream
endpoint, we must first understand how to manage the matching rules. Matching rules are the criteria we provide to Twitter to tell them what we want them to give us.
For example, if we wanted only tweets that contain specific keywords - for example, "coronavirus" or "indiana" - we would need to create matching rules that tells Twitter to do exactly that. Here is what that looks like.
To add filter rules, we use the set_filtered_stream_rule
method.
import osometweet
# Initialize the OSoMeTweet object
bearer_token = "YOUR_TWITTER_BEARER_TOKEN"
oauth2 = osometweet.OAuth2(
bearer_token=bearer_token,
manage_rate_limits=False
)
ot = osometweet.OsomeTweet(oauth2)
# Add streaming rules
rules = [{"value": "coronavirus", "tag": "all coronavirus tweets"},
{"value": "indiana", "tag": "all indiana tweets"}]
add_rules = {"add": rules}
response = ot.set_filtered_stream_rule(rules=add_rules) #<~~~ Where the magic happens!
print(f"API response from adding two rules:\n\n{response}")
which would return something like the below output
API response from adding two rules:
{'data': [{'value': 'coronavirus',
'tag': 'all coronavirus tweets',
'id': '1429542787012251666'},
{'value': 'indiana',
'tag': 'all indiana tweets',
'id': '1429542787012251665'}],
'meta': {'sent': '2021-08-22T20:35:36.106Z',
'summary': {'created': 2, 'not_created': 0, 'valid': 2, 'invalid': 0}}}
We highly recommend you check out Twitter's own documentation on how to build a rule. Also, see Building High Quality Filters for a more in depth review.
Nonetheless, we provide a basic explanation of how adding rules works and their structure to get you up and running.
Rules are added based on a list of dictionaries with the keys: value
and tag
. Each dictionary in that list makes up one rule where the keys represent the below...
-
value
: The matching criteria- Twitter returns tweets that match this value's input. See the links above to learn about the different ways to match tweets.
-
tag
: A label for the matching rule in that dictionary- This doesn't affect the actual tweets that are returned, however, if you have many rules, creating simple tags can be helpful should you want to find and delete specific rules (see Deleting filter rules for more information on this).
So the endpoint takes in something like the below (which you can see we created above in the Adding filter rules section).
{'add': [
{'value': 'coronavirus', 'tag': 'all coronavirus tweets'},
{'value': 'indiana', 'tag': 'all indiana tweets'}
]}
The top-level key add
tells Twitter that we are adding rules and feeds the list as input of what to add.
Now, if we wanted to check that the rules added during the Adding filter rules section are actually there, we can use the get_filtered_stream_rule
method.
We can do this like so...
current_rules = ot.get_filtered_stream_rule()
print(f"API response when retrieving current rules:\n\n{response}")
which will show
API response when retrieving current rules:
{'data': [{'value': 'coronavirus',
'tag': 'all coronavirus tweets',
'id': '1429542787012251666'},
{'value': 'indiana',
'tag': 'all indiana tweets',
'id': '1429542787012251665'}],
'meta': {'sent': '2021-08-22T20:35:36.106Z',
'summary': {'created': 2, 'not_created': 0, 'valid': 2, 'invalid': 0}}}
We can see here, that our rules are included under the data
key. The value
and tag
keys are included exactly as we passed them and each rule also includes a unique identifier key id
.
Note, these ids will be unique each time you create these rules - i.e., that is if you add the rule which matches "coronavirus", it will create a unique value for
id
. If you then delete all of your rules and recreate that exact same rule, the value forid
will not be the same.
Now that we have successfully added some matching rules, and we are confident they are there, we can connect to the streaming endpoint and begin gathering tweets. Here is how we do that...
import json
# Returns a generator
stream = ot.filtered_stream()
# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
# Then, we read the json `tweet` object as a dictionary and select the `data`
# Note: if it's not there for some reason, this line returns `None`...
data = json.loads(tweet).get("data")
# ... but if we do find data, we can then print each tweet
if data:
print(data)
This will then print tweets in real-time that match any of our rules. For example, we might get...
{'id': '1429556303177846788',
'text': 'It could have saved lives. So how did coronavirus response plan go missing? https://t.co/BuAjWemVRy'}
... because it includes the word coronavirus
. However, if you noticed that this is not much data and you'd like to request specific fields / expansions. See the following section.
Like all of the other endpoints, we can request additional fields and expansions by passing them into the fields
or expansions
parameters. We can do this by replacing the ot.filtered_stream()
line above with something like the below...
# Add all tweet fields
import json
# Get all tweet fields
all_tweet_fields = osometweet.TweetFields(everything=True)
# Returns a generator
stream = ot.filtered_stream(fields=all_tweet_fields) # <~~~ now we request all tweet fields
# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
# Then, we read the json `tweet` object as a dictionary and select the `data`
# Note: if it's not there for some reason, this line returns `None`...
data = json.loads(tweet).get("data")
# ... but if we do find data, we can then print each tweet
if data:
print(data)
... which will then return tweets containing all of the tweet fields. This works in exactly the same way for expansions.
As always, we can use the everything=True
parameter to request everything from Twitter. This looks like...
import json
# Returns a generator
stream = ot.filtered_stream(everything=True) # <~~~ now we request everything
# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
# Then, we read the json `tweet` object as a dictionary and select the `data`
# Note: if it's not there for some reason, this line returns `None`...
data = json.loads(tweet).get("data")
# ... but if we do find data, we can then print each tweet
if data:
print(data)
To delete filter rules, we use the set_filtered_stream_rule
method again. If we assume that the rules we added during the Adding filter rules section are still present, they are...
{'data': [{'value': 'coronavirus',
'tag': 'all coronavirus tweets',
'id': '1429542787012251666'},
{'value': 'indiana',
'tag': 'all indiana tweets',
'id': '1429542787012251665'}],
'meta': {'sent': '2021-08-22T20:35:36.106Z',
'summary': {'created': 2, 'not_created': 0, 'valid': 2, 'invalid': 0}}}
Note: If we're not sure what's there, we can request them from Twitter -- as discussed in the Retrieving filter rules section -- via
current_rules = ot.get_filtered_stream_rule()
To delete rules, we need to provide a list of the id
's for each rule that we'd like to delete. So if we have a current_rules
object that represents the above dictionary, we can collect all of the tweet ids into a list with the below line.
all_rule_ids = [rule["id"] for rule in current_rules["data"]]
Using a list comprehension we easily iterate through each rule in the data
object and select each id
value.
Then we simply make use of the set_filtered_stream_rule
in the following way.
delete_rule = {'delete': {'ids':all_rule_ids}}
ot.set_filtered_stream_rule(rules=delete_rule)
Notice that we needed to embed the list of ids inside of a dictionary prior to passing it to the method. Just like adding filter rules, the first key of this dictionary tells Twitter what action it should be doing - i.e., delete
tells Twitter to remove rules, based on the list of ids
provided.
We can access the sampled streaming endpoint with the sampled_stream
method.
As this endpoint doesn't take any matching criteria and simply returns a general 1% sample, there is much less to think about and we can begin collecting tweets from the sampled stream in the following way...
import osometweet
import json
# Initialize the OSoMeTweet object
bearer_token = "YOUR_TWITTER_BEARER_TOKEN"
oauth2 = osometweet.OAuth2(
bearer_token=bearer_token,
manage_rate_limits=False
)
ot = osometweet.OsomeTweet(oauth2)
# Returns a generator
stream = ot.sampled_stream()
# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
# Then, we read the json `tweet` object as a dictionary and select the `data`
# Note: if it's not there for some reason, this line returns `None`...
data = json.loads(tweet).get("data")
# ... but if we do find data, we can then print each tweet
if data:
print(data)
Like all of the other endpoints, we can request additional fields and expansions by passing them into the fields
or expansions
parameters. We can do this by replacing the ot.sampled_stream()
line above with something like the below...
# Add all tweet fields
import json
# Get all tweet fields
all_tweet_fields = osometweet.TweetFields(everything=True)
# Returns a generator
stream = ot.sampled_stream(fields=all_tweet_fields) # <~~~ now we request all tweet fields
# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
# Then, we read the json `tweet` object as a dictionary and select the `data`
# Note: if it's not there for some reason, this line returns `None`...
data = json.loads(tweet).get("data")
# ... but if we do find data, we can then print each tweet
if data:
print(data)
... which will then return tweets containing all of the tweet fields. This works in exactly the same way for expansions.
As always, we can use the everything=True
parameter to request everything from Twitter. This looks like...
import json
# Returns a generator
stream = ot.sampled_stream(everything=True) # <~~~ now we request everything
# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
# Then, we read the json `tweet` object as a dictionary and select the `data`
# Note: if it's not there for some reason, this line returns `None`...
data = json.loads(tweet).get("data")
# ... but if we do find data, we can then print each tweet
if data:
print(data)
V2 API, by default, only returns limited information.
To fetch more, you will need to specify the fields
and expansions
parameters in the requests.
OSoMeTweet
contains several classes to handle them.
See Method: Specifying fields and expansions for examples of how to work with them.