diff --git a/src/http-connection/codec.ts b/src/http-connection/codec.ts index 727c9aa..dce24fc 100644 --- a/src/http-connection/codec.ts +++ b/src/http-connection/codec.ts @@ -18,8 +18,8 @@ import { types } from "neo4j-driver-core" import { BeginTransactionConfig } from "neo4j-driver-core/types/connection" - export const NEO4J_QUERY_CONTENT_TYPE = 'application/vnd.neo4j.query' +export const NEO4J_QUERY_JSONL_CONTENT_TYPE = 'application/vnd.neo4j.query.v1.0+jsonl'; export function encodeAuthToken(auth: types.AuthToken): string { switch (auth.scheme) { diff --git a/src/http-connection/connection.http.ts b/src/http-connection/connection.http.ts index 25433a9..8c2d9de 100644 --- a/src/http-connection/connection.http.ts +++ b/src/http-connection/connection.http.ts @@ -99,13 +99,9 @@ export default class HttpConnection extends Connection { this._log?.debug(`${this} REQUEST: ${JSON.stringify(request)}`) - const res = await fetch(request.url, request) - const { body: rawQueryResponse, headers: [contentType] } = await readBodyAndReaders(request.url, res, 'content-type') - - this._log?.debug(`${this} RESPONSE: { body: ${JSON.stringify(rawQueryResponse)}, headers: { content-type: ${contentType} }}`); - + const res = await fetch(request.url, request) const batchSize = config?.fetchSize ?? Number.MAX_SAFE_INTEGER - const codec = QueryResponseCodec.of(this._config, contentType ?? '', rawQueryResponse); + const codec = await QueryResponseCodec.ofResponse(this._config, request.url, res); if (codec.error) { throw codec.error @@ -121,7 +117,7 @@ export default class HttpConnection extends Connection { } for (let i = 0; !observer.paused && i < batchSize && !observer.completed; i++) { - const { done, value: rawRecord } = stream.next() + const { done, value: rawRecord } = await stream.next() if (!done) { observer.onNext(rawRecord) } else { diff --git a/src/http-connection/lang/line-splitter.ts b/src/http-connection/lang/line-splitter.ts new file mode 100644 index 0000000..d19c334 --- /dev/null +++ b/src/http-connection/lang/line-splitter.ts @@ -0,0 +1,31 @@ +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { TransformStreamDefaultController, Transformer } from "stream/web"; + +export default class LineSplitter implements Transformer { + transform(chunk: string, controller: TransformStreamDefaultController): void { + try { + const splitted = chunk.split('\n') + for (let i = 0; i < splitted.length - 1; i++) { + controller.enqueue(splitted[i]) + } + } catch(e) { + controller.error(e) + } + } +} \ No newline at end of file diff --git a/src/http-connection/query.codec.ts b/src/http-connection/query.codec.ts index 1d44d11..b6f10fe 100644 --- a/src/http-connection/query.codec.ts +++ b/src/http-connection/query.codec.ts @@ -15,90 +15,13 @@ * limitations under the License. */ -import { newError, Node, Relationship, int, error, types, Integer, Time, Date, LocalTime, Point, DateTime, LocalDateTime, Duration, isInt, isPoint, isDuration, isLocalTime, isTime, isDate, isLocalDateTime, isDateTime, isRelationship, isPath, isNode, isPathSegment, Path, PathSegment, internal, isUnboundRelationship, isVector } from "neo4j-driver-core" +import { newError, int, error, types, isInt, isPoint, isDuration, isLocalTime, isTime, isDate, isLocalDateTime, isDateTime, isRelationship, isPath, isNode, isPathSegment, isUnboundRelationship, isVector, internal } from "neo4j-driver-core" import { RunQueryConfig } from "neo4j-driver-core/types/connection" -import { NEO4J_QUERY_CONTENT_TYPE, encodeAuthToken, encodeTransactionBody } from "./codec" - -export type RawQueryValueTypes = 'Null' | 'Boolean' | 'Integer' | 'Float' | 'String' | - 'Time' | 'Date' | 'LocalTime' | 'ZonedDateTime' | 'OffsetDateTime' | 'LocalDateTime' | - 'Duration' | 'Point' | 'Base64' | 'Map' | 'List' | 'Node' | 'Relationship' | - 'Path' - -export type NodeShape = { _element_id: string, _labels: string[], _properties?: Record } -export type RelationshipShape = { _element_id: string, _start_node_element_id: string, _end_node_element_id: string, _type: string, _properties?: Record } -export type PathShape = (RawQueryRelationship | RawQueryNode)[] -export type RawQueryValueDef = { $type: T, _value: V } - -export type RawQueryNull = RawQueryValueDef<'Null', null> -export type RawQueryBoolean = RawQueryValueDef<'Boolean', boolean> -export type RawQueryInteger = RawQueryValueDef<'Integer', string> -export type RawQueryFloat = RawQueryValueDef<'Float', string> -export type RawQueryString = RawQueryValueDef<'String', string> -export type RawQueryTime = RawQueryValueDef<'Time', string> -export type RawQueryDate = RawQueryValueDef<'Date', string> -export type RawQueryLocalTime = RawQueryValueDef<'LocalTime', string> -export type RawQueryZonedDateTime = RawQueryValueDef<'ZonedDateTime', string> -export type RawQueryOffsetDateTime = RawQueryValueDef<'OffsetDateTime', string> -export type RawQueryLocalDateTime = RawQueryValueDef<'LocalDateTime', string> -export type RawQueryDuration = RawQueryValueDef<'Duration', string> -export type RawQueryPoint = RawQueryValueDef<'Point', string> -export type RawQueryBinary = RawQueryValueDef<'Base64', string> -export interface RawQueryMap extends RawQueryValueDef<'Map', Record> { } -export interface RawQueryList extends RawQueryValueDef<'List', RawQueryValue[]> { } -export type RawQueryNode = RawQueryValueDef<'Node', NodeShape> -export type RawQueryRelationship = RawQueryValueDef<'Relationship', RelationshipShape> -export type RawQueryPath = RawQueryValueDef<'Path', PathShape> - - -export type RawQueryValue = RawQueryNull | RawQueryBoolean | RawQueryInteger | RawQueryFloat | - RawQueryString | RawQueryTime | RawQueryDate | RawQueryLocalTime | RawQueryZonedDateTime | - RawQueryOffsetDateTime | RawQueryLocalDateTime | RawQueryDuration | RawQueryPoint | - RawQueryBinary | RawQueryMap | RawQueryList | RawQueryNode | RawQueryRelationship | - RawQueryPath - -export type Counters = { - containsUpdates: boolean - nodesCreated: number - nodesDeleted: number - propertiesSet: number - relationshipsCreated: number - relationshipsDeleted: number - labelsAdded: number - labelsRemoved: number - indexesAdded: number - indexesRemoved: number - constraintsAdded: number - constraintsRemoved: number - containsSystemUpdates: boolean - systemUpdates: number -} - -export type ProfiledQueryPlan = { - dbHits: number - records: number - hasPageCacheStats: boolean - pageCacheHits: number - pageCacheMisses: number - pageCacheHitRatio: number - time: number - operatorType: string - arguments: Record - identifiers: string[] - children: ProfiledQueryPlan[] -} - -export type NotificationShape = { - code: string - title: string - description: string - position: { - offset: number - line: number - column: number - } | {} - severity: string - category: string -} +import { NEO4J_QUERY_CONTENT_TYPE, NEO4J_QUERY_JSONL_CONTENT_TYPE, encodeAuthToken, encodeTransactionBody } from "./codec" +import TypedJsonCodec, { Counters, NotificationShape, ProfiledQueryPlan, RawQueryValue } from "./types.codec" +import { TransformStream, TransformStreamDefaultController, Transformer } from "stream/web" +import { TextDecoderStream } from "node:stream/web" +import LineSplitter from "./lang/line-splitter" export type RawQueryData = { fields: string[] @@ -121,6 +44,36 @@ export type RawQueryError = { error?: string } +type HeaderEvent = { + $event: 'Header', + _body: { + fields?: string [] + } +} + +type RecordEvent = { + $event: 'Record', + _body: RawQueryValue[] +} + +type SummaryEvent = { + $event: 'Summary', + _body: { + notifications?: NotificationShape[] + counters?: Counters + bookmarks?: string[] + profiledQueryPlan?: ProfiledQueryPlan + queryPlan?: ProfiledQueryPlan + } +} + +type ErrorEvent = { + $event: 'Error', + _body: RawQueryError[] +} + +type Event = HeaderEvent | RecordEvent | SummaryEvent | ErrorEvent; + export type RawQueryFailuresResponse = { errors: RawQueryError[] @@ -130,6 +83,40 @@ export type RawQueryResponse = RawQuerySuccessResponse | RawQueryFailuresRespons export class QueryResponseCodec { + static async ofResponse( + config: types.InternalConfig, + url: String, + response: Response + ): Promise { + + const contentType = response.headers.get('Content-Type') ?? '' + + if (contentType === NEO4J_QUERY_JSONL_CONTENT_TYPE) { + const decoder = new TextDecoderStream(); + const it = response.body?.pipeThrough(decoder) + .pipeThrough(new TransformStream(new LineSplitter())) + .pipeThrough(new TransformStream(new EventDecoder())) + .values()!; + + const { value: first, done } = await it.next() + + return new QueryJsonlResponseCodec( + TypedJsonCodec.of(contentType, config), + it, + first, + done === true + ) + } + + try { + const text = await response.text() + const body = text !== '' ? JSON.parse(text) : {}; + return QueryResponseCodec.of(config, contentType, body); + } catch (error) { + throw newError(`Failure accessing "${url}"`, 'SERVICE_UNAVAILABLE', error) + } + } + static of( config: types.InternalConfig, contentType: string, @@ -137,7 +124,7 @@ export class QueryResponseCodec { if (isSuccess(response)) { if (contentType === NEO4J_QUERY_CONTENT_TYPE) { - return new QuerySuccessResponseCodec(config, response) + return new QuerySuccessResponseCodec(TypedJsonCodec.of(contentType, config), response) } return new QueryFailureResponseCodec(newError( `Wrong content-type. Expected "${NEO4J_QUERY_CONTENT_TYPE}", but got "${contentType}".`, @@ -166,7 +153,7 @@ export class QueryResponseCodec { throw new Error('Not implemented') } - *stream(): Generator { + async *stream(): AsyncGenerator { throw new Error('Not implemented') } } @@ -174,7 +161,7 @@ export class QueryResponseCodec { class QuerySuccessResponseCodec extends QueryResponseCodec { constructor( - private _config: types.InternalConfig, + private readonly _typedJsonCodec: TypedJsonCodec, private readonly _response: RawQuerySuccessResponse) { super() } @@ -187,11 +174,11 @@ class QuerySuccessResponseCodec extends QueryResponseCodec { return this._response.data.fields } - *stream(): Generator { + async *stream(): AsyncGenerator { while (this._response.data.values.length > 0) { const value = this._response.data.values.shift() if (value != null) { - yield value.map(this._decodeValue.bind(this)) + yield value.map(this._typedJsonCodec.decodeValue.bind(this._typedJsonCodec)) } } return @@ -200,438 +187,98 @@ class QuerySuccessResponseCodec extends QueryResponseCodec { get meta(): Record { return { bookmark: this._response.bookmarks, - stats: this._decodeStats(this._response.counters), + stats: this._typedJsonCodec.decodeStats(this._response.counters), profile: this._response.profiledQueryPlan != null ? - this._decodeProfile(this._response.profiledQueryPlan) : null, + this._typedJsonCodec.decodeProfile(this._response.profiledQueryPlan) : null, plan: this._response.queryPlan != null ? - this._decodeProfile(this._response.queryPlan) : null, + this._typedJsonCodec.decodeProfile(this._response.queryPlan) : null, notifications: this._response.notifications } } +} - private _decodeStats(counters: Counters): Record { - return Object.fromEntries( - Object.entries(counters) - .map(([key, value]) => [key, typeof value === 'number' ? this._normalizeInteger(int(value)) : value]) - ) - } - - private _decodeProfile(queryPlan: ProfiledQueryPlan): Record { - return Object.fromEntries( - Object.entries(queryPlan) - .map(([key, value]) => { - let actualKey: string = key - let actualValue: unknown = value - switch (key) { - case 'children': - actualValue = (value as ProfiledQueryPlan[]).map(this._decodeProfile.bind(this)) - break - case 'arguments': - actualKey = 'args' - actualValue = Object.fromEntries(Object.entries(value as {}) - .map(([k, v]) => [k, this._decodeValue(v as RawQueryValue)])) - break - case 'records': - actualKey = 'rows' - break - default: - break - } - return [actualKey, actualValue] - }) - ) - } - - - private _decodeValue(value: RawQueryValue): unknown { - switch (value.$type) { - case "Null": - return null - case "Boolean": - return value._value - case "Integer": - return this._decodeInteger(value._value as string) - case "Float": - return this._decodeFloat(value._value as string) - case "String": - return value._value - case "Time": - return this._decodeTime(value._value as string) - case "Date": - return this._decodeDate(value._value as string) - case "LocalTime": - return this._decodeLocalTime(value._value as string) - case "ZonedDateTime": - return this._decodeZonedDateTime(value._value as string) - case "OffsetDateTime": - return this._decodeOffsetDateTime(value._value as string) - case "LocalDateTime": - return this._decodeLocalDateTime(value._value as string) - case "Duration": - return this._decodeDuration(value._value as string) - case "Point": - return this._decodePoint(value._value as string) - case "Base64": - return this._decodeBase64(value._value as string) - case "Map": - return this._decodeMap(value._value as Record) - case "List": - return this._decodeList(value._value as RawQueryValue[]) - case "Node": - return this._decodeNode(value._value as NodeShape) - case "Relationship": - return this._decodeRelationship(value._value as RelationshipShape) - case "Path": - return this._decodePath(value._value as PathShape) - default: - // @ts-expect-error It should never happen - throw newError(`Unknown type: ${value.$type}`, error.PROTOCOL_ERROR) - } - } +class QueryJsonlResponseCodec extends QueryResponseCodec { + private readonly _keys: string[] + private _error: Error | undefined + private _meta: Record - _decodeInteger(value: string): Integer | number | bigint { - if (this._config.useBigInt === true) { - return BigInt(value) + constructor( + private readonly _typedJsonCodec: TypedJsonCodec, + private readonly _it: AsyncIterableIterator, + private readonly _first: Event | undefined, + private _done: boolean ) { + super() + + if (this._first?.$event === 'Header') { + if (!Array.isArray(this._first._body.fields )) { + throw newError('Query headers should have fields', error.PROTOCOL_ERROR) + } + this._keys = this._first._body.fields + } else if (this._first?.$event === 'Error') { + this.setError(this._first) } else { - const integer = int(value) - if (this._config.disableLosslessIntegers === true) { - return integer.toNumber() - } - return integer + throw newError(`${this._first?.$event} is not expected as first event.`, error.PROTOCOL_ERROR) } + } - - _decodeFloat(value: string): number { - return parseFloat(value) + + private setError(event: ErrorEvent) { + this._error = event._body.length > 0 ? newError( + event._body[0].message, + // TODO: REMOVE THE ?? AND .ERROR WHEN SERVER IS FIXED + event._body[0].code + ) : newError('Server replied an empty error response', error.PROTOCOL_ERROR) } - _decodeTime(value: string): Time | LocalTime { - // 12:50:35.556+01:00 - // 12:50:35+01:00 - // 12:50:35Z - const [hourStr, minuteString, secondNanosecondAndOffsetString, offsetMinuteString] = value.split(':') - let [secondStr, nanosecondAndOffsetString] = secondNanosecondAndOffsetString.split('.') - let [nanosecondString, offsetHourString, isPositive, hasOffset]: [string, string, boolean, boolean] = ['0', '0', true, true] + get error(): Error | undefined { + return this._error + } - if (nanosecondAndOffsetString !== undefined) { - if ( nanosecondAndOffsetString.indexOf('+') >= 0 ) { - [nanosecondString, offsetHourString] = [...nanosecondAndOffsetString.split('+')] - } else if (nanosecondAndOffsetString.indexOf('-') >= 0) { - [nanosecondString, offsetHourString] = [...nanosecondAndOffsetString.split('-')] - isPositive = false - } else if (nanosecondAndOffsetString.indexOf('Z') >= 0) { - [nanosecondString] = [...nanosecondAndOffsetString.split('Z')] - } else { - hasOffset = false - if (nanosecondAndOffsetString.indexOf('[')) { - [nanosecondString] = [...nanosecondAndOffsetString.split('[')] - } - } - } else { - if ( secondStr.indexOf('+') >= 0 ) { - [secondStr, offsetHourString] = [...secondStr.split('+')] - } else if ( secondStr.indexOf('-') >= 0 ) { - [secondStr, offsetHourString] = [...secondStr.split('-')] - isPositive = false - } else if (secondStr.indexOf('Z') < 0) { - hasOffset = false - } + get keys(): string[] { + if (this._error) { + throw this._error } + return this._keys + } - secondStr = secondStr.substring(0, 2) - - const nanosecond = nanosecondString === undefined ? int(0) : int((nanosecondString).padEnd(9, '0')) - - if (hasOffset) { - const timeZoneOffsetInSeconds = int(offsetHourString).multiply(60).add(int(offsetMinuteString ?? '0')).multiply(60).multiply(isPositive ? 1 : -1) - - return new Time( - this._decodeInteger(hourStr), - this._decodeInteger(minuteString), - this._decodeInteger(secondStr), - this._normalizeInteger(nanosecond), - this._normalizeInteger(timeZoneOffsetInSeconds)) + get meta(): Record { + if (this._error) { + throw this._error } + return this._meta + } - return new LocalTime( - this._decodeInteger(hourStr), - this._decodeInteger(minuteString), - this._decodeInteger(secondStr), - this._normalizeInteger(nanosecond), - ) - } - - _decodeDate(value: string): Date { - // (+|-)2015-03-26 - // first might be signal or first digit on date - const first = value[0] - const [yearStr, monthStr, dayStr] = value.substring(1).split('-') - return new Date( - this._decodeInteger(first.concat(yearStr)), - this._decodeInteger(monthStr), - this._decodeInteger(dayStr) - ) - } - - _decodeLocalTime(value: string): LocalTime { - // 12:50:35.556 - const [hourStr, minuteString, secondNanosecondAndOffsetString] = value.split(':') - const [secondStr, nanosecondString] = secondNanosecondAndOffsetString.split('.') - const nanosecond = nanosecondString === undefined ? int(0) : int((nanosecondString).padEnd(9, '0')) - - return new LocalTime( - this._decodeInteger(hourStr), - this._decodeInteger(minuteString), - this._decodeInteger(secondStr), - this._normalizeInteger(nanosecond) - ) - } - - _decodeZonedDateTime(value: string): DateTime { - // 2015-11-21T21:40:32.142Z[Antarctica/Troll] - const [dateTimeStr, timeZoneIdEndWithAngleBrackets] = value.split('[') - const timeZoneId = timeZoneIdEndWithAngleBrackets.slice(0, timeZoneIdEndWithAngleBrackets.length - 1) - const dateTime = this._decodeOffsetDateTime(dateTimeStr) - - return new DateTime( - dateTime.year, - dateTime.month, - dateTime.day, - dateTime.hour, - dateTime.minute, - dateTime.second, - dateTime.nanosecond, - isDateTime(dateTime) ? dateTime.timeZoneOffsetSeconds : undefined, - timeZoneId - ) - } - - _decodeOffsetDateTime(value: string): DateTime | LocalDateTime{ - // 2015-06-24T12:50:35.556+01:00 - const [dateStr, timeStr] = value.split('T') - const date = this._decodeDate(dateStr) - const time = this._decodeTime(timeStr) - if (isTime(time)) { - return new DateTime( - date.year, - date.month, - date.day, - time.hour, - time.minute, - time.second, - time.nanosecond, - time.timeZoneOffsetSeconds - ) + async *stream(): AsyncGenerator { + if (this._error) { + throw this._error } - - return new LocalDateTime( - date.year, - date.month, - date.day, - time.hour, - time.minute, - time.second, - time.nanosecond - ) - } - - _decodeLocalDateTime(value: string): LocalDateTime { - // 2015-06-24T12:50:35.556 - const [dateStr, timeStr] = value.split('T') - const date = this._decodeDate(dateStr) - const time = this._decodeLocalTime(timeStr) - return new LocalDateTime( - date.year, - date.month, - date.day, - time.hour, - time.minute, - time.second, - time.nanosecond - ) - } - - _decodeDuration(value: string): Duration { - // P14DT16H12M - const durationStringWithP = value.slice(1, value.length) - - let month = '0' - let week = '0' - let day = '0' - let second = '0' - let nanosecond = '0' - let hour = '0' - let minute = '0' - let currentNumber = '' - let timePart = false - - for (const ch of durationStringWithP) { - if (ch >= '0' && ch <= '9' || ch === '.' || ch === ',' || (currentNumber.length === 0 && ch === '-')) { - currentNumber = currentNumber + ch - } else { - switch (ch) { - case 'M': - // minutes - if (timePart) { - minute = currentNumber - // months - } else { - month = currentNumber - } - break; - case 'W': - if (timePart) { - throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in time part`, error.PROTOCOL_ERROR) - } - week = currentNumber; - break - case 'D': - if (timePart) { - throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in time part`, error.PROTOCOL_ERROR) - } - day = currentNumber - break - case 'S': - if (!timePart) { - throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in date part`, error.PROTOCOL_ERROR) - } - const nanosecondSeparator = currentNumber.includes(',') ? ',' : '.'; - [second, nanosecond] = currentNumber.split(nanosecondSeparator) - break - case 'H': - if (!timePart) { - throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in date part`, error.PROTOCOL_ERROR) - } - hour = currentNumber - break - case 'T': - timePart = true - break - default: - throw newError(`Duration is not well formatted. Unexpected Duration component ${ch}`, error.PROTOCOL_ERROR) - } - currentNumber = '' + while(!this._done) { + const { value: event, done } = await this._it.next() as { value: Event, done: boolean } + this._done = done === true + if (this._done) { + return; } - } - - const secondsInt = int(hour) - .multiply(60) - .add(minute) - .multiply(60) - .add(second) - - const dayInt = int(week) - .multiply(7) - .add(day) - - const nanosecondString = nanosecond ?? '0' - return new Duration( - this._decodeInteger(month), - this._normalizeInteger(dayInt), - this._normalizeInteger(secondsInt), - this._decodeInteger(nanosecondString.padEnd(9, '0')) - ) - } - - _decodeMap(value: Record): Record { - const result: Record = {} - for (const k of Object.keys(value)) { - if (Object.prototype.hasOwnProperty.call(value, k)) { - result[k] = this._decodeValue(value[k]) + if (event.$event === 'Error') { + this.setError(event) + throw this._error + } else if(event.$event === 'Record') { + yield event._body.map(this._typedJsonCodec.decodeValue.bind(this._typedJsonCodec)) + } else if(event.$event === 'Summary') { + this._meta = { + bookmark: event._body.bookmarks, + stats: event._body.counters != null ? this._typedJsonCodec.decodeStats(event._body.counters) : null, + profile: event._body.profiledQueryPlan != null ? + this._typedJsonCodec.decodeProfile(event._body.profiledQueryPlan) : null, + plan: event._body.queryPlan != null ? + this._typedJsonCodec.decodeProfile(event._body.queryPlan) : null, + notifications: event._body.notifications + } + } else { + this._error = newError(`${this._first?.$event} is not expected`, error.PROTOCOL_ERROR) + throw this._error } } - return result - } - - _decodePoint(value: string): Point { - const createProtocolError = (): Point => internal.objectUtil.createBrokenObject(newError( - `Wrong point format. RawValue: ${value}`, - error.PROTOCOL_ERROR - ), new Point(0, 0, 0)) - - - const splittedOnSeparator = value.split(';') - if (splittedOnSeparator.length !== 2 || !splittedOnSeparator[0].startsWith('SRID=') || - !(splittedOnSeparator[1].startsWith('POINT (') || splittedOnSeparator[1].startsWith('POINT Z ('))) { - return createProtocolError() - } - - const [_, sridString] = splittedOnSeparator[0].split('=') - const srid = this._normalizeInteger(int(sridString)) - - const [__, coordinatesString] = splittedOnSeparator[1].split('(') - const [x, y, z] = coordinatesString.substring(0, coordinatesString.length - 1).split(" ").filter(c => c != null).map(parseFloat) - - return new Point( - srid, - x, - y, - z - ) - } - - _decodeBase64(value: string): Uint8Array { - const binaryString: string = atob(value) - // @ts-expect-error See https://developer.mozilla.org/en-US/docs/Glossary/Base64 - return Uint8Array.from(binaryString, (b) => b.codePointAt(0)) - } - - _decodeList(value: RawQueryValue[]): unknown[] { - return value.map(v => this._decodeValue(v)) - } - - _decodeNode(value: NodeShape): Node { - return new Node( - // @ts-expect-error identity doesn't return - undefined, - value._labels, - this._decodeMap(value._properties ?? {}), - value._element_id - ) - } - - _decodeRelationship(value: RelationshipShape): Relationship { - return new Relationship( - // @ts-expect-error identity doesn't return - undefined, - undefined, - undefined, - value._type, - this._decodeMap(value._properties ?? {}), - value._element_id, - value._start_node_element_id, - value._end_node_element_id - ) - } - - _decodePath(value: PathShape): Path { - const decoded = value.map(v => this._decodeValue(v)) - type SegmentAccumulator = [] | [Node] | [Node, Relationship] - type Accumulator = { acc: SegmentAccumulator, segments: PathSegment[] } - - return new Path( - decoded[0] as Node, - decoded[decoded.length - 1] as Node, - // @ts-expect-error - decoded.reduce((previous: Accumulator, current: Node | Relationship): Accumulator => { - if (previous.acc.length === 2) { - return { - acc: [current as Node], segments: [...previous.segments, - new PathSegment(previous.acc[0], previous.acc[1], current as Node)] - } - } - return { ...previous, acc: [...previous.acc, current] as SegmentAccumulator } - }, { acc: [], segments: [] }).segments - ) - } - - _normalizeInteger(integer: Integer): Integer | number | bigint { - if (this._config.useBigInt === true) { - return integer.toBigInt() - } else if (this._config.disableLosslessIntegers === true) { - return integer.toNumber() - } - return integer + return } } @@ -652,7 +299,7 @@ class QueryFailureResponseCodec extends QueryResponseCodec { throw this._error } - stream(): Generator { + async *stream(): AsyncGenerator { throw this._error } } @@ -685,7 +332,7 @@ export class QueryRequestCodec { } get accept(): string { - return `${NEO4J_QUERY_CONTENT_TYPE}, application/json` + return `${NEO4J_QUERY_JSONL_CONTENT_TYPE}, ${NEO4J_QUERY_CONTENT_TYPE}, application/json` } get authorization(): string { @@ -794,3 +441,22 @@ function isSuccess(obj: RawQueryResponse): obj is RawQuerySuccessResponse { } return true } + +function isEvent(obj: any): obj is Event { + return obj != null && typeof obj.$event === "string" && typeof obj._body === 'object' && obj._body != null +} + +class EventDecoder implements Transformer { + transform(chunk: string, controller: TransformStreamDefaultController): void { + try { + const mightEvent = JSON.parse(chunk) + if (isEvent(mightEvent)) { + controller.enqueue(mightEvent) + } else { + throw newError('Invalid event', error.PROTOCOL_ERROR) + } + } catch(e) { + controller.error(e) + } + } +} diff --git a/src/http-connection/types.codec.ts b/src/http-connection/types.codec.ts new file mode 100644 index 0000000..cfdd546 --- /dev/null +++ b/src/http-connection/types.codec.ts @@ -0,0 +1,555 @@ +import { Date, DateTime, Duration, Integer, LocalDateTime, LocalTime, Node, Path, PathSegment, Point, Relationship, Time, error, int, internal, isDateTime, isTime, newError, types } from "neo4j-driver-core" + +/** + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [https://neo4j.com] + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +export type RawQueryValueTypes = 'Null' | 'Boolean' | 'Integer' | 'Float' | 'String' | + 'Time' | 'Date' | 'LocalTime' | 'ZonedDateTime' | 'OffsetDateTime' | 'LocalDateTime' | + 'Duration' | 'Point' | 'Base64' | 'Map' | 'List' | 'Node' | 'Relationship' | + 'Path' + +export type NodeShape = { _element_id: string, _labels: string[], _properties?: Record } +export type RelationshipShape = { _element_id: string, _start_node_element_id: string, _end_node_element_id: string, _type: string, _properties?: Record } +export type PathShape = (RawQueryRelationship | RawQueryNode)[] +export type RawQueryValueDef = { $type: T, _value: V } + +export type RawQueryNull = RawQueryValueDef<'Null', null> +export type RawQueryBoolean = RawQueryValueDef<'Boolean', boolean> +export type RawQueryInteger = RawQueryValueDef<'Integer', string> +export type RawQueryFloat = RawQueryValueDef<'Float', string> +export type RawQueryString = RawQueryValueDef<'String', string> +export type RawQueryTime = RawQueryValueDef<'Time', string> +export type RawQueryDate = RawQueryValueDef<'Date', string> +export type RawQueryLocalTime = RawQueryValueDef<'LocalTime', string> +export type RawQueryZonedDateTime = RawQueryValueDef<'ZonedDateTime', string> +export type RawQueryOffsetDateTime = RawQueryValueDef<'OffsetDateTime', string> +export type RawQueryLocalDateTime = RawQueryValueDef<'LocalDateTime', string> +export type RawQueryDuration = RawQueryValueDef<'Duration', string> +export type RawQueryPoint = RawQueryValueDef<'Point', string> +export type RawQueryBinary = RawQueryValueDef<'Base64', string> +export interface RawQueryMap extends RawQueryValueDef<'Map', Record> { } +export interface RawQueryList extends RawQueryValueDef<'List', RawQueryValue[]> { } +export type RawQueryNode = RawQueryValueDef<'Node', NodeShape> +export type RawQueryRelationship = RawQueryValueDef<'Relationship', RelationshipShape> +export type RawQueryPath = RawQueryValueDef<'Path', PathShape> + + +export type RawQueryValue = RawQueryNull | RawQueryBoolean | RawQueryInteger | RawQueryFloat | + RawQueryString | RawQueryTime | RawQueryDate | RawQueryLocalTime | RawQueryZonedDateTime | + RawQueryOffsetDateTime | RawQueryLocalDateTime | RawQueryDuration | RawQueryPoint | + RawQueryBinary | RawQueryMap | RawQueryList | RawQueryNode | RawQueryRelationship | + RawQueryPath + +export type Counters = { + containsUpdates: boolean + nodesCreated: number + nodesDeleted: number + propertiesSet: number + relationshipsCreated: number + relationshipsDeleted: number + labelsAdded: number + labelsRemoved: number + indexesAdded: number + indexesRemoved: number + constraintsAdded: number + constraintsRemoved: number + containsSystemUpdates: boolean + systemUpdates: number +} + +export type ProfiledQueryPlan = { + dbHits: number + records: number + hasPageCacheStats: boolean + pageCacheHits: number + pageCacheMisses: number + pageCacheHitRatio: number + time: number + operatorType: string + arguments: Record + identifiers: string[] + children: ProfiledQueryPlan[] +} + +export type NotificationShape = { + code: string + title: string + description: string + position: { + offset: number + line: number + column: number + } | {} + severity: string + category: string +} + +export default class TypedJsonCodec { + static of(contentType: string, config: types.InternalConfig): TypedJsonCodec { + if (contentType === 'application/vnd.neo4j.query.v1.0' || contentType == 'application/vnd.neo4j.query' + || contentType === 'application/vnd.neo4j.query.v1.0+jsonl') { + return new TypedJsonCodecV10(config) + } + + throw newError(`Unsupported content type: ${contentType}`) + } + + decodeStats(counters: Counters): Record { + throw Error('Not implemented') + } + + decodeProfile(queryPlan: ProfiledQueryPlan): Record { + throw Error('Not implemented') + } + + + decodeValue(value: RawQueryValue): unknown { + throw Error('Not implemented') + } +} + +class TypedJsonCodecV10 extends TypedJsonCodec { + + constructor(private readonly _config: types.InternalConfig) { + super() + } + + decodeStats(counters: Counters): Record { + return Object.fromEntries( + Object.entries(counters) + .map(([key, value]) => [key, typeof value === 'number' ? this._normalizeInteger(int(value)) : value]) + ) + } + + decodeProfile(queryPlan: ProfiledQueryPlan): Record { + return Object.fromEntries( + Object.entries(queryPlan) + .map(([key, value]) => { + let actualKey: string = key + let actualValue: unknown = value + switch (key) { + case 'children': + actualValue = (value as ProfiledQueryPlan[]).map(this.decodeProfile.bind(this)) + break + case 'arguments': + actualKey = 'args' + actualValue = Object.fromEntries(Object.entries(value as {}) + .map(([k, v]) => [k, this.decodeValue(v as RawQueryValue)])) + break + case 'records': + actualKey = 'rows' + break + default: + break + } + return [actualKey, actualValue] + }) + ) + } + + + decodeValue(value: RawQueryValue): unknown { + switch (value.$type) { + case "Null": + return null + case "Boolean": + return value._value + case "Integer": + return this._decodeInteger(value._value as string) + case "Float": + return this._decodeFloat(value._value as string) + case "String": + return value._value + case "Time": + return this._decodeTime(value._value as string) + case "Date": + return this._decodeDate(value._value as string) + case "LocalTime": + return this._decodeLocalTime(value._value as string) + case "ZonedDateTime": + return this._decodeZonedDateTime(value._value as string) + case "OffsetDateTime": + return this._decodeOffsetDateTime(value._value as string) + case "LocalDateTime": + return this._decodeLocalDateTime(value._value as string) + case "Duration": + return this._decodeDuration(value._value as string) + case "Point": + return this._decodePoint(value._value as string) + case "Base64": + return this._decodeBase64(value._value as string) + case "Map": + return this._decodeMap(value._value as Record) + case "List": + return this._decodeList(value._value as RawQueryValue[]) + case "Node": + return this._decodeNode(value._value as NodeShape) + case "Relationship": + return this._decodeRelationship(value._value as RelationshipShape) + case "Path": + return this._decodePath(value._value as PathShape) + default: + // @ts-expect-error It should never happen + throw newError(`Unknown type: ${value.$type}`, error.PROTOCOL_ERROR) + } + } + + _decodeInteger(value: string): Integer | number | bigint { + if (this._config.useBigInt === true) { + return BigInt(value) + } else { + const integer = int(value) + if (this._config.disableLosslessIntegers === true) { + return integer.toNumber() + } + return integer + } + } + + _decodeFloat(value: string): number { + return parseFloat(value) + } + + _decodeTime(value: string): Time | LocalTime { + // 12:50:35.556+01:00 + // 12:50:35+01:00 + // 12:50:35Z + const [hourStr, minuteString, secondNanosecondAndOffsetString, offsetMinuteString] = value.split(':') + let [secondStr, nanosecondAndOffsetString] = secondNanosecondAndOffsetString.split('.') + let [nanosecondString, offsetHourString, isPositive, hasOffset]: [string, string, boolean, boolean] = ['0', '0', true, true] + + if (nanosecondAndOffsetString !== undefined) { + if ( nanosecondAndOffsetString.indexOf('+') >= 0 ) { + [nanosecondString, offsetHourString] = [...nanosecondAndOffsetString.split('+')] + } else if (nanosecondAndOffsetString.indexOf('-') >= 0) { + [nanosecondString, offsetHourString] = [...nanosecondAndOffsetString.split('-')] + isPositive = false + } else if (nanosecondAndOffsetString.indexOf('Z') >= 0) { + [nanosecondString] = [...nanosecondAndOffsetString.split('Z')] + } else { + hasOffset = false + if (nanosecondAndOffsetString.indexOf('[')) { + [nanosecondString] = [...nanosecondAndOffsetString.split('[')] + } + } + } else { + if ( secondStr.indexOf('+') >= 0 ) { + [secondStr, offsetHourString] = [...secondStr.split('+')] + } else if ( secondStr.indexOf('-') >= 0 ) { + [secondStr, offsetHourString] = [...secondStr.split('-')] + isPositive = false + } else if (secondStr.indexOf('Z') < 0) { + hasOffset = false + } + } + + secondStr = secondStr.substring(0, 2) + + const nanosecond = nanosecondString === undefined ? int(0) : int((nanosecondString).padEnd(9, '0')) + + if (hasOffset) { + const timeZoneOffsetInSeconds = int(offsetHourString).multiply(60).add(int(offsetMinuteString ?? '0')).multiply(60).multiply(isPositive ? 1 : -1) + + return new Time( + this._decodeInteger(hourStr), + this._decodeInteger(minuteString), + this._decodeInteger(secondStr), + this._normalizeInteger(nanosecond), + this._normalizeInteger(timeZoneOffsetInSeconds)) + } + + return new LocalTime( + this._decodeInteger(hourStr), + this._decodeInteger(minuteString), + this._decodeInteger(secondStr), + this._normalizeInteger(nanosecond), + ) + } + + _decodeDate(value: string): Date { + // (+|-)2015-03-26 + // first might be signal or first digit on date + const first = value[0] + const [yearStr, monthStr, dayStr] = value.substring(1).split('-') + return new Date( + this._decodeInteger(first.concat(yearStr)), + this._decodeInteger(monthStr), + this._decodeInteger(dayStr) + ) + } + + _decodeLocalTime(value: string): LocalTime { + // 12:50:35.556 + const [hourStr, minuteString, secondNanosecondAndOffsetString] = value.split(':') + const [secondStr, nanosecondString] = secondNanosecondAndOffsetString.split('.') + const nanosecond = nanosecondString === undefined ? int(0) : int((nanosecondString).padEnd(9, '0')) + + return new LocalTime( + this._decodeInteger(hourStr), + this._decodeInteger(minuteString), + this._decodeInteger(secondStr), + this._normalizeInteger(nanosecond) + ) + } + + _decodeZonedDateTime(value: string): DateTime { + // 2015-11-21T21:40:32.142Z[Antarctica/Troll] + const [dateTimeStr, timeZoneIdEndWithAngleBrackets] = value.split('[') + const timeZoneId = timeZoneIdEndWithAngleBrackets.slice(0, timeZoneIdEndWithAngleBrackets.length - 1) + const dateTime = this._decodeOffsetDateTime(dateTimeStr) + + return new DateTime( + dateTime.year, + dateTime.month, + dateTime.day, + dateTime.hour, + dateTime.minute, + dateTime.second, + dateTime.nanosecond, + isDateTime(dateTime) ? dateTime.timeZoneOffsetSeconds : undefined, + timeZoneId + ) + } + + _decodeOffsetDateTime(value: string): DateTime | LocalDateTime{ + // 2015-06-24T12:50:35.556+01:00 + const [dateStr, timeStr] = value.split('T') + const date = this._decodeDate(dateStr) + const time = this._decodeTime(timeStr) + if (isTime(time)) { + return new DateTime( + date.year, + date.month, + date.day, + time.hour, + time.minute, + time.second, + time.nanosecond, + time.timeZoneOffsetSeconds + ) + } + + return new LocalDateTime( + date.year, + date.month, + date.day, + time.hour, + time.minute, + time.second, + time.nanosecond + ) + } + + _decodeLocalDateTime(value: string): LocalDateTime { + // 2015-06-24T12:50:35.556 + const [dateStr, timeStr] = value.split('T') + const date = this._decodeDate(dateStr) + const time = this._decodeLocalTime(timeStr) + return new LocalDateTime( + date.year, + date.month, + date.day, + time.hour, + time.minute, + time.second, + time.nanosecond + ) + } + + _decodeDuration(value: string): Duration { + // P14DT16H12M + const durationStringWithP = value.slice(1, value.length) + + let month = '0' + let week = '0' + let day = '0' + let second = '0' + let nanosecond = '0' + let hour = '0' + let minute = '0' + let currentNumber = '' + let timePart = false + + for (const ch of durationStringWithP) { + if (ch >= '0' && ch <= '9' || ch === '.' || ch === ',' || (currentNumber.length === 0 && ch === '-')) { + currentNumber = currentNumber + ch + } else { + switch (ch) { + case 'M': + // minutes + if (timePart) { + minute = currentNumber + // months + } else { + month = currentNumber + } + break; + case 'W': + if (timePart) { + throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in time part`, error.PROTOCOL_ERROR) + } + week = currentNumber; + break + case 'D': + if (timePart) { + throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in time part`, error.PROTOCOL_ERROR) + } + day = currentNumber + break + case 'S': + if (!timePart) { + throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in date part`, error.PROTOCOL_ERROR) + } + const nanosecondSeparator = currentNumber.includes(',') ? ',' : '.'; + [second, nanosecond] = currentNumber.split(nanosecondSeparator) + break + case 'H': + if (!timePart) { + throw newError(`Duration is not well formatted. Unexpected Duration component ${ch} in date part`, error.PROTOCOL_ERROR) + } + hour = currentNumber + break + case 'T': + timePart = true + break + default: + throw newError(`Duration is not well formatted. Unexpected Duration component ${ch}`, error.PROTOCOL_ERROR) + } + currentNumber = '' + } + } + + const secondsInt = int(hour) + .multiply(60) + .add(minute) + .multiply(60) + .add(second) + + const dayInt = int(week) + .multiply(7) + .add(day) + + const nanosecondString = nanosecond ?? '0' + return new Duration( + this._decodeInteger(month), + this._normalizeInteger(dayInt), + this._normalizeInteger(secondsInt), + this._decodeInteger(nanosecondString.padEnd(9, '0')) + ) + } + + _decodeMap(value: Record): Record { + const result: Record = {} + for (const k of Object.keys(value)) { + if (Object.prototype.hasOwnProperty.call(value, k)) { + result[k] = this.decodeValue(value[k]) + } + } + return result + } + + _decodePoint(value: string): Point { + const createProtocolError = (): Point => internal.objectUtil.createBrokenObject(newError( + `Wrong point format. RawValue: ${value}`, + error.PROTOCOL_ERROR + ), new Point(0, 0, 0)) + + + const splittedOnSeparator = value.split(';') + if (splittedOnSeparator.length !== 2 || !splittedOnSeparator[0].startsWith('SRID=') || + !(splittedOnSeparator[1].startsWith('POINT (') || splittedOnSeparator[1].startsWith('POINT Z ('))) { + return createProtocolError() + } + + const [_, sridString] = splittedOnSeparator[0].split('=') + const srid = this._normalizeInteger(int(sridString)) + + const [__, coordinatesString] = splittedOnSeparator[1].split('(') + const [x, y, z] = coordinatesString.substring(0, coordinatesString.length - 1).split(" ").filter(c => c != null).map(parseFloat) + + return new Point( + srid, + x, + y, + z + ) + } + + _decodeBase64(value: string): Uint8Array { + const binaryString: string = atob(value) + // @ts-expect-error See https://developer.mozilla.org/en-US/docs/Glossary/Base64 + return Uint8Array.from(binaryString, (b) => b.codePointAt(0)) + } + + _decodeList(value: RawQueryValue[]): unknown[] { + return value.map(v => this.decodeValue(v)) + } + + _decodeNode(value: NodeShape): Node { + return new Node( + // @ts-expect-error identity doesn't return + undefined, + value._labels, + this._decodeMap(value._properties ?? {}), + value._element_id + ) + } + + _decodeRelationship(value: RelationshipShape): Relationship { + return new Relationship( + // @ts-expect-error identity doesn't return + undefined, + undefined, + undefined, + value._type, + this._decodeMap(value._properties ?? {}), + value._element_id, + value._start_node_element_id, + value._end_node_element_id + ) + } + + _decodePath(value: PathShape): Path { + const decoded = value.map(v => this.decodeValue(v)) + type SegmentAccumulator = [] | [Node] | [Node, Relationship] + type Accumulator = { acc: SegmentAccumulator, segments: PathSegment[] } + + return new Path( + decoded[0] as Node, + decoded[decoded.length - 1] as Node, + // @ts-expect-error + decoded.reduce((previous: Accumulator, current: Node | Relationship): Accumulator => { + if (previous.acc.length === 2) { + return { + acc: [current as Node], segments: [...previous.segments, + new PathSegment(previous.acc[0], previous.acc[1], current as Node)] + } + } + return { ...previous, acc: [...previous.acc, current] as SegmentAccumulator } + }, { acc: [], segments: [] }).segments + ) + } + + _normalizeInteger(integer: Integer): Integer | number | bigint { + if (this._config.useBigInt === true) { + return integer.toBigInt() + } else if (this._config.disableLosslessIntegers === true) { + return integer.toNumber() + } + return integer + } + +} \ No newline at end of file diff --git a/test/integration/config/stubs/affinity_header_return_1.stub.json b/test/integration/config/stubs/affinity_header_return_1.stub.json index 49f564f..2dfb6bf 100644 --- a/test/integration/config/stubs/affinity_header_return_1.stub.json +++ b/test/integration/config/stubs/affinity_header_return_1.stub.json @@ -7,7 +7,7 @@ "equalTo": "application/vnd.neo4j.query" }, "Accept": { - "equalTo": "application/vnd.neo4j.query, application/json" + "equalTo": "application/vnd.neo4j.query.v1.0+jsonl, application/vnd.neo4j.query, application/json" }, "Authorization": { "equalTo": "Bearer nicestTokenEver" diff --git a/test/integration/config/stubs/session_run_bearer_token_impersonated_return_1.stub.json b/test/integration/config/stubs/session_run_bearer_token_impersonated_return_1.stub.json index 0b5acfe..c1fe9e1 100644 --- a/test/integration/config/stubs/session_run_bearer_token_impersonated_return_1.stub.json +++ b/test/integration/config/stubs/session_run_bearer_token_impersonated_return_1.stub.json @@ -7,7 +7,7 @@ "equalTo": "application/vnd.neo4j.query" }, "Accept": { - "equalTo": "application/vnd.neo4j.query, application/json" + "equalTo": "application/vnd.neo4j.query.v1.0+jsonl, application/vnd.neo4j.query, application/json" }, "Authorization": { "equalTo": "Bearer nicestTokenEver" diff --git a/test/integration/config/stubs/session_run_bearer_token_return_1.stub.json b/test/integration/config/stubs/session_run_bearer_token_return_1.stub.json index 5286edf..f0e8b53 100644 --- a/test/integration/config/stubs/session_run_bearer_token_return_1.stub.json +++ b/test/integration/config/stubs/session_run_bearer_token_return_1.stub.json @@ -7,7 +7,7 @@ "equalTo": "application/vnd.neo4j.query" }, "Accept": { - "equalTo": "application/vnd.neo4j.query, application/json" + "equalTo": "application/vnd.neo4j.query.v1.0+jsonl, application/vnd.neo4j.query, application/json" }, "Authorization": { "equalTo": "Bearer nicestTokenEver" diff --git a/test/unit/http-connection/query.code.test.ts b/test/unit/http-connection/query.code.test.ts index 3ea16d4..045a553 100644 --- a/test/unit/http-connection/query.code.test.ts +++ b/test/unit/http-connection/query.code.test.ts @@ -16,7 +16,7 @@ */ import neo4j, { auth, types, internal, int, Date, Time, LocalTime, DateTime, LocalDateTime, Point, Duration, Node, Relationship, UnboundRelationship, Path, PathSegment, Vector } from "neo4j-driver-core"; -import { QueryRequestCodec, QueryRequestCodecConfig, QueryResponseCodec, RawQueryResponse, RawQueryValue } from "../../../src/http-connection/query.codec"; +import { QueryRequestCodec, QueryRequestCodecConfig, QueryResponseCodec, RawQueryResponse } from "../../../src/http-connection/query.codec"; describe('QueryRequestCodec', () => { const DEFAULT_AUTH = auth.basic('neo4j', 'password') @@ -31,10 +31,10 @@ describe('QueryRequestCodec', () => { }) describe('.accept', () => { - it('should return "application/vnd.neo4j.query, application/json"', () => { + it('should return "application/vnd.neo4j.query.v1.0+jsonl, application/vnd.neo4j.query, application/json"', () => { const codec = subject() - expect(codec.accept).toBe('application/vnd.neo4j.query, application/json') + expect(codec.accept).toBe('application/vnd.neo4j.query.v1.0+jsonl, application/vnd.neo4j.query, application/json') }) }) @@ -1983,7 +1983,7 @@ describe('QueryResponseCodec', () => { ] ].map(([_value, expected]) => [`Path (value=${_value})`, [[{ $type: 'Path', _value }]], [[expected]], config]), ]) - ])('should handle %s values', (_: string, values: any, expected: any, config?: Partial) => { + ])('should handle %s values', async (_: string, values: any, expected: any, config?: Partial) => { const codec = subject({ rawQueryResponse: { ...DEFAULT_RAW_RESPONSE, @@ -1995,10 +1995,19 @@ describe('QueryResponseCodec', () => { config }) - expect([...codec.stream()]).toEqual(expected) + const result = [] + for await (const v of codec.stream()) { + result.push(v) + } + + expect(result).toEqual(expected) // the stream should be consumed, // no data should come after - expect([...codec.stream()]).toEqual([]) + const emptyResult = [] + for await (const v of codec.stream()) { + emptyResult.push(v) + } + expect(emptyResult).toEqual([]) }) it.each( @@ -2006,7 +2015,12 @@ describe('QueryResponseCodec', () => { )('should handle %s failures', (_: string, param: SubjectParams) => { const codec = subject(param) - expect(() => codec.stream()).toThrow(codec.error) + expect(async () => { + const ignored = [] + for await (const v of codec.stream()) { + ignored.push(v) + } + }).rejects.toThrow(codec.error) }) })