about summary refs log tree commit diff stats
path: root/src/stream.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream.cpp')
-rw-r--r--src/stream.cpp98
1 files changed, 51 insertions, 47 deletions
diff --git a/src/stream.cpp b/src/stream.cpp index cb55ee8..86d177c 100644 --- a/src/stream.cpp +++ b/src/stream.cpp
@@ -4,23 +4,27 @@
4#include <curl_header.h> 4#include <curl_header.h>
5#include "util.h" 5#include "util.h"
6#include "notification.h" 6#include "notification.h"
7#include "client.h" 7#include "request.h"
8 8
9namespace twitter { 9namespace twitter {
10 10
11 stream::stream( 11 stream::stream(
12 const client& tclient, 12 const auth& tauth,
13 notify_callback callback, 13 notify_callback callback,
14 bool with_followings, 14 bool with_followings,
15 bool receive_all_replies, 15 bool receive_all_replies,
16 std::list<std::string> track, 16 std::list<std::string> track,
17 std::list<bounding_box> locations) : 17 std::list<bounding_box> locations) :
18 _client(tclient), 18 _auth(tauth),
19 _notify(callback), 19 _notify(callback),
20 _currentUser(get(
21 _auth,
22 "https://api.twitter.com/1.1/account/verify_credentials.json")
23 .perform()),
20 _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations)) 24 _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations))
21 { 25 {
22 } 26 }
23 27
24 stream::~stream() 28 stream::~stream()
25 { 29 {
26 if (_thread.joinable()) 30 if (_thread.joinable())
@@ -29,7 +33,7 @@ namespace twitter {
29 _thread.join(); 33 _thread.join();
30 } 34 }
31 } 35 }
32 36
33 std::string stream::generateUrl( 37 std::string stream::generateUrl(
34 bool with_followings, 38 bool with_followings,
35 bool receive_all_replies, 39 bool receive_all_replies,
@@ -37,68 +41,68 @@ namespace twitter {
37 std::list<bounding_box> locations) 41 std::list<bounding_box> locations)
38 { 42 {
39 std::list<std::string> arguments; 43 std::list<std::string> arguments;
40 44
41 if (receive_all_replies) 45 if (receive_all_replies)
42 { 46 {
43 arguments.push_back("replies=all"); 47 arguments.push_back("replies=all");
44 } 48 }
45 49
46 if (!with_followings) 50 if (!with_followings)
47 { 51 {
48 arguments.push_back("with=user"); 52 arguments.push_back("with=user");
49 } 53 }
50 54
51 if (!track.empty()) 55 if (!track.empty())
52 { 56 {
53 std::ostringstream trackstr; 57 std::ostringstream trackstr;
54 trackstr << "track="; 58 trackstr << "track=";
55 59
56 for (auto it = std::begin(track); it != std::end(track); it++) 60 for (auto it = std::begin(track); it != std::end(track); it++)
57 { 61 {
58 if (it != std::begin(track)) 62 if (it != std::begin(track))
59 { 63 {
60 trackstr << ","; 64 trackstr << ",";
61 } 65 }
62 66
63 trackstr << OAuth::HttpEncodeQueryValue(*it); 67 trackstr << OAuth::HttpEncodeQueryValue(*it);
64 } 68 }
65 69
66 arguments.push_back(trackstr.str()); 70 arguments.push_back(trackstr.str());
67 } 71 }
68 72
69 if (!locations.empty()) 73 if (!locations.empty())
70 { 74 {
71 std::ostringstream localstr; 75 std::ostringstream localstr;
72 localstr << "locations="; 76 localstr << "locations=";
73 77
74 for (auto it = std::begin(locations); it != std::end(locations); it++) 78 for (auto it = std::begin(locations); it != std::end(locations); it++)
75 { 79 {
76 if (it != std::begin(locations)) 80 if (it != std::begin(locations))
77 { 81 {
78 localstr << ","; 82 localstr << ",";
79 } 83 }
80 84
81 localstr << (double)it->getSouthWestLongitude() << ","; 85 localstr << (double)it->getSouthWestLongitude() << ",";
82 localstr << (double)it->getSouthWestLatitude() << ","; 86 localstr << (double)it->getSouthWestLatitude() << ",";
83 localstr << (double)it->getNorthEastLongitude() << ","; 87 localstr << (double)it->getNorthEastLongitude() << ",";
84 localstr << (double)it->getNorthEastLatitude(); 88 localstr << (double)it->getNorthEastLatitude();
85 } 89 }
86 90
87 arguments.push_back(localstr.str()); 91 arguments.push_back(localstr.str());
88 } 92 }
89 93
90 std::ostringstream urlstr; 94 std::ostringstream urlstr;
91 urlstr << "https://userstream.twitter.com/1.1/user.json"; 95 urlstr << "https://userstream.twitter.com/1.1/user.json";
92 96
93 if (!arguments.empty()) 97 if (!arguments.empty())
94 { 98 {
95 urlstr << "?"; 99 urlstr << "?";
96 urlstr << implode(std::begin(arguments), std::end(arguments), "&"); 100 urlstr << implode(std::begin(arguments), std::end(arguments), "&");
97 } 101 }
98 102
99 return urlstr.str(); 103 return urlstr.str();
100 } 104 }
101 105
102 void stream::run(std::string url) 106 void stream::run(std::string url)
103 { 107 {
104 _backoff_type = backoff::none; 108 _backoff_type = backoff::none;
@@ -108,15 +112,15 @@ namespace twitter {
108 curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) { 112 curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) {
109 return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb); 113 return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb);
110 }); 114 });
111 115
112 curl::curl_easy conn(ios); 116 curl::curl_easy conn(ios);
113 curl::curl_header headers; 117 curl::curl_header headers;
114 std::string oauth_header; 118 std::string oauth_header;
115 119
116 try 120 try
117 { 121 {
118 oauth_header = _client._oauth_client->getFormattedHttpHeader(OAuth::Http::Get, url, ""); 122 oauth_header = _auth.getClient().getFormattedHttpHeader(OAuth::Http::Get, url, "");
119 123
120 if (!oauth_header.empty()) 124 if (!oauth_header.empty())
121 { 125 {
122 headers.add(oauth_header); 126 headers.add(oauth_header);
@@ -126,10 +130,10 @@ namespace twitter {
126 std::cout << "Error generating OAuth header:" << std::endl; 130 std::cout << "Error generating OAuth header:" << std::endl;
127 std::cout << error.what() << std::endl; 131 std::cout << error.what() << std::endl;
128 std::cout << "This is likely due to a malformed URL." << std::endl; 132 std::cout << "This is likely due to a malformed URL." << std::endl;
129 133
130 assert(false); 134 assert(false);
131 } 135 }
132 136
133 try 137 try
134 { 138 {
135 conn.add<CURLOPT_HEADERFUNCTION>(nullptr); 139 conn.add<CURLOPT_HEADERFUNCTION>(nullptr);
@@ -146,10 +150,10 @@ namespace twitter {
146 } catch (const curl::curl_exception& error) 150 } catch (const curl::curl_exception& error)
147 { 151 {
148 error.print_traceback(); 152 error.print_traceback();
149 153
150 assert(false); 154 assert(false);
151 } 155 }
152 156
153 bool failure = false; 157 bool failure = false;
154 try 158 try
155 { 159 {
@@ -169,7 +173,7 @@ namespace twitter {
169 } 173 }
170 } 174 }
171 } 175 }
172 176
173 if (!failure) 177 if (!failure)
174 { 178 {
175 long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get(); 179 long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
@@ -198,7 +202,7 @@ namespace twitter {
198 } 202 }
199 } 203 }
200 } 204 }
201 205
202 std::this_thread::sleep_for(_backoff_amount); 206 std::this_thread::sleep_for(_backoff_amount);
203 207
204 switch (_backoff_type) 208 switch (_backoff_type)
@@ -209,35 +213,35 @@ namespace twitter {
209 { 213 {
210 _backoff_amount += std::chrono::milliseconds(250); 214 _backoff_amount += std::chrono::milliseconds(250);
211 } 215 }
212 216
213 break; 217 break;
214 } 218 }
215 219
216 case backoff::http: 220 case backoff::http:
217 { 221 {
218 if (_backoff_amount < std::chrono::seconds(320)) 222 if (_backoff_amount < std::chrono::seconds(320))
219 { 223 {
220 _backoff_amount *= 2; 224 _backoff_amount *= 2;
221 } 225 }
222 226
223 break; 227 break;
224 } 228 }
225 229
226 case backoff::rate_limit: 230 case backoff::rate_limit:
227 { 231 {
228 _backoff_amount *= 2; 232 _backoff_amount *= 2;
229 233
230 break; 234 break;
231 } 235 }
232 236
233 case backoff::none: 237 case backoff::none:
234 { 238 {
235 break; 239 break;
236 } 240 }
237 } 241 }
238 } 242 }
239 } 243 }
240 244
241 size_t stream::write(char* ptr, size_t size, size_t nmemb) 245 size_t stream::write(char* ptr, size_t size, size_t nmemb)
242 { 246 {
243 for (size_t i = 0; i < size*nmemb; i++) 247 for (size_t i = 0; i < size*nmemb; i++)
@@ -245,38 +249,38 @@ namespace twitter {
245 if (ptr[i] == '\r') 249 if (ptr[i] == '\r')
246 { 250 {
247 i++; // Skip the \n 251 i++; // Skip the \n
248 252
249 if (!_buffer.empty()) 253 if (!_buffer.empty())
250 { 254 {
251 notification n(_client, _buffer); 255 notification n(_currentUser, _buffer);
252 if (n.getType() == notification::type::friends) 256 if (n.getType() == notification::type::friends)
253 { 257 {
254 _established = true; 258 _established = true;
255 _backoff_type = backoff::none; 259 _backoff_type = backoff::none;
256 _backoff_amount = std::chrono::milliseconds(0); 260 _backoff_amount = std::chrono::milliseconds(0);
257 } 261 }
258 262
259 _notify(std::move(n)); 263 _notify(std::move(n));
260 264
261 _buffer = ""; 265 _buffer = "";
262 } 266 }
263 } else { 267 } else {
264 _buffer.push_back(ptr[i]); 268 _buffer.push_back(ptr[i]);
265 } 269 }
266 } 270 }
267 271
268 time(&_last_write); 272 time(&_last_write);
269 273
270 return size*nmemb; 274 return size*nmemb;
271 } 275 }
272 276
273 int stream::progress() 277 int stream::progress()
274 { 278 {
275 if (_stop) 279 if (_stop)
276 { 280 {
277 return 1; 281 return 1;
278 } 282 }
279 283
280 if (_established) 284 if (_established)
281 { 285 {
282 if (difftime(time(NULL), _last_write) >= 90) 286 if (difftime(time(NULL), _last_write) >= 90)
@@ -284,8 +288,8 @@ namespace twitter {
284 return 1; 288 return 1;
285 } 289 }
286 } 290 }
287 291
288 return 0; 292 return 0;
289 } 293 }
290 294
291} 295}