Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Resolver, Field, ObjectType, Int, Query } from 'type-graphql'
import { IndexerStatusService } from './indexer-status.service'
import { Inject } from 'typedi'
import { Subscription } from 'type-graphql';

@ObjectType()
export class IndexerStatus {
Expand Down Expand Up @@ -34,4 +35,37 @@ export class IndexerStatusResolver {
async indexerStatus(): Promise<IndexerStatus> {
return this.service.currentStatus()
}

@Subscription({
// topics: (resolverTopicData) => {
// console.log('debuuuuug', resolverTopicData)
//
// return 'someString'
// }
topics: ({ args, payload, context }) => {
console.log('dynamic topiccc', args.topic)
return args.topic
}
/*
subscribe: (root, args, context, info) => {
console.log('subbbscribe has been called!', root, args, context, info)

//return []
return {
[Symbol.asyncIterator]() {
yield "hello";
}
}
}*/
})
StatusSubscription(indexerStatus: IndexerStatus): IndexerStatus {
return indexerStatus
}

//@Subscription({ topics: 'user:create' })
//createUserSubscription(@Root() user: User): User {
// return user;
//}
}


22 changes: 22 additions & 0 deletions packages/hydra-indexer-gateway/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import 'reflect-metadata'
import { BaseContext, Server } from 'warthog'
import { Logger } from './logger'

import WebSocket from 'ws'
import { ConnectionContext } from 'subscriptions-transport-ws'

// TODO: add authentication
export interface Context extends BaseContext {
user?: {
Expand All @@ -12,6 +15,7 @@ export interface Context extends BaseContext {
}

export function getServer(appOptions = {}, dbOptions = {}): Server<Context> {
console.log('sstaaarting server')
return new Server<Context>(
{
// Inject a fake user. In a real app you'd parse a JWT to add the user
Expand All @@ -27,6 +31,24 @@ export function getServer(appOptions = {}, dbOptions = {}): Server<Context> {
introspection: true,
logger: Logger,
...appOptions,



apolloConfig: {
subscriptions: {
path: '/mytestsubscription',
//keepAlive?: number
//onConnect: (connectionParams: Object, websocket: WebSocket, context: ConnectionContext) => {
onConnect: (connectionParams, websocket, context) => {
console.log('hurrayy! it seems subscriptions started!!')
console.log(connectionParams, websocket, context)
},
//onDisconnect: (websocket: WebSocket, context: ConnectionContext) => {
onDisconnect: (websocket, context) => {
console.log('aaa subscriptions disconnected!!')
},
}
}
},
dbOptions
)
Expand Down
1 change: 1 addition & 0 deletions packages/hydra-indexer-gateway/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"strict": true,
"strictNullChecks": true,
"esModuleInterop": true,
"declaration": true,
"types": ["isomorphic-fetch", "node"]
},
"include": ["src/**/*"],
Expand Down
6 changes: 4 additions & 2 deletions packages/hydra-processor/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@oclif/command": "^1.8.0",
"@oclif/config": "^1",
"@oclif/errors": "^1.3.3",
"apollo-link-ws": "^1.0.20",
"bn.js": "^5.2.0",
"chalk": "^4.1.0",
"delay": "^5.0.0",
Expand All @@ -57,7 +58,9 @@
"prom-client": "^12.0.0",
"semver": "^7.3.4",
"shortid": "^2.2.16",
"subscriptions-transport-ws": "^0.9.19",
"typedi": "^0.8.0",
"ws": "^7.5.3",
"yaml": "^1.10.0",
"yaml-validator": "^3.0.0"
},
Expand All @@ -82,7 +85,6 @@
"ts-sinon": "^2.0.1",
"tslib": "^2.0.3",
"typeorm": "^0.2.34",
"typescript": "^3.8",
"ws": "^7.5.0"
"typescript": "^3.8"
}
}
11 changes: 6 additions & 5 deletions packages/hydra-processor/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ cleanup() {
yarn pm2 stop processorProcess > /dev/null

# turn off docker containers
docker-compose -f docker-compose-test.yml down
#docker-compose -f docker-compose-test.yml down
}

startupDocker() {
Expand All @@ -17,6 +17,7 @@ startupDocker() {
echo "starting db, please wait"
sleep 2 # wait for db to startup
docker-compose -f docker-compose-test.yml up -d
#docker-compose -f docker-compose-test.yml up -d hydra-indexer
}

buildProcessor() {
Expand Down Expand Up @@ -45,15 +46,15 @@ export DB_PASS=postgres
export INDEXER_ENDPOINT_URL=http://localhost:4002/graphql

# ensure docker depencency images exist
docker build ../../ -t hydra-builder:latest
yarn workspace @joystream/hydra-indexer docker:build
yarn workspace @joystream/hydra-indexer-gateway docker:build
#docker build ../../ -t hydra-builder:latest
#yarn workspace @joystream/hydra-indexer docker:build
#yarn workspace @joystream/hydra-indexer-gateway docker:build

# start preparation
startupDocker
buildProcessor

#exit 1 # uncomment during debugging and run rest of commands manually as you need
exit 1 # uncomment during debugging and run rest of commands manually as you need

startProcessor

Expand Down
70 changes: 68 additions & 2 deletions packages/hydra-processor/src/ingest/GraphQLSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import { GraphQLQuery, IndexerQuery } from './IProcessorSource'
import { IndexerStatus } from '../state'
import { collectNamedQueries } from './graphql-query-builder'
import { compact } from 'lodash'
//import { WebSocketLink } from 'apollo-link-ws' // yarn add apollo-link-ws
import { SubscriptionClient, Observable, Observer } from 'subscriptions-transport-ws'
import ws from 'ws'
import { ExecutionResult } from 'graphql/execution/execute'

const debug = Debug('hydra-processor:graphql-source')

Expand Down Expand Up @@ -56,8 +60,70 @@ export class GraphQLSource implements IProcessorSource {

// TODO: implement
/* eslint-disable-next-line @typescript-eslint/no-unused-vars */
subscribe(events: string[]): Promise<void> {
throw new Error('Method not implemented.')
async subscribe(events: string[]): Promise<void> {

//throw new Error('Method not implemented.')

// http://localhost:4002/graphql // indexer url

/*
const wsLink = new WebSocketLink({
//uri: 'ws://localhost:4000/subscriptions',
uri: 'ws://localhost:4002/graphql',
options: {
reconnect: true
}
});
*/
//const uri = 'wss://localhost:4002/graphql'
//const uri = 'ws://localhost:4002/graphql'
//const uri = 'ws://localhost:4100/graphql'
const uri = 'ws://localhost:4002/mytestsubscription'


const subscriptionOptions = {
lazy: false,
reconnect: true,
}
const client = new SubscriptionClient(uri, subscriptionOptions, ws)
console.log(client.status)
client.onConnected(() => {console.log('connected', client.status)})

/*
const subscriptionQuery = `
subscription OnCommentAdded($postID: ID!) {
commentAdded(postID: $postID) {
id
content
}
}
`
*/
// inspired by https://www.apollographql.com/docs/apollo-server/data/subscriptions/
const subscriptionQuery = `
subscription StatusSubscription {
indexerStatus {
chainHeight
}
}
`

const request = {
query: subscriptionQuery
}

//client.request(request).subscribe((observer: Observer<ExecutionResult>) => {})
client.request(request).subscribe({
next: (value) => {console.log('neeeext', value)},
error: (error) => {
console.log('eeeeerror', error)
console.trace()
},
complete: () => {console.log('coooomplete')},
})
console.log('qqqqqqqqa')
//throw new Error('Good throw :)')
//throw new Error('Method not implemented.')
}

async getIndexerStatus(): Promise<IndexerStatus> {
Expand Down
30 changes: 28 additions & 2 deletions packages/hydra-processor/src/queue/BlockQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,39 @@ export class BlockQueue implements IBlockQueue {

info('Starting the event queue')

await Promise.all([this.pollIndexer(), this.fill()])
//await Promise.all([this.pollIndexer(), this.fill()])


this.subscribeToIndexer()

await this.fill()
}

stop(): void {
this._started = false
}

async subscribeToIndexer(): Promise<void> {
info('Subscribing to indexer')

this.dataSource.subscribe([]) // TODO: enumerate events

eventEmitter.on(ProcessorEvents.NEW_INDEXER_HEAD, (h: number) => {
debug(`New Indexer Head: ${h}`)
console.log('NIIIIICE', h)
//this.indexerHead = h
})
}


/*
async pollIndexer(): Promise<void> {

eventEmitter.on(ProcessorEvents.NEW_INDEXER_HEAD, (h: number) => {
debug(`New Indexer Head: ${h}`)
this.indexerHead = h
});

// TODO: uncomment this block when eventSource will emit
// this.eventsSource.on('NewIndexerHead', (h: number) => {
// debug(`New Indexer Head: ${h}`)
Expand All @@ -119,6 +144,7 @@ export class BlockQueue implements IBlockQueue {
await delay(conf().POLL_INTERVAL_MS)
}
}
*/

hasNext(): boolean {
return this._hasNext
Expand Down Expand Up @@ -231,7 +257,7 @@ export class BlockQueue implements IBlockQueue {
debug(
`Event queue state:
\tIndexer head: ${this.indexerStatus.head}
\tChain head: ${this.indexerStatus.chainHeight}
\tChain head: ${this.indexerStatus.chainHeight}
\tQueue size: ${this.eventQueue.length}
\tLast fetched event: ${this.rangeFilter.id.gt}
\tBlock range: ${JSON.stringify(this.rangeFilter.block)}`
Expand Down
1 change: 1 addition & 0 deletions packages/hydra-processor/src/start/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ let conf: {
// the query tries to events from the current block to block + BLOCK_WINDOW
BLOCK_WINDOW: number
PROCESSOR_NAME: string
// TODO: remove
// Interval at which the processor pulls new blocks from the database
// The interval is reasonably large by default. The trade-off is the latency
// between the updates and the load to the database
Expand Down
2 changes: 1 addition & 1 deletion packages/hydra-processor/test/fixtures/manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ entities:
- test/fixtures/test-entities.ts
mappings:
mappingsModule: test/fixtures/test-mappings.ts
range: '[, 345]'
range: '[, 34500]'
imports:
- test/fixtures/test-types.ts
eventHandlers:
Expand Down
26 changes: 25 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3696,6 +3696,14 @@ apollo-link-http@^1.5.16:
apollo-link-http-common "^0.2.16"
tslib "^1.9.3"

apollo-link-ws@^1.0.20:
version "1.0.20"
resolved "https://registry.yarnpkg.com/apollo-link-ws/-/apollo-link-ws-1.0.20.tgz#dfad44121f8445c6d7b7f8101a1b24813ba008ed"
integrity sha512-mjSFPlQxmoLArpHBeUb2Xj+2HDYeTaJqFGOqQ+I8NVJxgL9lJe84PDWcPah/yMLv3rB7QgBDSuZ0xoRFBPlySw==
dependencies:
apollo-link "^1.2.14"
tslib "^1.9.3"

apollo-link@^1.2.14, apollo-link@^1.2.3:
version "1.2.14"
resolved "https://registry.yarnpkg.com/apollo-link/-/apollo-link-1.2.14.tgz#3feda4b47f9ebba7f4160bef8b977ba725b684d9"
Expand Down Expand Up @@ -14548,6 +14556,17 @@ subscriptions-transport-ws@^0.9.11, subscriptions-transport-ws@^0.9.16:
symbol-observable "^1.0.4"
ws "^5.2.0"

subscriptions-transport-ws@^0.9.19:
version "0.9.19"
resolved "https://registry.yarnpkg.com/subscriptions-transport-ws/-/subscriptions-transport-ws-0.9.19.tgz#10ca32f7e291d5ee8eb728b9c02e43c52606cdcf"
integrity sha512-dxdemxFFB0ppCLg10FTtRqH/31FNRL1y1BQv8209MK5I4CwALb7iihQg+7p65lFcIl8MHatINWBLOqpgU4Kyyw==
dependencies:
backo2 "^1.0.2"
eventemitter3 "^3.1.0"
iterall "^1.2.1"
symbol-observable "^1.0.4"
ws "^5.2.0 || ^6.0.0 || ^7.0.0"

subsume@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/subsume/-/subsume-3.0.0.tgz#22c92730f441ad72ee9af4bdad42dc4ff830cfaf"
Expand Down Expand Up @@ -15999,14 +16018,19 @@ ws@^5.2.0:
dependencies:
async-limiter "~1.0.0"

"ws@^5.2.0 || ^6.0.0 || ^7.0.0", ws@^7.5.3:
version "7.5.3"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.3.tgz#160835b63c7d97bfab418fc1b8a9fced2ac01a74"
integrity sha512-kQ/dHIzuLrS6Je9+uv81ueZomEwH0qVYstcAQ4/Z93K8zeko9gtAbttJWzoC5ukqXY1PpoouV3+VSOqEAFt5wg==

ws@^6.0.0:
version "6.2.1"
resolved "https://registry.yarnpkg.com/ws/-/ws-6.2.1.tgz#442fdf0a47ed64f59b6a5d8ff130f4748ed524fb"
integrity sha512-GIyAXC2cB7LjvpgMt9EKS2ldqr0MTrORaleiOno6TweZ6r3TKtoFQWay/2PceJ3RuBasOHzXNn5Lrw1X0bEjqA==
dependencies:
async-limiter "~1.0.0"

ws@^7.0.0, ws@^7.5.0:
ws@^7.0.0:
version "7.5.0"
resolved "https://registry.yarnpkg.com/ws/-/ws-7.5.0.tgz#0033bafea031fb9df041b2026fc72a571ca44691"
integrity sha512-6ezXvzOZupqKj4jUqbQ9tXuJNo+BR2gU8fFRk3XCP3e0G6WT414u5ELe6Y0vtp7kmSJ3F7YWObSNr1ESsgi4vw==
Expand Down