diff options
Diffstat (limited to 'src/stream.cpp')
-rw-r--r-- | src/stream.cpp | 291 |
1 files changed, 291 insertions, 0 deletions
diff --git a/src/stream.cpp b/src/stream.cpp new file mode 100644 index 0000000..17dcce7 --- /dev/null +++ b/src/stream.cpp | |||
@@ -0,0 +1,291 @@ | |||
1 | #include "stream.h" | ||
2 | #include <liboauthcpp/liboauthcpp.h> | ||
3 | #include <curl_easy.h> | ||
4 | #include <curl_header.h> | ||
5 | #include "util.h" | ||
6 | #include "notification.h" | ||
7 | #include "client.h" | ||
8 | |||
9 | namespace twitter { | ||
10 | |||
11 | stream::stream( | ||
12 | const client& tclient, | ||
13 | notify_callback callback, | ||
14 | bool with_followings, | ||
15 | bool receive_all_replies, | ||
16 | std::list<std::string> track, | ||
17 | std::list<bounding_box> locations) : | ||
18 | _client(tclient), | ||
19 | _notify(callback), | ||
20 | _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations)) | ||
21 | { | ||
22 | } | ||
23 | |||
24 | stream::~stream() | ||
25 | { | ||
26 | if (_thread.joinable()) | ||
27 | { | ||
28 | _stop = true; | ||
29 | _thread.join(); | ||
30 | } | ||
31 | } | ||
32 | |||
33 | std::string stream::generateUrl( | ||
34 | bool with_followings, | ||
35 | bool receive_all_replies, | ||
36 | std::list<std::string> track, | ||
37 | std::list<bounding_box> locations) | ||
38 | { | ||
39 | std::list<std::string> arguments; | ||
40 | |||
41 | if (receive_all_replies) | ||
42 | { | ||
43 | arguments.push_back("replies=all"); | ||
44 | } | ||
45 | |||
46 | if (!with_followings) | ||
47 | { | ||
48 | arguments.push_back("with=user"); | ||
49 | } | ||
50 | |||
51 | if (!track.empty()) | ||
52 | { | ||
53 | std::ostringstream trackstr; | ||
54 | trackstr << "track="; | ||
55 | |||
56 | for (auto it = std::begin(track); it != std::end(track); it++) | ||
57 | { | ||
58 | if (it != std::begin(track)) | ||
59 | { | ||
60 | trackstr << ","; | ||
61 | } | ||
62 | |||
63 | trackstr << OAuth::HttpEncodeQueryValue(*it); | ||
64 | } | ||
65 | |||
66 | arguments.push_back(trackstr.str()); | ||
67 | } | ||
68 | |||
69 | if (!locations.empty()) | ||
70 | { | ||
71 | std::ostringstream localstr; | ||
72 | localstr << "locations="; | ||
73 | |||
74 | for (auto it = std::begin(locations); it != std::end(locations); it++) | ||
75 | { | ||
76 | if (it != std::begin(locations)) | ||
77 | { | ||
78 | localstr << ","; | ||
79 | } | ||
80 | |||
81 | localstr << (double)it->getSouthWestLongitude() << ","; | ||
82 | localstr << (double)it->getSouthWestLatitude() << ","; | ||
83 | localstr << (double)it->getNorthEastLongitude() << ","; | ||
84 | localstr << (double)it->getNorthEastLatitude(); | ||
85 | } | ||
86 | |||
87 | arguments.push_back(localstr.str()); | ||
88 | } | ||
89 | |||
90 | std::ostringstream urlstr; | ||
91 | urlstr << "https://userstream.twitter.com/1.1/user.json"; | ||
92 | |||
93 | if (!arguments.empty()) | ||
94 | { | ||
95 | urlstr << "?"; | ||
96 | urlstr << implode(std::begin(arguments), std::end(arguments), "&"); | ||
97 | } | ||
98 | |||
99 | return urlstr.str(); | ||
100 | } | ||
101 | |||
102 | void stream::run(std::string url) | ||
103 | { | ||
104 | curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) { | ||
105 | return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb); | ||
106 | }); | ||
107 | |||
108 | curl::curl_easy conn(ios); | ||
109 | curl::curl_header headers; | ||
110 | std::string oauth_header; | ||
111 | |||
112 | try | ||
113 | { | ||
114 | oauth_header = _client._oauth_client->getFormattedHttpHeader(OAuth::Http::Get, url, ""); | ||
115 | |||
116 | if (!oauth_header.empty()) | ||
117 | { | ||
118 | headers.add(oauth_header); | ||
119 | } | ||
120 | } catch (const OAuth::ParseError& error) | ||
121 | { | ||
122 | std::cout << "Error generating OAuth header:" << std::endl; | ||
123 | std::cout << error.what() << std::endl; | ||
124 | std::cout << "This is likely due to a malformed URL." << std::endl; | ||
125 | |||
126 | assert(false); | ||
127 | } | ||
128 | |||
129 | try | ||
130 | { | ||
131 | conn.add<CURLOPT_HEADERFUNCTION>(nullptr); | ||
132 | conn.add<CURLOPT_HEADERDATA>(nullptr); | ||
133 | conn.add<CURLOPT_XFERINFOFUNCTION>([] (void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) { | ||
134 | return static_cast<stream*>(cdata)->progress(); | ||
135 | }); | ||
136 | conn.add<CURLOPT_XFERINFODATA>(this); | ||
137 | conn.add<CURLOPT_NOPROGRESS>(0); | ||
138 | //conn.add<CURLOPT_VERBOSE>(1); | ||
139 | //conn.add<CURLOPT_DEBUGFUNCTION>(my_trace); | ||
140 | conn.add<CURLOPT_URL>(url.c_str()); | ||
141 | conn.add<CURLOPT_HTTPHEADER>(headers.get()); | ||
142 | } catch (const curl::curl_exception& error) | ||
143 | { | ||
144 | error.print_traceback(); | ||
145 | |||
146 | assert(false); | ||
147 | } | ||
148 | |||
149 | _backoff_type = backoff::none; | ||
150 | _backoff_amount = std::chrono::milliseconds(0); | ||
151 | for (;;) | ||
152 | { | ||
153 | bool failure = false; | ||
154 | try | ||
155 | { | ||
156 | conn.perform(); | ||
157 | } catch (const curl::curl_easy_exception& error) | ||
158 | { | ||
159 | failure = true; | ||
160 | if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop) | ||
161 | { | ||
162 | break; | ||
163 | } else { | ||
164 | if (_backoff_type == backoff::none) | ||
165 | { | ||
166 | _established = false; | ||
167 | _backoff_type = backoff::network; | ||
168 | _backoff_amount = std::chrono::milliseconds(0); | ||
169 | } | ||
170 | } | ||
171 | } | ||
172 | |||
173 | if (!failure) | ||
174 | { | ||
175 | long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get(); | ||
176 | if (response_code == 420) | ||
177 | { | ||
178 | if (_backoff_type == backoff::none) | ||
179 | { | ||
180 | _established = false; | ||
181 | _backoff_type = backoff::rate_limit; | ||
182 | _backoff_amount = std::chrono::minutes(1); | ||
183 | } | ||
184 | } else if (response_code != 200) | ||
185 | { | ||
186 | if (_backoff_type == backoff::none) | ||
187 | { | ||
188 | _established = false; | ||
189 | _backoff_type = backoff::http; | ||
190 | _backoff_amount = std::chrono::seconds(5); | ||
191 | } | ||
192 | } else { | ||
193 | if (_backoff_type == backoff::none) | ||
194 | { | ||
195 | _established = false; | ||
196 | _backoff_type = backoff::network; | ||
197 | _backoff_amount = std::chrono::milliseconds(0); | ||
198 | } | ||
199 | } | ||
200 | } | ||
201 | |||
202 | std::this_thread::sleep_for(_backoff_amount); | ||
203 | |||
204 | switch (_backoff_type) | ||
205 | { | ||
206 | case backoff::network: | ||
207 | { | ||
208 | if (_backoff_amount < std::chrono::seconds(16)) | ||
209 | { | ||
210 | _backoff_amount += std::chrono::milliseconds(250); | ||
211 | } | ||
212 | |||
213 | break; | ||
214 | } | ||
215 | |||
216 | case backoff::http: | ||
217 | { | ||
218 | if (_backoff_amount < std::chrono::seconds(320)) | ||
219 | { | ||
220 | _backoff_amount *= 2; | ||
221 | } | ||
222 | |||
223 | break; | ||
224 | } | ||
225 | |||
226 | case backoff::rate_limit: | ||
227 | { | ||
228 | _backoff_amount *= 2; | ||
229 | |||
230 | break; | ||
231 | } | ||
232 | |||
233 | case backoff::none: | ||
234 | { | ||
235 | break; | ||
236 | } | ||
237 | } | ||
238 | } | ||
239 | } | ||
240 | |||
241 | size_t stream::write(char* ptr, size_t size, size_t nmemb) | ||
242 | { | ||
243 | for (size_t i = 0; i < size*nmemb; i++) | ||
244 | { | ||
245 | if (ptr[i] == '\r') | ||
246 | { | ||
247 | i++; // Skip the \n | ||
248 | |||
249 | if (!_buffer.empty()) | ||
250 | { | ||
251 | notification n(_client, _buffer); | ||
252 | if (n.getType() == notification::type::friends) | ||
253 | { | ||
254 | _established = true; | ||
255 | _backoff_type = backoff::none; | ||
256 | _backoff_amount = std::chrono::milliseconds(0); | ||
257 | } | ||
258 | |||
259 | _notify(n); | ||
260 | |||
261 | _buffer = ""; | ||
262 | } | ||
263 | } else { | ||
264 | _buffer.push_back(ptr[i]); | ||
265 | } | ||
266 | } | ||
267 | |||
268 | time(&_last_write); | ||
269 | |||
270 | return size*nmemb; | ||
271 | } | ||
272 | |||
273 | int stream::progress() | ||
274 | { | ||
275 | if (_stop) | ||
276 | { | ||
277 | return 1; | ||
278 | } | ||
279 | |||
280 | if (_established) | ||
281 | { | ||
282 | if (difftime(time(NULL), _last_write) >= 90) | ||
283 | { | ||
284 | return 1; | ||
285 | } | ||
286 | } | ||
287 | |||
288 | return 0; | ||
289 | } | ||
290 | |||
291 | } | ||