From 0ccac89815ee92c69fefc148cfb272faf7309136 Mon Sep 17 00:00:00 2001 From: Kelly Rauchenberger Date: Fri, 20 May 2016 15:34:44 -0400 Subject: Started implementing user streams You can now start a user stream and end it yourself. If it disconnects abnormally, it will reconnect with a backoff as described by Twitter. Some data structures have some fields parsed now; tweets have IDs, text, and authors. Users have IDs, screen names, and names. Notifications from the stream are parsed completely. The ability to follow and unfollow users has also been added, as well as the ability to get a list of friends and followers, and to reply to a tweet. --- src/client.cpp | 449 ++++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 415 insertions(+), 34 deletions(-) (limited to 'src/client.cpp') diff --git a/src/client.cpp b/src/client.cpp index 26f3289..ffb651b 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -4,7 +4,9 @@ #include #include #include "util.h" -#include +#include + +using nlohmann::json; // These are here for debugging curl stuff @@ -80,11 +82,29 @@ int my_trace(CURL *handle, curl_infotype type, namespace twitter { + int client_stream_progress_callback_wrapper(void* cdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) + { + return static_cast(cdata)->progress(); + } + + size_t client_stream_write_callback_wrapper(void* ptr, size_t size, size_t nmemb, void* cdata) + { + return static_cast(cdata)->write(static_cast(ptr), size, nmemb); + } + client::client(const auth& _arg) { _oauth_consumer = new OAuth::Consumer(_arg.getConsumerKey(), _arg.getConsumerSecret()); _oauth_token = new OAuth::Token(_arg.getAccessKey(), _arg.getAccessSecret()); _oauth_client = new OAuth::Client(_oauth_consumer, _oauth_token); + + std::string url = "https://api.twitter.com/1.1/account/verify_credentials.json"; + long response_code; + std::string response_data; + if (performGet(url, response_code, response_data) && (response_code == 200)) + { + _current_user = user(response_data); + } } client::~client() @@ -94,11 +114,17 @@ namespace twitter { delete _oauth_consumer; } - response client::updateStatus(std::string msg, tweet& result, std::list media_ids) + response client::updateStatus(std::string msg, tweet& result, tweet in_response_to, std::list media_ids) { std::stringstream datastrstream; datastrstream << "status=" << OAuth::PercentEncode(msg); + if (in_response_to) + { + datastrstream << "&in_reply_to_status_id="; + datastrstream << in_response_to.getID(); + } + if (!media_ids.empty()) { datastrstream << "&media_ids="; @@ -109,7 +135,7 @@ namespace twitter { std::string url = "https://api.twitter.com/1.1/statuses/update.json"; long response_code; - json response_data; + std::string response_data; if (!performPost(url, datastr, response_code, response_data)) { return response::curl_error; @@ -147,7 +173,7 @@ namespace twitter { } long response_code; - json response_data; + std::string response_data; if (!performMultiPost("https://upload.twitter.com/1.1/media/upload.json", form.get(), response_code, response_data)) { return response::curl_error; @@ -158,7 +184,8 @@ namespace twitter { return codeForError(response_code, response_data); } - media_id = response_data["media_id"].get(); + auto response_json = json::parse(response_data); + media_id = response_json["media_id"].get(); curl_httppost* append_form_post = nullptr; curl_httppost* append_form_last = nullptr; @@ -197,7 +224,8 @@ namespace twitter { return codeForError(response_code, response_data); } - if (response_data.find("processing_info") != response_data.end()) + response_json = json::parse(response_data); + if (response_json.find("processing_info") != response_json.end()) { std::stringstream datastr; datastr << "https://upload.twitter.com/1.1/media/upload.json?command=STATUS&media_id=" << media_id; @@ -214,20 +242,183 @@ namespace twitter { return codeForError(response_code, response_data); } - if (response_data["processing_info"]["state"] == "succeeded") + response_json = json::parse(response_data); + if (response_json["processing_info"]["state"] == "succeeded") { break; } - int ttw = response_data["processing_info"]["check_after_secs"].get(); - sleep(ttw); + int ttw = response_json["processing_info"]["check_after_secs"].get(); + std::this_thread::sleep_for(std::chrono::seconds(ttw)); } } return response::ok; } - bool client::performGet(std::string url, long& response_code, json& result) + response client::follow(user_id toFollow) + { + std::stringstream datastrstream; + datastrstream << "follow=true&user_id="; + datastrstream << toFollow; + + std::string datastr = datastrstream.str(); + std::string url = "https://api.twitter.com/1.1/friendships/create.json"; + + long response_code; + std::string response_data; + if (!performPost(url, datastr, response_code, response_data)) + { + return response::curl_error; + } + + if (response_code == 200) + { + return response::ok; + } else { + return codeForError(response_code, response_data); + } + } + + response client::follow(user toFollow) + { + return follow(toFollow.getID()); + } + + response client::unfollow(user_id toUnfollow) + { + std::stringstream datastrstream; + datastrstream << "user_id="; + datastrstream << toUnfollow; + + std::string datastr = datastrstream.str(); + std::string url = "https://api.twitter.com/1.1/friendships/destroy.json"; + + long response_code; + std::string response_data; + if (!performPost(url, datastr, response_code, response_data)) + { + return response::curl_error; + } + + if (response_code == 200) + { + return response::ok; + } else { + return codeForError(response_code, response_data); + } + } + + response client::unfollow(user toUnfollow) + { + return unfollow(toUnfollow.getID()); + } + + const user& client::getUser() const + { + return _current_user; + } + + response client::getFriends(std::set& _ret) + { + if (!_current_user) + { + return response::unknown_error; + } + + long long cursor = -1; + std::set result; + + while (cursor != 0) + { + std::stringstream urlstream; + urlstream << "https://api.twitter.com/1.1/friends/ids.json?user_id="; + urlstream << _current_user.getID(); + urlstream << "&cursor="; + urlstream << cursor; + + std::string url = urlstream.str(); + + long response_code; + std::string response_data; + if (!performGet(url, response_code, response_data)) + { + return response::curl_error; + } + + if (response_code == 200) + { + json rjs = json::parse(response_data); + cursor = rjs.at("next_cursor"); + result.insert(std::begin(rjs.at("ids")), std::end(rjs.at("ids"))); + } else { + return codeForError(response_code, response_data); + } + } + + _ret = result; + + return response::ok; + } + + response client::getFollowers(std::set& _ret) + { + if (!_current_user) + { + return response::unknown_error; + } + + long long cursor = -1; + std::set result; + + while (cursor != 0) + { + std::stringstream urlstream; + urlstream << "https://api.twitter.com/1.1/followers/ids.json?user_id="; + urlstream << _current_user.getID(); + urlstream << "&cursor="; + urlstream << cursor; + + std::string url = urlstream.str(); + + long response_code; + std::string response_data; + if (!performGet(url, response_code, response_data)) + { + return response::curl_error; + } + + if (response_code == 200) + { + json rjs = json::parse(response_data); + cursor = rjs.at("next_cursor"); + result.insert(std::begin(rjs.at("ids")), std::end(rjs.at("ids"))); + } else { + return codeForError(response_code, response_data); + } + } + + _ret = result; + + return response::ok; + } + + void client::setUserStreamNotifyCallback(stream::notify_callback callback) + { + _user_stream.setNotifyCallback(callback); + } + + void client::startUserStream() + { + _user_stream.start(); + } + + void client::stopUserStream() + { + _user_stream.stop(); + } + + bool client::performGet(std::string url, long& response_code, std::string& result) { std::ostringstream output; curl::curl_ios ios(output); @@ -255,17 +446,12 @@ namespace twitter { } response_code = conn.get_info().get(); - if (output.str().empty()) - { - result = json(); - } else { - result = json::parse(output.str()); - } + result = output.str(); return true; } - bool client::performPost(std::string url, std::string datastr, long& response_code, json& result) + bool client::performPost(std::string url, std::string datastr, long& response_code, std::string& result) { std::ostringstream output; curl::curl_ios ios(output); @@ -294,17 +480,12 @@ namespace twitter { } response_code = conn.get_info().get(); - if (output.str().empty()) - { - result = json(); - } else { - result = json::parse(output.str()); - } + result = output.str(); return true; } - bool client::performMultiPost(std::string url, const curl_httppost* fields, long& response_code, json& result) + bool client::performMultiPost(std::string url, const curl_httppost* fields, long& response_code, std::string& result) { std::ostringstream output; curl::curl_ios ios(output); @@ -333,23 +514,19 @@ namespace twitter { } response_code = conn.get_info().get(); - - if (output.str().empty()) - { - result = json(); - } else { - result = json::parse(output.str()); - } + result = output.str(); return true; } - response client::codeForError(int response_code, json response_data) const + response client::codeForError(int response_code, std::string response_data) const { + auto response_json = json::parse(response_data); + std::set error_codes; - if (response_data.find("errors") != response_data.end()) + if (response_json.find("errors") != response_json.end()) { - std::transform(std::begin(response_data["errors"]), std::end(response_data["errors"]), std::inserter(error_codes, std::begin(error_codes)), [] (const json& error) { + std::transform(std::begin(response_json["errors"]), std::end(response_json["errors"]), std::inserter(error_codes, std::begin(error_codes)), [] (const json& error) { return error["code"].get(); }); } @@ -407,4 +584,208 @@ namespace twitter { } } + client::stream::stream(client& _client) : _client(_client) + { + + } + + bool client::stream::isRunning() const + { + return _thread.joinable(); + } + + void client::stream::setNotifyCallback(notify_callback _n) + { + std::lock_guard _notify_lock(_notify_mutex); + _notify = _n; + } + + void client::stream::start() + { + std::lock_guard _running_lock(_running_mutex); + + if (!_thread.joinable()) + { + _thread = std::thread(&stream::run, this); + } + } + + void client::stream::stop() + { + std::lock_guard _running_lock(_running_mutex); + + if (_thread.joinable()) + { + _stop = true; + _thread.join(); + _stop = false; + } + } + + void client::stream::run() + { + curl::curl_easy conn; + std::string url = "https://userstream.twitter.com/1.1/user.json"; + + curl::curl_header headers; + std::string oauth_header = _client._oauth_client->getFormattedHttpHeader(OAuth::Http::Get, url, ""); + if (!oauth_header.empty()) + { + headers.add(oauth_header); + } + + conn.add(client_stream_write_callback_wrapper); + conn.add(this); + conn.add(nullptr); + conn.add(nullptr); + conn.add(client_stream_progress_callback_wrapper); + conn.add(this); + conn.add(0); + //conn.add(1); + //conn.add(my_trace); + conn.add(url.c_str()); + conn.add(headers.get()); + + _backoff_type = backoff::none; + _backoff_amount = std::chrono::milliseconds(0); + for (;;) + { + bool failure = false; + try { + conn.perform(); + } catch (curl::curl_easy_exception error) + { + failure = true; + if ((error.get_code() == CURLE_ABORTED_BY_CALLBACK) && _stop) + { + break; + } else { + if (_backoff_type == backoff::none) + { + _established = false; + _backoff_type = backoff::network; + _backoff_amount = std::chrono::milliseconds(0); + } + } + } + + if (!failure) + { + long response_code = conn.get_info().get(); + if (response_code == 420) + { + if (_backoff_type == backoff::none) + { + _established = false; + _backoff_type = backoff::rate_limit; + _backoff_amount = std::chrono::minutes(1); + } + } else if (response_code != 200) + { + if (_backoff_type == backoff::none) + { + _established = false; + _backoff_type = backoff::http; + _backoff_amount = std::chrono::seconds(5); + } + } else { + break; + } + } + + std::this_thread::sleep_for(_backoff_amount); + + switch (_backoff_type) + { + case backoff::network: + { + if (_backoff_amount < std::chrono::seconds(16)) + { + _backoff_amount += std::chrono::milliseconds(250); + } + + break; + } + + case backoff::http: + { + if (_backoff_amount < std::chrono::seconds(320)) + { + _backoff_amount *= 2; + } + + break; + } + + case backoff::rate_limit: + { + _backoff_amount *= 2; + + break; + } + } + } + } + + int client::stream::write(char* ptr, size_t size, size_t nmemb) + { + for (size_t i = 0; i < size*nmemb; i++) + { + if (ptr[i] == '\r') + { + i++; // Skip the \n + + if (!_buffer.empty()) + { + notification n(_buffer, _client._current_user); + if (n.getType() == notification::type::friends) + { + _established = true; + _backoff_type = backoff::none; + _backoff_amount = std::chrono::milliseconds(0); + } + + { + std::lock_guard _notify_lock(_notify_mutex); + + if (_notify) + { + _notify(n); + } + } + + _buffer = ""; + } + } else { + _buffer.push_back(ptr[i]); + } + } + + { + std::lock_guard _stall_lock(_stall_mutex); + time(&_last_write); + } + + return size*nmemb; + } + + int client::stream::progress() + { + if (_stop) + { + return 1; + } + + if (_established) + { + std::lock_guard _stall_lock(_stall_mutex); + if (difftime(time(NULL), _last_write) >= 90) + { + return 1; + } + } + + return 0; + } + }; -- cgit 1.4.1