From 7ab5ed7229a4e48ff8ef9353c8384f66d84761fa Mon Sep 17 00:00:00 2001 From: Git Date: Wed, 20 Jun 2012 16:28:00 +0000 Subject: [PATCH] Ocelot 0.4 --- CHANGES | 8 ++ db.cpp | 259 +++++++++++++++++++++++++-------------------- events.cpp | 2 +- events.h | 6 +- misc_functions.cpp | 4 +- misc_functions.h | 2 +- ocelot.h | 9 +- schedule.h | 2 +- worker.cpp | 107 +++++++++++++------ 9 files changed, 240 insertions(+), 159 deletions(-) diff --git a/CHANGES b/CHANGES index d2da9df..67ed134 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,11 @@ +-- 0.4 (2012-06-20) +Convert all headers to lowercase +Don't show users to themselves (patch by GrecKo) +Don't write peer updates to the binlog +Fix incorrect handling of "corrupt" stat +Search for peer in leecher list if not found in seeders list +Sort response dictionary according to BitTorrent specs + -- 0.3 (2012-02-03) Add tokens Added a logging function diff --git a/db.cpp b/db.cpp index a5b2183..20240af 100644 --- a/db.cpp +++ b/db.cpp @@ -164,7 +164,7 @@ void mysql::flush_users() { " ON DUPLICATE KEY UPDATE Uploaded = Uploaded + VALUES(Uploaded), Downloaded = Downloaded + VALUES(Downloaded)"; user_queue.push(sql); update_user_buffer.clear(); - if (user_queue.size() == 1 && u_active == false) { + if (u_active == false) { boost::thread thread(&mysql::do_flush_users, this); } } @@ -184,7 +184,7 @@ void mysql::flush_torrents() { sql.clear(); sql = "DELETE FROM torrents WHERE info_hash = ''"; torrent_queue.push(sql); - if (torrent_queue.size() == 2 && t_active == false) { + if (t_active == false) { boost::thread thread(&mysql::do_flush_torrents, this); } } @@ -198,7 +198,7 @@ void mysql::flush_snatches() { sql = "INSERT INTO xbt_snatched (uid, fid, tstamp, IP) VALUES " + update_snatch_buffer; snatch_queue.push(sql); update_snatch_buffer.clear(); - if (snatch_queue.size() == 1 && s_active == false) { + if (s_active == false) { boost::thread thread(&mysql::do_flush_snatches, this); } } @@ -221,16 +221,16 @@ void mysql::flush_peers() { sql.clear(); } - sql = "INSERT INTO xbt_files_users (uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining," + - std::string("timespent,announced,ip,peer_id,useragent,mtime) VALUES ") + update_peer_buffer + + sql = "INSERT INTO xbt_files_users (uid,fid,active,uploaded,downloaded,upspeed,downspeed,remaining,corrupt," + + std::string("timespent,announced,ip,peer_id,useragent,mtime) VALUES ") + update_peer_buffer + " ON DUPLICATE KEY UPDATE active=VALUES(active), uploaded=VALUES(uploaded), " + "downloaded=VALUES(downloaded), upspeed=VALUES(upspeed), " + "downspeed=VALUES(downspeed), remaining=VALUES(remaining), " + - "timespent=VALUES(timespent), announced=VALUES(announced), " + - "mtime=VALUES(mtime)"; + "corrupt=VALUES(corrupt), timespent=VALUES(timespent), " + + "announced=VALUES(announced), mtime=VALUES(mtime)"; peer_queue.push(sql); update_peer_buffer.clear(); - if (peer_queue.size() == 2 && p_active == false) { + if (p_active == false) { boost::thread thread(&mysql::do_flush_peers, this); } } @@ -245,161 +245,196 @@ void mysql::flush_tokens() { " ON DUPLICATE KEY UPDATE Downloaded = Downloaded + VALUES(Downloaded)"; token_queue.push(sql); update_token_buffer.clear(); - if (token_queue.size() == 1 && tok_active == false) { + if (tok_active == false) { boost::thread(&mysql::do_flush_tokens, this); } } void mysql::do_flush_users() { u_active = true; - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); - while (user_queue.size() > 0) { - try { - std::string sql = user_queue.front(); - mysqlpp::Query query = c.query(sql); - if (!query.exec()) { - std::cout << "User flush failed (" << user_queue.size() << " remain)" << std::endl; + try { + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (user_queue.size() > 0) { + try { + std::string sql = user_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "User flush failed (" << user_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(user_buffer_lock); + user_queue.pop(); + std::cout << "Users flushed (" << user_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl; + sleep(3); + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl; sleep(3); continue; - } else { - boost::mutex::scoped_lock lock(user_buffer_lock); - user_queue.pop(); - std::cout << "Users flushed (" << user_queue.size() << " remain)" << std::endl; } - } - catch (const mysqlpp::BadQuery &er) { - std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl; - sleep(3); - continue; - } catch (const mysqlpp::Exception &er) { - std::cerr << "Query error: " << er.what() << " in flush users with a qlength: " << user_queue.front().size() << " queue size: " << user_queue.size() << std::endl; - sleep(3); - continue; } } + catch (const mysqlpp::Exception &er) { + std::cerr << "MySQL error in flush_users: " << er.what() << std::endl; + u_active = false; + return; + } u_active = false; } void mysql::do_flush_torrents() { t_active = true; - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); - while (torrent_queue.size() > 0) { - try { - std::string sql = torrent_queue.front(); - if (sql == "") { - torrent_queue.pop(); - continue; + try { + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (torrent_queue.size() > 0) { + try { + std::string sql = torrent_queue.front(); + if (sql == "") { + torrent_queue.pop(); + continue; + } + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Torrent flush failed (" << torrent_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(torrent_buffer_lock); + torrent_queue.pop(); + std::cout << "Torrents flushed (" << torrent_queue.size() << " remain)" << std::endl; + } } - mysqlpp::Query query = c.query(sql); - if (!query.exec()) { - std::cout << "Torrent flush failed (" << torrent_queue.size() << " remain)" << std::endl; + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl; + sleep(3); + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl; sleep(3); continue; - } else { - boost::mutex::scoped_lock lock(torrent_buffer_lock); - torrent_queue.pop(); - std::cout << "Torrents flushed (" << torrent_queue.size() << " remain)" << std::endl; } } - catch (const mysqlpp::BadQuery &er) { - std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl; - sleep(3); - continue; - } catch (const mysqlpp::Exception &er) { - std::cerr << "Query error: " << er.what() << " in flush torrents with a qlength: " << torrent_queue.front().size() << " queue size: " << torrent_queue.size() << std::endl; - sleep(3); - continue; - } + } + catch (const mysqlpp::Exception &er) { + std::cerr << "MySQL error in flush_torrents: " << er.what() << std::endl; + t_active = false; + return; } t_active = false; } void mysql::do_flush_peers() { p_active = true; - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); - while (peer_queue.size() > 0) { - try { - std::string sql = peer_queue.front(); - mysqlpp::Query query = c.query(sql); - if (!query.exec()) { - std::cout << "Peer flush failed (" << peer_queue.size() << " remain)" << std::endl; + try { + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (peer_queue.size() > 0) { + try { + std::string sql = peer_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Peer flush failed (" << peer_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(peer_buffer_lock); + peer_queue.pop(); + std::cout << "Peers flushed (" << peer_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl; + sleep(3); + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl; sleep(3); continue; - } else { - boost::mutex::scoped_lock lock(peer_buffer_lock); - peer_queue.pop(); - std::cout << "Peers flushed (" << peer_queue.size() << " remain)" << std::endl; } } - catch (const mysqlpp::BadQuery &er) { - std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl; - sleep(3); - continue; - } catch (const mysqlpp::Exception &er) { - std::cerr << "Query error: " << er.what() << " in flush peers with a qlength: " << peer_queue.front().size() << " queue size: " << peer_queue.size() << std::endl; - sleep(3); - continue; - } + } + catch (const mysqlpp::Exception &er) { + std::cerr << "MySQL error in flush_peers: " << er.what() << std::endl; + p_active = false; + return; } p_active = false; } void mysql::do_flush_snatches() { s_active = true; - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); - while (snatch_queue.size() > 0) { - try { - std::string sql = snatch_queue.front(); - mysqlpp::Query query = c.query(sql); - if (!query.exec()) { - std::cout << "Snatch flush failed (" << snatch_queue.size() << " remain)" << std::endl; + try { + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (snatch_queue.size() > 0) { + try { + std::string sql = snatch_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Snatch flush failed (" << snatch_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(snatch_buffer_lock); + snatch_queue.pop(); + std::cout << "Snatches flushed (" << snatch_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl; + sleep(3); + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl; sleep(3); continue; - } else { - boost::mutex::scoped_lock lock(snatch_buffer_lock); - snatch_queue.pop(); - std::cout << "Snatches flushed (" << snatch_queue.size() << " remain)" << std::endl; } - } - catch (const mysqlpp::BadQuery &er) { - std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl; - sleep(3); - continue; - } catch (const mysqlpp::Exception &er) { - std::cerr << "Query error: " << er.what() << " in flush snatches with a qlength: " << snatch_queue.front().size() << " queue size: " << snatch_queue.size() << std::endl; - sleep(3); - continue; } } + catch (const mysqlpp::Exception &er) { + std::cerr << "MySQL error in flush_snatches: " << er.what() << std::endl; + s_active = false; + return; + } s_active = false; } void mysql::do_flush_tokens() { tok_active = true; - mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); - while (token_queue.size() > 0) { - try { - std::string sql = token_queue.front(); - mysqlpp::Query query = c.query(sql); - if (!query.exec()) { - std::cout << "Token flush failed (" << token_queue.size() << " remain)" << std::endl; + try { + mysqlpp::Connection c(db.c_str(), server.c_str(), db_user.c_str(), pw.c_str(), 0); + while (token_queue.size() > 0) { + try { + std::string sql = token_queue.front(); + mysqlpp::Query query = c.query(sql); + if (!query.exec()) { + std::cout << "Token flush failed (" << token_queue.size() << " remain)" << std::endl; + sleep(3); + continue; + } else { + boost::mutex::scoped_lock lock(user_token_lock); + token_queue.pop(); + std::cout << "Tokens flushed (" << token_queue.size() << " remain)" << std::endl; + } + } + catch (const mysqlpp::BadQuery &er) { + std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl; + sleep(3); + continue; + } catch (const mysqlpp::Exception &er) { + std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl; sleep(3); continue; - } else { - boost::mutex::scoped_lock lock(user_token_lock); - token_queue.pop(); - std::cout << "Tokens flushed (" << token_queue.size() << " remain)" << std::endl; } } - catch (const mysqlpp::BadQuery &er) { - std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl; - sleep(3); - continue; - } catch (const mysqlpp::Exception &er) { - std::cerr << "Query error: " << er.what() << " in flush tokens with a qlength: " << token_queue.front().size() << " queue size: " << token_queue.size() << std::endl; - sleep(3); - continue; - } + } + catch (const mysqlpp::Exception &er) { + std::cerr << "MySQL error in flush_tokens: " << er.what() << std::endl; + tok_active = false; + return; } tok_active = false; } diff --git a/events.cpp b/events.cpp index 161946d..f366bc6 100644 --- a/events.cpp +++ b/events.cpp @@ -167,7 +167,7 @@ void connection_middleman::handle_read(ev::io &watcher, int events_flags) { void connection_middleman::handle_write(ev::io &watcher, int events_flags) { write_event.stop(); timeout_event.stop(); - std::string http_response = "HTTP/1.1 200\r\nServer: Ocelot 1.0\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n"; + std::string http_response = "HTTP/1.1 200 OK\r\nServer: Ocelot 1.0\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n"; http_response+=response; send(connect_sock, http_response.c_str(), http_response.size(), MSG_NOSIGNAL); delete this; diff --git a/events.h b/events.h index 85ed708..7f70c66 100644 --- a/events.h +++ b/events.h @@ -55,8 +55,8 @@ class connection_mother { mysql * db; ev::timer schedule_event; - unsigned long opened_connections; unsigned int open_connections; + uint64_t opened_connections; public: connection_mother(worker * worker_obj, config * config_obj, mysql * db_obj); @@ -64,8 +64,8 @@ class connection_mother { void increment_open_connections() { open_connections++; } void decrement_open_connections() { open_connections--; } - int get_open_connections() { return open_connections; } - int get_opened_connections() { return opened_connections; } + unsigned int get_open_connections() { return open_connections; } + uint64_t get_opened_connections() { return opened_connections; } void handle_connect(ev::io &watcher, int events_flags); ~connection_mother(); diff --git a/misc_functions.cpp b/misc_functions.cpp index 0e00bdf..7e6082f 100644 --- a/misc_functions.cpp +++ b/misc_functions.cpp @@ -9,9 +9,9 @@ long strtolong(const std::string& str) { return i; } -long long strtolonglong(const std::string& str) { +int64_t strtolonglong(const std::string& str) { std::istringstream stream (str); - long long i = 0; + int64_t i = 0; stream >> i; return i; } diff --git a/misc_functions.h b/misc_functions.h index 1b70a64..db91944 100644 --- a/misc_functions.h +++ b/misc_functions.h @@ -3,7 +3,7 @@ #include #include long strtolong(const std::string& str); -long long strtolonglong(const std::string& str); +int64_t strtolonglong(const std::string& str); std::string inttostr(int i); std::string hex_decode(const std::string &in); int timeval_subtract (timeval* result, timeval* x, timeval* y); diff --git a/ocelot.h b/ocelot.h index 0ceadea..6011ded 100644 --- a/ocelot.h +++ b/ocelot.h @@ -8,12 +8,13 @@ typedef struct { int userid; std::string peer_id; - std::string user_agent; +// std::string user_agent; std::string ip_port; std::string ip; unsigned int port; - long long uploaded; - long long downloaded; + int64_t uploaded; + int64_t downloaded; + int64_t corrupt; uint64_t left; time_t last_announced; time_t first_announced; @@ -27,7 +28,7 @@ enum freetype { NORMAL, FREE, NEUTRAL }; typedef struct { int id; time_t last_seeded; - long long balance; + int64_t balance; int completed; freetype free_torrent; std::map seeders; diff --git a/schedule.h b/schedule.h index 8bc70ff..7c87e9b 100644 --- a/schedule.h +++ b/schedule.h @@ -8,7 +8,7 @@ class schedule { worker * work; config * conf; mysql * db; - int last_opened_connections; + uint64_t last_opened_connections; int counter; time_t next_flush; diff --git a/worker.cpp b/worker.cpp index 9ab1509..7a37164 100644 --- a/worker.cpp +++ b/worker.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -202,12 +203,14 @@ std::string worker::announce(torrent &tor, user &u, std::map::const_iterator peer_id_iterator = params.find("peer_id"); @@ -230,11 +233,14 @@ std::string worker::announce(torrent &tor, user &u, std::map 0 || params["event"] == "completed") { + if(left > 0 || completed_torrent) { if(u.can_leech == false) { return error("Access denied, leeching forbidden"); } @@ -253,12 +259,25 @@ std::string worker::announce(torrent &tor, user &u, std::map insert - = tor.seeders.insert(std::pair(peer_id, new_peer)); - - p = &(insert.first->second); - inserted = true; + i = tor.leechers.find(peer_id); + if(i == tor.leechers.end()) { + peer new_peer; + std::pair insert + = tor.seeders.insert(std::pair(peer_id, new_peer)); + + p = &(insert.first->second); + inserted = true; + } else { + p = &i->second; + std::pair insert + = tor.seeders.insert(std::pair(peer_id, *p)); + tor.leechers.erase(peer_id); + if(downloaded > 0) { + std::cout << "Found unreported snatch from user " << u.id << " on torrent " << tor.id << std::endl; + } + p = &(insert.first->second); +// completed_torrent = true; // Not sure if we want to do this. Might cause massive spam for broken clients (e.g. µTorrent 3) + } } else { p = &i->second; } @@ -268,25 +287,30 @@ std::string worker::announce(torrent &tor, user &u, std::mapleft = left; - long long upspeed = 0; - long long downspeed = 0; - long long real_uploaded_change = 0; - long long real_downloaded_change = 0; + int64_t upspeed = 0; + int64_t downspeed = 0; + int64_t real_uploaded_change = 0; + int64_t real_downloaded_change = 0; - if(inserted || params["event"] == "started" || uploaded < p->uploaded || downloaded < p->downloaded) { + if(inserted || params["event"] == "started") { //New peer on this torrent update_torrent = true; p->userid = u.id; p->peer_id = peer_id; - p->user_agent = headers["user-agent"]; p->first_announced = cur_time; p->last_announced = 0; p->uploaded = uploaded; p->downloaded = downloaded; + p->corrupt = corrupt; p->announces = 1; + } else if(uploaded < p->uploaded || downloaded < p->downloaded) { + p->announces++; + p->uploaded = uploaded; + p->downloaded = downloaded; } else { - long long uploaded_change = 0; - long long downloaded_change = 0; + int64_t uploaded_change = 0; + int64_t downloaded_change = 0; + int64_t corrupt_change = 0; p->announces++; if(uploaded != p->uploaded) { @@ -299,11 +323,15 @@ std::string worker::announce(torrent &tor, user &u, std::mapdownloaded = downloaded; } + if(corrupt != p->corrupt) { + corrupt_change = corrupt - p->corrupt; + p->corrupt = corrupt; + tor.balance -= corrupt_change; + update_torrent = true; + } if(uploaded_change || downloaded_change) { - long corrupt = strtolong(params["corrupt"]); tor.balance += uploaded_change; tor.balance -= downloaded_change; - tor.balance -= corrupt; update_torrent = true; if(cur_time > p->last_announced) { @@ -396,7 +424,7 @@ std::string worker::announce(torrent &tor, user &u, std::mapsecond.userid == p->userid) //Don't add the peer to the peerlist if it's the same user + { + i++; + continue; + } peers.append(i->second.ip_port); found_peers++; tor.last_selected_seeder = i->second.peer_id; @@ -459,7 +494,7 @@ std::string worker::announce(torrent &tor, user &u, std::map 1) { for(peer_list::const_iterator i = tor.leechers.begin(); i != tor.leechers.end() && found_peers < numwant; i++) { - if(i->second.ip_port == p->ip_port) { // Don't show leechers themselves + if(i->second.ip_port == p->ip_port || i->second.userid == p->userid) { // Don't show users themselves continue; } found_peers++; @@ -469,6 +504,9 @@ std::string worker::announce(torrent &tor, user &u, std::map 0) { // User is a seeder, and we have leechers! for(peer_list::const_iterator i = tor.leechers.begin(); i != tor.leechers.end() && found_peers < numwant; i++) { + if(i->second.userid == p->userid) { // Don't show users themselves + continue; + } found_peers++; peers.append(i->second.ip_port); } @@ -485,12 +523,18 @@ std::string worker::announce(torrent &tor, user &u, std::mapfirst_announced) << ',' << p->announces << ','; + record << '(' << u.id << ',' << tor.id << ',' << active << ',' << uploaded << ',' << downloaded << ',' << upspeed << ',' << downspeed << ',' << left << ',' << corrupt << ',' << (cur_time - p->first_announced) << ',' << p->announces << ','; std::string record_str = record.str(); db->record_peer(record_str, ip, peer_id, headers["user-agent"]); - std::string response = "d8:intervali"; + std::string response = "d8:completei"; response.reserve(350); + response += inttostr(tor.seeders.size()); + response += "e10:downloadedi"; + response += inttostr(tor.completed); + response += "e10:incompletei"; + response += inttostr(tor.leechers.size()); + response += "e8:intervali"; response += inttostr(conf->announce_interval+std::min((size_t)600, tor.seeders.size())); // ensure a more even distribution of announces/second response += "e12:min intervali"; response += inttostr(conf->announce_interval); @@ -502,14 +546,7 @@ std::string worker::announce(torrent &tor, user &u, std::map