-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSimulator.cpp
More file actions
344 lines (285 loc) · 12.8 KB
/
Simulator.cpp
File metadata and controls
344 lines (285 loc) · 12.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#include "Simulator.h"
#include "Settings.h"
#include "TermPack.h"
#include "Landlord.h"
#include <iostream>
#include <chrono>
#include <sstream>
#include <iomanip>
namespace IndexUpdate {
void auxRebuildCPrice(std::vector<double> &consolidationPriceVector, const std::vector<size_t>& sizeStack,
const Settings &settings, double stopForTokens);
struct SimulateCache {
Caching::Landlord cache;
std::vector<std::pair<unsigned, unsigned> > termRanges;
std::vector<unsigned> currentPostions;
explicit SimulateCache(uint64_t cacheSz):
cache(cacheSz) {}
void init(const std::vector<TermPack>& tpacks) {
unsigned first = 0;
currentPostions.resize(tpacks.size(),0);
for(auto it = tpacks.begin(); it != tpacks.end(); ++it) {
termRanges.push_back({first,first+it->members()});
first = termRanges.back().second+1;
}
}
bool visit(unsigned id, uint64_t currentLength) {
auto range = termRanges[id];
auto term = range.first + currentPostions[id];
currentPostions[id] = (currentPostions[id]+1) % (range.second-range.first);
return cache.visit(term,currentLength);
}
};
class SimulatorIMP {
const Settings settings;
uint64_t totalSeenPostings;
uint64_t postingsInUpdateBuffer;
uint64_t lastQueryAtPostings;
unsigned queriesStoppedAt;
uint64_t totalQs;
unsigned evictions;
ReadIO totalQueryReads;
ConsolidationStats merges;
std::vector<TermPack> tpacks;
std::vector<uint64_t> monolithicSegments;
SimulateCache cache;
public:
SimulatorIMP(const Settings &s);
~SimulatorIMP() { }
const SimulatorIMP& execute(Algorithm alg);
void init();
bool finished() const;
bool bufferFull() const;
void handleQueries();
void fillUpdateBuffer();
void evictFromUpdateBuffer(Algorithm alg);
void evictMonoliths(Algorithm alg);
void evictTPacks(Algorithm alg);
ConsolidationStats consolidateTPSki(TermPack& tp) {
auto segments = tp.segments();
ConsolidationStats nil;
if(segments.size()<2) {
nil += WriteIO(segments.back(), 0); //0 since we write all non-consolidants together
return nil;
}
double tokens = tp.convertSeeksToTokens( //exchange seeks for tokens
costIoInMinutes(ReadIO(0,tp.extraSeeks()),
settings.ioMBS, settings.ioSeek, settings.szOfPostingBytes));
std::vector<double> consolidationPriceVector;
auxRebuildCPrice(consolidationPriceVector, segments, settings,tokens);
auto i = int(consolidationPriceVector.size()-1);
while(i>=0 && tokens >= consolidationPriceVector[size_t(i)])
--i;
unsigned offset = i+1;
assert(offset<=segments.size());
if(offset<segments.size()-1) {
auto cons = consolidateSegments(tp.unsafeGetSegments(), offset);
tp.reduceTokens(ConsolidationStats::costInMinutes(cons,
settings.ioMBS, settings.ioSeek, settings.szOfPostingBytes));
return cons;
}
nil += WriteIO(segments.back(),0); //0 since we write all non-consolidants together
return nil;
}
ConsolidationStats consolidateTPStatic(TermPack& tp) {
auto &segments = tp.unsafeGetSegments();
auto offset = offsetOfTelescopicMerge(segments);
assert(offset<=monolithicSegments.size());
if(offset<monolithicSegments.size()-1)
return consolidateSegments(segments, offset);
ConsolidationStats nil;
nil += WriteIO(segments.back(),1);
return nil;
}
std::string report(Algorithm alg) const;
double getTotalQTime() const;
double allTimes() const;
double getMergeTimes() const;
};
double Simulator::simulateOne(Algorithm alg, const Settings & settings) {
return SimulatorIMP(settings).execute(alg).allTimes();
}
std::vector<std::string> Simulator::simulate(const std::vector<Algorithm>& algs, const Settings &settings) {
//auto start = std::chrono::system_clock::now();
std::vector<std::string> reports;
for(auto alg : algs)
reports.emplace_back(
SimulatorIMP(settings).execute(alg).report(alg));
//auto end = std::chrono::system_clock::now();
//std::cerr << std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count() << "ms\n";
return reports;
}
SimulatorIMP::SimulatorIMP(const Settings &s) :
settings(s),
totalSeenPostings(0),
postingsInUpdateBuffer(0),
lastQueryAtPostings(0),
queriesStoppedAt(0),
totalQs(0),
evictions(0),
cache(s.cacheSizePostings)
{ }
const SimulatorIMP& SimulatorIMP::execute(Algorithm alg) {
try {
init();
while (!finished()) {
fillUpdateBuffer();
handleQueries();
evictFromUpdateBuffer(alg);
//std::cout << totalSeenPostings << "\n";
}
}
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << std::endl;
}
return *this;
}
void SimulatorIMP::init() {
assert(settings.updatesQuant > 9999); //no point to make it too small
assert(settings.totalExperimentPostings * settings.quieriesQuant);
assert(settings.updateBufferPostingsLimit > settings.updatesQuant);
assert(settings.tpQueries.size() == settings.tpUpdates.size());
assert(settings.tpQueries.size() == settings.tpMembers.size());
totalSeenPostings = postingsInUpdateBuffer =
evictions = totalQs =
queriesStoppedAt = lastQueryAtPostings = 0;
totalQueryReads = ReadIO();
auto updates = settings.tpUpdates.begin();
auto queries = settings.tpQueries.begin();
auto members = settings.tpMembers.begin();
//init term packs
tpacks.clear();
for(unsigned i=0; updates != settings.tpUpdates.end();
++updates, ++queries, ++members, ++i) {
tpacks.emplace_back(TermPack(i,*members, *updates, *queries));
}
TermPack::normalizeUpdates(tpacks);
cache.init(tpacks);
}
double SimulatorIMP::allTimes() const { return getTotalQTime()+getMergeTimes(); }
std::string SimulatorIMP::report(Algorithm alg) const{
double totalQueryTime = getTotalQTime();
double mergeTimes = getMergeTimes();
std::stringstream strstr;
strstr <<
Settings::name(alg) << " " << (settings.diskType==HD?"HD":"SSD") <<
" " << settings.flags[0] << "--" << settings.flags[1] <<
" Evictions: " << std::setw(5) << evictions <<
" Total-seen-postings: " << totalSeenPostings <<
" Total-queries: " << totalQs <<
" Query-reads: " << totalQueryReads <<
" Consolidation: " << merges <<
" Total-query-minutes: " << totalQueryTime <<
" Total-merge-minutes: " << mergeTimes <<
" Sum-All: " << totalQueryTime+mergeTimes <<
std::endl;
return strstr.str();
}
double SimulatorIMP::getMergeTimes() const {
auto mergeTimes = ConsolidationStats::costInMinutes(merges,
settings.ioMBS, settings.ioSeek, settings.szOfPostingBytes);
return mergeTimes;
}
double SimulatorIMP::getTotalQTime() const {
auto totalQueryTime = costIoInMinutes(totalQueryReads,
settings.ioMBS, settings.ioSeek, settings.szOfPostingBytes);
return totalQueryTime;
}
void SimulatorIMP::handleQueries() {
auto total = totalSeenPostings+postingsInUpdateBuffer;
if(total >= lastQueryAtPostings + settings.updatesQuant) {
auto totalNew = total - lastQueryAtPostings;
auto carry = totalNew % settings.updatesQuant;
assert(total > carry);
lastQueryAtPostings = total - carry;
//ask your quant of queries
auto quant = settings.quieriesQuant * (totalNew / settings.updatesQuant);
assert(quant);
totalQs += quant;
//currently RoundRobin -- may replace with a discrete distribution
for(; quant != 0; --quant, queriesStoppedAt = (queriesStoppedAt+1)%tpacks.size()) {
bool isMiss = ! cache.visit(queriesStoppedAt,tpacks[queriesStoppedAt].meanDiskLength());
if(isMiss)
totalQueryReads += tpacks[queriesStoppedAt].query();
}
}
}
void SimulatorIMP::fillUpdateBuffer() {
while (!bufferFull()) {
for(auto& tp : tpacks) //round robin
postingsInUpdateBuffer += tp.addUBPostings();
if(finished())
break;
}
}
void SimulatorIMP::evictMonoliths(Algorithm alg) {
for(auto& tp : tpacks) {
auto newPostings = tp.evictAll();
auto& segments = tp.unsafeGetSegments();
segments.clear();
segments.push_back(newPostings);
}
//consolidation cost is calc. here...
monolithicSegments.push_back(postingsInUpdateBuffer);
totalSeenPostings += postingsInUpdateBuffer;
postingsInUpdateBuffer = 0;
auto offset = (LogMerge == alg) ? offsetOfTelescopicMerge(monolithicSegments) :
(monolithicSegments.size() > 1 ? 0 : 1);
//override for NeverMerge
if(NeverMerge == alg) offset = monolithicSegments.size()-1;
assert(offset<=monolithicSegments.size());
if(offset<monolithicSegments.size()-1)
merges += consolidateSegments(monolithicSegments, offset);
else
merges += WriteIO(monolithicSegments.back(),1);
//fix segment sizes for tpacks (this how we know during queries how many seeks to make)
unsigned currentSzAll = monolithicSegments.size();
for(auto& tp : tpacks)
tp.unsafeGetSegments().resize(currentSzAll);
}
void SimulatorIMP::evictTPacks(Algorithm alg) {
uint64_t desiredCapacity = settings.percentsUBLeft * settings.updateBufferPostingsLimit / 100;
//we evict castes with larger ID first
for(auto it = tpacks.rbegin(); postingsInUpdateBuffer > desiredCapacity && it !=tpacks.rend(); ++it) {
TermPack& tp = *it;
auto newPostings = tp.evictAll();
tp.unsafeGetSegments().push_back(newPostings);
totalSeenPostings += newPostings;
postingsInUpdateBuffer -= newPostings;
merges += consolidateTPSki(tp); //consolidateTPStatic(tp);
}
assert(postingsInUpdateBuffer <= desiredCapacity);
}
void SimulatorIMP::evictFromUpdateBuffer(Algorithm alg) {
++evictions;
if(alg != SkiBased && alg != Prognosticator)
evictMonoliths(alg);
else
evictTPacks(alg);
//std::cout << totalSeenPostings << std::endl;
}
bool SimulatorIMP::bufferFull() const {
return postingsInUpdateBuffer >= settings.updateBufferPostingsLimit;
}
bool SimulatorIMP::finished() const {
return totalSeenPostings + postingsInUpdateBuffer >= settings.totalExperimentPostings;
}
void auxRebuildCPrice(std::vector<double> &consolidationPriceVector, const std::vector<size_t>& sizeStack,
const Settings &settings, double stopForTokens) {
const int sz = sizeStack.size();
assert(sz >= 2);
consolidationPriceVector.clear();
consolidationPriceVector.resize(sz,std::numeric_limits<float>::max());
for(int i = 2; i <=sz; ++i) {
auto cons = kWayConsolidate(sizeStack.begin()+(sz-i),sizeStack.end());
consolidationPriceVector[sz-i] =
ConsolidationStats::costInMinutes(cons, settings.ioMBS, settings.ioSeek, settings.szOfPostingBytes);
if(consolidationPriceVector[sz-i] > stopForTokens)
break; //no point to calculate the other ones
}
consolidationPriceVector[sz-1] = //no real consolidation - just a write-back of the last one
ConsolidationStats::costInMinutes(
ConsolidationStats(0,0,sizeStack.back(),1),
settings.ioMBS, settings.ioSeek, settings.szOfPostingBytes);
}
}