-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.cpp
More file actions
195 lines (163 loc) · 5.21 KB
/
server.cpp
File metadata and controls
195 lines (163 loc) · 5.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#include <thrust/host_vector.h>
#include <thrust/device_vector.h>
#include <thrust/count.h>
#include <thrust/execution_policy.h>
#include <thrust/advance.h>
#include <vector>
#include <iterator>
#include <string>
#include "device_string.h"
#include "recordType.h"
#include "comparator.h"
#include "io.h"
#include <ctime>
#include <iostream>
#include <fstream>
#include <string>
#include <sys/stat.h>
/****
*
* cudaDB: 参考impala 架构, 利用GPU算力实现的一个极度阉割版SQL查询引擎.
*
* @author: fifteencai
*
* 基本结构:
*
*
* 前端: SQL 语法解析 + Restful Api (python)
*
*
* <<-- linux pipe -->>
*
*
*
* 后端: 执行计划 + 并行算子 + 元数据管理(todo) (c++ + cuda)
*
*
*
* 第三方库: thrust(cuda容器类), device_string(cuda 字符串操作), sqlparse (Sql语法解析)
*
*
* ***/
using namespace std;
/**scan算子***/
void scan(thrust::device_vector<recordType> &d_vec, thrust::device_vector<int> &d_scan_res, std::string key,
std::string value, std::string opr);
/**agg算子*/
long long aggregate(thrust::device_vector<int> &d_scan_res, int length);
int main() {
cout << "server init..." << endl;
// 读入文件
vector<recordType> fileContent = readFile(1000000);
vector<recordType> h_vec;
for (int i = 0; i < fileContent.size(); i++) {
h_vec.push_back(fileContent[i]);
}
int length = h_vec.size();
cout << "record count: " << h_vec.size() << endl;
thrust::device_vector<recordType> d_vec;
thrust::device_vector<int> d_scan_res;
cout << "pls wait..." << endl;
// reserve is a must!
d_vec.reserve(length);
d_scan_res.reserve(length);
// 写入显存
for (vector<recordType>::iterator iter = h_vec.begin(); iter != h_vec.end(); ++iter) {
d_vec.push_back(*iter);
}
clock_t begin;
clock_t end;
double elapsed_secs;
// 管道, 用于与Python通信
cout << "[READY]" << endl;
const char *fifo_name = "/home/adam/fifo";
const char *fifo2_name = "/home/adam/fifo_";
mknod(fifo_name, S_IFIFO | 0666, 0);
std::ifstream f;
// 监听管道中的消息
std::string line;
bool quit = false;
while (!quit) {
cout << "listening..." << endl;
f = std::ifstream(fifo_name);
cout << "Got msg" << endl;
try {
getline(f, line);
auto data_size = std::stoi(line);
std::cout << "Size: " << data_size << std::endl;
std::string data;
{
std::vector<char> buf(data_size);
f.read(buf.data(), data_size);
// write to vector data is valid since C++11
data.assign(buf.data(), buf.size());
}
if (!f.good()) {
std::cerr << "Read failed" << std::endl;
}
std::cout << "Data size: " << data.size() << " content: " << data << std::endl;
vector<string> strs;
boost::split(strs, data, boost::is_any_of("|"));
if (strs.size() != 3) {
// 目前支持3元关系
cout << "size not 3" << endl;
continue;
}
cout << strs[0] << strs[1] << strs[2] << endl;
begin = clock();
if (strs[0] == "break") {
// exit command recvd
quit = true;
continue;
}
// scan
scan(d_vec, d_scan_res, strs[0], strs[1], strs[2]);
mknod(fifo2_name, S_IFIFO | 0666, 0);
// aggregate
long long result = aggregate(d_scan_res, length);
// 回复结果数据
cout << "uin count is: " << result << endl;
end = clock();
elapsed_secs = double(end - begin) / CLOCKS_PER_SEC;
cout << "elapsed time: " << elapsed_secs << endl;
std::ofstream f2(fifo2_name);
f2 << result << endl;
f2 << elapsed_secs << endl;
}
catch (...) {
cout << "error" << endl;
continue;
}
}
return 0;
}
/**SCAN算子, 支持两类scan, value 为 string 或 value 为 Int*
*
* d_vec: GPU中的对象数组
* d_scan_res: GPU中的结果数组
*
*
* */
void scan(thrust::device_vector<recordType> &d_vec, thrust::device_vector<int> &d_scan_res, std::string key,
std::string value, std::string opr) {
if (key == "age") {
thrust::transform(thrust::device, d_vec.begin(), d_vec.end(), d_scan_res.begin(),
ageFilter(device_string(key), stoi(value), device_string(opr)));
} else {
bool shouldEqual = opr == "=";
thrust::transform(thrust::device, d_vec.begin(), d_vec.end(), d_scan_res.begin(),
stringFilter(device_string(key), device_string(value), shouldEqual));
}
}
/**AGG算子, 统计结果的行数
*
* d_scan_res: GPU中的结果数组
* length: 结果数组长度
*
* */
long long aggregate(thrust::device_vector<int> &d_scan_res, int length) {
long long result;
result = thrust::count_if(d_scan_res.begin(), d_scan_res.begin() + length, is_odd());
cout << "uin count is: " << result << endl;
return result;
}