blob: 891f263c16ca767521ca286b3f1d17b84c670922 [file] [log] [blame]
#pragma once
#include <memory>
#include <thread>
#include <unordered_map>
#include <c10d/Store.hpp>
#include <c10d/Utils.hpp>
namespace c10d {
class TCPStoreDaemon {
public:
explicit TCPStoreDaemon(int storeListenSocket);
~TCPStoreDaemon();
void join();
protected:
void run();
void stop();
void query(int socket);
void setHandler(int socket);
void addHandler(int socket);
void getHandler(int socket) const;
void checkHandler(int socket) const;
void waitHandler(int socket);
bool checkKeys(const std::vector<std::string>& keys) const;
void wakeupWaitingClients(const std::string& key);
std::thread daemonThread_;
std::unordered_map<std::string, std::vector<uint8_t>> tcpStore_;
// From key -> the list of sockets waiting on it
std::unordered_map<std::string, std::vector<int>> waitingSockets_;
// From socket -> number of keys awaited
std::unordered_map<int, size_t> keysAwaited_;
std::vector<int> sockets_;
int storeListenSocket_;
std::vector<int> controlPipeFd_{-1, -1};
};
class TCPStore : public Store {
public:
explicit TCPStore(
const std::string& masterAddr,
PortType masterPort,
bool isServer = false);
virtual ~TCPStore();
void set(const std::string& key, const std::vector<uint8_t>& value) override;
std::vector<uint8_t> get(const std::string& key) override;
int64_t add(const std::string& key, int64_t value) override;
bool check(const std::vector<std::string>& keys) override;
void wait(const std::vector<std::string>& keys) override;
void wait(
const std::vector<std::string>& keys,
const std::chrono::milliseconds& timeout) override;
protected:
bool isServer_;
int storeSocket_ = -1;
int masterListenSocket_ = -1;
std::string tcpStoreAddr_;
PortType tcpStorePort_;
// Only needs to be launched as the server
std::unique_ptr<TCPStoreDaemon> tcpStoreDaemon_ = nullptr;
};
} // namespace c10d