about summary refs log tree commit diff stats
path: root/src/stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.cpp')
-rw-r--r--src/stream.cpp291
1 files changed, 291 insertions, 0 deletions
diff --git a/src/stream.cpp b/src/stream.cpp new file mode 100644 index 0000000..17dcce7 --- /dev/null +++ b/src/stream.cpp
@@ -0,0 +1,291 @@
1#include "stream.h"
2#include <liboauthcpp/liboauthcpp.h>
3#include <curl_easy.h>
4#include <curl_header.h>
5#include "util.h"
6#include "notification.h"
7#include "client.h"
8
9namespace twitter {
10
11 stream::stream(
12 const client& tclient,
13 notify_callback callback,
14 bool with_followings,
15 bool receive_all_replies,
16 std::list<std::string> track,
17 std::list<bounding_box> locations) :
18 _client(tclient),
19 _notify(callback),
20 _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations))
21 {
22 }
23
24 stream::~stream()
25 {
26 if (_thread.joinable())
27 {
28 _stop = true;
29 _thread.join();
30 }
31 }
32
33 std::string stream::generateUrl(
34 bool with_followings,
35 bool receive_all_replies,
36 std::list<std::string> track,
37 std::list<bounding_box> locations)
38 {
39 std::list<std::string> arguments;
40
41 if (receive_all_replies)
42 {
43 arguments.push_back("replies=all");
44 }
45
46 if (!with_followings)
47 {
48 arguments.push_back("with=user");
49 }
50
51 if (!track.empty())
52 {
53 std::ostringstream trackstr;
54 trackstr << "track=";
55
56 for (auto it = std::begin(track); it != std::end(track); it++)
57 {
58 if (it != std::begin(track))
59 {
60 trackstr << ",";
61 }
62
63 trackstr << OAuth::HttpEncodeQueryValue(*it);
64 }
65
66 arguments.push_back(trackstr.str());
67 }
68
69 if (!locations.empty())
70 {
71 std::ostringstream localstr;
72 localstr << "locations=";
73
74 for (auto it = std::begin(locations); it != std::end(locations); it++)
75 {
76 if (it != std::begin(locations))
77 {
78 localstr << ",";
79 }
80
81 localstr << (double)it->getSouthWestLongitude() << ",";
82 localstr << (double)it->getSouthWestLatitude() << ",";
83 localstr << (double)it->getNorthEastLongitude() << ",";
84 localstr << (double)it->getNorthEastLatitude();
85 }
86
87 arguments.push_back(localstr.str());
88 }
89
90 std::ostringstream urlstr;
91 urlstr << "https://userstream.twitter.com/1.1/user.json";
92
93 if (!arguments.empty())
94 {
95 urlstr << "?";
96 urlstr << implode(std::begin(arguments), std::end(arguments), "&");
97 }
98
99 return urlstr.str();
100 }
101
102 void stream::run(std::string url)
103 {
104 curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) {
105 return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb);
106 });
107
108 curl::curl_easy conn(ios);
109 curl::curl_header headers;
110 std::string oauth_header;
111
112 try
113 {
114 oauth_header = _client._oauth_client->getFormattedHttpHeader(OAuth::Http::Get, url, "");
115
116 if (!oauth_header.empty())
117 {
118 headers.add(oauth_header);
119 }
120 } catch (const OAuth::ParseError& error)
121 {
122 std::cout << "Error generating OAuth header:" << std::endl;
123 std::cout << error.what() << std::endl;
124 std::cout << "This is likely due to a malformed URL." << std::endl;
125
126 assert(false);
127 }
128
129 try
130 {
131 conn.add<CURLOPT_HEADERFUNCTION>(nullptr);
132 conn.add<CURLOPT_HEADERDATA>(nullptr);
133 conn.add<CURLOPT_XFERINFOFUNCTION>([] (void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) {
134 return static_cast<stream*>(cdata)->progress();
135 });
136 conn.add<CURLOPT_XFERINFODATA>(this);
137 conn.add<CURLOPT_NOPROGRESS>(0);
138 //conn.add<CURLOPT_VERBOSE>(1);
139 //conn.add<CURLOPT_DEBUGFUNCTION>(my_trace);
140 conn.add<CURLOPT_URL>(url.c_str());
141 conn.add<CURLOPT_HTTPHEADER>(headers.get());
142 } catch (const curl::curl_exception& error)
143 {
144 error.print_traceback();
145
146 assert(false);
147 }
148
149 _backoff_type = backoff::none;
150 _backoff_amount = std::chrono::milliseconds(0);
151 for (;;)
152 {
153 bool failure = false;
154 try
155 {
156 conn.perform();
157 } catch (const curl::curl_easy_exception& error)
158 {
159 failure = true;
160 if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop)
161 {
162 break;
163 } else {
164 if (_backoff_type == backoff::none)
165 {
166 _established = false;
167 _backoff_type = backoff::network;
168 _backoff_amount = std::chrono::milliseconds(0);
169 }
170 }
171 }
172
173 if (!failure)
174 {
175 long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
176 if (response_code == 420)
177 {
178 if (_backoff_type == backoff::none)
179 {
180 _established = false;
181 _backoff_type = backoff::rate_limit;
182 _backoff_amount = std::chrono::minutes(1);
183 }
184 } else if (response_code != 200)
185 {
186 if (_backoff_type == backoff::none)
187 {
188 _established = false;
189 _backoff_type = backoff::http;
190 _backoff_amount = std::chrono::seconds(5);
191 }
192 } else {
193 if (_backoff_type == backoff::none)
194 {
195 _established = false;
196 _backoff_type = backoff::network;
197 _backoff_amount = std::chrono::milliseconds(0);
198 }
199 }
200 }
201
202 std::this_thread::sleep_for(_backoff_amount);
203
204 switch (_backoff_type)
205 {
206 case backoff::network:
207 {
208 if (_backoff_amount < std::chrono::seconds(16))
209 {
210 _backoff_amount += std::chrono::milliseconds(250);
211 }
212
213 break;
214 }
215
216 case backoff::http:
217 {
218 if (_backoff_amount < std::chrono::seconds(320))
219 {
220 _backoff_amount *= 2;
221 }
222
223 break;
224 }
225
226 case backoff::rate_limit:
227 {
228 _backoff_amount *= 2;
229
230 break;
231 }
232
233 case backoff::none:
234 {
235 break;
236 }
237 }
238 }
239 }
240
241 size_t stream::write(char* ptr, size_t size, size_t nmemb)
242 {
243 for (size_t i = 0; i < size*nmemb; i++)
244 {
245 if (ptr[i] == '\r')
246 {
247 i++; // Skip the \n
248
249 if (!_buffer.empty())
250 {
251 notification n(_client, _buffer);
252 if (n.getType() == notification::type::friends)
253 {
254 _established = true;
255 _backoff_type = backoff::none;
256 _backoff_amount = std::chrono::milliseconds(0);
257 }
258
259 _notify(n);
260
261 _buffer = "";
262 }
263 } else {
264 _buffer.push_back(ptr[i]);
265 }
266 }
267
268 time(&_last_write);
269
270 return size*nmemb;
271 }
272
273 int stream::progress()
274 {
275 if (_stop)
276 {
277 return 1;
278 }
279
280 if (_established)
281 {
282 if (difftime(time(NULL), _last_write) >= 90)
283 {
284 return 1;
285 }
286 }
287
288 return 0;
289 }
290
291}