Skip to content

Commit 096f46a

Browse files
authored
Merge pull request #27 from rsscloud/feat/dual-write-json-store
feat: add dual-write JSON file store alongside MongoDB
2 parents d511dd2 + b00f6dd commit 096f46a

12 files changed

Lines changed: 142 additions & 26 deletions

app.js

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const config = require('./config'),
55
express = require('express'),
66
exphbs = require('express-handlebars'),
77
getDayjs = require('./services/dayjs-wrapper'),
8+
jsonStore = require('./services/json-store'),
89
mongodb = require('./services/mongodb'),
910
morgan = require('morgan'),
1011
removeExpiredSubscriptions = require('./services/remove-expired-subscriptions'),
@@ -67,10 +68,38 @@ app.use(express.static('public', {
6768
app.use(require('./controllers'));
6869

6970
// Start server
71+
async function seedJsonStore() {
72+
const db = mongodb.get('rsscloud');
73+
const resources = await db.collection('resources').find({}).toArray();
74+
const subscriptions = await db.collection('subscriptions').find({}).toArray();
75+
76+
for (const resource of resources) {
77+
jsonStore.setResource(resource._id, resource);
78+
}
79+
80+
for (const sub of subscriptions) {
81+
jsonStore.setSubscriptions(sub._id, sub.pleaseNotify || []);
82+
}
83+
84+
jsonStore.flush();
85+
}
86+
87+
async function gracefulShutdown() {
88+
jsonStore.shutdown();
89+
await mongodb.closeAll();
90+
process.exit();
91+
}
92+
93+
process.on('SIGINT', gracefulShutdown);
94+
process.on('SIGTERM', gracefulShutdown);
95+
7096
async function startServer() {
7197
await initializeDayjs();
7298
await mongodb.connect('rsscloud', config.mongodbUri);
7399

100+
jsonStore.initialize(config.dataFilePath);
101+
await seedJsonStore();
102+
74103
// Start cleanup scheduling
75104
scheduleCleanupTasks();
76105

config.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ module.exports = {
2121
maxResourceSize: getNumericConfig('MAX_RESOURCE_SIZE', 256000),
2222
ctSecsResourceExpire: getNumericConfig('CT_SECS_RESOURCE_EXPIRE', 90000),
2323
minSecsBetweenPings: getNumericConfig('MIN_SECS_BETWEEN_PINGS', 0),
24-
requestTimeout: getNumericConfig('REQUEST_TIMEOUT', 4000)
24+
requestTimeout: getNumericConfig('REQUEST_TIMEOUT', 4000),
25+
dataFilePath: getConfig('DATA_FILE_PATH', './data/subscriptions.json')
2526
};

controllers/index.js

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
const express = require('express'),
2+
jsonStore = require('../services/json-store'),
23
router = new express.Router();
34

45
router.use('/', require('./home'));
@@ -10,4 +11,9 @@ router.use('/pingForm', require('./ping-form'));
1011
router.use('/viewLog', require('./view-log'));
1112
router.use('/RPC2', require('./rpc2'));
1213

14+
router.get('/subscriptions.json', (req, res) => {
15+
res.set('Content-Type', 'application/json');
16+
res.send(JSON.stringify(jsonStore.getData(), null, 2));
17+
});
18+
1319
module.exports = router;

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
"description": "An rssCloud Server",
55
"main": "app.js",
66
"scripts": {
7-
"start": "nodemon --use_strict ./app.js",
8-
"client": "nodemon --use_strict ./client.js",
7+
"start": "nodemon --use_strict --ignore data/ ./app.js",
8+
"client": "nodemon --use_strict --ignore data/ ./client.js",
99
"import-data": "node ./bin/import-data.js",
1010
"lint": "eslint --fix controllers/ services/ test/ *.js",
1111
"format": "prettier --write .",

services/init-resource.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ const getDayjs = require('./dayjs-wrapper');
33
async function initResource(resource) {
44
const dayjs = await getDayjs();
55
const defaultResource = {
6-
flDirty: true,
76
lastSize: 0,
87
lastHash: '',
98
ctChecks: 0,

services/json-store.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
const fs = require('fs');
2+
const path = require('path');
3+
4+
let data = {};
5+
let filePath = null;
6+
let flushTimer = null;
7+
let flushing = false;
8+
9+
function initialize(dataFilePath, flushIntervalMs = 60000) {
10+
filePath = dataFilePath;
11+
12+
const dir = path.dirname(filePath);
13+
fs.mkdirSync(dir, { recursive: true });
14+
15+
if (fs.existsSync(filePath)) {
16+
try {
17+
data = JSON.parse(fs.readFileSync(filePath, 'utf8'));
18+
} catch {
19+
data = {};
20+
}
21+
}
22+
23+
flushTimer = setInterval(() => flush(), flushIntervalMs);
24+
}
25+
26+
function setResource(feedUrl, resourceObj) {
27+
if (!data[feedUrl]) {
28+
data[feedUrl] = { resource: {}, subscribers: [] };
29+
}
30+
const clean = Object.assign({}, resourceObj);
31+
delete clean._id;
32+
data[feedUrl].resource = clean;
33+
}
34+
35+
function setSubscriptions(feedUrl, pleaseNotifyArray) {
36+
if (!data[feedUrl]) {
37+
data[feedUrl] = { resource: {}, subscribers: [] };
38+
}
39+
data[feedUrl].subscribers = pleaseNotifyArray.map(sub => {
40+
const clean = Object.assign({}, sub);
41+
delete clean._id;
42+
return clean;
43+
});
44+
}
45+
46+
function getData() {
47+
return data;
48+
}
49+
50+
function clear() {
51+
data = {};
52+
}
53+
54+
function flush() {
55+
if (!filePath || flushing) {
56+
return;
57+
}
58+
flushing = true;
59+
try {
60+
const tmpPath = filePath + '.tmp';
61+
fs.writeFileSync(tmpPath, JSON.stringify(data, null, 2));
62+
fs.renameSync(tmpPath, filePath);
63+
} finally {
64+
flushing = false;
65+
}
66+
}
67+
68+
function shutdown() {
69+
if (flushTimer) {
70+
clearInterval(flushTimer);
71+
flushTimer = null;
72+
}
73+
flush();
74+
}
75+
76+
module.exports = {
77+
initialize,
78+
setResource,
79+
setSubscriptions,
80+
getData,
81+
clear,
82+
flush,
83+
shutdown
84+
};

services/mongodb.js

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,6 @@ async function closeAll() {
3232
await Promise.all(Object.keys(state).map(close));
3333
}
3434

35-
function cleanup() {
36-
closeAll()
37-
.finally(() => {
38-
process.exit();
39-
});
40-
}
41-
42-
process.on('SIGINT', cleanup);
43-
process.on('SIGTERM', cleanup);
44-
4535
module.exports = {
4636
connect,
4737
get,

services/notify-subscribers.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const config = require('../config'),
22
getDayjs = require('./dayjs-wrapper'),
3+
jsonStore = require('./json-store'),
34
logEvent = require('./log-event'),
45
mongodb = require('./mongodb'),
56
notifyOne = require('./notify-one');
@@ -22,6 +23,7 @@ async function upsertSubscriptions(subscriptions) {
2223
subscriptions,
2324
{ upsert: true }
2425
);
26+
jsonStore.setSubscriptions(subscriptions._id, subscriptions.pleaseNotify);
2527
}
2628

2729
async function notifyOneSubscriber(resourceUrl, subscription) {

services/ping.js

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const appMessage = require('./app-messages'),
44
ErrorResponse = require('./error-response'),
55
getDayjs = require('./dayjs-wrapper'),
66
initResource = require('./init-resource'),
7+
jsonStore = require('./json-store'),
78
logEvent = require('./log-event'),
89
mongodb = require('./mongodb'),
910
notifySubscribers = require('./notify-subscribers');
@@ -56,13 +57,7 @@ async function checkForResourceChange(resource, resourceUrl, startticks) {
5657

5758
const hash = md5Hash(body);
5859

59-
if (resource.lastHash !== hash) {
60-
resource.flDirty = true;
61-
} else if (resource.lastSize !== body.length) {
62-
resource.flDirty = true;
63-
} else {
64-
resource.flDirty = false;
65-
}
60+
const changed = (resource.lastHash !== hash) || (resource.lastSize !== body.length);
6661

6762
resource.lastHash = hash;
6863
resource.lastSize = body.length;
@@ -71,7 +66,7 @@ async function checkForResourceChange(resource, resourceUrl, startticks) {
7166
'Ping',
7267
{
7368
resourceUrl: resourceUrl,
74-
changed: resource.flDirty,
69+
changed: changed,
7570
hash: resource.lastHash,
7671
size: resource.lastSize,
7772
stats: {
@@ -81,6 +76,8 @@ async function checkForResourceChange(resource, resourceUrl, startticks) {
8176
},
8277
startticks
8378
);
79+
80+
return changed;
8481
}
8582

8683
async function fetchResource(resourceUrl) {
@@ -101,10 +98,11 @@ async function upsertResource(resource) {
10198
resource,
10299
{ upsert: true }
103100
);
101+
jsonStore.setResource(resource._id, resource);
104102
}
105103

106-
async function notifySubscribersIfDirty(resource, resourceUrl) {
107-
if (resource.flDirty) {
104+
async function notifySubscribersIfDirty(changed, resource, resourceUrl) {
105+
if (changed) {
108106
const dayjs = await getDayjs();
109107
resource.ctUpdates += 1;
110108
resource.whenLastUpdate = new Date(dayjs().utc().format());
@@ -120,8 +118,8 @@ async function ping(resourceUrl) {
120118
);
121119

122120
await checkPingFrequency(resource);
123-
await checkForResourceChange(resource, resourceUrl, startticks);
124-
await notifySubscribersIfDirty(resource, resourceUrl);
121+
const changed = await checkForResourceChange(resource, resourceUrl, startticks);
122+
await notifySubscribersIfDirty(changed, resource, resourceUrl);
125123
await upsertResource(resource);
126124

127125
return {

services/please-notify.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const appMessages = require('./app-messages'),
33
ErrorResponse = require('./error-response'),
44
getDayjs = require('./dayjs-wrapper'),
55
initSubscription = require('./init-subscription'),
6+
jsonStore = require('./json-store'),
67
logEvent = require('./log-event'),
78
mongodb = require('./mongodb'),
89
notifyOne = require('./notify-one'),
@@ -47,6 +48,7 @@ async function upsertSubscriptions(subscriptions) {
4748
subscriptions,
4849
{ upsert: true }
4950
);
51+
jsonStore.setSubscriptions(subscriptions._id, subscriptions.pleaseNotify);
5052
}
5153

5254
async function notifyApiUrl(notifyProcedure, apiurl, protocol, resourceUrl, diffDomain) {

0 commit comments

Comments
 (0)