Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:

strategy:
matrix:
node-version: [10.x, 12.x, 14.x, 15.x]
node-version: [14.x, 16.x, 18.x]

steps:
- uses: actions/checkout@v3
Expand All @@ -26,7 +26,7 @@ jobs:
node-version: ${{ matrix.node-version }}
- run: yarn
- run: yarn test

maybe-release:
name: release
runs-on: ubuntu-latest
Expand All @@ -40,18 +40,18 @@ jobs:
release-type: node
package-name: release-please-action
changelog-types: '[{"type":"feat","section":"Features","hidden":false},{"type":"fix","section":"Bug Fixes","hidden":false},{"type":"chore","section":"Miscellaneous","hidden":false}]'

- uses: actions/checkout@v3
# these if statements ensure that a publication only occurs when
# a new release is created:
if: ${{ steps.release.outputs.release_created }}

- uses: actions/setup-node@v3
with:
node-version: 16
registry-url: 'https://registry.npmjs.org'
if: ${{ steps.release.outputs.release_created }}

- run: yarn install
if: ${{ steps.release.outputs.release_created }}

Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
node_modules
.idea
yarn.lock
yarn.lock
.nyc_output
11 changes: 6 additions & 5 deletions example.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
var snappyStream = require('./index.js')
, compressStream = snappyStream.createCompressStream()
, uncompressStream = snappyStream.createUncompressStream({
asBuffer: false // optional option, asBuffer = false means that the stream emits strings, default: true
})
import {createCompressStream, createUncompressStream} from './index.js'

const compressStream = createCompressStream()
const uncompressStream = createUncompressStream({
asBuffer: false // optional option, asBuffer = false means that the stream emits strings, default: true
})

compressStream.on('data', function (chunk) {
console.log('Som data from the compressed stream', chunk)
Expand Down
19 changes: 9 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
var CompressStream = require('./lib/compress-stream')
, UncompressStream = require('./lib/uncompress-stream')
import {CompressStream} from './lib/compress-stream.js'
import {UncompressStream} from './lib/uncompress-stream.js'

module.exports = {
createUncompressStream: function (opts) {
return new UncompressStream(opts)
}
, createCompressStream: function (opts) {
return new CompressStream(opts)
}
}
export function createUncompressStream(opts) {
return new UncompressStream(opts)
}

export function createCompressStream(opts) {
return new CompressStream(opts)
}
9 changes: 4 additions & 5 deletions lib/checksum.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
var crc32c = require('@chainsafe/fast-crc32c').calculate
var bufferAlloc = require('buffer-alloc')
import crc32c from '@chainsafe/fast-crc32c'

module.exports = function (value) {
var x = crc32c(value)
var result = bufferAlloc(4)
export function checksum(value) {
var x = crc32c.calculate(value)
var result = Buffer.alloc(4)

// As defined in section 3 of https://github.com/google/snappy/blob/master/framing_format.txt
// And other implementations for reference:
Expand Down
76 changes: 22 additions & 54 deletions lib/compress-stream.js
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
import {Transform} from 'stream'
import util from 'util'
import snappyJS from 'snappyjs'
import {checksum} from './checksum.js'

/**
* As per the snappy framing format for streams, the size of any uncompressed chunk can be
* no longer than 65536 bytes.
*
* From: https://github.com/google/snappy/blob/main/framing_format.txt#L90:L92
*/
const UNCOMPRESSED_CHUNK_SIZE = 65536;

var Transform = require('stream').Transform
, util = require('util')

, snappy = require('snappy')
, bufferFrom = require('buffer-from')

, checksum = require('./checksum')

, IDENTIFIER_FRAME = bufferFrom([
0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
])
, COMPRESSED = bufferFrom([ 0x00 ])
, UNCOMPRESSED = bufferFrom([ 0x01 ])

, CompressStream = function (opts) {
const UNCOMPRESSED_CHUNK_SIZE = 65536
const IDENTIFIER_FRAME = Buffer.from([
0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
])
const COMPRESSED = Buffer.from([ 0x00 ])
const UNCOMPRESSED = Buffer.from([ 0x01 ])

export const CompressStream = function (opts) {
if (!(this instanceof CompressStream))
return new CompressStream(opts)

this.asyncCompress = (opts && typeof(opts.asyncCompress) === 'boolean') ? opts.asyncCompress : false
Transform.call(this)

// first push the identifier frame
Expand All @@ -39,7 +34,7 @@ CompressStream.prototype._compressed = function (chunk, compressed) {
this.push(
Buffer.concat([
COMPRESSED
, bufferFrom([ size, size >> 8, size >> 16 ])
, Buffer.from([ size, size >> 8, size >> 16 ])
, checksum(chunk)
, compressed
])
Expand All @@ -52,60 +47,39 @@ CompressStream.prototype._uncompressed = function (chunk) {
this.push(
Buffer.concat([
UNCOMPRESSED
, bufferFrom([ size, size >> 8, size >> 16 ])
, Buffer.from([ size, size >> 8, size >> 16 ])
, checksum(chunk)
, chunk
])
)
}

/**
* Some compression benchmarks :
*
* TODO
* Some compression benchmarks :
*
* i) Sync compress via snappy.compressSync ({asyncCompress:false}) default
* ii) Async compress via snappy.compress ({asyncCompress:true})
* iii) No chunking (Original)
*
*
* | Size | sync compress | async compress | original (no chunking) |
* |--------------------|---------------|----------------|------------------------|
* | 10kb (1 chunk) | 0.0229 ms | 0.0385 ms | 0.0388 ms |
* | 100kb (2 chunks) | 0.0562 ms | 0.1051 ms | 0.0844 ms |
* | 1000kb (16 chunks) | 0.382 ms | 0.7971 ms | 0.1998 ms |
*
*
*/


CompressStream.prototype._transform = function(chunk, enc, callback) {
const self = this;

function asyncCompressNext(startFrom) {
const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE);
const bytesChunk = chunk.slice(startFrom, endAt);
snappy.compress(bytesChunk, function(err, compressed) {
if (err) {
callback(err)
} else {

if (compressed.length < bytesChunk.length)
self._compressed(bytesChunk, compressed)
else
self._uncompressed(bytesChunk)

if (endAt < chunk.length) {
asyncCompressNext(endAt)
} else {
callback()
}
}
})
}

function syncCompress() {
try {
for (let startFrom = 0; startFrom < chunk.length; startFrom += UNCOMPRESSED_CHUNK_SIZE) {
const endAt = startFrom + Math.min(chunk.length - startFrom, UNCOMPRESSED_CHUNK_SIZE);
const bytesChunk = chunk.slice(startFrom, endAt);
const compressed = snappy.compressSync(bytesChunk)
const compressed = snappyJS.compress(bytesChunk)

if (compressed.length < bytesChunk.length)
self._compressed(bytesChunk, compressed)
Expand All @@ -117,11 +91,5 @@ CompressStream.prototype._transform = function(chunk, enc, callback) {
return callback(err);
}
}
if (this.asyncCompress) {
asyncCompressNext(0)
} else {
syncCompress();
}
syncCompress();
}

module.exports = CompressStream
81 changes: 37 additions & 44 deletions lib/uncompress-stream.js
Original file line number Diff line number Diff line change
@@ -1,38 +1,37 @@

var Transform = require('stream').Transform
, util = require('util')

, bufferEqual = require('buffer-equal')
, bufferFrom = require('buffer-from')
, BufferList = require('bl')
, snappy = require('snappy')

, IDENTIFIER = bufferFrom([
0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
])
, frameSize = function (buffer, offset) {
return buffer.get(offset) + (buffer.get(offset + 1) << 8) + (buffer.get(offset + 2) << 16)
}
, getType = function (value) {
if (value === 0xff)
return 'identifier'
if (value === 0x00)
return 'compressed'
if (value === 0x01)
return 'uncompressed'
if (value === 0xfe)
return 'padding'
// TODO: Handle the other cases described in the spec
}

, UncompressStream = function (opts) {
var asBuffer = (opts && typeof(opts.asBuffer) === 'boolean') ? opts.asBuffer : true

Transform.call(this, { objectMode: !asBuffer })
this.asBuffer = asBuffer
this.foundIdentifier = false
this.buffer = new BufferList()
}
import {Transform} from 'stream'
import util from 'util'
import BufferList from 'bl'
import snappyJS from 'snappyjs'

const IDENTIFIER = Buffer.from([
0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
])

function frameSize(buffer, offset) {
return buffer.get(offset) + (buffer.get(offset + 1) << 8) + (buffer.get(offset + 2) << 16)
}

function getType (value) {
if (value === 0xff)
return 'identifier'
if (value === 0x00)
return 'compressed'
if (value === 0x01)
return 'uncompressed'
if (value === 0xfe)
return 'padding'
// TODO: Handle the other cases described in the spec
}

export function UncompressStream (opts) {
var asBuffer = (opts && typeof(opts.asBuffer) === 'boolean') ? opts.asBuffer : true

Transform.call(this, { objectMode: !asBuffer })
this.asBuffer = asBuffer
this.foundIdentifier = false
this.buffer = new BufferList()
}

util.inherits(UncompressStream, Transform)

Expand All @@ -54,7 +53,7 @@ UncompressStream.prototype._parse = function (callback) {
return callback(new Error('malformed input: must begin with an identifier'))

if (type === 'identifier') {
if(!bufferEqual(data, IDENTIFIER))
if(!data.equals(IDENTIFIER))
return callback(new Error('malformed input: bad identifier'))

this.foundIdentifier = true
Expand All @@ -63,13 +62,9 @@ UncompressStream.prototype._parse = function (callback) {

if (type === 'compressed') {
// TODO: check that the checksum matches
snappy.uncompress(data.slice(4), { asBuffer: this.asBuffer }, function (err, raw) {
if(err) {
return callback(err)
}
self.push(raw)
self._parse(callback)
})
const raw = snappyJS.uncompress(data.slice(4))
self.push(raw)
self._parse(callback)
return
}

Expand All @@ -93,5 +88,3 @@ UncompressStream.prototype._transform = function (chunk, enc, callback) {
this.buffer.append(chunk)
this._parse(callback)
}

module.exports = UncompressStream
18 changes: 10 additions & 8 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
"name": "@chainsafe/snappy-stream",
"version": "5.1.1",
"description": "Compress data over a Stream using the snappy framing format",
"main": "index.js",
"type": "module",
"exports": "./index.js",
"scripts": {
"test": "tap test/*-test.js"
"test": "mocha test/*-test.js"
},
"engines": {
"node": ">=14.13.1"
},
"keywords": [
"snappy",
Expand All @@ -18,15 +22,13 @@
"license": "MIT",
"devDependencies": {
"async-benchmark": "^1.0.0",
"tap": "^11.0.1"
"mocha": "^8.0.0",
"chai": "4.3.6"
},
"dependencies": {
"bl": "^1.0.0",
"buffer-alloc": "^1.2.0",
"buffer-equal": "1.0.0",
"buffer-from": "^1.1.1",
"bl": "^4.0.1",
"@chainsafe/fast-crc32c": "3.0.0",
"snappy": "^6.3.5"
"snappyjs": "^0.7.0"
},
"directories": {
"test": "test"
Expand Down
Loading