about summary refs log tree commit diff stats
path: root/src/stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.cpp')
-rw-r--r--src/stream.cpp295
1 files changed, 0 insertions, 295 deletions
diff --git a/src/stream.cpp b/src/stream.cpp deleted file mode 100644 index 86d177c..0000000 --- a/src/stream.cpp +++ /dev/null
@@ -1,295 +0,0 @@
1#include "stream.h"
2#include <liboauthcpp/liboauthcpp.h>
3#include <curl_easy.h>
4#include <curl_header.h>
5#include "util.h"
6#include "notification.h"
7#include "request.h"
8
9namespace twitter {
10
11 stream::stream(
12 const auth& tauth,
13 notify_callback callback,
14 bool with_followings,
15 bool receive_all_replies,
16 std::list<std::string> track,
17 std::list<bounding_box> locations) :
18 _auth(tauth),
19 _notify(callback),
20 _currentUser(get(
21 _auth,
22 "https://api.twitter.com/1.1/account/verify_credentials.json")
23 .perform()),
24 _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations))
25 {
26 }
27
28 stream::~stream()
29 {
30 if (_thread.joinable())
31 {
32 _stop = true;
33 _thread.join();
34 }
35 }
36
37 std::string stream::generateUrl(
38 bool with_followings,
39 bool receive_all_replies,
40 std::list<std::string> track,
41 std::list<bounding_box> locations)
42 {
43 std::list<std::string> arguments;
44
45 if (receive_all_replies)
46 {
47 arguments.push_back("replies=all");
48 }
49
50 if (!with_followings)
51 {
52 arguments.push_back("with=user");
53 }
54
55 if (!track.empty())
56 {
57 std::ostringstream trackstr;
58 trackstr << "track=";
59
60 for (auto it = std::begin(track); it != std::end(track); it++)
61 {
62 if (it != std::begin(track))
63 {
64 trackstr << ",";
65 }
66
67 trackstr << OAuth::HttpEncodeQueryValue(*it);
68 }
69
70 arguments.push_back(trackstr.str());
71 }
72
73 if (!locations.empty())
74 {
75 std::ostringstream localstr;
76 localstr << "locations=";
77
78 for (auto it = std::begin(locations); it != std::end(locations); it++)
79 {
80 if (it != std::begin(locations))
81 {
82 localstr << ",";
83 }
84
85 localstr << (double)it->getSouthWestLongitude() << ",";
86 localstr << (double)it->getSouthWestLatitude() << ",";
87 localstr << (double)it->getNorthEastLongitude() << ",";
88 localstr << (double)it->getNorthEastLatitude();
89 }
90
91 arguments.push_back(localstr.str());
92 }
93
94 std::ostringstream urlstr;
95 urlstr << "https://userstream.twitter.com/1.1/user.json";
96
97 if (!arguments.empty())
98 {
99 urlstr << "?";
100 urlstr << implode(std::begin(arguments), std::end(arguments), "&");
101 }
102
103 return urlstr.str();
104 }
105
106 void stream::run(std::string url)
107 {
108 _backoff_type = backoff::none;
109 _backoff_amount = std::chrono::milliseconds(0);
110 for (;;)
111 {
112 curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) {
113 return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb);
114 });
115
116 curl::curl_easy conn(ios);
117 curl::curl_header headers;
118 std::string oauth_header;
119
120 try
121 {
122 oauth_header = _auth.getClient().getFormattedHttpHeader(OAuth::Http::Get, url, "");
123
124 if (!oauth_header.empty())
125 {
126 headers.add(oauth_header);
127 }
128 } catch (const OAuth::ParseError& error)
129 {
130 std::cout << "Error generating OAuth header:" << std::endl;
131 std::cout << error.what() << std::endl;
132 std::cout << "This is likely due to a malformed URL." << std::endl;
133
134 assert(false);
135 }
136
137 try
138 {
139 conn.add<CURLOPT_HEADERFUNCTION>(nullptr);
140 conn.add<CURLOPT_HEADERDATA>(nullptr);
141 conn.add<CURLOPT_XFERINFOFUNCTION>([] (void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) {
142 return static_cast<stream*>(cdata)->progress();
143 });
144 conn.add<CURLOPT_XFERINFODATA>(this);
145 conn.add<CURLOPT_NOPROGRESS>(0);
146 //conn.add<CURLOPT_VERBOSE>(1);
147 //conn.add<CURLOPT_DEBUGFUNCTION>(my_trace);
148 conn.add<CURLOPT_URL>(url.c_str());
149 conn.add<CURLOPT_HTTPHEADER>(headers.get());
150 } catch (const curl::curl_exception& error)
151 {
152 error.print_traceback();
153
154 assert(false);
155 }
156
157 bool failure = false;
158 try
159 {
160 conn.perform();
161 } catch (const curl::curl_easy_exception& error)
162 {
163 failure = true;
164 if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop)
165 {
166 break;
167 } else {
168 if (_backoff_type == backoff::none)
169 {
170 _established = false;
171 _backoff_type = backoff::network;
172 _backoff_amount = std::chrono::milliseconds(0);
173 }
174 }
175 }
176
177 if (!failure)
178 {
179 long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
180 if (response_code == 420)
181 {
182 if (_backoff_type == backoff::none)
183 {
184 _established = false;
185 _backoff_type = backoff::rate_limit;
186 _backoff_amount = std::chrono::minutes(1);
187 }
188 } else if (response_code != 200)
189 {
190 if (_backoff_type == backoff::none)
191 {
192 _established = false;
193 _backoff_type = backoff::http;
194 _backoff_amount = std::chrono::seconds(5);
195 }
196 } else {
197 if (_backoff_type == backoff::none)
198 {
199 _established = false;
200 _backoff_type = backoff::network;
201 _backoff_amount = std::chrono::milliseconds(0);
202 }
203 }
204 }
205
206 std::this_thread::sleep_for(_backoff_amount);
207
208 switch (_backoff_type)
209 {
210 case backoff::network:
211 {
212 if (_backoff_amount < std::chrono::seconds(16))
213 {
214 _backoff_amount += std::chrono::milliseconds(250);
215 }
216
217 break;
218 }
219
220 case backoff::http:
221 {
222 if (_backoff_amount < std::chrono::seconds(320))
223 {
224 _backoff_amount *= 2;
225 }
226
227 break;
228 }
229
230 case backoff::rate_limit:
231 {
232 _backoff_amount *= 2;
233
234 break;
235 }
236
237 case backoff::none:
238 {
239 break;
240 }
241 }
242 }
243 }
244
245 size_t stream::write(char* ptr, size_t size, size_t nmemb)
246 {
247 for (size_t i = 0; i < size*nmemb; i++)
248 {
249 if (ptr[i] == '\r')
250 {
251 i++; // Skip the \n
252
253 if (!_buffer.empty())
254 {
255 notification n(_currentUser, _buffer);
256 if (n.getType() == notification::type::friends)
257 {
258 _established = true;
259 _backoff_type = backoff::none;
260 _backoff_amount = std::chrono::milliseconds(0);
261 }
262
263 _notify(std::move(n));
264
265 _buffer = "";
266 }
267 } else {
268 _buffer.push_back(ptr[i]);
269 }
270 }
271
272 time(&_last_write);
273
274 return size*nmemb;
275 }
276
277 int stream::progress()
278 {
279 if (_stop)
280 {
281 return 1;
282 }
283
284 if (_established)
285 {
286 if (difftime(time(NULL), _last_write) >= 90)
287 {
288 return 1;
289 }
290 }
291
292 return 0;
293 }
294
295}