From 64fc6d16b6ce87a22b487ce42823841ee6f30c0a Mon Sep 17 00:00:00 2001 From: ondratra Date: Thu, 15 Jul 2021 18:13:17 +0200 Subject: [PATCH] feat: temporary dev progress affects: @joystream/hydra-indexer-gateway, @joystream/hydra-processor --- .../indexer-status/indexer-status.resolver.ts | 34 +++++++++ packages/hydra-indexer-gateway/src/server.ts | 22 ++++++ packages/hydra-indexer-gateway/tsconfig.json | 1 + packages/hydra-processor/package.json | 6 +- packages/hydra-processor/run-tests.sh | 11 +-- .../src/ingest/GraphQLSource.ts | 70 ++++++++++++++++++- .../hydra-processor/src/queue/BlockQueue.ts | 30 +++++++- packages/hydra-processor/src/start/config.ts | 1 + .../test/fixtures/manifest.yml | 2 +- yarn.lock | 26 ++++++- 10 files changed, 190 insertions(+), 13 deletions(-) diff --git a/packages/hydra-indexer-gateway/src/modules/indexer-status/indexer-status.resolver.ts b/packages/hydra-indexer-gateway/src/modules/indexer-status/indexer-status.resolver.ts index 7ce104952..dc94a73ed 100644 --- a/packages/hydra-indexer-gateway/src/modules/indexer-status/indexer-status.resolver.ts +++ b/packages/hydra-indexer-gateway/src/modules/indexer-status/indexer-status.resolver.ts @@ -1,6 +1,7 @@ import { Resolver, Field, ObjectType, Int, Query } from 'type-graphql' import { IndexerStatusService } from './indexer-status.service' import { Inject } from 'typedi' +import { Subscription } from 'type-graphql'; @ObjectType() export class IndexerStatus { @@ -34,4 +35,37 @@ export class IndexerStatusResolver { async indexerStatus(): Promise { return this.service.currentStatus() } + + @Subscription({ +// topics: (resolverTopicData) => { +// console.log('debuuuuug', resolverTopicData) +// +// return 'someString' +// } + topics: ({ args, payload, context }) => { + console.log('dynamic topiccc', args.topic) + return args.topic + } + /* + subscribe: (root, args, context, info) => { + console.log('subbbscribe has been called!', root, args, context, info) + + //return [] + return { + [Symbol.asyncIterator]() { + yield "hello"; + } + } + }*/ + }) + StatusSubscription(indexerStatus: IndexerStatus): IndexerStatus { + return indexerStatus + } + + //@Subscription({ topics: 'user:create' }) + //createUserSubscription(@Root() user: User): User { + // return user; + //} } + + diff --git a/packages/hydra-indexer-gateway/src/server.ts b/packages/hydra-indexer-gateway/src/server.ts index b8433bb23..2b3cf11d7 100644 --- a/packages/hydra-indexer-gateway/src/server.ts +++ b/packages/hydra-indexer-gateway/src/server.ts @@ -2,6 +2,9 @@ import 'reflect-metadata' import { BaseContext, Server } from 'warthog' import { Logger } from './logger' +import WebSocket from 'ws' +import { ConnectionContext } from 'subscriptions-transport-ws' + // TODO: add authentication export interface Context extends BaseContext { user?: { @@ -12,6 +15,7 @@ export interface Context extends BaseContext { } export function getServer(appOptions = {}, dbOptions = {}): Server { + console.log('sstaaarting server') return new Server( { // Inject a fake user. In a real app you'd parse a JWT to add the user @@ -27,6 +31,24 @@ export function getServer(appOptions = {}, dbOptions = {}): Server { introspection: true, logger: Logger, ...appOptions, + + + + apolloConfig: { + subscriptions: { + path: '/mytestsubscription', + //keepAlive?: number + //onConnect: (connectionParams: Object, websocket: WebSocket, context: ConnectionContext) => { + onConnect: (connectionParams, websocket, context) => { + console.log('hurrayy! it seems subscriptions started!!') + console.log(connectionParams, websocket, context) + }, + //onDisconnect: (websocket: WebSocket, context: ConnectionContext) => { + onDisconnect: (websocket, context) => { + console.log('aaa subscriptions disconnected!!') + }, + } + } }, dbOptions ) diff --git a/packages/hydra-indexer-gateway/tsconfig.json b/packages/hydra-indexer-gateway/tsconfig.json index 676a03bda..805690f26 100644 --- a/packages/hydra-indexer-gateway/tsconfig.json +++ b/packages/hydra-indexer-gateway/tsconfig.json @@ -18,6 +18,7 @@ "strict": true, "strictNullChecks": true, "esModuleInterop": true, + "declaration": true, "types": ["isomorphic-fetch", "node"] }, "include": ["src/**/*"], diff --git a/packages/hydra-processor/package.json b/packages/hydra-processor/package.json index 6edd8a531..9679a24e1 100644 --- a/packages/hydra-processor/package.json +++ b/packages/hydra-processor/package.json @@ -42,6 +42,7 @@ "@oclif/command": "^1.8.0", "@oclif/config": "^1", "@oclif/errors": "^1.3.3", + "apollo-link-ws": "^1.0.20", "bn.js": "^5.2.0", "chalk": "^4.1.0", "delay": "^5.0.0", @@ -57,7 +58,9 @@ "prom-client": "^12.0.0", "semver": "^7.3.4", "shortid": "^2.2.16", + "subscriptions-transport-ws": "^0.9.19", "typedi": "^0.8.0", + "ws": "^7.5.3", "yaml": "^1.10.0", "yaml-validator": "^3.0.0" }, @@ -82,7 +85,6 @@ "ts-sinon": "^2.0.1", "tslib": "^2.0.3", "typeorm": "^0.2.34", - "typescript": "^3.8", - "ws": "^7.5.0" + "typescript": "^3.8" } } diff --git a/packages/hydra-processor/run-tests.sh b/packages/hydra-processor/run-tests.sh index 159b31698..798eade18 100644 --- a/packages/hydra-processor/run-tests.sh +++ b/packages/hydra-processor/run-tests.sh @@ -8,7 +8,7 @@ cleanup() { yarn pm2 stop processorProcess > /dev/null # turn off docker containers - docker-compose -f docker-compose-test.yml down + #docker-compose -f docker-compose-test.yml down } startupDocker() { @@ -17,6 +17,7 @@ startupDocker() { echo "starting db, please wait" sleep 2 # wait for db to startup docker-compose -f docker-compose-test.yml up -d + #docker-compose -f docker-compose-test.yml up -d hydra-indexer } buildProcessor() { @@ -45,15 +46,15 @@ export DB_PASS=postgres export INDEXER_ENDPOINT_URL=http://localhost:4002/graphql # ensure docker depencency images exist -docker build ../../ -t hydra-builder:latest -yarn workspace @joystream/hydra-indexer docker:build -yarn workspace @joystream/hydra-indexer-gateway docker:build +#docker build ../../ -t hydra-builder:latest +#yarn workspace @joystream/hydra-indexer docker:build +#yarn workspace @joystream/hydra-indexer-gateway docker:build # start preparation startupDocker buildProcessor -#exit 1 # uncomment during debugging and run rest of commands manually as you need +exit 1 # uncomment during debugging and run rest of commands manually as you need startProcessor diff --git a/packages/hydra-processor/src/ingest/GraphQLSource.ts b/packages/hydra-processor/src/ingest/GraphQLSource.ts index ef012158a..6feacbf15 100644 --- a/packages/hydra-processor/src/ingest/GraphQLSource.ts +++ b/packages/hydra-processor/src/ingest/GraphQLSource.ts @@ -13,6 +13,10 @@ import { GraphQLQuery, IndexerQuery } from './IProcessorSource' import { IndexerStatus } from '../state' import { collectNamedQueries } from './graphql-query-builder' import { compact } from 'lodash' +//import { WebSocketLink } from 'apollo-link-ws' // yarn add apollo-link-ws +import { SubscriptionClient, Observable, Observer } from 'subscriptions-transport-ws' +import ws from 'ws' +import { ExecutionResult } from 'graphql/execution/execute' const debug = Debug('hydra-processor:graphql-source') @@ -56,8 +60,70 @@ export class GraphQLSource implements IProcessorSource { // TODO: implement /* eslint-disable-next-line @typescript-eslint/no-unused-vars */ - subscribe(events: string[]): Promise { - throw new Error('Method not implemented.') + async subscribe(events: string[]): Promise { + + //throw new Error('Method not implemented.') + + // http://localhost:4002/graphql // indexer url + + /* + const wsLink = new WebSocketLink({ + //uri: 'ws://localhost:4000/subscriptions', + uri: 'ws://localhost:4002/graphql', + options: { + reconnect: true + } + }); + */ + //const uri = 'wss://localhost:4002/graphql' + //const uri = 'ws://localhost:4002/graphql' + //const uri = 'ws://localhost:4100/graphql' + const uri = 'ws://localhost:4002/mytestsubscription' + + + const subscriptionOptions = { + lazy: false, + reconnect: true, + } + const client = new SubscriptionClient(uri, subscriptionOptions, ws) + console.log(client.status) + client.onConnected(() => {console.log('connected', client.status)}) + + /* + const subscriptionQuery = ` + subscription OnCommentAdded($postID: ID!) { + commentAdded(postID: $postID) { + id + content + } + } + ` + */ + // inspired by https://www.apollographql.com/docs/apollo-server/data/subscriptions/ + const subscriptionQuery = ` + subscription StatusSubscription { + indexerStatus { + chainHeight + } + } + ` + + const request = { + query: subscriptionQuery + } + + //client.request(request).subscribe((observer: Observer) => {}) + client.request(request).subscribe({ + next: (value) => {console.log('neeeext', value)}, + error: (error) => { + console.log('eeeeerror', error) + console.trace() + }, + complete: () => {console.log('coooomplete')}, + }) +console.log('qqqqqqqqa') + //throw new Error('Good throw :)') + //throw new Error('Method not implemented.') } async getIndexerStatus(): Promise { diff --git a/packages/hydra-processor/src/queue/BlockQueue.ts b/packages/hydra-processor/src/queue/BlockQueue.ts index fb4c088e4..c058946f4 100644 --- a/packages/hydra-processor/src/queue/BlockQueue.ts +++ b/packages/hydra-processor/src/queue/BlockQueue.ts @@ -96,14 +96,39 @@ export class BlockQueue implements IBlockQueue { info('Starting the event queue') - await Promise.all([this.pollIndexer(), this.fill()]) + //await Promise.all([this.pollIndexer(), this.fill()]) + + + this.subscribeToIndexer() + + await this.fill() } stop(): void { this._started = false } + async subscribeToIndexer(): Promise { + info('Subscribing to indexer') + + this.dataSource.subscribe([]) // TODO: enumerate events + + eventEmitter.on(ProcessorEvents.NEW_INDEXER_HEAD, (h: number) => { + debug(`New Indexer Head: ${h}`) + console.log('NIIIIICE', h) + //this.indexerHead = h + }) + } + + +/* async pollIndexer(): Promise { + + eventEmitter.on(ProcessorEvents.NEW_INDEXER_HEAD, (h: number) => { + debug(`New Indexer Head: ${h}`) + this.indexerHead = h + }); + // TODO: uncomment this block when eventSource will emit // this.eventsSource.on('NewIndexerHead', (h: number) => { // debug(`New Indexer Head: ${h}`) @@ -119,6 +144,7 @@ export class BlockQueue implements IBlockQueue { await delay(conf().POLL_INTERVAL_MS) } } + */ hasNext(): boolean { return this._hasNext @@ -231,7 +257,7 @@ export class BlockQueue implements IBlockQueue { debug( `Event queue state: \tIndexer head: ${this.indexerStatus.head} - \tChain head: ${this.indexerStatus.chainHeight} + \tChain head: ${this.indexerStatus.chainHeight} \tQueue size: ${this.eventQueue.length} \tLast fetched event: ${this.rangeFilter.id.gt} \tBlock range: ${JSON.stringify(this.rangeFilter.block)}` diff --git a/packages/hydra-processor/src/start/config.ts b/packages/hydra-processor/src/start/config.ts index d8ec37dcd..33a114ab5 100644 --- a/packages/hydra-processor/src/start/config.ts +++ b/packages/hydra-processor/src/start/config.ts @@ -19,6 +19,7 @@ let conf: { // the query tries to events from the current block to block + BLOCK_WINDOW BLOCK_WINDOW: number PROCESSOR_NAME: string + // TODO: remove // Interval at which the processor pulls new blocks from the database // The interval is reasonably large by default. The trade-off is the latency // between the updates and the load to the database diff --git a/packages/hydra-processor/test/fixtures/manifest.yml b/packages/hydra-processor/test/fixtures/manifest.yml index 2cbf6ab2a..d9213a95e 100644 --- a/packages/hydra-processor/test/fixtures/manifest.yml +++ b/packages/hydra-processor/test/fixtures/manifest.yml @@ -10,7 +10,7 @@ entities: - test/fixtures/test-entities.ts mappings: mappingsModule: test/fixtures/test-mappings.ts - range: '[, 345]' + range: '[, 34500]' imports: - test/fixtures/test-types.ts eventHandlers: diff --git a/yarn.lock b/yarn.lock index e69bd9d32..e6b0491d0 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3696,6 +3696,14 @@ apollo-link-http@^1.5.16: apollo-link-http-common "^0.2.16" tslib "^1.9.3" +apollo-link-ws@^1.0.20: + version "1.0.20" + resolved "https://registry.yarnpkg.com/apollo-link-ws/-/apollo-link-ws-1.0.20.tgz#dfad44121f8445c6d7b7f8101a1b24813ba008ed" + integrity sha512-mjSFPlQxmoLArpHBeUb2Xj+2HDYeTaJqFGOqQ+I8NVJxgL9lJe84PDWcPah/yMLv3rB7QgBDSuZ0xoRFBPlySw== + dependencies: + apollo-link "^1.2.14" + tslib "^1.9.3" + apollo-link@^1.2.14, apollo-link@^1.2.3: version "1.2.14" resolved "https://registry.yarnpkg.com/apollo-link/-/apollo-link-1.2.14.tgz#3feda4b47f9ebba7f4160bef8b977ba725b684d9" @@ -14548,6 +14556,17 @@ subscriptions-transport-ws@^0.9.11, subscriptions-transport-ws@^0.9.16: symbol-observable "^1.0.4" ws "^5.2.0" +subscriptions-transport-ws@^0.9.19: + version "0.9.19" + resolved "https://registry.yarnpkg.com/subscriptions-transport-ws/-/subscriptions-transport-ws-0.9.19.tgz#10ca32f7e291d5ee8eb728b9c02e43c52606cdcf" + integrity sha512-dxdemxFFB0ppCLg10FTtRqH/31FNRL1y1BQv8209MK5I4CwALb7iihQg+7p65lFcIl8MHatINWBLOqpgU4Kyyw== + dependencies: + backo2 "^1.0.2" + eventemitter3 "^3.1.0" + iterall "^1.2.1" + symbol-observable "^1.0.4" + ws "^5.2.0 || ^6.0.0 || ^7.0.0" + subsume@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/subsume/-/subsume-3.0.0.tgz#22c92730f441ad72ee9af4bdad42dc4ff830cfaf" @@ -15999,6 +16018,11 @@ ws@^5.2.0: dependencies: async-limiter "~1.0.0" +"ws@^5.2.0 || ^6.0.0 || ^7.0.0", ws@^7.5.3: + version "7.5.3" + resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.3.tgz#160835b63c7d97bfab418fc1b8a9fced2ac01a74" + integrity sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg== + ws@^6.0.0: version "6.2.1" resolved "https://registry.yarnpkg.com/ws/-/ws-6.2.1.tgz#442fdf0a47ed64f59b6a5d8ff130f4748ed524fb" @@ -16006,7 +16030,7 @@ ws@^6.0.0: dependencies: async-limiter "~1.0.0" -ws@^7.0.0, ws@^7.5.0: +ws@^7.0.0: version "7.5.0" resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.0.tgz#0033bafea031fb9df041b2026fc72a571ca44691" integrity sha512-6ezXvzOZupqKj4jUqbQ9tXuJNo+BR2gU8fFRk3XCP3e0G6WT414u5ELe6Y0vtp7kmSJ3F7YWObSNr1ESsgi4vw==