-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlargefilesserver.cpp
More file actions
100 lines (84 loc) · 3.17 KB
/
largefilesserver.cpp
File metadata and controls
100 lines (84 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <iostream>
#include <memory>
#include <string>
#include "absl/flags/flag.h"
#include "absl/flags/parse.h"
#include "absl/log/initialize.h"
#include "absl/strings/str_format.h"
#include "largefiles.grpc.pb.h"
using grpc::Server;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
using largefiles::FileWriter;
using largefiles::Chunk;
using largefiles::FileStreamBlock;
using largefiles::FileUploadResponse;
ABSL_FLAG(uint16_t, port, 50051, "Server port for the service");
ABSL_FLAG(size_t, quota, 20, "Resource quota, in megabytes");
// Server reactor that is slow to read incoming messages, causing the buffers
// to fill.
class SlowReadingBidiReactor final
: public grpc::ServerReadReactor<largefiles::FileStreamBlock> {
public:
SlowReadingBidiReactor(largefiles::FileUploadResponse* res)
: res_(res) {
StartRead(&req_);
}
void OnReadDone(bool ok) override {
if (req_.metadata().populated()) {
std::cout << "Received new file to upload " << req_.metadata().name() << std::endl;
}
if (!ok) {
Finish(grpc::Status::OK);
return;
}
readCount_ += req_.filedata().chunk().size();
std::cout << "Received file chunk " << req_.filedata().chunk().size() << std::endl;
usleep(100000); // Simulate slow write to database
StartRead(&req_);
}
void OnDone() override {
res_->set_readsize(readCount_);
std::cout << "Uploaded file of total size " << readCount_ << std::endl;
delete this;
}
private:
largefiles::FileUploadResponse* res_;
largefiles::FileStreamBlock req_;
uint64_t readCount_{0};
};
class FileWriterServiceImpl final : public largefiles::FileWriter::CallbackService {
grpc::ServerReadReactor<largefiles::FileStreamBlock>* UploadFile(grpc::CallbackServerContext* /* context */, ::largefiles::FileUploadResponse* response) override {
return new SlowReadingBidiReactor(response);
}
};
void RunServer(uint16_t port) {
std::string server_address = absl::StrFormat("0.0.0.0:%d", port);
FileWriterServiceImpl service;
grpc::EnableDefaultHealthCheckService(true);
grpc::reflection::InitProtoReflectionServerBuilderPlugin();
ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
// Register "service" as the instance through which we'll communicate with
// clients. In this case it corresponds to an *synchronous* service.
builder.RegisterService(&service);
grpc::ResourceQuota quota;
quota.Resize(absl::GetFlag(FLAGS_quota) * 1024 * 1024);
// Finally assemble the server.
std::unique_ptr<Server> server(builder.BuildAndStart());
std::cout << "Server listening on " << server_address << std::endl;
// Wait for the server to shutdown. Note that some other thread must be
// responsible for shutting down the server for this call to ever return.
server->Wait();
}
int main(int argc, char** argv) {
absl::ParseCommandLine(argc, argv);
absl::InitializeLog();
RunServer(absl::GetFlag(FLAGS_port));
return 0;
}