diff options
| author | Kelly Rauchenberger <fefferburbia@gmail.com> | 2016-08-20 13:56:23 -0400 |
|---|---|---|
| committer | Kelly Rauchenberger <fefferburbia@gmail.com> | 2016-08-20 13:56:23 -0400 |
| commit | 69fc8d805396b889b5e8c1c88e8129d93db77d29 (patch) | |
| tree | 6b807bd9332c65b65066e247d4d00fd5e4118d2e /src/stream.cpp | |
| parent | 442f1ee071152be04c4184473ddfee5040795b76 (diff) | |
| download | libtwittercpp-69fc8d805396b889b5e8c1c88e8129d93db77d29.tar.gz libtwittercpp-69fc8d805396b889b5e8c1c88e8129d93db77d29.tar.bz2 libtwittercpp-69fc8d805396b889b5e8c1c88e8129d93db77d29.zip | |
Updated API to use exceptions and make tweet/user objects more helpful
Diffstat (limited to 'src/stream.cpp')
| -rw-r--r-- | src/stream.cpp | 291 |
1 files changed, 291 insertions, 0 deletions
| diff --git a/src/stream.cpp b/src/stream.cpp new file mode 100644 index 0000000..17dcce7 --- /dev/null +++ b/src/stream.cpp | |||
| @@ -0,0 +1,291 @@ | |||
| 1 | #include "stream.h" | ||
| 2 | #include <liboauthcpp/liboauthcpp.h> | ||
| 3 | #include <curl_easy.h> | ||
| 4 | #include <curl_header.h> | ||
| 5 | #include "util.h" | ||
| 6 | #include "notification.h" | ||
| 7 | #include "client.h" | ||
| 8 | |||
| 9 | namespace twitter { | ||
| 10 | |||
| 11 | stream::stream( | ||
| 12 | const client& tclient, | ||
| 13 | notify_callback callback, | ||
| 14 | bool with_followings, | ||
| 15 | bool receive_all_replies, | ||
| 16 | std::list<std::string> track, | ||
| 17 | std::list<bounding_box> locations) : | ||
| 18 | _client(tclient), | ||
| 19 | _notify(callback), | ||
| 20 | _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations)) | ||
| 21 | { | ||
| 22 | } | ||
| 23 | |||
| 24 | stream::~stream() | ||
| 25 | { | ||
| 26 | if (_thread.joinable()) | ||
| 27 | { | ||
| 28 | _stop = true; | ||
| 29 | _thread.join(); | ||
| 30 | } | ||
| 31 | } | ||
| 32 | |||
| 33 | std::string stream::generateUrl( | ||
| 34 | bool with_followings, | ||
| 35 | bool receive_all_replies, | ||
| 36 | std::list<std::string> track, | ||
| 37 | std::list<bounding_box> locations) | ||
| 38 | { | ||
| 39 | std::list<std::string> arguments; | ||
| 40 | |||
| 41 | if (receive_all_replies) | ||
| 42 | { | ||
| 43 | arguments.push_back("replies=all"); | ||
| 44 | } | ||
| 45 | |||
| 46 | if (!with_followings) | ||
| 47 | { | ||
| 48 | arguments.push_back("with=user"); | ||
| 49 | } | ||
| 50 | |||
| 51 | if (!track.empty()) | ||
| 52 | { | ||
| 53 | std::ostringstream trackstr; | ||
| 54 | trackstr << "track="; | ||
| 55 | |||
| 56 | for (auto it = std::begin(track); it != std::end(track); it++) | ||
| 57 | { | ||
| 58 | if (it != std::begin(track)) | ||
| 59 | { | ||
| 60 | trackstr << ","; | ||
| 61 | } | ||
| 62 | |||
| 63 | trackstr << OAuth::HttpEncodeQueryValue(*it); | ||
| 64 | } | ||
| 65 | |||
| 66 | arguments.push_back(trackstr.str()); | ||
| 67 | } | ||
| 68 | |||
| 69 | if (!locations.empty()) | ||
| 70 | { | ||
| 71 | std::ostringstream localstr; | ||
| 72 | localstr << "locations="; | ||
| 73 | |||
| 74 | for (auto it = std::begin(locations); it != std::end(locations); it++) | ||
| 75 | { | ||
| 76 | if (it != std::begin(locations)) | ||
| 77 | { | ||
| 78 | localstr << ","; | ||
| 79 | } | ||
| 80 | |||
| 81 | localstr << (double)it->getSouthWestLongitude() << ","; | ||
| 82 | localstr << (double)it->getSouthWestLatitude() << ","; | ||
| 83 | localstr << (double)it->getNorthEastLongitude() << ","; | ||
| 84 | localstr << (double)it->getNorthEastLatitude(); | ||
| 85 | } | ||
| 86 | |||
| 87 | arguments.push_back(localstr.str()); | ||
| 88 | } | ||
| 89 | |||
| 90 | std::ostringstream urlstr; | ||
| 91 | urlstr << "https://userstream.twitter.com/1.1/user.json"; | ||
| 92 | |||
| 93 | if (!arguments.empty()) | ||
| 94 | { | ||
| 95 | urlstr << "?"; | ||
| 96 | urlstr << implode(std::begin(arguments), std::end(arguments), "&"); | ||
| 97 | } | ||
| 98 | |||
| 99 | return urlstr.str(); | ||
| 100 | } | ||
| 101 | |||
| 102 | void stream::run(std::string url) | ||
| 103 | { | ||
| 104 | curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) { | ||
| 105 | return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb); | ||
| 106 | }); | ||
| 107 | |||
| 108 | curl::curl_easy conn(ios); | ||
| 109 | curl::curl_header headers; | ||
| 110 | std::string oauth_header; | ||
| 111 | |||
| 112 | try | ||
| 113 | { | ||
| 114 | oauth_header = _client._oauth_client->getFormattedHttpHeader(OAuth::Http::Get, url, ""); | ||
| 115 | |||
| 116 | if (!oauth_header.empty()) | ||
| 117 | { | ||
| 118 | headers.add(oauth_header); | ||
| 119 | } | ||
| 120 | } catch (const OAuth::ParseError& error) | ||
| 121 | { | ||
| 122 | std::cout << "Error generating OAuth header:" << std::endl; | ||
| 123 | std::cout << error.what() << std::endl; | ||
| 124 | std::cout << "This is likely due to a malformed URL." << std::endl; | ||
| 125 | |||
| 126 | assert(false); | ||
| 127 | } | ||
| 128 | |||
| 129 | try | ||
| 130 | { | ||
| 131 | conn.add<CURLOPT_HEADERFUNCTION>(nullptr); | ||
| 132 | conn.add<CURLOPT_HEADERDATA>(nullptr); | ||
| 133 | conn.add<CURLOPT_XFERINFOFUNCTION>([] (void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) { | ||
| 134 | return static_cast<stream*>(cdata)->progress(); | ||
| 135 | }); | ||
| 136 | conn.add<CURLOPT_XFERINFODATA>(this); | ||
| 137 | conn.add<CURLOPT_NOPROGRESS>(0); | ||
| 138 | //conn.add<CURLOPT_VERBOSE>(1); | ||
| 139 | //conn.add<CURLOPT_DEBUGFUNCTION>(my_trace); | ||
| 140 | conn.add<CURLOPT_URL>(url.c_str()); | ||
| 141 | conn.add<CURLOPT_HTTPHEADER>(headers.get()); | ||
| 142 | } catch (const curl::curl_exception& error) | ||
| 143 | { | ||
| 144 | error.print_traceback(); | ||
| 145 | |||
| 146 | assert(false); | ||
| 147 | } | ||
| 148 | |||
| 149 | _backoff_type = backoff::none; | ||
| 150 | _backoff_amount = std::chrono::milliseconds(0); | ||
| 151 | for (;;) | ||
| 152 | { | ||
| 153 | bool failure = false; | ||
| 154 | try | ||
| 155 | { | ||
| 156 | conn.perform(); | ||
| 157 | } catch (const curl::curl_easy_exception& error) | ||
| 158 | { | ||
| 159 | failure = true; | ||
| 160 | if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop) | ||
| 161 | { | ||
| 162 | break; | ||
| 163 | } else { | ||
| 164 | if (_backoff_type == backoff::none) | ||
| 165 | { | ||
| 166 | _established = false; | ||
| 167 | _backoff_type = backoff::network; | ||
| 168 | _backoff_amount = std::chrono::milliseconds(0); | ||
| 169 | } | ||
| 170 | } | ||
| 171 | } | ||
| 172 | |||
| 173 | if (!failure) | ||
| 174 | { | ||
| 175 | long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get(); | ||
| 176 | if (response_code == 420) | ||
| 177 | { | ||
| 178 | if (_backoff_type == backoff::none) | ||
| 179 | { | ||
| 180 | _established = false; | ||
| 181 | _backoff_type = backoff::rate_limit; | ||
| 182 | _backoff_amount = std::chrono::minutes(1); | ||
| 183 | } | ||
| 184 | } else if (response_code != 200) | ||
| 185 | { | ||
| 186 | if (_backoff_type == backoff::none) | ||
| 187 | { | ||
| 188 | _established = false; | ||
| 189 | _backoff_type = backoff::http; | ||
| 190 | _backoff_amount = std::chrono::seconds(5); | ||
| 191 | } | ||
| 192 | } else { | ||
| 193 | if (_backoff_type == backoff::none) | ||
| 194 | { | ||
| 195 | _established = false; | ||
| 196 | _backoff_type = backoff::network; | ||
| 197 | _backoff_amount = std::chrono::milliseconds(0); | ||
| 198 | } | ||
| 199 | } | ||
| 200 | } | ||
| 201 | |||
| 202 | std::this_thread::sleep_for(_backoff_amount); | ||
| 203 | |||
| 204 | switch (_backoff_type) | ||
| 205 | { | ||
| 206 | case backoff::network: | ||
| 207 | { | ||
| 208 | if (_backoff_amount < std::chrono::seconds(16)) | ||
| 209 | { | ||
| 210 | _backoff_amount += std::chrono::milliseconds(250); | ||
| 211 | } | ||
| 212 | |||
| 213 | break; | ||
| 214 | } | ||
| 215 | |||
| 216 | case backoff::http: | ||
| 217 | { | ||
| 218 | if (_backoff_amount < std::chrono::seconds(320)) | ||
| 219 | { | ||
| 220 | _backoff_amount *= 2; | ||
| 221 | } | ||
| 222 | |||
| 223 | break; | ||
| 224 | } | ||
| 225 | |||
| 226 | case backoff::rate_limit: | ||
| 227 | { | ||
| 228 | _backoff_amount *= 2; | ||
| 229 | |||
| 230 | break; | ||
| 231 | } | ||
| 232 | |||
| 233 | case backoff::none: | ||
| 234 | { | ||
| 235 | break; | ||
| 236 | } | ||
| 237 | } | ||
| 238 | } | ||
| 239 | } | ||
| 240 | |||
| 241 | size_t stream::write(char* ptr, size_t size, size_t nmemb) | ||
| 242 | { | ||
| 243 | for (size_t i = 0; i < size*nmemb; i++) | ||
| 244 | { | ||
| 245 | if (ptr[i] == '\r') | ||
| 246 | { | ||
| 247 | i++; // Skip the \n | ||
| 248 | |||
| 249 | if (!_buffer.empty()) | ||
| 250 | { | ||
| 251 | notification n(_client, _buffer); | ||
| 252 | if (n.getType() == notification::type::friends) | ||
| 253 | { | ||
| 254 | _established = true; | ||
| 255 | _backoff_type = backoff::none; | ||
| 256 | _backoff_amount = std::chrono::milliseconds(0); | ||
| 257 | } | ||
| 258 | |||
| 259 | _notify(n); | ||
| 260 | |||
| 261 | _buffer = ""; | ||
| 262 | } | ||
| 263 | } else { | ||
| 264 | _buffer.push_back(ptr[i]); | ||
| 265 | } | ||
| 266 | } | ||
| 267 | |||
| 268 | time(&_last_write); | ||
| 269 | |||
| 270 | return size*nmemb; | ||
| 271 | } | ||
| 272 | |||
| 273 | int stream::progress() | ||
| 274 | { | ||
| 275 | if (_stop) | ||
| 276 | { | ||
| 277 | return 1; | ||
| 278 | } | ||
| 279 | |||
| 280 | if (_established) | ||
| 281 | { | ||
| 282 | if (difftime(time(NULL), _last_write) >= 90) | ||
| 283 | { | ||
| 284 | return 1; | ||
| 285 | } | ||
| 286 | } | ||
| 287 | |||
| 288 | return 0; | ||
| 289 | } | ||
| 290 | |||
| 291 | } | ||
