Skip to content

Method: Streaming

Matthew R. DeVerna edited this page Aug 26, 2021 · 3 revisions

Twitter offers two different streaming endpoints to gather tweets in real-time:

  1. Filtered stream : The filtered stream endpoint enables developers to filter the real-time stream of public Tweets.
    • There are also filtered stream endpoints that enable you to create and manage matching rules, and apply those rules to filter a stream of real-time Tweets that will return matching public Tweets. For example, you can request all tweets which include the word "politics" or some other string.
  2. Sampled stream : The sampled stream endpoint delivers a roughly 1% random sample of publicly available Tweets in real-time.

Note: The streaming endpoints cannot be used with the Rate Limit Manager tool. Thus, during authorization the manage_rate_limits parameter must be set to False. See Adding filter rules for an example.

Contents

Filtered streaming

There are three different osometweet methods that will help you stream real-time filtered public tweets.

Type osometweet method Purpose Twitter endpoint
Streaming filtered_stream Connect to the stream GET /2/tweets/search/stream
Management set_filtered_stream_rule Add or delete rules from your stream POST /2/tweets/search/stream/rules
Management get_filtered_stream_rule Retrieve your stream's rules GET /2/tweets/search/stream/rules

To utilize the filtered_stream endpoint, we must first understand how to manage the matching rules. Matching rules are the criteria we provide to Twitter to tell them what we want them to give us.

For example, if we wanted only tweets that contain specific keywords - for example, "coronavirus" or "indiana" - we would need to create matching rules that tells Twitter to do exactly that. Here is what that looks like.

Adding filter rules

To add filter rules, we use the set_filtered_stream_rule method.

import osometweet

# Initialize the OSoMeTweet object
bearer_token = "YOUR_TWITTER_BEARER_TOKEN"
oauth2 = osometweet.OAuth2(
    bearer_token=bearer_token,
    manage_rate_limits=False    # <~~~ Must be set to False!!
)
ot = osometweet.OsomeTweet(oauth2)

# Add streaming rules
rules = [{"value": "coronavirus", "tag": "all coronavirus tweets"},
         {"value": "indiana", "tag": "all indiana tweets"}]
add_rules = {"add": rules}
response = ot.set_filtered_stream_rule(rules=add_rules) #<~~~ Where the magic happens!
print(f"API response from adding two rules:\n\n{response}")

which would return something like the below output

API response from adding two rules:

{'data': [{'value': 'coronavirus',
   'tag': 'all coronavirus tweets',
   'id': '1429542787012251666'},
  {'value': 'indiana',
   'tag': 'all indiana tweets',
   'id': '1429542787012251665'}],
 'meta': {'sent': '2021-08-22T20:35:36.106Z',
  'summary': {'created': 2, 'not_created': 0, 'valid': 2, 'invalid': 0}}}

Understand adding filter rules

We highly recommend you check out Twitter's own documentation on how to build a rule. Also, see Building High Quality Filters for a more in depth review.

Nonetheless, we provide a basic explanation of how adding rules works and their structure to get you up and running.

Rules are added based on a list of dictionaries with the keys: value and tag. Each dictionary in that list makes up one rule where the keys represent the below...

  • value : The matching criteria
    • Twitter returns tweets that match this value's input. See the links above to learn about the different ways to match tweets.
  • tag : A label for the matching rule in that dictionary
    • This doesn't affect the actual tweets that are returned, however, if you have many rules, creating simple tags can be helpful should you want to find and delete specific rules (see Deleting filter rules for more information on this).

So the endpoint takes in something like the below (which you can see we created above in the Adding filter rules section).

{'add': [
    {'value': 'coronavirus', 'tag': 'all coronavirus tweets'},
    {'value': 'indiana', 'tag': 'all indiana tweets'}
]}

The top-level key add tells Twitter that we are adding rules and feeds the list as input of what to add.

Retrieving filter rules

Now, if we wanted to check that the rules added during the Adding filter rules section are actually there, we can use the get_filtered_stream_rule method.

We can do this like so...

current_rules = ot.get_filtered_stream_rule()
print(f"API response when retrieving current rules:\n\n{response}")

which will show

API response when retrieving current rules:

{'data': [{'value': 'coronavirus',
   'tag': 'all coronavirus tweets',
   'id': '1429542787012251666'},
  {'value': 'indiana',
   'tag': 'all indiana tweets',
   'id': '1429542787012251665'}],
 'meta': {'sent': '2021-08-22T20:35:36.106Z',
  'summary': {'created': 2, 'not_created': 0, 'valid': 2, 'invalid': 0}}}

We can see here, that our rules are included under the data key. The value and tag keys are included exactly as we passed them and each rule also includes a unique identifier key id.

Note, these ids will be unique each time you create these rules - i.e., that is if you add the rule which matches "coronavirus", it will create a unique value for id. If you then delete all of your rules and recreate that exact same rule, the value for id will not be the same.

Connecting to the filtered stream endpoint

Now that we have successfully added some matching rules, and we are confident they are there, we can connect to the streaming endpoint and begin gathering tweets. Here is how we do that...

import json

# Returns a generator
stream = ot.filtered_stream()

# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
    
    # Then, we read the json `tweet` object as a dictionary and select the `data`
    # Note: if it's not there for some reason, this line returns `None`...
    data = json.loads(tweet).get("data")

    # ... but if we do find data, we can then print each tweet
    if data:
        print(data)

This will then print tweets in real-time that match any of our rules. For example, we might get...

{'id': '1429556303177846788',
 'text': 'It could have saved lives. So how did coronavirus response plan go missing? https://t.co/BuAjWemVRy'}

... because it includes the word coronavirus. However, if you noticed that this is not much data and you'd like to request specific fields / expansions. See the following section.

Requesting additional fields and expansions

Like all of the other endpoints, we can request additional fields and expansions by passing them into the fields or expansions parameters. We can do this by replacing the ot.filtered_stream() line above with something like the below...

# Add all tweet fields
import json

# Get all tweet fields
all_tweet_fields = osometweet.TweetFields(everything=True)

# Returns a generator
stream = ot.filtered_stream(fields=all_tweet_fields) # <~~~ now we request all tweet fields

# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
    
    # Then, we read the json `tweet` object as a dictionary and select the `data`
    # Note: if it's not there for some reason, this line returns `None`...
    data = json.loads(tweet).get("data")

    # ... but if we do find data, we can then print each tweet
    if data:
        print(data)

... which will then return tweets containing all of the tweet fields. This works in exactly the same way for expansions.

Requesting all fields and expansions

As always, we can use the everything=True parameter to request everything from Twitter. This looks like...

import json

# Returns a generator
stream = ot.filtered_stream(everything=True) # <~~~ now we request everything

# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
    
    # Then, we read the json `tweet` object as a dictionary and select the `data`
    # Note: if it's not there for some reason, this line returns `None`...
    data = json.loads(tweet).get("data")

    # ... but if we do find data, we can then print each tweet
    if data:
        print(data)

Deleting filter rules

To delete filter rules, we use the set_filtered_stream_rule method again. If we assume that the rules we added during the Adding filter rules section are still present, they are...

{'data': [{'value': 'coronavirus',
   'tag': 'all coronavirus tweets',
   'id': '1429542787012251666'},
  {'value': 'indiana',
   'tag': 'all indiana tweets',
   'id': '1429542787012251665'}],
 'meta': {'sent': '2021-08-22T20:35:36.106Z',
  'summary': {'created': 2, 'not_created': 0, 'valid': 2, 'invalid': 0}}}

Note: If we're not sure what's there, we can request them from Twitter -- as discussed in the Retrieving filter rules section -- via

current_rules = ot.get_filtered_stream_rule()

To delete rules, we need to provide a list of the id's for each rule that we'd like to delete. So if we have a current_rules object that represents the above dictionary, we can collect all of the tweet ids into a list with the below line.

all_rule_ids = [rule["id"] for rule in current_rules["data"]]

Using a list comprehension we easily iterate through each rule in the data object and select each id value.

Then we simply make use of the set_filtered_stream_rule in the following way.

delete_rule = {'delete': {'ids':all_rule_ids}}
ot.set_filtered_stream_rule(rules=delete_rule)

Notice that we needed to embed the list of ids inside of a dictionary prior to passing it to the method. Just like adding filter rules, the first key of this dictionary tells Twitter what action it should be doing - i.e., delete tells Twitter to remove rules, based on the list of ids provided.

Sampled streaming

We can access the sampled streaming endpoint with the sampled_stream method.

Connecting to the sampled stream endpoint

As this endpoint doesn't take any matching criteria and simply returns a general 1% sample, there is much less to think about and we can begin collecting tweets from the sampled stream in the following way...

import osometweet
import json

# Initialize the OSoMeTweet object
bearer_token = "YOUR_TWITTER_BEARER_TOKEN"
oauth2 = osometweet.OAuth2(
    bearer_token=bearer_token,
    manage_rate_limits=False
)
ot = osometweet.OsomeTweet(oauth2)

# Returns a generator
stream = ot.sampled_stream()

# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
    
    # Then, we read the json `tweet` object as a dictionary and select the `data`
    # Note: if it's not there for some reason, this line returns `None`...
    data = json.loads(tweet).get("data")

    # ... but if we do find data, we can then print each tweet
    if data:
        print(data)

Requesting additional fields and expansions

Like all of the other endpoints, we can request additional fields and expansions by passing them into the fields or expansions parameters. We can do this by replacing the ot.sampled_stream() line above with something like the below...

# Add all tweet fields
import json

# Get all tweet fields
all_tweet_fields = osometweet.TweetFields(everything=True)

# Returns a generator
stream = ot.sampled_stream(fields=all_tweet_fields) # <~~~ now we request all tweet fields

# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
    
    # Then, we read the json `tweet` object as a dictionary and select the `data`
    # Note: if it's not there for some reason, this line returns `None`...
    data = json.loads(tweet).get("data")

    # ... but if we do find data, we can then print each tweet
    if data:
        print(data)

... which will then return tweets containing all of the tweet fields. This works in exactly the same way for expansions.

Requesting all fields and expansions

As always, we can use the everything=True parameter to request everything from Twitter. This looks like...

import json

# Returns a generator
stream = ot.sampled_stream(everything=True) # <~~~ now we request everything

# Because we have a generator, we iterate over each tweet
for tweet in stream.iter_lines():
    
    # Then, we read the json `tweet` object as a dictionary and select the `data`
    # Note: if it's not there for some reason, this line returns `None`...
    data = json.loads(tweet).get("data")

    # ... but if we do find data, we can then print each tweet
    if data:
        print(data)

Fields and expansions

V2 API, by default, only returns limited information. To fetch more, you will need to specify the fields and expansions parameters in the requests. OSoMeTweet contains several classes to handle them. See Method: Specifying fields and expansions for examples of how to work with them.