diff --git a/Makefile b/Makefile index d36bfd9..9ad55e0 100644 --- a/Makefile +++ b/Makefile @@ -19,7 +19,7 @@ integration: test ${MOCHA} --timeout 5000 test/integration_test.js regression: integration - MAX_WORKERS=1 ${MOCHA} --timeout 200000 test/regression_test.js + ${MOCHA} --timeout 200000 test/regression_test.js run: src/fts/Porter2.js ${NODE} ./src/index.js ${MANIFEST_SOURCE} diff --git a/README.md b/README.md index 0a95ce1..2c4b72b 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ You will need Node.js v8.0 or later. ``` npm install -MAX_WORKERS=2 node ./src/index.js [MANIFEST_SOURCE] +node ./src/index.js [MANIFEST_SOURCE] ``` Marian will then read the manifest directory given in `MANIFEST_SOURCE`, and diff --git a/package-lock.json b/package-lock.json index b358636..541338a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1706,11 +1706,6 @@ "integrity": "sha1-DdTJ/6q8NXlgsbckEV1+Doai4fU=", "dev": true }, - "tiny-worker": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/tiny-worker/-/tiny-worker-2.1.2.tgz", - "integrity": "sha512-t8xrlrw0ScBnJ1K5ziHcD6u2SgWpE9Tozv4EIqpXMnCfEVc3pWzMx+ZFwqpXk20C4WTRoLZVBi9v1tLkaciCTg==" - }, "tmp": { "version": "0.0.33", "resolved": "https://registry.npmjs.org/tmp/-/tmp-0.0.33.tgz", diff --git a/package.json b/package.json index 8d0de22..966b86a 100644 --- a/package.json +++ b/package.json @@ -16,8 +16,7 @@ "dive": "^0.5.0", "iltorb": "^2.1.0", "mongodb-stitch": "^3.0.7", - "nspell": "^1.0.5", - "tiny-worker": "^2.1.2" + "nspell": "^1.0.5" }, "devDependencies": { "eslint": "^4.19.1", diff --git a/src/index.js b/src/index.js index ad8e353..43679a5 100755 --- a/src/index.js +++ b/src/index.js @@ -4,30 +4,22 @@ const { Buffer } = require('buffer') const fs = require('fs') const http = require('http') -const os = require('os') -const pathModule = require('path') const process = require('process') const url = require('url') const util = require('util') const zlib = require('zlib') -const Pool = require('./pool.js').Pool +const { Searcher } = require('./searcher.js') const dive = require('dive') const iltorb = require('iltorb') const Logger = require('basic-logger') const S3 = require('aws-sdk/clients/s3') const stitch = require('mongodb-stitch') -const Worker = require('tiny-worker') process.title = 'marian' const MAXIMUM_QUERY_LENGTH = 100 -// If a worker's backlog rises above this threshold, reject the request. -// This prevents the server from getting bogged down for unbounded periods of time. -const MAXIMUM_BACKLOG = 20 -const WARNING_BACKLOG = 15 - const STANDARD_HEADERS = { 'X-Content-Type-Options': 'nosniff' } @@ -81,82 +73,15 @@ function checkMethod(req, res, method) { return true } -/** A web worker with a promise-oriented message-call interface. */ -class TaskWorker { - /** - * Create a new TaskWorker. - * @param {string} scriptPath - A path to a JS file to execute. - */ - constructor(scriptPath) { - this.worker = new Worker(scriptPath) - this.worker.onmessage = this.onmessage.bind(this) - - this.backlog = 0 - this.pending = new Map() - this.messageId = 0 - } - - /** - * Send a message to this TaskWorker. - * @param {map} message - An object to send to the worker. - * @return {Promise} - */ - send(message) { - if (this.backlog > MAXIMUM_BACKLOG) { - throw new Error('backlog-exceeded') - } - - return new Promise((resolve, reject) => { - const messageId = this.messageId - this.messageId += 1 - this.backlog += 1 - - this.worker.postMessage({message: message, messageId: messageId}) - this.pending.set(messageId, [resolve, reject]) - }) - } - - /** - * Handler for messages received from the worker. - * @private - * @param {MessageEvent} event - * @return {Promise} - */ - onmessage(event) { - const pair = this.pending.get(event.data.messageId) - if (!pair) { - log.error(`Got unknown message ID ${event.data.messageId}`) - return - } - - this.backlog -= 1 - this.pending.delete(event.data.messageId) - const [resolve, reject] = pair - if (event.data.error) { - reject(new Error(event.data.error)) - return - } - - resolve(event.data) - } -} - class Index { constructor(manifestSource) { this.manifestSource = manifestSource this.manifests = [] + this.searcher = new Searcher() this.errors = [] this.lastSyncDate = null - this.currentlyIndexing = false - - const MAX_WORKERS = parseInt(process.env.MAX_WORKERS) || 2 - this.workers = new Pool(Math.min(os.cpus().length, MAX_WORKERS), () => new TaskWorker(pathModule.join(__dirname, 'worker-searcher.js'))) - - // Suspend all of our workers until we have an index - for (const worker of this.workers.pool) { - this.workers.suspend(worker) - } + this.rebuildRequests = [] } getStatus() { @@ -166,19 +91,13 @@ class Index { errors: this.errors, finished: this.lastSyncDate ? this.lastSyncDate.toISOString() : null }, - workers: this.workers.getStatus() + rebuildRequests: this.rebuildRequests } } search(queryString, searchProperty) { - const worker = this.workers.get() - const useHits = worker.backlog <= WARNING_BACKLOG - - return worker.send({search: { - queryString: queryString, - searchProperty: searchProperty, - useHits: useHits - }}).then((message) => message.results) + const searchProperties = (searchProperty || '').split(',').filter((x) => x) + return this.searcher.search(queryString, searchProperties) } async getManifestsFromS3(bucketName, prefix) { @@ -267,39 +186,35 @@ class Index { return manifests } - async load() { - if (this.currentlyIndexing) { - throw new Error('already-indexing') + load() { + this.rebuildRequests.push(new Date()) + if (this.rebuildRequests.length > 1) { + return } - this.currentlyIndexing = true - let manifests - try { - manifests = await this.getManifests() - } catch (err) { - this.currentlyIndexing = false - throw err - } + setImmediate(async () => { + while (this.rebuildRequests.length > 0) { + if (this.rebuildRequests.length > 1) { + this.rebuildRequests = this.rebuildRequests.slice(-1) + } - this.errors = [] - setTimeout(async () => { - for (const worker of this.workers.pool) { - this.workers.suspend(worker) + let manifests try { - await worker.send({sync: manifests}) - } finally { - this.workers.resume(worker) + manifests = await this.getManifests() + } catch (err) { + this.errors.push(err) + throw err } - // Ideally we would have a lastSyncDate per worker. + this.errors = [] + const enlapsedMs = await this.searcher.sync(manifests) this.lastSyncDate = new Date() - } + this.manifests = manifests.map((manifest) => manifest.searchProperty) + log.info(`Loaded new index in ${enlapsedMs}ms`) - this.currentlyIndexing = false - this.manifests = manifests.map((manifest) => manifest.searchProperty) - - log.info('Loaded new index') - }, 1) + this.rebuildRequests = this.rebuildRequests.slice(1) + } + }) } } @@ -322,10 +237,7 @@ class Marian { }) } - // Fire-and-forget loading - this.index.load().catch((err) => { - this.errors.push(err) - }) + this.index.load() } start(port) { @@ -370,31 +282,10 @@ class Marian { const headers = { 'Vary': 'Accept-Encoding' } - Object.assign(headers, STANDARD_HEADERS) - try { - await this.index.load() - } catch(err) { - headers['Content-Type'] = 'application/json' - const body = await compress(req, headers, JSON.stringify({'errors': [err]})) - - if (err.message === 'already-indexing') { - res.writeHead(503, headers) - } else { - res.writeHead(500, headers) - } - res.end(body) - return - } - - if (this.index.errors.length > 0) { - headers['Content-Type'] = 'application/json' - const body = await compress(req, headers, JSON.stringify({'errors': this.index.errors})) - res.writeHead(200, headers) - res.end(body) - return - } + Object.assign(headers, STANDARD_HEADERS) + this.index.load() res.writeHead(200, headers) res.end('') } @@ -411,12 +302,7 @@ class Marian { let body = JSON.stringify(status) body = await compress(req, headers, body) - // If all workers are overloaded, return 503 - let statusCode = 200 - if (status.workers.filter((n) => n <= WARNING_BACKLOG).length === 0) { - statusCode = 503 - } - + const statusCode = this.index.errors.length > 0 ? 500 : 200 res.writeHead(statusCode, headers) res.end(body) } diff --git a/src/pool.js b/src/pool.js deleted file mode 100644 index e1de882..0000000 --- a/src/pool.js +++ /dev/null @@ -1,59 +0,0 @@ -/** A balancing scheduling pool. Useful primarily for making a pool of TaskWorkers. */ -class Pool { - /** - * Create a new Pool. - * @param {number} size - The size of the pool. - * @param {function} f - A function returning a pool element. This element - * must have a "backlog" property representing its current load. - */ - constructor(size, f) { - if (this.size <= 0) { throw new Error('Bad pool size') } - - this.pool = [] - this.suspended = new Set() - for (let i = 0; i < size; i += 1) { - this.pool.push(f()) - } - } - - suspend(element) { - this.suspended.add(element) - } - - resume(element) { - this.suspended.delete(element) - } - - /** - * Return the least-loaded element of the pool. - * @return {?} The least-loaded element of the pool. - */ - get() { - const dummy = {backlog: Infinity} - let min = dummy - for (const element of this.pool) { - if (this.suspended.has(element)) { continue } - if (element.backlog < min.backlog) { - min = element - } - } - - if (dummy === min) { - throw new Error('pool-unavailable') - } - - return min - } - - getStatus() { - return this.pool.map((worker) => { - if (!this.suspended.has(worker)) { - return worker.backlog - } - - return 's' - }) - } -} - -exports.Pool = Pool diff --git a/src/searcher.js b/src/searcher.js new file mode 100644 index 0000000..e24f634 --- /dev/null +++ b/src/searcher.js @@ -0,0 +1,183 @@ +'use strict' + +const dictionary = require('dictionary-en-us') +const nspell = require('nspell') +const Query = require('./fts/Query.js').Query +const fts = require('./fts/fts.js') +const correlations = require('./correlations.js').correlations + +const MAXIMUM_TERMS = 10 + +function yieldToEventLoop() { + return new Promise(resolve => setImmediate(resolve)) +} + +class Searcher { + constructor() { + this.spelling = null + this.searchPropertyAliases = new Map() + this.index = null + this.documents = {} + } + + /** + * Search the index, and return results within the given searchProperty. + * @param {string} queryString The query string. + * @param {[string]} searchProperties The properties to search. If empty, all results are returned. + * @return {{results: [{title: String, preview: String, url: String}], spellingCorrections: Object}} + */ + search(queryString, searchProperties) { + if (!this.index) { + throw new Error('still-indexing') + } + + searchProperties = searchProperties.map((property) => { + if (this.searchPropertyAliases.has(property)) { + return this.searchPropertyAliases.get(property) + } + + return property + }) + + const parsedQuery = new Query(queryString) + if (parsedQuery.terms.size > MAXIMUM_TERMS) { + throw new Error('query-too-long') + } + + if (searchProperties.length) { + const properties = new Set(searchProperties) + parsedQuery.filter = (_id) => properties.has(this.documents[_id].searchProperty) + } else { + parsedQuery.filter = (_id) => this.documents[_id].includeInGlobalSearch === true + } + + let rawResults = this.index.search(parsedQuery, true) + + // If our results seem poor in quality, check if the query is misspelled + const misspelled = {} + if (this.spelling !== null && (rawResults.length === 0 || rawResults[0].score <= 0.6)) { + for (const term of parsedQuery.terms) { + const suggestions = this.spelling.suggest(term) + if (suggestions.length > 0) { + misspelled[term] = suggestions[0] + } + } + } + + rawResults = rawResults.map((match) => { + const doc = this.documents[match._id] + // console.log(doc.title, match.score, match.relevancyScore, match.authorityScore) + return { + title: doc.title, + preview: doc.preview, + url: doc.url + } + }) + + return { + results: rawResults, + spellingCorrections: misspelled + } + } + + setupSpellingDictionary(words) { + dictionary((err, dict) => { + if (err) { + console.error(err) + } + + const newWords = dict.dic.utf8Slice().split('\n').filter((w) => { + return words.has(w.split('/', 1)[0]) + }) + const newSpelling = nspell(dict.aff, newWords.join('\n')) + for (const word of words) { + newSpelling.add(word) + } + + this.spelling = newSpelling + }) + } + + async sync(manifests) { + const startTime = Date.now() + const newSearchPropertyAliases = new Map() + const newIndex = new fts.FTSIndex([ + ['text', 1], + ['headings', 5], + ['title', 10], + ['tags', 75], + ]) + + for (const [term, synonymn, weight] of correlations) { + newIndex.correlateWord(term, synonymn, weight) + } + + const newManifests = [] + for (const manifest of manifests) { + manifest.body = JSON.parse(manifest.body) + const url = manifest.body.url.replace(/\/+$/, '') + + for (const alias of (manifest.body.aliases || [])) { + newSearchPropertyAliases.set(alias, manifest.searchProperty) + } + + manifest.body.documents = manifest.body.documents.map((doc) => { + doc.slug = doc.slug.replace(/^\/+/, '') + doc.url = `${url}/${doc.slug}` + + return doc + }) + + newManifests.push({ + documents: manifest.body.documents, + searchProperty: manifest.searchProperty, + includeInGlobalSearch: manifest.body.includeInGlobalSearch + }) + + await yieldToEventLoop() + } + + const words = new Set() + const newDocuments = Object.create(null) + + let batchCharactersIndexed = 0 + for (const manifest of newManifests) { + for (const doc of manifest.documents) { + const weight = doc.weight || 1 + const id = newIndex.add({ + links: doc.links, + url: doc.url, + + weight: weight, + text: doc.text, + tags: doc.tags, + headings: (doc.headings || []).join(' '), + title: doc.title}, (word) => words.add(word)) + + newDocuments[id] = { + title: doc.title, + preview: doc.preview, + url: doc.url, + searchProperty: manifest.searchProperty, + includeInGlobalSearch: manifest.includeInGlobalSearch + } + + // Yield every 100,000 characters indexed to handle requests + batchCharactersIndexed += doc.text.length + if (batchCharactersIndexed > 100000) { + await yieldToEventLoop() + batchCharactersIndexed = 0 + } + } + } + + this.setupSpellingDictionary(words) + this.index = newIndex + this.searchPropertyAliases = newSearchPropertyAliases + this.documents = newDocuments + + return Date.now() - startTime + } +} + +exports.Searcher = Searcher diff --git a/src/worker-searcher.js b/src/worker-searcher.js deleted file mode 100644 index 7437f82..0000000 --- a/src/worker-searcher.js +++ /dev/null @@ -1,184 +0,0 @@ -'use strict' - -require('process').title = 'marian-worker' -const pathModule = require('path') - -const dictionary = require('dictionary-en-us') -const nspell = require('nspell') -const Query = require(pathModule.join(__dirname, './src/fts/Query.js')).Query -const fts = require(pathModule.join(__dirname, './src/fts/fts.js')) -const correlations = require(pathModule.join(__dirname, './src/correlations.js')).correlations - -const MAXIMUM_TERMS = 10 - -let spelling = null -let searchPropertyAliases = new Map() -let index = null -let documents = {} - -/** - * Search the index, and return results within the given searchProperty. - * @param {string} queryString The query string. - * @param {[string]} searchProperties The properties to search. If empty, all results are returned. - * @param {boolean} useHits True if HITS link analysis should be performed. - * @return {{results: [{title: String, preview: String, url: String}], spellingCorrections: Object}} - */ -function search(queryString, searchProperties, useHits) { - if (!index) { - throw new Error('still-indexing') - } - - searchProperties = searchProperties.map((property) => { - if (searchPropertyAliases.has(property)) { - return searchPropertyAliases.get(property) - } - - return property - }) - - const parsedQuery = new Query(queryString) - if (parsedQuery.terms.size > MAXIMUM_TERMS) { - throw new Error('query-too-long') - } - - if (searchProperties.length) { - const properties = new Set(searchProperties) - parsedQuery.filter = (_id) => properties.has(documents[_id].searchProperty) - } else { - parsedQuery.filter = (_id) => documents[_id].includeInGlobalSearch === true - } - - let rawResults = index.search(parsedQuery, useHits) - - // If our results seem poor in quality, check if the query is misspelled - const misspelled = {} - if (spelling !== null && (rawResults.length === 0 || rawResults[0].score <= 0.6)) { - for (const term of parsedQuery.terms) { - const suggestions = spelling.suggest(term) - if (suggestions.length > 0) { - misspelled[term] = suggestions[0] - } - } - } - - rawResults = rawResults.map((match) => { - const doc = documents[match._id] - // console.log(doc.title, match.score, match.relevancyScore, match.authorityScore) - return { - title: doc.title, - preview: doc.preview, - url: doc.url - } - }) - - return { - results: rawResults, - spellingCorrections: misspelled - } -} - -function setupSpellingDictionary(words) { - dictionary((err, dict) => { - if (err) { - console.error(err) - } - - const newWords = dict.dic.utf8Slice().split('\n').filter((w) => { - return words.has(w.split('/', 1)[0]) - }) - const newSpelling = nspell(dict.aff, newWords.join('\n')) - for (const word of words) { - newSpelling.add(word) - } - - spelling = newSpelling - }) -} - -function sync(manifests) { - const newSearchPropertyAliases = new Map() - const newIndex = new fts.FTSIndex([ - ['text', 1], - ['headings', 5], - ['title', 10], - ['tags', 75], - ]) - - for (const [term, synonymn, weight] of correlations) { - newIndex.correlateWord(term, synonymn, weight) - } - - manifests = manifests.map((manifest) => { - manifest.body = JSON.parse(manifest.body) - const url = manifest.body.url.replace(/\/+$/, '') - - for (const alias of (manifest.body.aliases || [])) { - newSearchPropertyAliases.set(alias, manifest.searchProperty) - } - - manifest.body.documents = manifest.body.documents.map((doc) => { - doc.slug = doc.slug.replace(/^\/+/, '') - doc.url = `${url}/${doc.slug}` - - return doc - }) - - return { - documents: manifest.body.documents, - searchProperty: manifest.searchProperty, - includeInGlobalSearch: manifest.body.includeInGlobalSearch - } - }) - - const words = new Set() - const newDocuments = Object.create(null) - - for (const manifest of manifests) { - for (const doc of manifest.documents) { - const weight = doc.weight || 1 - const id = newIndex.add({ - links: doc.links, - url: doc.url, - - weight: weight, - text: doc.text, - tags: doc.tags, - headings: (doc.headings || []).join(' '), - title: doc.title}, (word) => words.add(word)) - - newDocuments[id] = { - title: doc.title, - preview: doc.preview, - url: doc.url, - searchProperty: manifest.searchProperty, - includeInGlobalSearch: manifest.includeInGlobalSearch - } - } - } - - setupSpellingDictionary(words) - index = newIndex - searchPropertyAliases = newSearchPropertyAliases - documents = newDocuments -} - -self.onmessage = function(event) { - const message = event.data.message - const messageId = event.data.messageId - - try { - if (message.search !== undefined) { - const properties = (message.search.searchProperty || '').split(',').filter((x) => x) - - const results = search(message.search.queryString, properties, message.search.useHits) - self.postMessage({results: results, messageId: messageId}) - } else if (message.sync !== undefined) { - sync(message.sync) - self.postMessage({ok: true, messageId: messageId}) - } else { - throw new Error('Unknown command') - } - } catch (err) { - self.postMessage({error: err.message, messageId: messageId}) - } -} diff --git a/test/test_pool.js b/test/test_pool.js deleted file mode 100644 index d6455c6..0000000 --- a/test/test_pool.js +++ /dev/null @@ -1,51 +0,0 @@ -/* eslint-env node, mocha */ -'use strict' - -const assert = require('assert') -const Pool = require('../src/pool.js').Pool - -describe('Pool', () => { - let i = 0 - const pool = new Pool(3, () => { - i += 1 - return { - backlog: i, - i: i - } - }) - - it('Should be idempotent', () => { - assert.strictEqual(pool.get().i, 1) - assert.strictEqual(pool.get().i, 1) - }) - - it('Should select the unsuspended element with the smallest backlog', () => { - assert.deepStrictEqual(pool.getStatus(), [1, 2, 3]) - - pool.pool[0].backlog += 3 - const x = pool.get() - assert.strictEqual(x.i, 2) - pool.suspend(x) - assert.deepStrictEqual(pool.getStatus(), [4, 's', 3]) - assert.strictEqual(pool.get().i, 3) - pool.resume(x) - assert.deepStrictEqual(pool.getStatus(), [4, 2, 3]) - assert.strictEqual(pool.get().i, 2) - - pool.pool[0].backlog -= 2 - assert.strictEqual(pool.get().i, 1) - - pool.pool[2].backlog -= 2 - assert.strictEqual(pool.get().i, 3) - }) - - it('Should throw if no elements are available', () => { - for (const worker of pool.pool) { - pool.suspend(worker) - } - - assert.throws(() => { - pool.get() - }, /pool-unavailable/) - }) -})