-
Notifications
You must be signed in to change notification settings - Fork 4
/
twitter.go
162 lines (155 loc) · 3.95 KB
/
twitter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package twitterfeed
import (
"bufio"
"context"
"encoding/json"
"log"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
"github.com/garyburd/go-oauth/oauth"
)
// Tweet is a single tweet.
type Tweet struct {
// Text is the body of the tweet.
Text string
// Terms is a list of matching terms in the text.
Terms []string
}
// TweetReader reads tweets.
type TweetReader struct {
consumerKey, consumerSecret, accessToken, accessSecret string
}
// NewTweetReader creates a new TweetReader with the given credentials.
func NewTweetReader(consumerKey, consumerSecret, accessToken, accessSecret string) *TweetReader {
return &TweetReader{
consumerKey: consumerKey,
consumerSecret: consumerSecret,
accessToken: accessToken,
accessSecret: accessSecret,
}
}
// Run starts reading and returns a channel through which Tweet objects are sent.
// Use a cancel function or timeout on the context to terminate the reader.
func (r *TweetReader) Run(ctx context.Context, terms ...string) <-chan Tweet {
tweetsChan := make(chan Tweet)
var connLock sync.Mutex
var conn net.Conn
client := &http.Client{
Transport: &http.Transport{
Dial: func(netw, addr string) (net.Conn, error) {
connLock.Lock()
defer connLock.Unlock()
if conn != nil {
conn.Close()
conn = nil
}
netc, err := net.DialTimeout(netw, addr, 5*time.Second)
if err != nil {
return nil, err
}
conn = netc
return netc, nil
},
},
}
creds := &oauth.Credentials{
Token: r.accessToken,
Secret: r.accessSecret,
}
authClient := &oauth.Client{
Credentials: oauth.Credentials{
Token: r.consumerKey,
Secret: r.consumerSecret,
},
}
go func() {
// periodically close the connection to keep it fresh,
// and if the context is done, close the connection and exit.
// Closing the connection will cause the main loop to exit, which
// in turn will check for a done context and abort, closing the channel.
for {
select {
case <-ctx.Done():
connLock.Lock()
if conn != nil {
conn.Close()
conn = nil
}
connLock.Unlock()
return
case <-time.After(2 * time.Minute):
connLock.Lock()
if conn != nil {
conn.Close()
conn = nil
}
connLock.Unlock()
}
}
}()
go func() {
// make the query and send all tweets into the channel
defer close(tweetsChan)
for {
select {
case <-ctx.Done():
return
default:
form := url.Values{"track": {strings.Join(terms, ",")}}
formEnc := form.Encode()
u, _ := url.Parse("https://stream.twitter.com/1.1/statuses/filter.json")
req, err := http.NewRequest("POST", u.String(), strings.NewReader(formEnc))
if err != nil {
log.Println("creating filter request failed:", err)
continue
}
req.Header.Set("Authorization", authClient.AuthorizationHeader(creds, "POST", u, form))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Content-Length", strconv.Itoa(len(formEnc)))
resp, err := client.Do(req)
if err != nil {
log.Println("Error getting response:", err)
continue
}
if resp.StatusCode != http.StatusOK {
// this is a nice way to see what the error actually is:
s := bufio.NewScanner(resp.Body)
s.Scan()
log.Println(s.Text())
log.Println("StatusCode =", resp.StatusCode)
continue
}
decoder := json.NewDecoder(resp.Body)
func() {
defer resp.Body.Close()
for {
var t Tweet
if err := decoder.Decode(&t); err != nil {
break
}
t.Terms = foundTerms(t.Text, terms...)
tweetsChan <- t
}
}()
}
}
}()
return tweetsChan
}
// foundTerms searches text for any of the terms and returns a list
// of any that appear.
func foundTerms(text string, terms ...string) []string {
text = strings.ToLower(text)
var foundTerms []string
for _, term := range terms {
if strings.Contains(text, strings.ToLower(term)) {
foundTerms = append(foundTerms, term)
}
}
return foundTerms
}