diff options
Diffstat (limited to 'src/stream.cpp')
-rw-r--r-- | src/stream.cpp | 295 |
1 files changed, 0 insertions, 295 deletions
diff --git a/src/stream.cpp b/src/stream.cpp deleted file mode 100644 index 86d177c..0000000 --- a/src/stream.cpp +++ /dev/null | |||
@@ -1,295 +0,0 @@ | |||
1 | #include "stream.h" | ||
2 | #include <liboauthcpp/liboauthcpp.h> | ||
3 | #include <curl_easy.h> | ||
4 | #include <curl_header.h> | ||
5 | #include "util.h" | ||
6 | #include "notification.h" | ||
7 | #include "request.h" | ||
8 | |||
9 | namespace twitter { | ||
10 | |||
11 | stream::stream( | ||
12 | const auth& tauth, | ||
13 | notify_callback callback, | ||
14 | bool with_followings, | ||
15 | bool receive_all_replies, | ||
16 | std::list<std::string> track, | ||
17 | std::list<bounding_box> locations) : | ||
18 | _auth(tauth), | ||
19 | _notify(callback), | ||
20 | _currentUser(get( | ||
21 | _auth, | ||
22 | "https://api.twitter.com/1.1/account/verify_credentials.json") | ||
23 | .perform()), | ||
24 | _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations)) | ||
25 | { | ||
26 | } | ||
27 | |||
28 | stream::~stream() | ||
29 | { | ||
30 | if (_thread.joinable()) | ||
31 | { | ||
32 | _stop = true; | ||
33 | _thread.join(); | ||
34 | } | ||
35 | } | ||
36 | |||
37 | std::string stream::generateUrl( | ||
38 | bool with_followings, | ||
39 | bool receive_all_replies, | ||
40 | std::list<std::string> track, | ||
41 | std::list<bounding_box> locations) | ||
42 | { | ||
43 | std::list<std::string> arguments; | ||
44 | |||
45 | if (receive_all_replies) | ||
46 | { | ||
47 | arguments.push_back("replies=all"); | ||
48 | } | ||
49 | |||
50 | if (!with_followings) | ||
51 | { | ||
52 | arguments.push_back("with=user"); | ||
53 | } | ||
54 | |||
55 | if (!track.empty()) | ||
56 | { | ||
57 | std::ostringstream trackstr; | ||
58 | trackstr << "track="; | ||
59 | |||
60 | for (auto it = std::begin(track); it != std::end(track); it++) | ||
61 | { | ||
62 | if (it != std::begin(track)) | ||
63 | { | ||
64 | trackstr << ","; | ||
65 | } | ||
66 | |||
67 | trackstr << OAuth::HttpEncodeQueryValue(*it); | ||
68 | } | ||
69 | |||
70 | arguments.push_back(trackstr.str()); | ||
71 | } | ||
72 | |||
73 | if (!locations.empty()) | ||
74 | { | ||
75 | std::ostringstream localstr; | ||
76 | localstr << "locations="; | ||
77 | |||
78 | for (auto it = std::begin(locations); it != std::end(locations); it++) | ||
79 | { | ||
80 | if (it != std::begin(locations)) | ||
81 | { | ||
82 | localstr << ","; | ||
83 | } | ||
84 | |||
85 | localstr << (double)it->getSouthWestLongitude() << ","; | ||
86 | localstr << (double)it->getSouthWestLatitude() << ","; | ||
87 | localstr << (double)it->getNorthEastLongitude() << ","; | ||
88 | localstr << (double)it->getNorthEastLatitude(); | ||
89 | } | ||
90 | |||
91 | arguments.push_back(localstr.str()); | ||
92 | } | ||
93 | |||
94 | std::ostringstream urlstr; | ||
95 | urlstr << "https://userstream.twitter.com/1.1/user.json"; | ||
96 | |||
97 | if (!arguments.empty()) | ||
98 | { | ||
99 | urlstr << "?"; | ||
100 | urlstr << implode(std::begin(arguments), std::end(arguments), "&"); | ||
101 | } | ||
102 | |||
103 | return urlstr.str(); | ||
104 | } | ||
105 | |||
106 | void stream::run(std::string url) | ||
107 | { | ||
108 | _backoff_type = backoff::none; | ||
109 | _backoff_amount = std::chrono::milliseconds(0); | ||
110 | for (;;) | ||
111 | { | ||
112 | curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) { | ||
113 | return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb); | ||
114 | }); | ||
115 | |||
116 | curl::curl_easy conn(ios); | ||
117 | curl::curl_header headers; | ||
118 | std::string oauth_header; | ||
119 | |||
120 | try | ||
121 | { | ||
122 | oauth_header = _auth.getClient().getFormattedHttpHeader(OAuth::Http::Get, url, ""); | ||
123 | |||
124 | if (!oauth_header.empty()) | ||
125 | { | ||
126 | headers.add(oauth_header); | ||
127 | } | ||
128 | } catch (const OAuth::ParseError& error) | ||
129 | { | ||
130 | std::cout << "Error generating OAuth header:" << std::endl; | ||
131 | std::cout << error.what() << std::endl; | ||
132 | std::cout << "This is likely due to a malformed URL." << std::endl; | ||
133 | |||
134 | assert(false); | ||
135 | } | ||
136 | |||
137 | try | ||
138 | { | ||
139 | conn.add<CURLOPT_HEADERFUNCTION>(nullptr); | ||
140 | conn.add<CURLOPT_HEADERDATA>(nullptr); | ||
141 | conn.add<CURLOPT_XFERINFOFUNCTION>([] (void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) { | ||
142 | return static_cast<stream*>(cdata)->progress(); | ||
143 | }); | ||
144 | conn.add<CURLOPT_XFERINFODATA>(this); | ||
145 | conn.add<CURLOPT_NOPROGRESS>(0); | ||
146 | //conn.add<CURLOPT_VERBOSE>(1); | ||
147 | //conn.add<CURLOPT_DEBUGFUNCTION>(my_trace); | ||
148 | conn.add<CURLOPT_URL>(url.c_str()); | ||
149 | conn.add<CURLOPT_HTTPHEADER>(headers.get()); | ||
150 | } catch (const curl::curl_exception& error) | ||
151 | { | ||
152 | error.print_traceback(); | ||
153 | |||
154 | assert(false); | ||
155 | } | ||
156 | |||
157 | bool failure = false; | ||
158 | try | ||
159 | { | ||
160 | conn.perform(); | ||
161 | } catch (const curl::curl_easy_exception& error) | ||
162 | { | ||
163 | failure = true; | ||
164 | if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop) | ||
165 | { | ||
166 | break; | ||
167 | } else { | ||
168 | if (_backoff_type == backoff::none) | ||
169 | { | ||
170 | _established = false; | ||
171 | _backoff_type = backoff::network; | ||
172 | _backoff_amount = std::chrono::milliseconds(0); | ||
173 | } | ||
174 | } | ||
175 | } | ||
176 | |||
177 | if (!failure) | ||
178 | { | ||
179 | long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get(); | ||
180 | if (response_code == 420) | ||
181 | { | ||
182 | if (_backoff_type == backoff::none) | ||
183 | { | ||
184 | _established = false; | ||
185 | _backoff_type = backoff::rate_limit; | ||
186 | _backoff_amount = std::chrono::minutes(1); | ||
187 | } | ||
188 | } else if (response_code != 200) | ||
189 | { | ||
190 | if (_backoff_type == backoff::none) | ||
191 | { | ||
192 | _established = false; | ||
193 | _backoff_type = backoff::http; | ||
194 | _backoff_amount = std::chrono::seconds(5); | ||
195 | } | ||
196 | } else { | ||
197 | if (_backoff_type == backoff::none) | ||
198 | { | ||
199 | _established = false; | ||
200 | _backoff_type = backoff::network; | ||
201 | _backoff_amount = std::chrono::milliseconds(0); | ||
202 | } | ||
203 | } | ||
204 | } | ||
205 | |||
206 | std::this_thread::sleep_for(_backoff_amount); | ||
207 | |||
208 | switch (_backoff_type) | ||
209 | { | ||
210 | case backoff::network: | ||
211 | { | ||
212 | if (_backoff_amount < std::chrono::seconds(16)) | ||
213 | { | ||
214 | _backoff_amount += std::chrono::milliseconds(250); | ||
215 | } | ||
216 | |||
217 | break; | ||
218 | } | ||
219 | |||
220 | case backoff::http: | ||
221 | { | ||
222 | if (_backoff_amount < std::chrono::seconds(320)) | ||
223 | { | ||
224 | _backoff_amount *= 2; | ||
225 | } | ||
226 | |||
227 | break; | ||
228 | } | ||
229 | |||
230 | case backoff::rate_limit: | ||
231 | { | ||
232 | _backoff_amount *= 2; | ||
233 | |||
234 | break; | ||
235 | } | ||
236 | |||
237 | case backoff::none: | ||
238 | { | ||
239 | break; | ||
240 | } | ||
241 | } | ||
242 | } | ||
243 | } | ||
244 | |||
245 | size_t stream::write(char* ptr, size_t size, size_t nmemb) | ||
246 | { | ||
247 | for (size_t i = 0; i < size*nmemb; i++) | ||
248 | { | ||
249 | if (ptr[i] == '\r') | ||
250 | { | ||
251 | i++; // Skip the \n | ||
252 | |||
253 | if (!_buffer.empty()) | ||
254 | { | ||
255 | notification n(_currentUser, _buffer); | ||
256 | if (n.getType() == notification::type::friends) | ||
257 | { | ||
258 | _established = true; | ||
259 | _backoff_type = backoff::none; | ||
260 | _backoff_amount = std::chrono::milliseconds(0); | ||
261 | } | ||
262 | |||
263 | _notify(std::move(n)); | ||
264 | |||
265 | _buffer = ""; | ||
266 | } | ||
267 | } else { | ||
268 | _buffer.push_back(ptr[i]); | ||
269 | } | ||
270 | } | ||
271 | |||
272 | time(&_last_write); | ||
273 | |||
274 | return size*nmemb; | ||
275 | } | ||
276 | |||
277 | int stream::progress() | ||
278 | { | ||
279 | if (_stop) | ||
280 | { | ||
281 | return 1; | ||
282 | } | ||
283 | |||
284 | if (_established) | ||
285 | { | ||
286 | if (difftime(time(NULL), _last_write) >= 90) | ||
287 | { | ||
288 | return 1; | ||
289 | } | ||
290 | } | ||
291 | |||
292 | return 0; | ||
293 | } | ||
294 | |||
295 | } | ||