Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ integration: test
${MOCHA} --timeout 5000 test/integration_test.js

regression: integration
MAX_WORKERS=1 ${MOCHA} --timeout 200000 test/regression_test.js
${MOCHA} --timeout 200000 test/regression_test.js

run: src/fts/Porter2.js
${NODE} ./src/index.js ${MANIFEST_SOURCE}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ You will need Node.js v8.0 or later.

```
npm install
MAX_WORKERS=2 node ./src/index.js [MANIFEST_SOURCE]
node ./src/index.js [MANIFEST_SOURCE]
```

Marian will then read the manifest directory given in `MANIFEST_SOURCE`, and
Expand Down
5 changes: 0 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
"dive": "^0.5.0",
"iltorb": "^2.1.0",
"mongodb-stitch": "^3.0.7",
"nspell": "^1.0.5",
"tiny-worker": "^2.1.2"
"nspell": "^1.0.5"
},
"devDependencies": {
"eslint": "^4.19.1",
Expand Down
176 changes: 31 additions & 145 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,22 @@
const { Buffer } = require('buffer')
const fs = require('fs')
const http = require('http')
const os = require('os')
const pathModule = require('path')
const process = require('process')
const url = require('url')
const util = require('util')
const zlib = require('zlib')

const Pool = require('./pool.js').Pool
const { Searcher } = require('./searcher.js')
const dive = require('dive')
const iltorb = require('iltorb')
const Logger = require('basic-logger')
const S3 = require('aws-sdk/clients/s3')
const stitch = require('mongodb-stitch')
const Worker = require('tiny-worker')

process.title = 'marian'

const MAXIMUM_QUERY_LENGTH = 100

// If a worker's backlog rises above this threshold, reject the request.
// This prevents the server from getting bogged down for unbounded periods of time.
const MAXIMUM_BACKLOG = 20
const WARNING_BACKLOG = 15

const STANDARD_HEADERS = {
'X-Content-Type-Options': 'nosniff'
}
Expand Down Expand Up @@ -81,82 +73,15 @@ function checkMethod(req, res, method) {
return true
}

/** A web worker with a promise-oriented message-call interface. */
class TaskWorker {
/**
* Create a new TaskWorker.
* @param {string} scriptPath - A path to a JS file to execute.
*/
constructor(scriptPath) {
this.worker = new Worker(scriptPath)
this.worker.onmessage = this.onmessage.bind(this)

this.backlog = 0
this.pending = new Map()
this.messageId = 0
}

/**
* Send a message to this TaskWorker.
* @param {map} message - An object to send to the worker.
* @return {Promise}
*/
send(message) {
if (this.backlog > MAXIMUM_BACKLOG) {
throw new Error('backlog-exceeded')
}

return new Promise((resolve, reject) => {
const messageId = this.messageId
this.messageId += 1
this.backlog += 1

this.worker.postMessage({message: message, messageId: messageId})
this.pending.set(messageId, [resolve, reject])
})
}

/**
* Handler for messages received from the worker.
* @private
* @param {MessageEvent} event
* @return {Promise<?, Error>}
*/
onmessage(event) {
const pair = this.pending.get(event.data.messageId)
if (!pair) {
log.error(`Got unknown message ID ${event.data.messageId}`)
return
}

this.backlog -= 1
this.pending.delete(event.data.messageId)
const [resolve, reject] = pair
if (event.data.error) {
reject(new Error(event.data.error))
return
}

resolve(event.data)
}
}

class Index {
constructor(manifestSource) {
this.manifestSource = manifestSource
this.manifests = []
this.searcher = new Searcher()
this.errors = []

this.lastSyncDate = null
this.currentlyIndexing = false

const MAX_WORKERS = parseInt(process.env.MAX_WORKERS) || 2
this.workers = new Pool(Math.min(os.cpus().length, MAX_WORKERS), () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js')))

// Suspend all of our workers until we have an index
for (const worker of this.workers.pool) {
this.workers.suspend(worker)
}
this.rebuildRequests = []
}

getStatus() {
Expand All @@ -166,19 +91,13 @@ class Index {
errors: this.errors,
finished: this.lastSyncDate ? this.lastSyncDate.toISOString() : null
},
workers: this.workers.getStatus()
rebuildRequests: this.rebuildRequests
}
}

search(queryString, searchProperty) {
const worker = this.workers.get()
const useHits = worker.backlog <= WARNING_BACKLOG

return worker.send({search: {
queryString: queryString,
searchProperty: searchProperty,
useHits: useHits
}}).then((message) => message.results)
const searchProperties = (searchProperty || '').split(',').filter((x) => x)
return this.searcher.search(queryString, searchProperties)
}

async getManifestsFromS3(bucketName, prefix) {
Expand Down Expand Up @@ -267,39 +186,35 @@ class Index {
return manifests
}

async load() {
if (this.currentlyIndexing) {
throw new Error('already-indexing')
load() {
this.rebuildRequests.push(new Date())
if (this.rebuildRequests.length > 1) {
return
}
this.currentlyIndexing = true

let manifests
try {
manifests = await this.getManifests()
} catch (err) {
this.currentlyIndexing = false
throw err
}
setImmediate(async () => {
while (this.rebuildRequests.length > 0) {
if (this.rebuildRequests.length > 1) {
this.rebuildRequests = this.rebuildRequests.slice(-1)
}

this.errors = []
setTimeout(async () => {
for (const worker of this.workers.pool) {
this.workers.suspend(worker)
let manifests
try {
await worker.send({sync: manifests})
} finally {
this.workers.resume(worker)
manifests = await this.getManifests()
} catch (err) {
this.errors.push(err)
throw err
}

// Ideally we would have a lastSyncDate per worker.
this.errors = []
const enlapsedMs = await this.searcher.sync(manifests)
this.lastSyncDate = new Date()
}
this.manifests = manifests.map((manifest) => manifest.searchProperty)
log.info(`Loaded new index in ${enlapsedMs}ms`)

this.currentlyIndexing = false
this.manifests = manifests.map((manifest) => manifest.searchProperty)

log.info('Loaded new index')
}, 1)
this.rebuildRequests = this.rebuildRequests.slice(1)
}
})
}
}

Expand All @@ -322,10 +237,7 @@ class Marian {
})
}

// Fire-and-forget loading
this.index.load().catch((err) => {
this.errors.push(err)
})
this.index.load()
}

start(port) {
Expand Down Expand Up @@ -370,31 +282,10 @@ class Marian {
const headers = {
'Vary': 'Accept-Encoding'
}
Object.assign(headers, STANDARD_HEADERS)

try {
await this.index.load()
} catch(err) {
headers['Content-Type'] = 'application/json'
const body = await compress(req, headers, JSON.stringify({'errors': [err]}))

if (err.message === 'already-indexing') {
res.writeHead(503, headers)
} else {
res.writeHead(500, headers)
}
res.end(body)
return
}

if (this.index.errors.length > 0) {
headers['Content-Type'] = 'application/json'
const body = await compress(req, headers, JSON.stringify({'errors': this.index.errors}))
res.writeHead(200, headers)
res.end(body)
return
}
Object.assign(headers, STANDARD_HEADERS)

this.index.load()
res.writeHead(200, headers)
res.end('')
}
Expand All @@ -411,12 +302,7 @@ class Marian {
let body = JSON.stringify(status)
body = await compress(req, headers, body)

// If all workers are overloaded, return 503
let statusCode = 200
if (status.workers.filter((n) => n <= WARNING_BACKLOG).length === 0) {
statusCode = 503
}

const statusCode = this.index.errors.length > 0 ? 500 : 200
res.writeHead(statusCode, headers)
res.end(body)
}
Expand Down
59 changes: 0 additions & 59 deletions src/pool.js

This file was deleted.

Loading