about summary refs log tree commit diff stats
path: root/src/stream.cpp
diff options
context:
space:
mode:
authorKelly Rauchenberger <fefferburbia@gmail.com>2018-08-30 19:22:20 -0400
committerKelly Rauchenberger <fefferburbia@gmail.com>2018-08-30 19:22:20 -0400
commitcc57105f29b4095daad03273bc1a882368e75664 (patch)
tree2800240352fdcff0b009e6ced97aab4cb726dcb7 /src/stream.cpp
parent4963c3dd55b765a33a16a77af432f2bfa12b8359 (diff)
downloadlibtwittercpp-cc57105f29b4095daad03273bc1a882368e75664.tar.gz
libtwittercpp-cc57105f29b4095daad03273bc1a882368e75664.tar.bz2
libtwittercpp-cc57105f29b4095daad03273bc1a882368e75664.zip
Removed stream functionality
The Twitter streaming API has officially sunset, so the stream functionality can no longer be used. Because of this, there is no need to keep it in the codebase.

refs #3
Diffstat (limited to 'src/stream.cpp')
-rw-r--r--src/stream.cpp295
1 files changed, 0 insertions, 295 deletions
diff --git a/src/stream.cpp b/src/stream.cpp deleted file mode 100644 index 86d177c..0000000 --- a/src/stream.cpp +++ /dev/null
@@ -1,295 +0,0 @@
1#include "stream.h"
2#include <liboauthcpp/liboauthcpp.h>
3#include <curl_easy.h>
4#include <curl_header.h>
5#include "util.h"
6#include "notification.h"
7#include "request.h"
8
9namespace twitter {
10
11 stream::stream(
12 const auth& tauth,
13 notify_callback callback,
14 bool with_followings,
15 bool receive_all_replies,
16 std::list<std::string> track,
17 std::list<bounding_box> locations) :
18 _auth(tauth),
19 _notify(callback),
20 _currentUser(get(
21 _auth,
22 "https://api.twitter.com/1.1/account/verify_credentials.json")
23 .perform()),
24 _thread(&stream::run, this, generateUrl(with_followings, receive_all_replies, track, locations))
25 {
26 }
27
28 stream::~stream()
29 {
30 if (_thread.joinable())
31 {
32 _stop = true;
33 _thread.join();
34 }
35 }
36
37 std::string stream::generateUrl(
38 bool with_followings,
39 bool receive_all_replies,
40 std::list<std::string> track,
41 std::list<bounding_box> locations)
42 {
43 std::list<std::string> arguments;
44
45 if (receive_all_replies)
46 {
47 arguments.push_back("replies=all");
48 }
49
50 if (!with_followings)
51 {
52 arguments.push_back("with=user");
53 }
54
55 if (!track.empty())
56 {
57 std::ostringstream trackstr;
58 trackstr << "track=";
59
60 for (auto it = std::begin(track); it != std::end(track); it++)
61 {
62 if (it != std::begin(track))
63 {
64 trackstr << ",";
65 }
66
67 trackstr << OAuth::HttpEncodeQueryValue(*it);
68 }
69
70 arguments.push_back(trackstr.str());
71 }
72
73 if (!locations.empty())
74 {
75 std::ostringstream localstr;
76 localstr << "locations=";
77
78 for (auto it = std::begin(locations); it != std::end(locations); it++)
79 {
80 if (it != std::begin(locations))
81 {
82 localstr << ",";
83 }
84
85 localstr << (double)it->getSouthWestLongitude() << ",";
86 localstr << (double)it->getSouthWestLatitude() << ",";
87 localstr << (double)it->getNorthEastLongitude() << ",";
88 localstr << (double)it->getNorthEastLatitude();
89 }
90
91 arguments.push_back(localstr.str());
92 }
93
94 std::ostringstream urlstr;
95 urlstr << "https://userstream.twitter.com/1.1/user.json";
96
97 if (!arguments.empty())
98 {
99 urlstr << "?";
100 urlstr << implode(std::begin(arguments), std::end(arguments), "&");
101 }
102
103 return urlstr.str();
104 }
105
106 void stream::run(std::string url)
107 {
108 _backoff_type = backoff::none;
109 _backoff_amount = std::chrono::milliseconds(0);
110 for (;;)
111 {
112 curl::curl_ios<stream> ios(this, [] (void* contents, size_t size, size_t nmemb, void* userp) {
113 return static_cast<stream*>(userp)->write(static_cast<char*>(contents), size, nmemb);
114 });
115
116 curl::curl_easy conn(ios);
117 curl::curl_header headers;
118 std::string oauth_header;
119
120 try
121 {
122 oauth_header = _auth.getClient().getFormattedHttpHeader(OAuth::Http::Get, url, "");
123
124 if (!oauth_header.empty())
125 {
126 headers.add(oauth_header);
127 }
128 } catch (const OAuth::ParseError& error)
129 {
130 std::cout << "Error generating OAuth header:" << std::endl;
131 std::cout << error.what() << std::endl;
132 std::cout << "This is likely due to a malformed URL." << std::endl;
133
134 assert(false);
135 }
136
137 try
138 {
139 conn.add<CURLOPT_HEADERFUNCTION>(nullptr);
140 conn.add<CURLOPT_HEADERDATA>(nullptr);
141 conn.add<CURLOPT_XFERINFOFUNCTION>([] (void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) {
142 return static_cast<stream*>(cdata)->progress();
143 });
144 conn.add<CURLOPT_XFERINFODATA>(this);
145 conn.add<CURLOPT_NOPROGRESS>(0);
146 //conn.add<CURLOPT_VERBOSE>(1);
147 //conn.add<CURLOPT_DEBUGFUNCTION>(my_trace);
148 conn.add<CURLOPT_URL>(url.c_str());
149 conn.add<CURLOPT_HTTPHEADER>(headers.get());
150 } catch (const curl::curl_exception& error)
151 {
152 error.print_traceback();
153
154 assert(false);
155 }
156
157 bool failure = false;
158 try
159 {
160 conn.perform();
161 } catch (const curl::curl_easy_exception& error)
162 {
163 failure = true;
164 if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop)
165 {
166 break;
167 } else {
168 if (_backoff_type == backoff::none)
169 {
170 _established = false;
171 _backoff_type = backoff::network;
172 _backoff_amount = std::chrono::milliseconds(0);
173 }
174 }
175 }
176
177 if (!failure)
178 {
179 long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
180 if (response_code == 420)
181 {
182 if (_backoff_type == backoff::none)
183 {
184 _established = false;
185 _backoff_type = backoff::rate_limit;
186 _backoff_amount = std::chrono::minutes(1);
187 }
188 } else if (response_code != 200)
189 {
190 if (_backoff_type == backoff::none)
191 {
192 _established = false;
193 _backoff_type = backoff::http;
194 _backoff_amount = std::chrono::seconds(5);
195 }
196 } else {
197 if (_backoff_type == backoff::none)
198 {
199 _established = false;
200 _backoff_type = backoff::network;
201 _backoff_amount = std::chrono::milliseconds(0);
202 }
203 }
204 }
205
206 std::this_thread::sleep_for(_backoff_amount);
207
208 switch (_backoff_type)
209 {
210 case backoff::network:
211 {
212 if (_backoff_amount < std::chrono::seconds(16))
213 {
214 _backoff_amount += std::chrono::milliseconds(250);
215 }
216
217 break;
218 }
219
220 case backoff::http:
221 {
222 if (_backoff_amount < std::chrono::seconds(320))
223 {
224 _backoff_amount *= 2;
225 }
226
227 break;
228 }
229
230 case backoff::rate_limit:
231 {
232 _backoff_amount *= 2;
233
234 break;
235 }
236
237 case backoff::none:
238 {
239 break;
240 }
241 }
242 }
243 }
244
245 size_t stream::write(char* ptr, size_t size, size_t nmemb)
246 {
247 for (size_t i = 0; i < size*nmemb; i++)
248 {
249 if (ptr[i] == '\r')
250 {
251 i++; // Skip the \n
252
253 if (!_buffer.empty())
254 {
255 notification n(_currentUser, _buffer);
256 if (n.getType() == notification::type::friends)
257 {
258 _established = true;
259 _backoff_type = backoff::none;
260 _backoff_amount = std::chrono::milliseconds(0);
261 }
262
263 _notify(std::move(n));
264
265 _buffer = "";
266 }
267 } else {
268 _buffer.push_back(ptr[i]);
269 }
270 }
271
272 time(&_last_write);
273
274 return size*nmemb;
275 }
276
277 int stream::progress()
278 {
279 if (_stop)
280 {
281 return 1;
282 }
283
284 if (_established)
285 {
286 if (difftime(time(NULL), _last_write) >= 90)
287 {
288 return 1;
289 }
290 }
291
292 return 0;
293 }
294
295}