about summary refs log tree commit diff stats
path: root/src/client.cpp
diff options
context:
space:
mode:
authorKelly Rauchenberger <fefferburbia@gmail.com>2016-05-20 15:34:44 -0400
committerKelly Rauchenberger <fefferburbia@gmail.com>2016-05-20 15:34:44 -0400
commit0ccac89815ee92c69fefc148cfb272faf7309136 (patch)
tree228775a433018c8a5fd20f0ebb0f8446057b2112 /src/client.cpp
parentf465dce27cf0f07039e29d8975ad98939f0df3a9 (diff)
downloadlibtwittercpp-0ccac89815ee92c69fefc148cfb272faf7309136.tar.gz
libtwittercpp-0ccac89815ee92c69fefc148cfb272faf7309136.tar.bz2
libtwittercpp-0ccac89815ee92c69fefc148cfb272faf7309136.zip
Started implementing user streams
You can now start a user stream and end it yourself. If it disconnects abnormally, it will reconnect with a backoff as described by Twitter. Some data structures have some fields parsed now; tweets have IDs, text, and authors. Users have IDs, screen names, and names. Notifications from the stream are parsed completely. The ability to follow and unfollow users has also been added, as well as the ability to get a list of friends and followers, and to reply to a tweet.
Diffstat (limited to 'src/client.cpp')
-rw-r--r--src/client.cpp449
1 files changed, 415 insertions, 34 deletions
diff --git a/src/client.cpp b/src/client.cpp index 26f3289..ffb651b 100644 --- a/src/client.cpp +++ b/src/client.cpp
@@ -4,7 +4,9 @@
4#include <algorithm> 4#include <algorithm>
5#include <liboauthcpp/liboauthcpp.h> 5#include <liboauthcpp/liboauthcpp.h>
6#include "util.h" 6#include "util.h"
7#include <unistd.h> 7#include <json.hpp>
8
9using nlohmann::json;
8 10
9// These are here for debugging curl stuff 11// These are here for debugging curl stuff
10 12
@@ -80,11 +82,29 @@ int my_trace(CURL *handle, curl_infotype type,
80 82
81namespace twitter { 83namespace twitter {
82 84
85 int client_stream_progress_callback_wrapper(void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t)
86 {
87 return static_cast<client::stream*>(cdata)->progress();
88 }
89
90 size_t client_stream_write_callback_wrapper(void* ptr, size_t size, size_t nmemb, void* cdata)
91 {
92 return static_cast<client::stream*>(cdata)->write(static_cast<char*>(ptr), size, nmemb);
93 }
94
83 client::client(const auth& _arg) 95 client::client(const auth& _arg)
84 { 96 {
85 _oauth_consumer = new OAuth::Consumer(_arg.getConsumerKey(), _arg.getConsumerSecret()); 97 _oauth_consumer = new OAuth::Consumer(_arg.getConsumerKey(), _arg.getConsumerSecret());
86 _oauth_token = new OAuth::Token(_arg.getAccessKey(), _arg.getAccessSecret()); 98 _oauth_token = new OAuth::Token(_arg.getAccessKey(), _arg.getAccessSecret());
87 _oauth_client = new OAuth::Client(_oauth_consumer, _oauth_token); 99 _oauth_client = new OAuth::Client(_oauth_consumer, _oauth_token);
100
101 std::string url = "https://api.twitter.com/1.1/account/verify_credentials.json";
102 long response_code;
103 std::string response_data;
104 if (performGet(url, response_code, response_data) && (response_code == 200))
105 {
106 _current_user = user(response_data);
107 }
88 } 108 }
89 109
90 client::~client() 110 client::~client()
@@ -94,11 +114,17 @@ namespace twitter {
94 delete _oauth_consumer; 114 delete _oauth_consumer;
95 } 115 }
96 116
97 response client::updateStatus(std::string msg, tweet& result, std::list<long> media_ids) 117 response client::updateStatus(std::string msg, tweet& result, tweet in_response_to, std::list<long> media_ids)
98 { 118 {
99 std::stringstream datastrstream; 119 std::stringstream datastrstream;
100 datastrstream << "status=" << OAuth::PercentEncode(msg); 120 datastrstream << "status=" << OAuth::PercentEncode(msg);
101 121
122 if (in_response_to)
123 {
124 datastrstream << "&in_reply_to_status_id=";
125 datastrstream << in_response_to.getID();
126 }
127
102 if (!media_ids.empty()) 128 if (!media_ids.empty())
103 { 129 {
104 datastrstream << "&media_ids="; 130 datastrstream << "&media_ids=";
@@ -109,7 +135,7 @@ namespace twitter {
109 std::string url = "https://api.twitter.com/1.1/statuses/update.json"; 135 std::string url = "https://api.twitter.com/1.1/statuses/update.json";
110 136
111 long response_code; 137 long response_code;
112 json response_data; 138 std::string response_data;
113 if (!performPost(url, datastr, response_code, response_data)) 139 if (!performPost(url, datastr, response_code, response_data))
114 { 140 {
115 return response::curl_error; 141 return response::curl_error;
@@ -147,7 +173,7 @@ namespace twitter {
147 } 173 }
148 174
149 long response_code; 175 long response_code;
150 json response_data; 176 std::string response_data;
151 if (!performMultiPost("https://upload.twitter.com/1.1/media/upload.json", form.get(), response_code, response_data)) 177 if (!performMultiPost("https://upload.twitter.com/1.1/media/upload.json", form.get(), response_code, response_data))
152 { 178 {
153 return response::curl_error; 179 return response::curl_error;
@@ -158,7 +184,8 @@ namespace twitter {
158 return codeForError(response_code, response_data); 184 return codeForError(response_code, response_data);
159 } 185 }
160 186
161 media_id = response_data["media_id"].get<long>(); 187 auto response_json = json::parse(response_data);
188 media_id = response_json["media_id"].get<long>();
162 189
163 curl_httppost* append_form_post = nullptr; 190 curl_httppost* append_form_post = nullptr;
164 curl_httppost* append_form_last = nullptr; 191 curl_httppost* append_form_last = nullptr;
@@ -197,7 +224,8 @@ namespace twitter {
197 return codeForError(response_code, response_data); 224 return codeForError(response_code, response_data);
198 } 225 }
199 226
200 if (response_data.find("processing_info") != response_data.end()) 227 response_json = json::parse(response_data);
228 if (response_json.find("processing_info") != response_json.end())
201 { 229 {
202 std::stringstream datastr; 230 std::stringstream datastr;
203 datastr << "https://upload.twitter.com/1.1/media/upload.json?command=STATUS&media_id=" << media_id; 231 datastr << "https://upload.twitter.com/1.1/media/upload.json?command=STATUS&media_id=" << media_id;
@@ -214,20 +242,183 @@ namespace twitter {
214 return codeForError(response_code, response_data); 242 return codeForError(response_code, response_data);
215 } 243 }
216 244
217 if (response_data["processing_info"]["state"] == "succeeded") 245 response_json = json::parse(response_data);
246 if (response_json["processing_info"]["state"] == "succeeded")
218 { 247 {
219 break; 248 break;
220 } 249 }
221 250
222 int ttw = response_data["processing_info"]["check_after_secs"].get<int>(); 251 int ttw = response_json["processing_info"]["check_after_secs"].get<int>();
223 sleep(ttw); 252 std::this_thread::sleep_for(std::chrono::seconds(ttw));
224 } 253 }
225 } 254 }
226 255
227 return response::ok; 256 return response::ok;
228 } 257 }
229 258
230 bool client::performGet(std::string url, long& response_code, json& result) 259 response client::follow(user_id toFollow)
260 {
261 std::stringstream datastrstream;
262 datastrstream << "follow=true&user_id=";
263 datastrstream << toFollow;
264
265 std::string datastr = datastrstream.str();
266 std::string url = "https://api.twitter.com/1.1/friendships/create.json";
267
268 long response_code;
269 std::string response_data;
270 if (!performPost(url, datastr, response_code, response_data))
271 {
272 return response::curl_error;
273 }
274
275 if (response_code == 200)
276 {
277 return response::ok;
278 } else {
279 return codeForError(response_code, response_data);
280 }
281 }
282
283 response client::follow(user toFollow)
284 {
285 return follow(toFollow.getID());
286 }
287
288 response client::unfollow(user_id toUnfollow)
289 {
290 std::stringstream datastrstream;
291 datastrstream << "user_id=";
292 datastrstream << toUnfollow;
293
294 std::string datastr = datastrstream.str();
295 std::string url = "https://api.twitter.com/1.1/friendships/destroy.json";
296
297 long response_code;
298 std::string response_data;
299 if (!performPost(url, datastr, response_code, response_data))
300 {
301 return response::curl_error;
302 }
303
304 if (response_code == 200)
305 {
306 return response::ok;
307 } else {
308 return codeForError(response_code, response_data);
309 }
310 }
311
312 response client::unfollow(user toUnfollow)
313 {
314 return unfollow(toUnfollow.getID());
315 }
316
317 const user& client::getUser() const
318 {
319 return _current_user;
320 }
321
322 response client::getFriends(std::set<user_id>& _ret)
323 {
324 if (!_current_user)
325 {
326 return response::unknown_error;
327 }
328
329 long long cursor = -1;
330 std::set<user_id> result;
331
332 while (cursor != 0)
333 {
334 std::stringstream urlstream;
335 urlstream << "https://api.twitter.com/1.1/friends/ids.json?user_id=";
336 urlstream << _current_user.getID();
337 urlstream << "&cursor=";
338 urlstream << cursor;
339
340 std::string url = urlstream.str();
341
342 long response_code;
343 std::string response_data;
344 if (!performGet(url, response_code, response_data))
345 {
346 return response::curl_error;
347 }
348
349 if (response_code == 200)
350 {
351 json rjs = json::parse(response_data);
352 cursor = rjs.at("next_cursor");
353 result.insert(std::begin(rjs.at("ids")), std::end(rjs.at("ids")));
354 } else {
355 return codeForError(response_code, response_data);
356 }
357 }
358
359 _ret = result;
360
361 return response::ok;
362 }
363
364 response client::getFollowers(std::set<user_id>& _ret)
365 {
366 if (!_current_user)
367 {
368 return response::unknown_error;
369 }
370
371 long long cursor = -1;
372 std::set<user_id> result;
373
374 while (cursor != 0)
375 {
376 std::stringstream urlstream;
377 urlstream << "https://api.twitter.com/1.1/followers/ids.json?user_id=";
378 urlstream << _current_user.getID();
379 urlstream << "&cursor=";
380 urlstream << cursor;
381
382 std::string url = urlstream.str();
383
384 long response_code;
385 std::string response_data;
386 if (!performGet(url, response_code, response_data))
387 {
388 return response::curl_error;
389 }
390
391 if (response_code == 200)
392 {
393 json rjs = json::parse(response_data);
394 cursor = rjs.at("next_cursor");
395 result.insert(std::begin(rjs.at("ids")), std::end(rjs.at("ids")));
396 } else {
397 return codeForError(response_code, response_data);
398 }
399 }
400
401 _ret = result;
402
403 return response::ok;
404 }
405
406 void client::setUserStreamNotifyCallback(stream::notify_callback callback)
407 {
408 _user_stream.setNotifyCallback(callback);
409 }
410
411 void client::startUserStream()
412 {
413 _user_stream.start();
414 }
415
416 void client::stopUserStream()
417 {
418 _user_stream.stop();
419 }
420
421 bool client::performGet(std::string url, long& response_code, std::string& result)
231 { 422 {
232 std::ostringstream output; 423 std::ostringstream output;
233 curl::curl_ios<std::ostringstream> ios(output); 424 curl::curl_ios<std::ostringstream> ios(output);
@@ -255,17 +446,12 @@ namespace twitter {
255 } 446 }
256 447
257 response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get(); 448 response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
258 if (output.str().empty()) 449 result = output.str();
259 {
260 result = json();
261 } else {
262 result = json::parse(output.str());
263 }
264 450
265 return true; 451 return true;
266 } 452 }
267 453
268 bool client::performPost(std::string url, std::string datastr, long& response_code, json& result) 454 bool client::performPost(std::string url, std::string datastr, long& response_code, std::string& result)
269 { 455 {
270 std::ostringstream output; 456 std::ostringstream output;
271 curl::curl_ios<std::ostringstream> ios(output); 457 curl::curl_ios<std::ostringstream> ios(output);
@@ -294,17 +480,12 @@ namespace twitter {
294 } 480 }
295 481
296 response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get(); 482 response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
297 if (output.str().empty()) 483 result = output.str();
298 {
299 result = json();
300 } else {
301 result = json::parse(output.str());
302 }
303 484
304 return true; 485 return true;
305 } 486 }
306 487
307 bool client::performMultiPost(std::string url, const curl_httppost* fields, long& response_code, json& result) 488 bool client::performMultiPost(std::string url, const curl_httppost* fields, long& response_code, std::string& result)
308 { 489 {
309 std::ostringstream output; 490 std::ostringstream output;
310 curl::curl_ios<std::ostringstream> ios(output); 491 curl::curl_ios<std::ostringstream> ios(output);
@@ -333,23 +514,19 @@ namespace twitter {
333 } 514 }
334 515
335 response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get(); 516 response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
336 517 result = output.str();
337 if (output.str().empty())
338 {
339 result = json();
340 } else {
341 result = json::parse(output.str());
342 }
343 518
344 return true; 519 return true;
345 } 520 }
346 521
347 response client::codeForError(int response_code, json response_data) const 522 response client::codeForError(int response_code, std::string response_data) const
348 { 523 {
524 auto response_json = json::parse(response_data);
525
349 std::set<int> error_codes; 526 std::set<int> error_codes;
350 if (response_data.find("errors") != response_data.end()) 527 if (response_json.find("errors") != response_json.end())
351 { 528 {
352 std::transform(std::begin(response_data["errors"]), std::end(response_data["errors"]), std::inserter(error_codes, std::begin(error_codes)), [] (const json& error) { 529 std::transform(std::begin(response_json["errors"]), std::end(response_json["errors"]), std::inserter(error_codes, std::begin(error_codes)), [] (const json& error) {
353 return error["code"].get<int>(); 530 return error["code"].get<int>();
354 }); 531 });
355 } 532 }
@@ -407,4 +584,208 @@ namespace twitter {
407 } 584 }
408 } 585 }
409 586
587 client::stream::stream(client& _client) : _client(_client)
588 {
589
590 }
591
592 bool client::stream::isRunning() const
593 {
594 return _thread.joinable();
595 }
596
597 void client::stream::setNotifyCallback(notify_callback _n)
598 {
599 std::lock_guard<std::mutex> _notify_lock(_notify_mutex);
600 _notify = _n;
601 }
602
603 void client::stream::start()
604 {
605 std::lock_guard<std::mutex> _running_lock(_running_mutex);
606
607 if (!_thread.joinable())
608 {
609 _thread = std::thread(&stream::run, this);
610 }
611 }
612
613 void client::stream::stop()
614 {
615 std::lock_guard<std::mutex> _running_lock(_running_mutex);
616
617 if (_thread.joinable())
618 {
619 _stop = true;
620 _thread.join();
621 _stop = false;
622 }
623 }
624
625 void client::stream::run()
626 {
627 curl::curl_easy conn;
628 std::string url = "https://userstream.twitter.com/1.1/user.json";
629
630 curl::curl_header headers;
631 std::string oauth_header = _client._oauth_client->getFormattedHttpHeader(OAuth::Http::Get, url, "");
632 if (!oauth_header.empty())
633 {
634 headers.add(oauth_header);
635 }
636
637 conn.add<CURLOPT_WRITEFUNCTION>(client_stream_write_callback_wrapper);
638 conn.add<CURLOPT_WRITEDATA>(this);
639 conn.add<CURLOPT_HEADERFUNCTION>(nullptr);
640 conn.add<CURLOPT_HEADERDATA>(nullptr);
641 conn.add<CURLOPT_XFERINFOFUNCTION>(client_stream_progress_callback_wrapper);
642 conn.add<CURLOPT_XFERINFODATA>(this);
643 conn.add<CURLOPT_NOPROGRESS>(0);
644 //conn.add<CURLOPT_VERBOSE>(1);
645 //conn.add<CURLOPT_DEBUGFUNCTION>(my_trace);
646 conn.add<CURLOPT_URL>(url.c_str());
647 conn.add<CURLOPT_HTTPHEADER>(headers.get());
648
649 _backoff_type = backoff::none;
650 _backoff_amount = std::chrono::milliseconds(0);
651 for (;;)
652 {
653 bool failure = false;
654 try {
655 conn.perform();
656 } catch (curl::curl_easy_exception error)
657 {
658 failure = true;
659 if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop)
660 {
661 break;
662 } else {
663 if (_backoff_type == backoff::none)
664 {
665 _established = false;
666 _backoff_type = backoff::network;
667 _backoff_amount = std::chrono::milliseconds(0);
668 }
669 }
670 }
671
672 if (!failure)
673 {
674 long response_code = conn.get_info<CURLINFO_RESPONSE_CODE>().get();
675 if (response_code == 420)
676 {
677 if (_backoff_type == backoff::none)
678 {
679 _established = false;
680 _backoff_type = backoff::rate_limit;
681 _backoff_amount = std::chrono::minutes(1);
682 }
683 } else if (response_code != 200)
684 {
685 if (_backoff_type == backoff::none)
686 {
687 _established = false;
688 _backoff_type = backoff::http;
689 _backoff_amount = std::chrono::seconds(5);
690 }
691 } else {
692 break;
693 }
694 }
695
696 std::this_thread::sleep_for(_backoff_amount);
697
698 switch (_backoff_type)
699 {
700 case backoff::network:
701 {
702 if (_backoff_amount < std::chrono::seconds(16))
703 {
704 _backoff_amount += std::chrono::milliseconds(250);
705 }
706
707 break;
708 }
709
710 case backoff::http:
711 {
712 if (_backoff_amount < std::chrono::seconds(320))
713 {
714 _backoff_amount *= 2;
715 }
716
717 break;
718 }
719
720 case backoff::rate_limit:
721 {
722 _backoff_amount *= 2;
723
724 break;
725 }
726 }
727 }
728 }
729
730 int client::stream::write(char* ptr, size_t size, size_t nmemb)
731 {
732 for (size_t i = 0; i < size*nmemb; i++)
733 {
734 if (ptr[i] == '\r')
735 {
736 i++; // Skip the \n
737
738 if (!_buffer.empty())
739 {
740 notification n(_buffer, _client._current_user);
741 if (n.getType() == notification::type::friends)
742 {
743 _established = true;
744 _backoff_type = backoff::none;
745 _backoff_amount = std::chrono::milliseconds(0);
746 }
747
748 {
749 std::lock_guard<std::mutex> _notify_lock(_notify_mutex);
750
751 if (_notify)
752 {
753 _notify(n);
754 }
755 }
756
757 _buffer = "";
758 }
759 } else {
760 _buffer.push_back(ptr[i]);
761 }
762 }
763
764 {
765 std::lock_guard<std::mutex> _stall_lock(_stall_mutex);
766 time(&_last_write);
767 }
768
769 return size*nmemb;
770 }
771
772 int client::stream::progress()
773 {
774 if (_stop)
775 {
776 return 1;
777 }
778
779 if (_established)
780 {
781 std::lock_guard<std::mutex> _stall_lock(_stall_mutex);
782 if (difftime(time(NULL), _last_write) >= 90)
783 {
784 return 1;
785 }
786 }
787
788 return 0;
789 }
790
410}; 791};