diff --git a/.github/workflows/codeql-remove.yml b/.github/workflows/codeql-remove.yml new file mode 100644 index 00000000..ad4ce095 --- /dev/null +++ b/.github/workflows/codeql-remove.yml @@ -0,0 +1,101 @@ +# For most projects, this workflow file will not need changing; you simply need +# to commit it to your repository. +# +# You may wish to alter this file to override the set of languages analyzed, +# or to provide custom queries or build logic. +# +# ******** NOTE ******** +# We have attempted to detect the languages in your repository. Please check +# the `language` matrix defined below to confirm you have the correct set of +# supported CodeQL languages. +# +name: "CodeQL Advanced" + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + schedule: + - cron: '33 4 * * 6' + +jobs: + analyze: + name: Analyze (${{ matrix.language }}) + # Runner size impacts CodeQL analysis time. To learn more, please see: + # - https://gh.io/recommended-hardware-resources-for-running-codeql + # - https://gh.io/supported-runners-and-hardware-resources + # - https://gh.io/using-larger-runners (GitHub.com only) + # Consider using larger runners or machines with greater resources for possible analysis time improvements. + runs-on: ${{ (matrix.language == 'swift' && 'macos-latest') || 'ubuntu-latest' }} + permissions: + # required for all workflows + security-events: write + + # required to fetch internal or private CodeQL packs + packages: read + + # only required for workflows in private repositories + actions: read + contents: read + + strategy: + fail-fast: false + matrix: + include: + - language: actions + build-mode: none + - language: go + build-mode: autobuild + # CodeQL supports the following values keywords for 'language': 'actions', 'c-cpp', 'csharp', 'go', 'java-kotlin', 'javascript-typescript', 'python', 'ruby', 'rust', 'swift' + # Use `c-cpp` to analyze code written in C, C++ or both + # Use 'java-kotlin' to analyze code written in Java, Kotlin or both + # Use 'javascript-typescript' to analyze code written in JavaScript, TypeScript or both + # To learn more about changing the languages that are analyzed or customizing the build mode for your analysis, + # see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/customizing-your-advanced-setup-for-code-scanning. + # If you are analyzing a compiled language, you can modify the 'build-mode' for that language to customize how + # your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + # Add any setup steps before running the `github/codeql-action/init` action. + # This includes steps like installing compilers or runtimes (`actions/setup-node` + # or others). This is typically only required for manual builds. + # - name: Setup runtime (example) + # uses: actions/setup-example@v1 + + # Initializes the CodeQL tools for scanning. + - name: Initialize CodeQL + uses: github/codeql-action/init@v4 + with: + languages: ${{ matrix.language }} + build-mode: ${{ matrix.build-mode }} + # If you wish to specify custom queries, you can do so here or in a config file. + # By default, queries listed here will override any specified in a config file. + # Prefix the list here with "+" to use these queries and those in the config file. + + # For more details on CodeQL's query packs, refer to: https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs + # queries: security-extended,security-and-quality + + # If the analyze step fails for one of the languages you are analyzing with + # "We were unable to automatically build your code", modify the matrix above + # to set the build mode to "manual" for that language. Then modify this step + # to build your code. + # â„šī¸ Command-line programs to run using the OS shell. + # 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun + - name: Run manual build steps + if: matrix.build-mode == 'manual' + shell: bash + run: | + echo 'If you are using a "manual" build mode for one or more of the' \ + 'languages you are analyzing, replace this with the commands to build' \ + 'your code, for example:' + echo ' make bootstrap' + echo ' make release' + exit 1 + + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v4 + with: + category: "/language:${{matrix.language}}" diff --git a/.github/workflows/sync-labels.yml b/.github/workflows/sync-labels.yml index 2f391573..04db1874 100644 --- a/.github/workflows/sync-labels.yml +++ b/.github/workflows/sync-labels.yml @@ -12,7 +12,7 @@ jobs: sync-labels: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - uses: micnncim/action-label-syncer@v1.3.0 env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore index 54032572..bbb3c760 100644 --- a/.gitignore +++ b/.gitignore @@ -15,7 +15,7 @@ *.out # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ # Go workspace file go.work @@ -38,3 +38,7 @@ go.work.sum # OSX .DS_Store + +# AI context +CLAUDE.md +AGENT.md \ No newline at end of file diff --git a/api/openapi/paths/non_admin/request-bodies.yaml b/api/openapi/paths/non_admin/request-bodies.yaml index 64ad8bdd..88b40a46 100644 --- a/api/openapi/paths/non_admin/request-bodies.yaml +++ b/api/openapi/paths/non_admin/request-bodies.yaml @@ -26,9 +26,15 @@ components: type: number format: double description: 'Timestamp or sequence number from which to start synchronization' + limit: + type: integer + format: uint32 + default: 0 + description: 'Maximum number of items to return' required: - version - since + - limit RequestForeignGASPNodeBody: content: @@ -37,14 +43,14 @@ components: type: object required: - graphID - - txID + - txid - outputIndex properties: graphID: type: string - description: The graph ID in the format of "txID.outputIndex" + description: The graph ID in the format of "txid.outputIndex" example: "0000000000000000000000000000000000000000000000000000000000000000.1" - txID: + txid: type: string description: The transaction ID example: "0000000000000000000000000000000000000000000000000000000000000000" diff --git a/go.mod b/go.mod index fea2399f..7b19bb44 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,17 @@ module github.com/bsv-blockchain/go-overlay-services go 1.24.3 require ( - github.com/bsv-blockchain/go-sdk v1.2.1 + github.com/bsv-blockchain/go-sdk v1.2.10 github.com/bsv-blockchain/universal-test-vectors v0.5.0 github.com/go-resty/resty/v2 v2.16.5 - github.com/go-viper/mapstructure/v2 v2.3.0 - github.com/gofiber/fiber/v2 v2.52.8 + github.com/go-viper/mapstructure/v2 v2.4.0 + github.com/gofiber/fiber/v2 v2.52.9 github.com/google/uuid v1.6.0 github.com/mitchellh/mapstructure v1.5.0 github.com/oapi-codegen/runtime v1.1.1 github.com/spf13/viper v1.20.1 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 + golang.org/x/sync v0.17.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -54,14 +55,15 @@ require ( github.com/vmware-labs/yaml-jsonpath v0.3.2 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.9.0 // indirect - golang.org/x/crypto v0.39.0 // indirect - golang.org/x/mod v0.25.0 // indirect - golang.org/x/net v0.40.0 // indirect - golang.org/x/sync v0.15.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.26.0 // indirect - golang.org/x/tools v0.33.0 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/mod v0.27.0 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sys v0.36.0 // indirect + golang.org/x/text v0.29.0 // indirect + golang.org/x/tools v0.36.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) tool github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen + +replace github.com/bsv-blockchain/go-sdk => github.com/bsv-blockchain/go-sdk v1.2.11-0.20250920022700-8e346816aa35 diff --git a/go.sum b/go.sum index 6c809d98..413037bd 100644 --- a/go.sum +++ b/go.sum @@ -4,8 +4,8 @@ github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOL github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= -github.com/bsv-blockchain/go-sdk v1.2.1 h1:yQXHpHPIxoyvU+DERzaSuUEQF/ejVjcsdPhNV+z8OTI= -github.com/bsv-blockchain/go-sdk v1.2.1/go.mod h1:v//5tDobbCNhhZvHlEyP8SvuE+N3UFpWToH0+lOw9QM= +github.com/bsv-blockchain/go-sdk v1.2.11-0.20250920022700-8e346816aa35 h1:FaVMk3TXWDCGdgRaihmbKLBi/n1nhCPZcyA/a+9BguM= +github.com/bsv-blockchain/go-sdk v1.2.11-0.20250920022700-8e346816aa35/go.mod h1:C1r7iZbRUCbC015GjbhcpwH0jL5ubPn5XaQgjvUaPdU= github.com/bsv-blockchain/universal-test-vectors v0.5.0 h1:DhMyIR0wl4Krnh2hoLBWou/3FfCcXCFOolSOrADpO50= github.com/bsv-blockchain/universal-test-vectors v0.5.0/go.mod h1:x/t+oK2TganJmNd1utrwayHxaBE6wR5+R6J/4bR2HGg= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= @@ -34,10 +34,10 @@ github.com/go-resty/resty/v2 v2.16.5/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/go-test/deep v1.0.8 h1:TDsG77qcSprGbC6vTN8OuXp5g+J+b5Pcguhf7Zt61VM= github.com/go-test/deep v1.0.8/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= -github.com/go-viper/mapstructure/v2 v2.3.0 h1:27XbWsHIqhbdR5TIC911OfYvgSaW93HM+dX7970Q7jk= -github.com/go-viper/mapstructure/v2 v2.3.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= -github.com/gofiber/fiber/v2 v2.52.8 h1:xl4jJQ0BV5EJTA2aWiKw/VddRpHrKeZLF0QPUxqn0x4= -github.com/gofiber/fiber/v2 v2.52.8/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw= +github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= @@ -141,8 +141,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/tinylib/msgp v1.2.5 h1:WeQg1whrXRFiZusidTQqzETkRpGjFjcIhW6uqWH09po= @@ -165,11 +165,11 @@ go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTV golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= -golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.25.0 h1:n7a+ZbQKQA/Ysbyb0/6IbB1H/X41mKgbhfv7AfG/44w= -golang.org/x/mod v0.25.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/mod v0.27.0 h1:kb+q2PyFnEADO2IEF935ehFUXlWiNjJWtRNgBLSfbxQ= +golang.org/x/mod v0.27.0/go.mod h1:rWI627Fq0DEoudcK+MBkNkCe0EetEaDSwJJkCcjpazc= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -177,13 +177,13 @@ golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= -golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= -golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -199,23 +199,23 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= -golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc= -golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI= +golang.org/x/tools v0.36.0 h1:kWS0uv/zsvHEle1LbV5LE8QujrxB3wfQyxHfhOk0Qkg= +golang.org/x/tools v0.36.0/go.mod h1:WBDiHKJK8YgLHlcQPYQzNCkUxUypCaa5ZegCVutKm+s= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/pkg/core/engine/engine.go b/pkg/core/engine/engine.go index 2378b1c1..337098ea 100644 --- a/pkg/core/engine/engine.go +++ b/pkg/core/engine/engine.go @@ -22,7 +22,7 @@ import ( "github.com/bsv-blockchain/go-sdk/transaction/chaintracker" ) -const DEFAULT_GASP_SYNC_LIMIT = 10000 +const DEFAULT_GASP_SYNC_LIMIT = 1000 var TRUE = true var FALSE = false @@ -42,6 +42,20 @@ const ( SyncConfigurationNone ) +// String returns the string representation of SyncConfigurationType +func (s SyncConfigurationType) String() string { + switch s { + case SyncConfigurationPeers: + return "Peers" + case SyncConfigurationSHIP: + return "SHIP" + case SyncConfigurationNone: + return "None" + default: + return "Unknown" + } +} + type SyncConfiguration struct { Type SyncConfigurationType Peers []string @@ -132,7 +146,6 @@ var ErrMissingOutput = errors.New("missing-output") var ErrInputSpent = errors.New("input-spent") func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode SumbitMode, onSteakReady OnSteakReady) (overlay.Steak, error) { - start := time.Now() for _, topic := range taggedBEEF.Topics { if _, ok := e.Managers[topic]; !ok { slog.Error("unknown topic in Submit", "topic", topic, "error", ErrUnknownTopic) @@ -149,15 +162,13 @@ func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode slog.Error("invalid BEEF in Submit - tx is nil", "error", ErrInvalidBeef) return nil, ErrInvalidBeef } - if valid, err := spv.Verify(tx, e.ChainTracker, nil); err != nil { + if valid, err := spv.Verify(ctx, tx, e.ChainTracker, nil); err != nil { slog.Error("SPV verification failed in Submit", "txid", txid, "error", err) return nil, err } else if !valid { slog.Error("invalid transaction in Submit", "txid", txid, "error", ErrInvalidTransaction) return nil, ErrInvalidTransaction } - slog.Debug("transaction validated", "duration", time.Since(start)) - start = time.Now() steak := make(overlay.Steak, len(taggedBEEF.Topics)) topicInputs := make(map[string]map[uint32]*Output, len(tx.Inputs)) inpoints := make([]*transaction.Outpoint, 0, len(tx.Inputs)) @@ -199,15 +210,13 @@ func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode } if admit, err := e.Managers[topic].IdentifyAdmissibleOutputs(ctx, taggedBEEF.Beef, previousCoins); err != nil { - slog.Error("failed to identify admissible outputs", "topic", topic, "error", err) + slog.Error("failed to identify admissible outputs", "txid", txid.String(), "topic", topic, "mode", string(mode), "error", err) return nil, err } else { - slog.Debug("admissible outputs identified", "duration", time.Since(start)) - start = time.Now() if len(admit.AncillaryTxids) > 0 { ancillaryBeef := transaction.Beef{ Version: transaction.BEEF_V2, - Transactions: make(map[string]*transaction.BeefTx, len(admit.AncillaryTxids)), + Transactions: make(map[chainhash.Hash]*transaction.BeefTx, len(admit.AncillaryTxids)), } for _, txid := range admit.AncillaryTxids { if tx := beef.FindTransaction(txid.String()); tx == nil { @@ -238,17 +247,25 @@ func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode if _, ok := dupeTopics[topic]; ok { continue } - if err := e.Storage.MarkUTXOsAsSpent(ctx, inpoints, topic, txid); err != nil { - slog.Error("failed to mark UTXOs as spent", "topic", topic, "txid", txid, "error", err) - return nil, err + // Build list of inputs that actually exist in this topic's storage + if len(topicInputs[topic]) > 0 { + topicInpoints := make([]*transaction.Outpoint, 0, len(topicInputs[topic])) + for _, output := range topicInputs[topic] { + topicInpoints = append(topicInpoints, &output.Outpoint) + } + if err := e.Storage.MarkUTXOsAsSpent(ctx, topicInpoints, topic, txid); err != nil { + slog.Error("failed to mark UTXOs as spent", "topic", topic, "txid", txid, "error", err) + return nil, err + } } - for vin, outpoint := range inpoints { + // Notify lookup services about spent outputs + for vin, output := range topicInputs[topic] { for _, l := range e.LookupServices { if err := l.OutputSpent(ctx, &OutputSpent{ - Outpoint: outpoint, + Outpoint: &output.Outpoint, Topic: topic, SpendingTxid: txid, - InputIndex: uint32(vin), + InputIndex: vin, UnlockingScript: tx.Inputs[vin].UnlockingScript, SequenceNumber: tx.Inputs[vin].SequenceNumber, SpendingAtomicBEEF: taggedBEEF.Beef, @@ -259,11 +276,9 @@ func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode } } } - slog.Debug("UTXOs marked as spent", "duration", time.Since(start)) - start = time.Now() if mode != SubmitModeHistorical && e.Broadcaster != nil { if _, failure := e.Broadcaster.Broadcast(tx); failure != nil { - slog.Error("failed to broadcast transaction", "txid", txid, "error", failure) + slog.Error("failed to broadcast transaction", "txid", txid, "mode", string(mode), "error", failure) return nil, failure } } @@ -341,8 +356,6 @@ func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode } } } - slog.Debug("outputs added", "duration", time.Since(start)) - start = time.Now() for _, output := range outputsConsumed { output.ConsumedBy = append(output.ConsumedBy, newOutpoints...) @@ -351,8 +364,6 @@ func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode return nil, err } } - slog.Debug("consumed by references updated", "duration", time.Since(start)) - start = time.Now() if err := e.Storage.InsertAppliedTransaction(ctx, &overlay.AppliedTransaction{ Txid: txid, Topic: topic, @@ -360,7 +371,6 @@ func (e *Engine) Submit(ctx context.Context, taggedBEEF overlay.TaggedBEEF, mode slog.Error("failed to insert applied transaction", "topic", topic, "txid", txid, "error", err) return nil, err } - slog.Debug("transaction applied", "duration", time.Since(start)) } if e.Advertiser == nil || mode == SubmitModeHistorical { return steak, nil @@ -580,8 +590,20 @@ func (e *Engine) StartGASPSync(ctx context.Context) error { continue } + slog.Info(fmt.Sprintf("[GASP SYNC] Processing topic \"%s\" with sync type \"%s\"", topic, syncEndpoints.Type)) + if syncEndpoints.Type == SyncConfigurationSHIP { + slog.Info(fmt.Sprintf("[GASP SYNC] Discovering peers for topic \"%s\" using SHIP lookup", topic)) + slog.Info(fmt.Sprintf("[GASP SYNC] Setting SLAP trackers for topic \"%s\", count: %d", topic, len(e.SLAPTrackers))) + if len(e.SLAPTrackers) > 0 { + for i, tracker := range e.SLAPTrackers { + slog.Info(fmt.Sprintf("[GASP SYNC] SLAP tracker %d: %s", i, tracker)) + } + } else { + slog.Warn(fmt.Sprintf("[GASP SYNC] No SLAP trackers configured for topic \"%s\"", topic)) + } e.LookupResolver.SetSLAPTrackers(e.SLAPTrackers) + slog.Debug(fmt.Sprintf("[GASP SYNC] Current SLAP trackers after setting: %v", e.LookupResolver.SLAPTrackers())) query, err := json.Marshal(map[string]any{"topics": []string{topic}}) if err != nil { @@ -589,31 +611,98 @@ func (e *Engine) StartGASPSync(ctx context.Context) error { return err } - timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + slog.Info(fmt.Sprintf("[GASP SYNC] Querying lookup resolver for topic \"%s\" with service \"ls_ship\"", topic)) + slog.Debug(fmt.Sprintf("[GASP SYNC] Query payload: %s", string(query))) + + timeoutCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() + + slog.Debug(fmt.Sprintf("[GASP SYNC] About to call LookupResolver.Query for topic \"%s\"", topic)) lookupAnswer, err := e.LookupResolver.Query(timeoutCtx, &lookup.LookupQuestion{Service: "ls_ship", Query: query}) + slog.Debug(fmt.Sprintf("[GASP SYNC] LookupResolver.Query returned for topic \"%s\", err: %v", topic, err)) + if err != nil { slog.Error("failed to query lookup resolver for GASP sync", "topic", topic, "error", err) return err } + slog.Info(fmt.Sprintf("[GASP SYNC] Lookup query completed for topic \"%s\", answer type: %s, outputs count: %d", topic, lookupAnswer.Type, len(lookupAnswer.Outputs))) + if lookupAnswer.Type == lookup.AnswerTypeOutputList { endpointSet := make(map[string]struct{}, len(lookupAnswer.Outputs)) for _, output := range lookupAnswer.Outputs { - tx, err := transaction.NewTransactionFromBEEF(output.Beef) + beef, _, txId, err := transaction.ParseBeef(output.Beef) if err != nil { slog.Error("failed to parse advertisement output BEEF", "topic", topic, "error", err) continue } + slog.Info(fmt.Sprintf("[GASP SYNC] Successfully parsed BEEF for topic \"%s\", transaction count: %d, txId: %s\n", topic, len(beef.Transactions), txId.String())) + + // Find the transaction that contains the output at the specified index + var tx *transaction.Transaction + for _, beefTx := range beef.Transactions { + if beefTx.Transaction != nil && beefTx.Transaction.Outputs != nil && len(beefTx.Transaction.Outputs) > int(output.OutputIndex) { + tx = beefTx.Transaction + break + } + } + if tx == nil { + slog.Error("failed to find transaction with output index in BEEF", "topic", topic, "outputIndex", output.OutputIndex) + continue + } + + // Additional safety check before accessing the output + if tx.Outputs == nil || len(tx.Outputs) <= int(output.OutputIndex) { + slog.Error("transaction outputs is nil or output index out of bounds", "topic", topic, "outputIndex", output.OutputIndex, "outputsLength", len(tx.Outputs)) + continue + } + + if tx.Outputs[output.OutputIndex] == nil { + slog.Error("output at index is nil", "topic", topic, "outputIndex", output.OutputIndex) + continue + } + + if tx.Outputs[output.OutputIndex].LockingScript == nil { + slog.Error("locking script is nil", "topic", topic, "outputIndex", output.OutputIndex) + continue + } + if e.Advertiser == nil { + slog.Warn("advertiser is nil, skipping advertisement parsing", "topic", topic) + continue + } + + slog.Debug("parsing advertisement from locking script", "topic", topic, "outputIndex", output.OutputIndex) advertisement, err := e.Advertiser.ParseAdvertisement(tx.Outputs[output.OutputIndex].LockingScript) if err != nil { slog.Error("failed to parse advertisement from locking script", "topic", topic, "error", err) continue } - if advertisement != nil && advertisement.Protocol == "SHIP" { + if advertisement == nil { + slog.Debug("advertisement parsed as nil", "topic", topic) + continue + } + + slog.Debug("successfully parsed advertisement", "topic", topic, "protocol", advertisement.Protocol, "domain", advertisement.Domain) + + // Determine expected protocol based on topic + var expectedProtocol overlay.Protocol + if topic == "tm_ship" { + expectedProtocol = overlay.ProtocolSHIP + } else if topic == "tm_slap" { + expectedProtocol = overlay.ProtocolSLAP + } else { + // For unknown topics, log a warning but continue + slog.Warn("unknown topic, cannot determine expected protocol", "topic", topic) + continue + } + + if advertisement.Protocol == expectedProtocol { + slog.Debug("found matching advertisement", "topic", topic, "protocol", advertisement.Protocol, "domain", advertisement.Domain) endpointSet[advertisement.Domain] = struct{}{} + } else { + slog.Debug("skipping advertisement with mismatched protocol", "topic", topic, "expected", expectedProtocol, "actual", advertisement.Protocol, "domain", advertisement.Domain) } } @@ -623,21 +712,59 @@ func (e *Engine) StartGASPSync(ctx context.Context) error { syncEndpoints.Peers = append(syncEndpoints.Peers, endpoint) } } + // Determine protocol name for logging + protocolName := "UNKNOWN" + if topic == "tm_ship" { + protocolName = "SHIP" + } else if topic == "tm_slap" { + protocolName = "SLAP" + } + slog.Info(fmt.Sprintf("[GASP SYNC] Discovered %d unique %s peer endpoint(s) for topic \"%s\"", len(syncEndpoints.Peers), protocolName, topic)) + } else { + slog.Warn(fmt.Sprintf("[GASP SYNC] Unexpected answer type \"%s\" for topic \"%s\", expected \"%s\"", lookupAnswer.Type, topic, lookup.AnswerTypeOutputList)) } + } else { + slog.Info(fmt.Sprintf("[GASP SYNC] Skipping topic peer discovery \"%s\" - sync type is not SHIP (type: \"%s\")", topic, syncEndpoints.Type)) } if len(syncEndpoints.Peers) > 0 { - peers := make([]string, 0, len(syncEndpoints.Peers)) - for _, peer := range syncEndpoints.Peers { - if peer != e.HostingURL { - peers = append(peers, peer) - } + // Log the number of peers we will attempt to sync with + plural := "" + if len(syncEndpoints.Peers) != 1 { + plural = "s" + } + slog.Info(fmt.Sprintf("[GASP SYNC] Will attempt to sync with %d peer%s", len(syncEndpoints.Peers), plural), "topic", topic) + } else { + slog.Info(fmt.Sprintf("[GASP SYNC] No peers found for topic \"%s\", skipping sync", topic)) + continue + } + + for _, peer := range syncEndpoints.Peers { + logPrefix := "[GASP Sync of " + topic + " with " + peer + "]" + + slog.Info(fmt.Sprintf("[GASP SYNC] Starting sync for topic \"%s\" with peer \"%s\"", topic, peer)) + + // Read the last interaction score from storage + lastInteraction, err := e.Storage.GetLastInteraction(ctx, peer, topic) + if err != nil { + slog.Error("Failed to get last interaction", "topic", topic, "peer", peer, "error", err) + return err } - for _, peer := range peers { - logPrefix := "[GASP Sync of " + topic + " with " + peer + "]" + // Create a new GASP provider for each peer to avoid state conflicts + gaspProvider := gasp.NewGASP(gasp.GASPParams{ + Storage: NewOverlayGASPStorage(topic, e, nil), + Remote: NewOverlayGASPRemote(peer, topic, http.DefaultClient, 0), + LastInteraction: lastInteraction, + LogPrefix: &logPrefix, + Unidirectional: true, + Concurrency: syncEndpoints.Concurrency, + }) - slog.Info("GASP sync starting", "topic", topic, "peer", peer) + if err := gaspProvider.Sync(ctx, peer, DEFAULT_GASP_SYNC_LIMIT); err != nil { + slog.Error(fmt.Sprintf("[GASP SYNC] Sync failed for topic \"%s\" with peer \"%s\"", topic, peer), "error", err) + } else { + slog.Info(fmt.Sprintf("[GASP SYNC] Sync successful for topic \"%s\" with peer \"%s\"", topic, peer)) // Read the last interaction score from storage lastInteraction, err := e.Storage.GetLastInteraction(ctx, peer, topic) @@ -646,30 +773,37 @@ func (e *Engine) StartGASPSync(ctx context.Context) error { return err } - // Create a new GASP provider for each peer to avoid state conflicts + // Create a GASP provider for this peer gaspProvider := gasp.NewGASP(gasp.GASPParams{ - Storage: NewOverlayGASPStorage(topic, e, nil), - Remote: &OverlayGASPRemote{ - EndpointUrl: peer, - Topic: topic, - HttpClient: http.DefaultClient, - }, + Storage: NewOverlayGASPStorage(topic, e, nil), + Remote: NewOverlayGASPRemote(peer, topic, http.DefaultClient, 8), LastInteraction: lastInteraction, LogPrefix: &logPrefix, Unidirectional: true, Concurrency: syncEndpoints.Concurrency, + Topic: topic, }) - if err := gaspProvider.Sync(ctx, peer, DEFAULT_GASP_SYNC_LIMIT); err != nil { - slog.Error("failed to sync with peer", "topic", topic, "peer", peer, "error", err) - } else { - slog.Info("GASP sync successful", "topic", topic, "peer", peer) + // Paginate through GASP sync, saving progress after each successful page + for { + previousLastInteraction := gaspProvider.LastInteraction + + // Sync one page + if err := gaspProvider.Sync(ctx, peer, DEFAULT_GASP_SYNC_LIMIT); err != nil { + slog.Error("failed to sync with peer", "topic", topic, "peer", peer, "error", err) + break // Exit loop on error + } - // Save the updated last interaction score - if gaspProvider.LastInteraction > lastInteraction { - if err := e.Storage.UpdateLastInteraction(ctx, peer, topic, gaspProvider.LastInteraction); err == nil { - slog.Info("Updated last interaction score", "topic", topic, "peer", peer, "score", gaspProvider.LastInteraction) + // Save progress after successful page + if gaspProvider.LastInteraction > previousLastInteraction { + if err := e.Storage.UpdateLastInteraction(ctx, peer, topic, gaspProvider.LastInteraction); err != nil { + slog.Error("Failed to update last interaction", "topic", topic, "peer", peer, "error", err) + // Continue anyway - we don't want to lose progress } + } else { + // No progress made, we're done syncing + slog.Info(logPrefix + " Sync completed") + break } } } @@ -678,6 +812,90 @@ func (e *Engine) StartGASPSync(ctx context.Context) error { return nil } +// SyncInvalidatedOutputs finds outputs with invalidated merkle proofs and syncs them with remote peers +func (e *Engine) SyncInvalidatedOutputs(ctx context.Context, topic string) error { + // Find outpoints with invalidated merkle proofs + invalidatedOutpoints, err := e.Storage.FindOutpointsByMerkleState(ctx, topic, MerkleStateInvalidated, 1000) + if err != nil { + slog.Error("Failed to find invalidated outputs", "topic", topic, "error", err) + return err + } + + if len(invalidatedOutpoints) == 0 { + return nil + } + + // Get sync configuration for this topic + syncConfig, ok := e.SyncConfiguration[topic] + if !ok || len(syncConfig.Peers) == 0 { + slog.Warn("No peers configured for topic", "topic", topic) + return nil + } + + // Group outpoints by transaction ID to avoid duplicate merkle proof requests + txidsToUpdate := make(map[chainhash.Hash]*transaction.Outpoint) + for _, outpoint := range invalidatedOutpoints { + if _, exists := txidsToUpdate[outpoint.Txid]; !exists { + // Use the first outpoint for this txid as representative + txidsToUpdate[outpoint.Txid] = outpoint + } + } + + // Try to get updated merkle proofs from peers + var successCount int + + // For each transaction that needs updating + for txid, outpoint := range txidsToUpdate { + var syncSuccess bool + + // Try each peer until we get a valid merkle proof for this transaction + for _, peer := range syncConfig.Peers { + if peer == e.HostingURL { + continue // Skip self + } + + // Create a remote client for this peer + remote := NewOverlayGASPRemote(peer, topic, http.DefaultClient, 8) + + // Request node with metadata to get merkle proof + node, err := remote.RequestNode(ctx, outpoint, outpoint, true) + if err != nil { + continue // Try next peer + } + + // If we got a merkle proof, update it for the transaction + if node.Proof != nil { + + merklePath, err := transaction.NewMerklePathFromHex(*node.Proof) + if err != nil { + slog.Error("Failed to parse merkle proof", "txid", txid.String(), "error", err) + continue // Try next peer + } + + // Update the merkle proof using the existing handler (updates all outputs for this transaction) + if err := e.HandleNewMerkleProof(ctx, &txid, merklePath); err != nil { + slog.Error("Failed to update merkle proof", "txid", txid.String(), "error", err) + continue // Try next peer + } + + successCount++ + syncSuccess = true + break // Got valid proof, move to next transaction + } + } + + if !syncSuccess { + slog.Warn("Failed to sync transaction from any peer", "txid", txid.String(), "peers_tried", len(syncConfig.Peers)) + } + } + + if successCount == 0 && len(txidsToUpdate) > 0 { + slog.Warn("Could not update all invalidated outputs", "topic", topic, "remaining", len(txidsToUpdate)) + } + + return nil +} + func (e *Engine) ProvideForeignSyncResponse(ctx context.Context, initialRequest *gasp.InitialRequest, topic string) (*gasp.InitialResponse, error) { if utxos, err := e.Storage.FindUTXOsForTopic(ctx, topic, initialRequest.Since, initialRequest.Limit, false); err != nil { slog.Error("failed to find UTXOs for topic in ProvideForeignSyncResponse", "topic", topic, "error", err) @@ -701,6 +919,11 @@ func (e *Engine) ProvideForeignSyncResponse(ctx context.Context, initialRequest } func (e *Engine) ProvideForeignGASPNode(ctx context.Context, graphId *transaction.Outpoint, outpoint *transaction.Outpoint, topic string) (*gasp.Node, error) { + slog.Debug("ProvideForeignGASPNode called", + "graphID", graphId.String(), + "outpoint", outpoint.String(), + "topic", topic) + var hydrator func(ctx context.Context, output *Output) (*gasp.Node, error) hydrator = func(ctx context.Context, output *Output) (*gasp.Node, error) { if output.Beef == nil { @@ -720,24 +943,58 @@ func (e *Engine) ProvideForeignGASPNode(ctx context.Context, graphId *transactio return nil, err } else { node := &gasp.Node{ - GraphID: graphId, - RawTx: tx.Hex(), - OutputIndex: outpoint.Index, - AncillaryBeef: output.AncillaryBeef, + GraphID: graphId, + RawTx: tx.Hex(), + OutputIndex: outpoint.Index, } if tx.MerklePath != nil { proof := tx.MerklePath.Hex() node.Proof = &proof + node.AncillaryBeef = output.AncillaryBeef + } else { + // For unmined transactions, provide full BEEF as ancillary + // Default to output.Beef, but try to merge with ancillary if it exists + node.AncillaryBeef = output.Beef + + if len(output.AncillaryBeef) > 0 { + // Try to merge ancillary BEEF into the main BEEF + if beef, _, _, err := transaction.ParseBeef(output.Beef); err == nil { + if err := beef.MergeBeefBytes(output.AncillaryBeef); err == nil { + // Use AtomicBytes to ensure client can parse with NewTransactionFromBEEF + if mergedBytes, err := beef.AtomicBytes(&outpoint.Txid); err == nil { + node.AncillaryBeef = mergedBytes + } + } + } + } + + // Validate the ancillary BEEF before sending it + if _, _, _, err := transaction.ParseBeef(node.AncillaryBeef); err != nil { + slog.Error("Invalid ancillary BEEF for unmined transaction", + "graphID", graphId.String(), + "outpoint", outpoint.String(), + "topic", topic, + "error", err) + return nil, fmt.Errorf("invalid ancillary BEEF: %w", err) + } } return node, nil } } - if output, err := e.Storage.FindOutput(ctx, graphId, &topic, nil, true); err != nil { - slog.Error("failed to find output in ProvideForeignGASPNode", "graphId", graphId.String(), "topic", topic, "error", err) + if output, err := e.Storage.FindOutput(ctx, outpoint, &topic, nil, true); err != nil { + slog.Error("failed to find output in ProvideForeignGASPNode", + "graphID", graphId.String(), + "outpoint", outpoint.String(), + "topic", topic, + "error", err) return nil, err } else if output == nil { + slog.Warn("Output not found in storage", + "graphID", graphId.String(), + "outpoint", outpoint.String(), + "topic", topic) return nil, ErrMissingOutput } else { return hydrator(ctx, output) @@ -850,7 +1107,7 @@ func (e *Engine) updateMerkleProof(ctx context.Context, output *Output, txid cha if len(output.AncillaryTxids) > 0 { ancillaryBeef := transaction.Beef{ Version: transaction.BEEF_V2, - Transactions: make(map[string]*transaction.BeefTx, len(output.AncillaryTxids)), + Transactions: make(map[chainhash.Hash]*transaction.BeefTx, len(output.AncillaryTxids)), } for _, dep := range output.AncillaryTxids { if depTx := beef.FindTransaction(dep.String()); depTx == nil { @@ -890,6 +1147,14 @@ func (e *Engine) updateMerkleProof(ctx context.Context, output *Output, txid cha return err } else { for _, consuming := range consumingOutputs { + // Check if consuming transaction has its own merkle path + // If it does, it's mined and doesn't include parent transactions anymore + if len(consuming.Beef) > 0 { + if _, consumingTx, _, err := transaction.ParseBeef(consuming.Beef); err == nil && consumingTx != nil && consumingTx.MerklePath != nil { + continue + } + } + if err := e.updateMerkleProof(ctx, consuming, txid, proof); err != nil { slog.Error("failed to update merkle proof for consuming output", "consumingTxid", consuming.Outpoint.Txid, "error", err) return err @@ -903,6 +1168,19 @@ func (e *Engine) updateMerkleProof(ctx context.Context, output *Output, txid cha } func (e *Engine) HandleNewMerkleProof(ctx context.Context, txid *chainhash.Hash, proof *transaction.MerklePath) error { + // Validate the merkle proof before processing + if merkleRoot, err := proof.ComputeRoot(txid); err != nil { + slog.Error("failed to compute merkle root from proof", "txid", txid, "error", err) + return err + } else if valid, err := e.ChainTracker.IsValidRootForHeight(ctx, merkleRoot, proof.BlockHeight); err != nil { + slog.Error("error validating merkle root for height", "txid", txid, "blockHeight", proof.BlockHeight, "error", err) + return err + } else if !valid { + err := fmt.Errorf("invalid merkle proof for transaction %s at block height %d", txid, proof.BlockHeight) + slog.Error("merkle proof validation failed", "txid", txid, "blockHeight", proof.BlockHeight, "error", err) + return err + } + if outputs, err := e.Storage.FindOutputsForTransaction(ctx, txid, true); err != nil { slog.Error("failed to find outputs for transaction in HandleNewMerkleProof", "txid", txid, "error", err) return err diff --git a/pkg/core/engine/gasp-remote.go b/pkg/core/engine/gasp-remote.go index a4497e81..e77bf67f 100644 --- a/pkg/core/engine/gasp-remote.go +++ b/pkg/core/engine/gasp-remote.go @@ -5,40 +5,73 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "log/slog" "net/http" + "sync" "github.com/bsv-blockchain/go-overlay-services/pkg/core/gasp" "github.com/bsv-blockchain/go-sdk/transaction" "github.com/bsv-blockchain/go-sdk/util" ) +type inflightNodeRequest struct { + wg *sync.WaitGroup + result *gasp.Node + err error +} + type OverlayGASPRemote struct { - EndpointUrl string - Topic string - HttpClient util.HTTPClient + endpointUrl string + topic string + httpClient util.HTTPClient + inflightMap sync.Map // Map to track in-flight node requests + networkLimiter chan struct{} // Controls max concurrent network requests +} + +func NewOverlayGASPRemote(endpointUrl, topic string, httpClient util.HTTPClient, maxConcurrency int) *OverlayGASPRemote { + if maxConcurrency <= 0 { + maxConcurrency = 8 // Default network concurrency + } + + return &OverlayGASPRemote{ + endpointUrl: endpointUrl, + topic: topic, + httpClient: httpClient, + networkLimiter: make(chan struct{}, maxConcurrency), + } } func (r *OverlayGASPRemote) GetInitialResponse(ctx context.Context, request *gasp.InitialRequest) (*gasp.InitialResponse, error) { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(request); err != nil { - slog.Error("failed to encode GASP initial request", "endpoint", r.EndpointUrl, "topic", r.Topic, "error", err) + requestJSON, err := json.Marshal(request) + if err != nil { + slog.Error("failed to encode GASP initial request", "endpoint", r.endpointUrl, "topic", r.topic, "error", err) return nil, err - } else if req, err := http.NewRequest("POST", r.EndpointUrl+"/requestSyncResponse", io.NopCloser(&buf)); err != nil { - slog.Error("failed to create HTTP request for GASP initial response", "endpoint", r.EndpointUrl, "topic", r.Topic, "error", err) + } + + if req, err := http.NewRequest("POST", r.endpointUrl+"/requestSyncResponse", bytes.NewReader(requestJSON)); err != nil { + slog.Error("failed to create HTTP request for GASP initial response", "endpoint", r.endpointUrl, "topic", r.topic, "error", err) return nil, err } else { req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-BSV-Topic", r.Topic) - if resp, err := r.HttpClient.Do(req); err != nil { + req.Header.Set("X-BSV-Topic", r.topic) + if resp, err := r.httpClient.Do(req); err != nil { return nil, err } else { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { + // Read error message from response body + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, &util.HTTPError{ + StatusCode: resp.StatusCode, + Err: readErr, + } + } return nil, &util.HTTPError{ StatusCode: resp.StatusCode, - Err: err, + Err: fmt.Errorf("server error: %s", string(body)), } } result := &gasp.InitialResponse{} @@ -51,6 +84,34 @@ func (r *OverlayGASPRemote) GetInitialResponse(ctx context.Context, request *gas } func (r *OverlayGASPRemote) RequestNode(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (*gasp.Node, error) { + outpointStr := outpoint.String() + var wg sync.WaitGroup + wg.Add(1) + defer wg.Done() + + // Check if there's already an in-flight request for this outpoint + if inflight, loaded := r.inflightMap.LoadOrStore(outpointStr, &inflightNodeRequest{wg: &wg}); loaded { + req := inflight.(*inflightNodeRequest) + req.wg.Wait() + return req.result, req.err + } else { + req := inflight.(*inflightNodeRequest) + req.result, req.err = r.doNodeRequest(ctx, graphID, outpoint, metadata) + + // Clean up inflight map + r.inflightMap.Delete(outpointStr) + return req.result, req.err + } +} + +func (r *OverlayGASPRemote) doNodeRequest(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (*gasp.Node, error) { + // Acquire network limiter + select { + case r.networkLimiter <- struct{}{}: + case <-ctx.Done(): + return nil, ctx.Err() + } + defer func() { <-r.networkLimiter }() if j, err := json.Marshal(&gasp.NodeRequest{ GraphID: graphID, Txid: &outpoint.Txid, @@ -58,19 +119,40 @@ func (r *OverlayGASPRemote) RequestNode(ctx context.Context, graphID *transactio Metadata: metadata, }); err != nil { return nil, err - } else if req, err := http.NewRequest("POST", r.EndpointUrl+"/requestForeignGASPNode", bytes.NewReader(j)); err != nil { + } else if req, err := http.NewRequest("POST", r.endpointUrl+"/requestForeignGASPNode", bytes.NewReader(j)); err != nil { return nil, err } else { req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-BSV-Topic", r.Topic) - if resp, err := r.HttpClient.Do(req); err != nil { + req.Header.Set("X-BSV-Topic", r.topic) + if resp, err := r.httpClient.Do(req); err != nil { return nil, err } else { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { + // Read error message from response body + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, &util.HTTPError{ + StatusCode: resp.StatusCode, + Err: readErr, + } + } + // Log the full request and response details on failure + var graphId string + if graphID != nil { + graphId = graphID.String() + } + slog.Error("RequestNode failed", + "status", resp.StatusCode, + "body", string(body), + "graphID", graphId, + "outpoint", outpoint.String(), + "metadata", metadata, + "endpoint", r.endpointUrl, + "topic", r.topic) return nil, &util.HTTPError{ StatusCode: resp.StatusCode, - Err: err, + Err: fmt.Errorf("server error: %s", string(body)), } } result := &gasp.Node{} diff --git a/pkg/core/engine/gasp-storage.go b/pkg/core/engine/gasp-storage.go index d6299c3a..f357b925 100644 --- a/pkg/core/engine/gasp-storage.go +++ b/pkg/core/engine/gasp-storage.go @@ -4,6 +4,9 @@ import ( "bytes" "context" "errors" + "fmt" + "log/slog" + "runtime" "slices" "sync" @@ -16,11 +19,17 @@ import ( var ErrGraphFull = errors.New("graph is full") +// submissionState tracks the state of a transaction submission +type submissionState struct { + wg sync.WaitGroup + err error +} + type GraphNode struct { gasp.Node Txid *chainhash.Hash `json:"txid"` SpentBy *chainhash.Hash `json:"spentBy"` - Children []*GraphNode `json:"children"` + Children sync.Map `json:"-"` // map[string]*GraphNode - concurrent safe Parent *GraphNode `json:"parent"` } @@ -30,6 +39,7 @@ type OverlayGASPStorage struct { MaxNodesInGraph *int tempGraphNodeRefs sync.Map tempGraphNodeCount int + submissionTracker sync.Map // map[chainhash.Hash]*submissionState } func NewOverlayGASPStorage(topic string, engine *Engine, maxNodesInGraph *int) *OverlayGASPStorage { @@ -58,6 +68,21 @@ func (s *OverlayGASPStorage) FindKnownUTXOs(ctx context.Context, since float64, } } +func (s *OverlayGASPStorage) HasOutputs(ctx context.Context, outpoints []*transaction.Outpoint) ([]bool, error) { + // Use FindOutputs to check existence - don't need BEEF for existence check + outputs, err := s.Engine.Storage.FindOutputs(ctx, outpoints, s.Topic, nil, false) + if err != nil { + return nil, err + } + + // Convert to boolean array - true if output exists, false if nil + result := make([]bool, len(outputs)) + for i, output := range outputs { + result[i] = output != nil + } + return result, nil +} + func (s *OverlayGASPStorage) HydrateGASPNode(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (*gasp.Node, error) { if output, err := s.Engine.Storage.FindOutput(ctx, outpoint, nil, nil, true); err != nil { return nil, err @@ -90,35 +115,56 @@ func (s *OverlayGASPStorage) HydrateGASPNode(ctx context.Context, graphID *trans func (s *OverlayGASPStorage) FindNeededInputs(ctx context.Context, gaspTx *gasp.Node) (*gasp.NodeResponse, error) { response := &gasp.NodeResponse{ - RequestedInputs: make(map[string]*gasp.NodeResponseData), + RequestedInputs: make(map[transaction.Outpoint]*gasp.NodeResponseData), } tx, err := transaction.NewTransactionFromHex(gaspTx.RawTx) if err != nil { return nil, err } - if gaspTx.Proof == nil { + // Commented out: This was requesting ALL inputs for unmined transactions + // but should use IdentifyNeededInputs to get only relevant inputs + if gaspTx.Proof == nil || *gaspTx.Proof == "" { for _, input := range tx.Inputs { outpoint := &transaction.Outpoint{ Txid: *input.SourceTXID, Index: input.SourceTxOutIndex, } - response.RequestedInputs[outpoint.String()] = &gasp.NodeResponseData{ + response.RequestedInputs[*outpoint] = &gasp.NodeResponseData{ Metadata: false, } } return s.stripAlreadyKnowInputs(ctx, response) - } else if tx.MerklePath, err = transaction.NewMerklePathFromHex(*gaspTx.Proof); err != nil { - return nil, err } - if beef, err := transaction.NewBeefFromTransaction(tx); err != nil { - return nil, err - } else { - if len(gaspTx.AncillaryBeef) > 0 { - if err := beef.MergeBeefBytes(gaspTx.AncillaryBeef); err != nil { - return nil, err - } + + // Process merkle proof if present + if gaspTx.Proof != nil && *gaspTx.Proof != "" { + if tx.MerklePath, err = transaction.NewMerklePathFromHex(*gaspTx.Proof); err != nil { + return nil, err + } + } + + var beef *transaction.Beef + if len(gaspTx.AncillaryBeef) > 0 { + // If we have ancillary BEEF, use it as the base (contains full transaction graph) + if beef, _, _, err = transaction.ParseBeef(gaspTx.AncillaryBeef); err != nil { + return nil, err + } + // Merge in the transaction we just received + if _, err = beef.MergeTransaction(tx); err != nil { + return nil, err } + } else if tx.MerklePath != nil { + // If we have a merkle path but no ancillary BEEF, create BEEF from transaction + if beef, err = transaction.NewBeefFromTransaction(tx); err != nil { + return nil, err + } + } /* else { + // Unmined transaction without ancillary BEEF is an error + return nil, fmt.Errorf("unmined transaction without ancillary BEEF") + }*/ + + if beef != nil { inpoints := make([]*transaction.Outpoint, len(tx.Inputs)) for vin, input := range tx.Inputs { inpoints[vin] = &transaction.Outpoint{ @@ -142,14 +188,16 @@ func (s *OverlayGASPStorage) FindNeededInputs(ctx context.Context, gaspTx *gasp. if beefBytes, err := beef.AtomicBytes(tx.TxID()); err != nil { return nil, err - } else if admit, err := s.Engine.Managers[s.Topic].IdentifyAdmissibleOutputs(ctx, beefBytes, previousCoins); err != nil { + } else if admit, err := s.IdentifyAdmissibleOutputs(ctx, beefBytes, previousCoins); err != nil { return nil, err } else if !slices.Contains(admit.OutputsToAdmit, gaspTx.OutputIndex) { - if neededInputs, err := s.Engine.Managers[s.Topic].IdentifyNeededInputs(ctx, beefBytes); err != nil { + if _, ok := s.Engine.Managers[s.Topic]; !ok { + return nil, errors.New("no manager for topic (identify needed inputs): " + s.Topic) + } else if neededInputs, err := s.Engine.Managers[s.Topic].IdentifyNeededInputs(ctx, beefBytes); err != nil { return nil, err } else { for _, outpoint := range neededInputs { - response.RequestedInputs[outpoint.String()] = &gasp.NodeResponseData{ + response.RequestedInputs[*outpoint] = &gasp.NodeResponseData{ Metadata: true, } } @@ -161,17 +209,22 @@ func (s *OverlayGASPStorage) FindNeededInputs(ctx context.Context, gaspTx *gasp. return response, nil } +func (s *OverlayGASPStorage) IdentifyAdmissibleOutputs(ctx context.Context, beefBytes []byte, previousCoins map[uint32]*transaction.TransactionOutput) (overlay.AdmittanceInstructions, error) { + if _, ok := s.Engine.Managers[s.Topic]; !ok { + return overlay.AdmittanceInstructions{}, errors.New("no manager for topic (identify admissible outputs): " + s.Topic) + } + return s.Engine.Managers[s.Topic].IdentifyAdmissibleOutputs(ctx, beefBytes, previousCoins) +} + func (s *OverlayGASPStorage) stripAlreadyKnowInputs(ctx context.Context, response *gasp.NodeResponse) (*gasp.NodeResponse, error) { if response == nil { return nil, nil } - for outpointStr := range response.RequestedInputs { - if outpoint, err := transaction.OutpointFromString(outpointStr); err != nil { - return nil, err - } else if found, err := s.Engine.Storage.FindOutput(ctx, outpoint, &s.Topic, nil, false); err != nil { + for outpoint := range response.RequestedInputs { + if found, err := s.Engine.Storage.FindOutput(ctx, &outpoint, &s.Topic, nil, false); err != nil { return nil, err } else if found != nil { - delete(response.RequestedInputs, outpointStr) + delete(response.RequestedInputs, outpoint) } } if len(response.RequestedInputs) == 0 { @@ -189,33 +242,34 @@ func (s *OverlayGASPStorage) AppendToGraph(ctx context.Context, gaspTx *gasp.Nod return err } else { txid := tx.TxID() - if gaspTx.Proof != nil { + if gaspTx.Proof != nil && *gaspTx.Proof != "" { if tx.MerklePath, err = transaction.NewMerklePathFromHex(*gaspTx.Proof); err != nil { + slog.Error("Failed to parse merkle path", "error", err, "proofLength", len(*gaspTx.Proof)) return err } } newGraphNode := &GraphNode{ - Node: *gaspTx, - Txid: txid, - Children: []*GraphNode{}, + Node: *gaspTx, + Txid: txid, } if spentBy == nil { - if _, ok := s.tempGraphNodeRefs.LoadOrStore(gaspTx.GraphID.String(), newGraphNode); !ok { + if _, ok := s.tempGraphNodeRefs.LoadOrStore(*gaspTx.GraphID, newGraphNode); !ok { s.tempGraphNodeCount++ } } else { // Find parent node by spentBy outpoint - if parentNode, ok := s.tempGraphNodeRefs.Load(spentBy.String()); !ok { + if parentNode, ok := s.tempGraphNodeRefs.Load(*spentBy); !ok { return ErrMissingInput } else { - parentNode.(*GraphNode).Children = append(parentNode.(*GraphNode).Children, newGraphNode) + parent := parentNode.(*GraphNode) + parent.Children.Store(*gaspTx.GraphID, newGraphNode) newGraphNode.Parent = parentNode.(*GraphNode) } newGraphOutpoint := &transaction.Outpoint{ Txid: *txid, Index: gaspTx.OutputIndex, } - if _, ok := s.tempGraphNodeRefs.LoadOrStore(newGraphOutpoint.String(), newGraphNode); !ok { + if _, ok := s.tempGraphNodeRefs.LoadOrStore(*newGraphOutpoint, newGraphNode); !ok { s.tempGraphNodeCount++ } } @@ -224,20 +278,20 @@ func (s *OverlayGASPStorage) AppendToGraph(ctx context.Context, gaspTx *gasp.Nod } func (s *OverlayGASPStorage) ValidateGraphAnchor(ctx context.Context, graphID *transaction.Outpoint) error { - if rootNode, ok := s.tempGraphNodeRefs.Load(graphID.String()); !ok { + if rootNode, ok := s.tempGraphNodeRefs.Load(*graphID); !ok { return ErrMissingInput } else if beef, err := s.getBEEFForNode(rootNode.(*GraphNode)); err != nil { return err } else if tx, err := transaction.NewTransactionFromBEEF(beef); err != nil { return err - } else if valid, err := spv.Verify(tx, s.Engine.ChainTracker, nil); err != nil { + } else if valid, err := spv.Verify(ctx, tx, s.Engine.ChainTracker, nil); err != nil { return err } else if !valid { return errors.New("graph anchor is not a valid transaction") } else if beefs, err := s.computeOrderedBEEFsForGraph(ctx, graphID); err != nil { return err } else { - coins := make(map[string]struct{}) + coins := make(map[transaction.Outpoint]struct{}) for _, beefBytes := range beefs { if tx, err := transaction.NewTransactionFromBEEF(beefBytes); err != nil { return err @@ -262,7 +316,7 @@ func (s *OverlayGASPStorage) ValidateGraphAnchor(ctx context.Context, graphID *t } } } - if admit, err := s.Engine.Managers[s.Topic].IdentifyAdmissibleOutputs(ctx, beef, previousCoins); err != nil { + if admit, err := s.IdentifyAdmissibleOutputs(ctx, beefBytes, previousCoins); err != nil { return err } else { for _, vout := range admit.OutputsToAdmit { @@ -270,43 +324,35 @@ func (s *OverlayGASPStorage) ValidateGraphAnchor(ctx context.Context, graphID *t Txid: *tx.TxID(), Index: vout, } - coins[outpoint.String()] = struct{}{} + coins[*outpoint] = struct{}{} } } } } - if _, ok := coins[graphID.String()]; !ok { + if _, ok := coins[*graphID]; !ok { return errors.New("graph did not result in topical admittance of the root node. rejecting") } return nil } } -func (s *OverlayGASPStorage) DiscardGraph(ctx context.Context, graphID *transaction.Outpoint) error { - // First, find all nodes that belong to this graph - nodesToDelete := make([]string, 0) +func (s *OverlayGASPStorage) DiscardGraph(_ context.Context, graphID *transaction.Outpoint) error { + // Find and delete all nodes that belong to this graph + nodesToDelete := make([]*transaction.Outpoint, 0) + + // First pass: collect all node IDs that belong to this graph s.tempGraphNodeRefs.Range(func(nodeId, graphRef any) bool { node := graphRef.(*GraphNode) if node.GraphID.Equal(graphID) { - // Recursively collect all child nodes - collectNodes := func(n *GraphNode) { - nodesToDelete = append(nodesToDelete, nodeId.(string)) - for _, child := range n.Children { - outpoint := &transaction.Outpoint{ - Txid: *child.Txid, - Index: child.OutputIndex, - } - nodesToDelete = append(nodesToDelete, outpoint.String()) - } - } - collectNodes(node) + outpoint := nodeId.(transaction.Outpoint) + nodesToDelete = append(nodesToDelete, &outpoint) } return true }) // Delete all collected nodes for _, nodeId := range nodesToDelete { - s.tempGraphNodeRefs.Delete(nodeId) + s.tempGraphNodeRefs.Delete(*nodeId) s.tempGraphNodeCount-- } @@ -318,17 +364,50 @@ func (s *OverlayGASPStorage) FinalizeGraph(ctx context.Context, graphID *transac return err } else { for _, beef := range beefs { - if _, err := s.Engine.Submit( - ctx, - overlay.TaggedBEEF{ - Topics: []string{s.Topic}, - Beef: beef, - }, - SubmitModeHistorical, - nil, - ); err != nil { + // Extract transaction ID from BEEF for deduplication key + _, tx, _, err := transaction.ParseBeef(beef) + if err != nil { return err } + if tx == nil { + return errors.New("no transaction in BEEF") + } + + txid := *tx.TxID() + + // Deduplicate submissions by transaction ID + + // Pre-initialize the submission state to avoid race conditions + newState := &submissionState{} + newState.wg.Add(1) + + if existing, loaded := s.submissionTracker.LoadOrStore(txid, newState); loaded { + // Another goroutine is already submitting this transaction, wait for it + state := existing.(*submissionState) + state.wg.Wait() + if state.err != nil { + return state.err + } + } else { + // We're the first caller, do the submission using our pre-initialized state + state := newState + defer state.wg.Done() // Signal completion + + // Perform the actual submission + _, state.err = s.Engine.Submit( + ctx, + overlay.TaggedBEEF{ + Topics: []string{s.Topic}, + Beef: beef, + }, + SubmitModeHistorical, + nil, + ) + if state.err != nil { + return state.err + } + slog.Info(fmt.Sprintf("[GASP] Transaction processed: %s", txid.String())) + } } return nil } @@ -346,16 +425,23 @@ func (s *OverlayGASPStorage) computeOrderedBEEFsForGraph(ctx context.Context, gr }) == -1 { beefs = append([][]byte{currentBeef}, beefs...) } - for _, child := range node.Children { + var childErr error + node.Children.Range(func(key, value any) bool { + child := value.(*GraphNode) if err := hydrator(child); err != nil { - return err + childErr = err + return false } + return true + }) + if childErr != nil { + return childErr } } return nil } - if foundRoot, ok := s.tempGraphNodeRefs.Load(graphID.String()); !ok { + if foundRoot, ok := s.tempGraphNodeRefs.Load(*graphID); !ok { return nil, errors.New("unable to find root node in graph for finalization") } else if err := hydrator(foundRoot.(*GraphNode)); err != nil { return nil, err @@ -365,11 +451,25 @@ func (s *OverlayGASPStorage) computeOrderedBEEFsForGraph(ctx context.Context, gr } func (s *OverlayGASPStorage) getBEEFForNode(node *GraphNode) ([]byte, error) { + if node == nil { + panic(fmt.Sprintf("GASP DEBUG: getBEEFForNode called with nil node. Total goroutines: %d", runtime.NumGoroutine())) + } + + // For unmined transactions (no proof), if ancillaryBeef is provided, use it directly + // as it contains the complete BEEF for the unmined transaction + if (node.Proof == nil || *node.Proof == "") && len(node.AncillaryBeef) > 0 { + // slog.Info("Using ancillaryBeef directly for unmined transaction", "beefSize", len(node.AncillaryBeef)) + return node.AncillaryBeef, nil + } + var hydrator func(node *GraphNode) (*transaction.Transaction, error) hydrator = func(node *GraphNode) (*transaction.Transaction, error) { + if node == nil { + panic(fmt.Sprintf("GASP DEBUG: hydrator called with nil node. Goroutine: %d", runtime.NumGoroutine())) + } if tx, err := transaction.NewTransactionFromHex(node.RawTx); err != nil { return nil, err - } else if node.Proof != nil { + } else if node.Proof != nil && *node.Proof != "" { if tx.MerklePath, err = transaction.NewMerklePathFromHex(*node.Proof); err != nil { return nil, err } @@ -380,7 +480,7 @@ func (s *OverlayGASPStorage) getBEEFForNode(node *GraphNode) ([]byte, error) { Txid: *input.SourceTXID, Index: input.SourceTxOutIndex, } - if foundNode, ok := s.tempGraphNodeRefs.Load(outpoint.String()); !ok { + if foundNode, ok := s.tempGraphNodeRefs.Load(*outpoint); !ok { return nil, errors.New("required input node for unproven parent not found in temporary graph store") } else if tx.Inputs[vin].SourceTransaction, err = hydrator(foundNode.(*GraphNode)); err != nil { return nil, err diff --git a/pkg/core/engine/lookup_resolver.go b/pkg/core/engine/lookup_resolver.go index 3ef4de6b..d18114af 100644 --- a/pkg/core/engine/lookup_resolver.go +++ b/pkg/core/engine/lookup_resolver.go @@ -4,6 +4,7 @@ import ( "context" "net/http" + "github.com/bsv-blockchain/go-sdk/overlay" "github.com/bsv-blockchain/go-sdk/overlay/lookup" ) @@ -15,12 +16,24 @@ type LookupResolver struct { // NewLookupResolver creates and initializes a LookupResolver with a default HTTPS facilitator. func NewLookupResolver() *LookupResolver { - return &LookupResolver{ - resolver: &lookup.LookupResolver{ - Facilitator: &lookup.HTTPSOverlayLookupFacilitator{ - Client: http.DefaultClient, - }, + return NewLookupResolverWithNetwork(overlay.NetworkMainnet) +} + +// NewLookupResolverWithNetwork creates and initializes a LookupResolver with a default HTTPS facilitator +// and appropriate SLAP trackers for the specified network. +func NewLookupResolverWithNetwork(network overlay.Network) *LookupResolver { + cfg := &lookup.LookupResolver{ + Facilitator: &lookup.HTTPSOverlayLookupFacilitator{ + Client: http.DefaultClient, }, + NetworkPreset: network, + } + + // Use NewLookupResolver from the go-sdk to get proper network defaults + resolver := lookup.NewLookupResolver(cfg) + + return &LookupResolver{ + resolver: resolver, } } diff --git a/pkg/core/engine/output.go b/pkg/core/engine/output.go index df212f56..4a66b3ab 100644 --- a/pkg/core/engine/output.go +++ b/pkg/core/engine/output.go @@ -6,6 +6,32 @@ import ( "github.com/bsv-blockchain/go-sdk/transaction" ) +// MerkleState represents the validation state of an output's merkle proof +type MerkleState uint8 + +const ( + MerkleStateUnmined MerkleState = iota + MerkleStateValidated + MerkleStateInvalidated + MerkleStateImmutable +) + +// String returns the string representation of the MerkleState +func (m MerkleState) String() string { + switch m { + case MerkleStateUnmined: + return "Unmined" + case MerkleStateValidated: + return "Validated" + case MerkleStateInvalidated: + return "Invalidated" + case MerkleStateImmutable: + return "Immutable" + default: + return "Unknown" + } +} + type Output struct { Outpoint transaction.Outpoint Topic string @@ -20,4 +46,6 @@ type Output struct { Beef []byte AncillaryTxids []*chainhash.Hash AncillaryBeef []byte + MerkleRoot *chainhash.Hash // Merkle root extracted from the merkle path + MerkleState MerkleState // Validation state of the merkle proof } diff --git a/pkg/core/engine/storage.go b/pkg/core/engine/storage.go index b02f9f63..924cd17b 100644 --- a/pkg/core/engine/storage.go +++ b/pkg/core/engine/storage.go @@ -53,4 +53,15 @@ type Storage interface { // Retrieves the last interaction score for a given host and topic // Returns 0 if no record exists GetLastInteraction(ctx context.Context, host string, topic string) (float64, error) + + // Finds outpoints with a specific merkle validation state + // Returns only the outpoints (not full output data) for efficiency + FindOutpointsByMerkleState(ctx context.Context, topic string, state MerkleState, limit uint32) ([]*transaction.Outpoint, error) + + // Reconciles validation state for all outputs at a given block height + // Compares outputs' merkle roots with the authoritative root and updates states: + // - Matching roots become Validated (or Immutable if old enough) + // - Non-matching roots become Invalidated + // - Null roots remain Unmined + ReconcileMerkleRoot(ctx context.Context, topic string, blockHeight uint32, merkleRoot *chainhash.Hash) error } diff --git a/pkg/core/engine/tests/engine_handle_merkle_proof_test.go b/pkg/core/engine/tests/engine_handle_merkle_proof_test.go index a70eed65..3fe418e4 100644 --- a/pkg/core/engine/tests/engine_handle_merkle_proof_test.go +++ b/pkg/core/engine/tests/engine_handle_merkle_proof_test.go @@ -83,6 +83,11 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { sut := &engine.Engine{ Storage: mockStorage, LookupServices: map[string]engine.LookupService{"test-service": mockLookupService}, + ChainTracker: fakeChainTracker{ + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { + return true, nil + }, + }, } // when @@ -130,6 +135,11 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { sut := &engine.Engine{ Storage: mockStorage, + ChainTracker: fakeChainTracker{ + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { + return true, nil + }, + }, } // when @@ -144,7 +154,15 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { // given ctx := context.Background() txid := &chainhash.Hash{1, 2, 3} - merklePath := &transaction.MerklePath{} + merklePath := &transaction.MerklePath{ + BlockHeight: 814435, + Path: [][]*transaction.PathElement{{ + { + Hash: txid, + Offset: 123, + }, + }}, + } mockStorage := &mockHandleMerkleProofStorage{ findOutputsForTransactionFunc: func(ctx context.Context, txid *chainhash.Hash, includeBEEF bool) ([]*engine.Output, error) { @@ -154,6 +172,11 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { sut := &engine.Engine{ Storage: mockStorage, + ChainTracker: fakeChainTracker{ + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { + return true, nil + }, + }, } // when @@ -167,7 +190,15 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { // given ctx := context.Background() txid := &chainhash.Hash{1, 2, 3} - merklePath := &transaction.MerklePath{} + merklePath := &transaction.MerklePath{ + BlockHeight: 814435, + Path: [][]*transaction.PathElement{{ + { + Hash: txid, + Offset: 123, + }, + }}, + } expectedErr := errors.New("storage error") mockStorage := &mockHandleMerkleProofStorage{ @@ -178,6 +209,11 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { sut := &engine.Engine{ Storage: mockStorage, + ChainTracker: fakeChainTracker{ + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { + return true, nil + }, + }, } // when @@ -214,9 +250,9 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { // Create BEEF for tx2 that includes tx1 as input beef := &transaction.Beef{ Version: transaction.BEEF_V2, - Transactions: map[string]*transaction.BeefTx{ - txid1.String(): {Transaction: tx1}, - txid2.String(): {Transaction: tx2}, + Transactions: map[chainhash.Hash]*transaction.BeefTx{ + *txid1: {Transaction: tx1}, + *txid2: {Transaction: tx2}, }, } beef2Bytes, err := beef.AtomicBytes(txid2) @@ -276,6 +312,11 @@ func TestEngine_HandleNewMerkleProof(t *testing.T) { sut := &engine.Engine{ Storage: mockStorage, LookupServices: map[string]engine.LookupService{}, + ChainTracker: fakeChainTracker{ + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { + return true, nil + }, + }, } // when @@ -369,6 +410,14 @@ func (m *mockHandleMerkleProofStorage) GetLastInteraction(ctx context.Context, h return 0, nil } +func (m *mockHandleMerkleProofStorage) FindOutpointsByMerkleState(ctx context.Context, topic string, state engine.MerkleState, limit uint32) ([]*transaction.Outpoint, error) { + return nil, nil +} + +func (m *mockHandleMerkleProofStorage) ReconcileMerkleRoot(ctx context.Context, topic string, blockHeight uint32, merkleRoot *chainhash.Hash) error { + return nil +} + // Mock lookup service type mockLookupService struct { outputBlockHeightUpdatedFunc func(ctx context.Context, txid *chainhash.Hash, blockHeight uint32, blockIdx uint64) error diff --git a/pkg/core/engine/tests/engine_lookup_test.go.bak b/pkg/core/engine/tests/engine_lookup_test.go.bak deleted file mode 100644 index 68877ed2..00000000 --- a/pkg/core/engine/tests/engine_lookup_test.go.bak +++ /dev/null @@ -1,369 +0,0 @@ -package engine_test - -import ( - "context" - "errors" - "testing" - - "github.com/bsv-blockchain/go-overlay-services/pkg/core/engine" - "github.com/bsv-blockchain/go-sdk/overlay" - "github.com/bsv-blockchain/go-sdk/overlay/lookup" - "github.com/stretchr/testify/require" -) - -func TestEngine_Lookup_ShouldReturnError_WhenServiceUnknown(t *testing.T) { - // given - expectedErr := engine.ErrUnknownTopic - - sut := &engine.Engine{ - LookupServices: make(map[string]engine.LookupService), - } - - // when - actualAnswer, actualErr := sut.Lookup(context.Background(), &lookup.LookupQuestion{Service: "non-existing"}) - - // then - require.ErrorIs(t, actualErr, expectedErr) - require.Nil(t, actualAnswer) -} - -func TestEngine_Lookup_ShouldReturnError_WhenServiceLookupFails(t *testing.T) { - // given - expectedErr := errors.New("internal error") - - sut := &engine.Engine{ - LookupServices: map[string]engine.LookupService{ - "test": fakeLookupService{ - lookupFunc: func(ctx context.Context, question *lookup.LookupQuestion) (*lookup.LookupAnswer, error) { - return nil, expectedErr - }, - }, - }, - } - - // when - actualAnswer, err := sut.Lookup(context.Background(), &lookup.LookupQuestion{Service: "test"}) - - // then - require.ErrorIs(t, err, expectedErr) - require.Nil(t, actualAnswer) -} - -func TestEngine_Lookup_ShouldReturnDirectResult_WhenAnswerTypeIsFreeform(t *testing.T) { - // given - expectedAnswer := &lookup.LookupAnswer{ - Type: lookup.AnswerTypeFreeform, - Result: map[string]interface{}{ - "key": "value", - }, - } - - sut := &engine.Engine{ - LookupServices: map[string]engine.LookupService{ - "test": fakeLookupService{ - lookupFunc: func(ctx context.Context, question *lookup.LookupQuestion) (*lookup.LookupAnswer, error) { - return expectedAnswer, nil - }, - }, - }, - } - - // when - actualAnswer, err := sut.Lookup(context.Background(), &lookup.LookupQuestion{Service: "test"}) - - // then - require.NoError(t, err) - require.Equal(t, expectedAnswer, actualAnswer) -} - -func TestEngine_Lookup_ShouldReturnDirectResult_WhenAnswerTypeIsOutputList(t *testing.T) { - // given - expectedAnswer := &lookup.LookupAnswer{ - Type: lookup.AnswerTypeOutputList, - Outputs: []*lookup.OutputListItem{ - { - OutputIndex: 0, - Beef: []byte("test"), - }, - }, - } - - sut := &engine.Engine{ - LookupServices: map[string]engine.LookupService{ - "test": fakeLookupService{ - lookupFunc: func(ctx context.Context, question *lookup.LookupQuestion) (*lookup.LookupAnswer, error) { - return expectedAnswer, nil - }, - }, - }, - } - - // when - actualAnswer, err := sut.Lookup(context.Background(), &lookup.LookupQuestion{Service: "test"}) - - // then - require.NoError(t, err) - require.Equal(t, expectedAnswer, actualAnswer) -} - -func TestEngine_Lookup_ShouldHydrateOutputs_WhenFormulasProvided(t *testing.T) { - // given - ctx := context.Background() - expectedBeef := []byte("hydrated beef") - outpoint := &transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 0} - - sut := &engine.Engine{ - LookupServices: map[string]engine.LookupService{ - "test": fakeLookupService{ - lookupFunc: func(ctx context.Context, question *lookup.LookupQuestion) (*lookup.LookupAnswer, error) { - return &lookup.LookupAnswer{ - Type: lookup.AnswerTypeFormula, - Formulas: []lookup.LookupFormula{ - {Outpoint: &transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 0}}, - }, - }, nil - }, - }, - }, - Storage: fakeStorage{ - findOutputFunc: func(ctx context.Context, outpoint *transaction.Outpoint, topic *string, spent *bool, includeBEEF bool) (*engine.Output, error) { - return &engine.Output{ - Outpoint: *outpoint, - Beef: expectedBeef, - }, nil - }, - }, - } - - expectedAnswer := &lookup.LookupAnswer{ - Type: lookup.AnswerTypeOutputList, - Outputs: []*lookup.OutputListItem{ - { - OutputIndex: outpoint.OutputIndex, - Beef: expectedBeef, - }, - }, - } - - // when - actualAnswer, err := sut.Lookup(ctx, &lookup.LookupQuestion{Service: "test"}) - - // then - require.NoError(t, err) - require.Equal(t, expectedAnswer, actualAnswer) -} - -func TestEngine_Lookup_MultipleFormulasWithHistory(t *testing.T) { - // Test when lookup returns multiple formulas each with different history requirements - ctx := context.Background() - - // Create mock outputs with history - parentOutput := &engine.Output{ - Outpoint: transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 0}, - Beef: []byte("parent beef"), - Topic: "test", - } - - childOutput := &engine.Output{ - Outpoint: transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 1}, - Beef: []byte("child beef"), - Topic: "test", - OutputsConsumed: []*transaction.Outpoint{&parentOutput.Outpoint}, - } - - sut := &engine.Engine{ - LookupServices: map[string]engine.LookupService{ - "test": fakeLookupService{ - lookupFunc: func(ctx context.Context, question *lookup.LookupQuestion) (*lookup.LookupAnswer, error) { - return &lookup.LookupAnswer{ - Type: lookup.AnswerTypeFormula, - Formulas: []lookup.LookupFormula{ - { - Outpoint: &childOutput.Outpoint, - Histoy: func(beef []byte, outputIndex uint32, currentDepth uint32) bool { return currentDepth <= 1 }, // Include 1 level of history - }, - { - Outpoint: &parentOutput.Outpoint, - Histoy: func(beef []byte, outputIndex uint32, currentDepth uint32) bool { return currentDepth <= 0 }, // No history - }, - }, - }, nil - }, - }, - }, - Storage: fakeStorage{ - findOutputFunc: func(ctx context.Context, outpoint *transaction.Outpoint, topic *string, spent *bool, includeBEEF bool) (*engine.Output, error) { - if outpoint.Txid.String() == childOutput.Outpoint.Txid.String() { - return childOutput, nil - } - if outpoint.Txid.String() == parentOutput.Outpoint.Txid.String() { - return parentOutput, nil - } - return nil, errors.New("output not found") - }, - }, - } - - // when - actualAnswer, err := sut.Lookup(ctx, &lookup.LookupQuestion{Service: "test"}) - - // then - require.NoError(t, err) - require.NotNil(t, actualAnswer) - require.Equal(t, lookup.AnswerTypeOutputList, actualAnswer.Type) - require.Len(t, actualAnswer.Outputs, 2) - - // Verify both outputs are included - foundParent := false - foundChild := false - for _, output := range actualAnswer.Outputs { - if string(output.Beef) == "parent beef" { - foundParent = true - } else if string(output.Beef) == "child beef" { - foundChild = true - } - } - require.True(t, foundParent, "Parent output should be included") - require.True(t, foundChild, "Child output should be included") -} - -func TestEngine_Lookup_HistorySelectorReturnsFalse(t *testing.T) { - // Test empty results when history selector returns false - ctx := context.Background() - - mockOutput := &engine.Output{ - Outpoint: transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 0}, - Beef: []byte("test beef"), - Topic: "test", - } - - historySelectorCalled := false - - sut := &engine.Engine{ - LookupServices: map[string]engine.LookupService{ - "test": fakeLookupService{ - lookupFunc: func(ctx context.Context, question *lookup.LookupQuestion) (*lookup.LookupAnswer, error) { - return &lookup.LookupAnswer{ - Type: lookup.AnswerTypeFormula, - Formulas: []lookup.LookupFormula{ - { - Outpoint: &mockOutput.Outpoint, - Histoy: func(beef []byte, outputIndex uint32, currentDepth uint32) bool { - historySelectorCalled = true - // Always return false, meaning don't include this output - return false - }), - }, - }, - }, nil - }, - }, - }, - Storage: fakeStorage{ - findOutputFunc: func(ctx context.Context, outpoint *transaction.Outpoint, topic *string, spent *bool, includeBEEF bool) (*engine.Output, error) { - return mockOutput, nil - }, - }, - } - - // when - actualAnswer, err := sut.Lookup(ctx, &lookup.LookupQuestion{Service: "test"}) - - // then - require.NoError(t, err) - require.NotNil(t, actualAnswer) - require.Equal(t, lookup.AnswerTypeOutputList, actualAnswer.Type) - require.Empty(t, actualAnswer.Outputs, "No outputs should be returned when history selector returns false") - require.True(t, historySelectorCalled, "History selector should have been called") -} - -func TestEngine_Lookup_ComplexHistoryGraph(t *testing.T) { - // Test lookup with complex multi-output history graph - ctx := context.Background() - - // Create a complex graph: grandparent -> parent1 & parent2 -> child - grandparentOutput := &engine.Output{ - Outpoint: transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 0}, - Beef: []byte("grandparent beef"), - Topic: "test", - } - - parent1Output := &engine.Output{ - Outpoint: transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 1}, - Beef: []byte("parent1 beef"), - Topic: "test", - OutputsConsumed: []*transaction.Outpoint{&grandparentOutput.Outpoint}, - } - - parent2Output := &engine.Output{ - Outpoint: transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 2}, - Beef: []byte("parent2 beef"), - Topic: "test", - OutputsConsumed: []*transaction.Outpoint{&grandparentOutput.Outpoint}, - } - - childOutput := &engine.Output{ - Outpoint: transaction.Outpoint{Txid: fakeTxID(t), OutputIndex: 3}, - Beef: []byte("child beef"), - Topic: "test", - OutputsConsumed: []*transaction.Outpoint{ - &parent1Output.Outpoint, - &parent2Output.Outpoint, - }, - } - - sut := &engine.Engine{ - LookupServices: map[string]engine.LookupService{ - "test": fakeLookupService{ - lookupFunc: func(ctx context.Context, question *lookup.LookupQuestion) (*lookup.LookupAnswer, error) { - return &lookup.LookupAnswer{ - Type: lookup.AnswerTypeFormula, - Formulas: []lookup.LookupFormula{ - { - Outpoint: &childOutput.Outpoint, - Histoy: func(beef []byte, outputIndex uint32, currentDepth uint32) bool { return currentDepth <= 2 }, // Include 2 levels of history - }, - }, - }, nil - }, - }, - }, - Storage: fakeStorage{ - findOutputFunc: func(ctx context.Context, outpoint *transaction.Outpoint, topic *string, spent *bool, includeBEEF bool) (*engine.Output, error) { - switch outpoint.Txid.String() { - case childOutput.Outpoint.Txid.String(): - return childOutput, nil - case parent1Output.Outpoint.Txid.String(): - return parent1Output, nil - case parent2Output.Outpoint.Txid.String(): - return parent2Output, nil - case grandparentOutput.Outpoint.Txid.String(): - return grandparentOutput, nil - default: - return nil, errors.New("output not found") - } - }, - }, - } - - // when - actualAnswer, err := sut.Lookup(ctx, &lookup.LookupQuestion{Service: "test"}) - - // then - require.NoError(t, err) - require.NotNil(t, actualAnswer) - require.Equal(t, lookup.AnswerTypeOutputList, actualAnswer.Type) - // With depth 2, we should get: child (depth 0), parent1 & parent2 (depth 1), grandparent (depth 2) - require.Len(t, actualAnswer.Outputs, 4, "Should include child, both parents, and grandparent") - - // Verify all outputs are included - beefContents := make(map[string]bool) - for _, output := range actualAnswer.Outputs { - beefContents[string(output.Beef)] = true - } - - require.True(t, beefContents["child beef"], "Child output should be included") - require.True(t, beefContents["parent1 beef"], "Parent1 output should be included") - require.True(t, beefContents["parent2 beef"], "Parent2 output should be included") - require.True(t, beefContents["grandparent beef"], "Grandparent output should be included") -} diff --git a/pkg/core/engine/tests/engine_provide_foreign_gasp_node_test.go b/pkg/core/engine/tests/engine_provide_foreign_gasp_node_test.go index d8f38163..a54868bc 100644 --- a/pkg/core/engine/tests/engine_provide_foreign_gasp_node_test.go +++ b/pkg/core/engine/tests/engine_provide_foreign_gasp_node_test.go @@ -19,9 +19,10 @@ func TestEngine_ProvideForeignGASPNode_Success(t *testing.T) { BEEF := createDummyBEEF(t) expectedNode := &gasp.Node{ - GraphID: graphID, - RawTx: parseBEEFToTx(t, BEEF).Hex(), - OutputIndex: outpoint.Index, + GraphID: graphID, + RawTx: parseBEEFToTx(t, BEEF).Hex(), + OutputIndex: outpoint.Index, + AncillaryBeef: BEEF, // Unmined transactions now include BEEF as ancillary } sut := &engine.Engine{ diff --git a/pkg/core/engine/tests/engine_submit_test.go b/pkg/core/engine/tests/engine_submit_test.go index 6607a5a5..1b9df4fc 100644 --- a/pkg/core/engine/tests/engine_submit_test.go +++ b/pkg/core/engine/tests/engine_submit_test.go @@ -51,7 +51,7 @@ func TestEngine_Submit_Success(t *testing.T) { }, }, ChainTracker: fakeChainTracker{ - isValidRootForHeight: func(root *chainhash.Hash, height uint32) (bool, error) { + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { return true, nil }, }, @@ -169,7 +169,7 @@ func TestEngine_Submit_DuplicateTransaction_ShouldReturnEmptySteak(t *testing.T) }, }, ChainTracker: fakeChainTracker{ - isValidRootForHeight: func(root *chainhash.Hash, height uint32) (bool, error) { + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { return true, nil }, }, @@ -245,7 +245,7 @@ func TestEngine_Submit_BroadcastFails_ShouldReturnError(t *testing.T) { verifyFunc: func(tx *transaction.Transaction, options ...any) (bool, error) { return true, nil }, - isValidRootForHeight: func(root *chainhash.Hash, height uint32) (bool, error) { + isValidRootForHeight: func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { return true, nil }, }, diff --git a/pkg/core/engine/tests/engine_utils_test.go b/pkg/core/engine/tests/engine_utils_test.go index f04d62d1..e47de421 100644 --- a/pkg/core/engine/tests/engine_utils_test.go +++ b/pkg/core/engine/tests/engine_utils_test.go @@ -133,6 +133,14 @@ func (f fakeStorage) GetLastInteraction(ctx context.Context, host string, topic panic("func not defined") } +func (f fakeStorage) FindOutpointsByMerkleState(ctx context.Context, topic string, state engine.MerkleState, limit uint32) ([]*transaction.Outpoint, error) { + return nil, nil +} + +func (f fakeStorage) ReconcileMerkleRoot(ctx context.Context, topic string, blockHeight uint32, merkleRoot *chainhash.Hash) error { + return nil +} + type fakeManager struct { identifyAdmissibleOutputsFunc func(ctx context.Context, beef []byte, previousCoins map[uint32]*transaction.TransactionOutput) (overlay.AdmittanceInstructions, error) identifyNeededInputsFunc func(ctx context.Context, beef []byte) ([]*transaction.Outpoint, error) @@ -170,7 +178,8 @@ func (f fakeManager) GetDocumentation() string { type fakeChainTracker struct { verifyFunc func(tx *transaction.Transaction, options ...any) (bool, error) - isValidRootForHeight func(root *chainhash.Hash, height uint32) (bool, error) + isValidRootForHeight func(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) + currentHeightFunc func(ctx context.Context) (uint32, error) findHeaderFunc func(height uint32) ([]byte, error) findPreviousHeaderFunc func(tx *transaction.Transaction) ([]byte, error) } @@ -182,9 +191,9 @@ func (f fakeChainTracker) Verify(tx *transaction.Transaction, options ...any) (b panic("func not defined") } -func (f fakeChainTracker) IsValidRootForHeight(root *chainhash.Hash, height uint32) (bool, error) { +func (f fakeChainTracker) IsValidRootForHeight(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { if f.isValidRootForHeight != nil { - return f.isValidRootForHeight(root, height) + return f.isValidRootForHeight(ctx, root, height) } panic("func not defined") } @@ -203,13 +212,20 @@ func (f fakeChainTracker) FindPreviousHeader(tx *transaction.Transaction) ([]byt panic("func not defined") } +func (f fakeChainTracker) CurrentHeight(ctx context.Context) (uint32, error) { + if f.currentHeightFunc != nil { + return f.currentHeightFunc(ctx) + } + return 0, nil +} + type fakeChainTrackerSPVFail struct{} func (f fakeChainTrackerSPVFail) Verify(tx *transaction.Transaction, options ...any) (bool, error) { panic("func not defined") } -func (f fakeChainTrackerSPVFail) IsValidRootForHeight(root *chainhash.Hash, height uint32) (bool, error) { +func (f fakeChainTrackerSPVFail) IsValidRootForHeight(ctx context.Context, root *chainhash.Hash, height uint32) (bool, error) { panic("func not defined") } @@ -221,6 +237,10 @@ func (f fakeChainTrackerSPVFail) FindPreviousHeader(tx *transaction.Transaction) panic("func not defined") } +func (f fakeChainTrackerSPVFail) CurrentHeight(ctx context.Context) (uint32, error) { + return 0, nil +} + type fakeBroadcasterFail struct { broadcastFunc func(tx *transaction.Transaction) (*transaction.BroadcastSuccess, *transaction.BroadcastFailure) broadcastCtxFunc func(ctx context.Context, tx *transaction.Transaction) (*transaction.BroadcastSuccess, *transaction.BroadcastFailure) @@ -378,9 +398,9 @@ func createDummyValidTaggedBEEF(t *testing.T) (overlay.TaggedBEEF, *chainhash.Ha beef := &transaction.Beef{ Version: transaction.BEEF_V2, - Transactions: map[string]*transaction.BeefTx{ - prevTxID.String(): {Transaction: prevTx}, - currentTxID.String(): {Transaction: currentTx}, + Transactions: map[chainhash.Hash]*transaction.BeefTx{ + *prevTxID: {Transaction: prevTx}, + *currentTxID: {Transaction: currentTx}, }, } beefBytes, err := beef.AtomicBytes(currentTxID) @@ -429,9 +449,9 @@ func createDummyBeefWithInputs(t *testing.T) []byte { beef := &transaction.Beef{ Version: transaction.BEEF_V2, - Transactions: map[string]*transaction.BeefTx{ - prevTx.TxID().String(): {Transaction: prevTx}, - currentTx.TxID().String(): {Transaction: currentTx}, + Transactions: map[chainhash.Hash]*transaction.BeefTx{ + *prevTx.TxID(): {Transaction: prevTx}, + *currentTx.TxID(): {Transaction: currentTx}, }, } diff --git a/pkg/core/engine/tests/gasp_start_sync_test.go b/pkg/core/engine/tests/gasp_start_sync_test.go index a40893e0..26aa250e 100644 --- a/pkg/core/engine/tests/gasp_start_sync_test.go +++ b/pkg/core/engine/tests/gasp_start_sync_test.go @@ -17,6 +17,7 @@ func TestEngine_StartGASPSync_CallsSyncSuccessfully(t *testing.T) { resolver := LookupResolverMock{ ExpectQueryCall: true, ExpectSetTrackersCall: true, + ExpectTrackersAccess: true, ExpectedAnswer: &lookup.LookupAnswer{ Type: lookup.AnswerTypeOutputList, Outputs: []*lookup.OutputListItem{ @@ -69,6 +70,7 @@ func TestEngine_StartGASPSync_ResolverQueryFails(t *testing.T) { resolver := LookupResolverMock{ ExpectQueryCall: true, ExpectSetTrackersCall: true, + ExpectTrackersAccess: true, ExpectedError: expectedQueryCallErr, ExpectedAnswer: &lookup.LookupAnswer{ Type: lookup.AnswerTypeOutputList, @@ -122,6 +124,7 @@ func TestEngine_StartGASPSync_GaspSyncFails(t *testing.T) { resolver := LookupResolverMock{ ExpectQueryCall: true, ExpectSetTrackersCall: true, + ExpectTrackersAccess: true, ExpectedAnswer: &lookup.LookupAnswer{ Type: lookup.AnswerTypeOutputList, Outputs: []*lookup.OutputListItem{ diff --git a/pkg/core/engine/tests/lookup_resolver_test.go b/pkg/core/engine/tests/lookup_resolver_test.go index 6b1e891a..56971d3a 100644 --- a/pkg/core/engine/tests/lookup_resolver_test.go +++ b/pkg/core/engine/tests/lookup_resolver_test.go @@ -17,7 +17,7 @@ func TestLookupResolver_NewLookupResolver(t *testing.T) { // then require.NotNil(t, resolver) - require.Empty(t, resolver.SLAPTrackers()) + require.Equal(t, resolver.SLAPTrackers(), lookup.DEFAULT_SLAP_TRACKERS) }) } diff --git a/pkg/core/gasp/gasp.go b/pkg/core/gasp/gasp.go index 6578473d..2d2b9f62 100644 --- a/pkg/core/gasp/gasp.go +++ b/pkg/core/gasp/gasp.go @@ -9,10 +9,17 @@ import ( "github.com/bsv-blockchain/go-sdk/chainhash" "github.com/bsv-blockchain/go-sdk/transaction" + "golang.org/x/sync/errgroup" ) const MAX_CONCURRENCY = 16 +// utxoProcessingState tracks the state of a UTXO processing operation with result sharing +type utxoProcessingState struct { + wg sync.WaitGroup + err error +} + type NodeRequest struct { GraphID *transaction.Outpoint `json:"graphID"` Txid *chainhash.Hash `json:"txid"` @@ -29,6 +36,7 @@ type GASPParams struct { Unidirectional bool LogLevel slog.Level Concurrency int + Topic string } type GASP struct { @@ -39,7 +47,14 @@ type GASP struct { LogPrefix string Unidirectional bool LogLevel slog.Level - limiter chan struct{} + Topic string + limiter chan struct{} // Concurrency limiter controlled by Concurrency config + + // Unified UTXO processing with result sharing + utxoProcessingMap sync.Map // map[transaction.Outpoint]*utxoProcessingState + + // Individual UTXO processing queue (hidden from external callers) + utxoQueue chan *transaction.Outpoint } func NewGASP(params GASPParams) *GASP { @@ -48,8 +63,10 @@ func NewGASP(params GASPParams) *GASP { Remote: params.Remote, LastInteraction: params.LastInteraction, Unidirectional: params.Unidirectional, - // Sequential: params.Sequential, + Topic: params.Topic, + utxoQueue: make(chan *transaction.Outpoint, 1000), } + // Concurrency limiter controlled by Concurrency config if params.Concurrency > 1 { gasp.limiter = make(chan struct{}, params.Concurrency) } else { @@ -66,96 +83,101 @@ func NewGASP(params GASPParams) *GASP { gasp.LogPrefix = "[GASP] " } slog.SetLogLoggerLevel(slog.LevelInfo) + + // Start the always-running worker for individual UTXO processing + go gasp.runProcessingWorker() + return gasp } func (g *GASP) Sync(ctx context.Context, host string, limit uint32) error { - slog.Info(fmt.Sprintf("%sStarting sync process. Last interaction timestamp: %f", g.LogPrefix, g.LastInteraction)) + var sharedOutpoints sync.Map - localUTXOs, err := g.Storage.FindKnownUTXOs(ctx, 0, 0) + initialRequest := &InitialRequest{ + Version: g.Version, + Since: g.LastInteraction, + Limit: limit, + } + initialResponse, err := g.Remote.GetInitialResponse(ctx, initialRequest) if err != nil { return err } - // Find which UTXOs we already have - knownOutpoints := make(map[string]struct{}) - for _, utxo := range localUTXOs { - outpoint := fmt.Sprintf("%s.%d", utxo.Txid, utxo.OutputIndex) - knownOutpoints[outpoint] = struct{}{} + if len(initialResponse.UTXOList) == 0 { + // No more UTXOs to process + return nil } - sharedOutpoints := make(map[string]struct{}) - - var initialResponse *InitialResponse - for { - initialRequest := &InitialRequest{ - Version: g.Version, - Since: g.LastInteraction, - Limit: limit, - } - initialResponse, err = g.Remote.GetInitialResponse(ctx, initialRequest) - if err != nil { - return err + + // Extract outpoints from current page for efficient batch lookup + pageOutpoints := make([]*transaction.Outpoint, len(initialResponse.UTXOList)) + for i, utxo := range initialResponse.UTXOList { + pageOutpoints[i] = utxo.Outpoint() + } + + // Check which outpoints we already have + hasOutputs, err := g.Storage.HasOutputs(ctx, pageOutpoints) + if err != nil { + return err + } + + var ingestQueue []*Output + for i, utxo := range initialResponse.UTXOList { + if utxo.Score > g.LastInteraction { + g.LastInteraction = utxo.Score } + outpoint := utxo.Outpoint() - var ingestQueue []*Output - for _, utxo := range initialResponse.UTXOList { - if utxo.Score > g.LastInteraction { - g.LastInteraction = utxo.Score - } - outpoint := utxo.OutpointString() - if _, exists := knownOutpoints[outpoint]; exists { - sharedOutpoints[outpoint] = struct{}{} - delete(knownOutpoints, outpoint) - } else if _, shared := sharedOutpoints[outpoint]; !shared { + // Check if we already have this output using the same index + if hasOutputs[i] { + // Already have it, mark as shared to avoid re-processing + sharedOutpoints.Store(*outpoint, struct{}{}) + } else { + // Don't have it - need to ingest + if _, shared := sharedOutpoints.Load(*outpoint); !shared { ingestQueue = append(ingestQueue, utxo) } } + } - var wg sync.WaitGroup - for _, utxo := range ingestQueue { - wg.Add(1) - g.limiter <- struct{}{} - go func(utxo *Output) { - defer func() { - <-g.limiter - wg.Done() - }() - outpoint := utxo.Outpoint() - resolvedNode, err := g.Remote.RequestNode(ctx, outpoint, outpoint, true) - if err != nil { - slog.Warn(fmt.Sprintf("%sError with incoming UTXO %s: %v", g.LogPrefix, outpoint, err)) - return - } - slog.Debug(fmt.Sprintf("%sReceived unspent graph node from remote: %v", g.LogPrefix, resolvedNode)) - if err = g.processIncomingNode(ctx, resolvedNode, nil, &sync.Map{}); err != nil { - slog.Warn(fmt.Sprintf("%sError processing incoming node %s: %v", g.LogPrefix, outpoint, err)) - return - } - if err = g.CompleteGraph(ctx, resolvedNode.GraphID); err != nil { - slog.Warn(fmt.Sprintf("%sError completing graph for %s: %v", g.LogPrefix, outpoint, err)) - return - } - sharedOutpoints[outpoint.String()] = struct{}{} - }(utxo) - } - wg.Wait() + // Process all UTXOs from this batch with shared deduplication + processingGroup, processingCtx := errgroup.WithContext(ctx) + seenNodes := &sync.Map{} // Shared across all UTXOs in this batch - // Check if we have more pages to fetch - // If we got fewer items than we requested (or no limit was set), we've reached the end - if limit == 0 || uint32(len(initialResponse.UTXOList)) < limit { - break - } + for _, utxo := range ingestQueue { + g.limiter <- struct{}{} + utxo := utxo // capture loop variable + processingGroup.Go(func() error { + outpoint := utxo.Outpoint() + defer func() { + <-g.limiter + }() + + if err := g.processUTXOToCompletion(processingCtx, outpoint, nil, seenNodes); err != nil { + slog.Error("error processing UTXO", "outpoint", outpoint, "error", err) + return nil + } + sharedOutpoints.Store(*outpoint, struct{}{}) + return nil + }) + } + slog.Info(fmt.Sprintf("%s Processing GASP page: %d UTXOs (since: %.0f)", g.LogPrefix, len(ingestQueue), initialRequest.Since)) + if err := processingGroup.Wait(); err != nil { + return err } // 2. Only do the "reply" half if unidirectional is disabled if !g.Unidirectional && initialResponse != nil { - // Filter localUTXOs for those after initialResponse.since and not in sharedOutpoints + // Load local UTXOs only newer than what the peer already knows about + localUTXOs, err := g.Storage.FindKnownUTXOs(ctx, initialResponse.Since, 0) + if err != nil { + return err + } + + // Filter localUTXOs for those not in sharedOutpoints var replyUTXOs []*Output for _, utxo := range localUTXOs { - outpoint := fmt.Sprintf("%s.%d", utxo.Txid, utxo.OutputIndex) - if utxo.Score >= initialResponse.Since { - if _, shared := sharedOutpoints[outpoint]; !shared { - replyUTXOs = append(replyUTXOs, utxo) - } + outpoint := utxo.Outpoint() + if _, shared := sharedOutpoints.Load(*outpoint); !shared { + replyUTXOs = append(replyUTXOs, utxo) } } @@ -169,20 +191,20 @@ func (g *GASP) Sync(ctx context.Context, host string, limit uint32) error { <-g.limiter wg.Done() }() - slog.Info(fmt.Sprintf("%sHydrating GASP node for UTXO: %s.%d", g.LogPrefix, utxo.Txid, utxo.OutputIndex)) + slog.Info(fmt.Sprintf("%s Hydrating GASP node for UTXO: %s.%d", g.LogPrefix, utxo.Txid, utxo.OutputIndex)) outpoint := utxo.Outpoint() outgoingNode, err := g.Storage.HydrateGASPNode(ctx, outpoint, outpoint, true) if err != nil { - slog.Warn(fmt.Sprintf("%sError hydrating outgoing UTXO %s.%d: %v", g.LogPrefix, utxo.Txid, utxo.OutputIndex, err)) + slog.Warn(fmt.Sprintf("%s Error hydrating outgoing UTXO %s.%d: %v", g.LogPrefix, utxo.Txid, utxo.OutputIndex, err)) return } if outgoingNode == nil { - slog.Debug(fmt.Sprintf("%sSkipping outgoing UTXO %s.%d: not found in storage", g.LogPrefix, utxo.Txid, utxo.OutputIndex)) + slog.Debug(fmt.Sprintf("%s Skipping outgoing UTXO %s.%d: not found in storage", g.LogPrefix, utxo.Txid, utxo.OutputIndex)) return } - slog.Debug(fmt.Sprintf("%sSending unspent graph node for remote: %v", g.LogPrefix, outgoingNode)) + slog.Debug(fmt.Sprintf("%s Sending unspent graph node for remote: %v", g.LogPrefix, outgoingNode)) if err = g.processOutgoingNode(ctx, outgoingNode, &sync.Map{}); err != nil { - slog.Warn(fmt.Sprintf("%sError processing outgoing node %s.%d: %v", g.LogPrefix, utxo.Txid, utxo.OutputIndex, err)) + slog.Warn(fmt.Sprintf("%s Error processing outgoing node %s.%d: %v", g.LogPrefix, utxo.Txid, utxo.OutputIndex, err)) } }(utxo) } @@ -190,14 +212,13 @@ func (g *GASP) Sync(ctx context.Context, host string, limit uint32) error { } } - slog.Info(fmt.Sprintf("%sSync completed!", g.LogPrefix)) return nil } func (g *GASP) GetInitialResponse(ctx context.Context, request *InitialRequest) (resp *InitialResponse, err error) { - slog.Info(fmt.Sprintf("%sReceived initial request: %v", g.LogPrefix, request)) + slog.Info(fmt.Sprintf("%s Received initial request: %v", g.LogPrefix, request)) if request.Version != g.Version { - slog.Error(fmt.Sprintf("%sGASP version mismatch", g.LogPrefix)) + slog.Error(fmt.Sprintf("%s GASP version mismatch", g.LogPrefix)) return nil, NewVersionMismatchError( g.Version, request.Version, @@ -212,18 +233,18 @@ func (g *GASP) GetInitialResponse(ctx context.Context, request *InitialRequest) Since: g.LastInteraction, UTXOList: utxos, } - slog.Debug(fmt.Sprintf("%sBuilt initial response: %v", g.LogPrefix, resp)) + slog.Debug(fmt.Sprintf("%s Built initial response: %v", g.LogPrefix, resp)) return resp, nil } func (g *GASP) GetInitialReply(ctx context.Context, response *InitialResponse) (resp *InitialReply, err error) { - slog.Info(fmt.Sprintf("%sReceived initial response: %v", g.LogPrefix, response)) + slog.Info(fmt.Sprintf("%s Received initial response: %v", g.LogPrefix, response)) knownUtxos, err := g.Storage.FindKnownUTXOs(ctx, response.Since, 0) if err != nil { return nil, err } - slog.Info(fmt.Sprintf("%sFound %d known UTXOs since %f", g.LogPrefix, len(knownUtxos), response.Since)) + slog.Info(fmt.Sprintf("%s Found %d known UTXOs since %f", g.LogPrefix, len(knownUtxos), response.Since)) resp = &InitialReply{ UTXOList: make([]*Output, 0), } @@ -235,27 +256,27 @@ func (g *GASP) GetInitialReply(ctx context.Context, response *InitialResponse) ( resp.UTXOList = append(resp.UTXOList, knownUtxo) } } - slog.Info(fmt.Sprintf("%sBuilt initial reply: %v", g.LogPrefix, resp)) + slog.Info(fmt.Sprintf("%s Built initial reply: %v", g.LogPrefix, resp)) return resp, nil } func (g *GASP) RequestNode(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (node *Node, err error) { - slog.Info(fmt.Sprintf("%sRemote is requesting node with graphID: %s, txid: %s, outputIndex: %d, metadata: %v", g.LogPrefix, graphID.String(), outpoint.Txid.String(), outpoint.Index, metadata)) + slog.Info(fmt.Sprintf("%s Remote is requesting node with graphID: %s, txid: %s, outputIndex: %d, metadata: %v", g.LogPrefix, graphID.String(), outpoint.Txid.String(), outpoint.Index, metadata)) if node, err = g.Storage.HydrateGASPNode(ctx, graphID, outpoint, metadata); err != nil { return nil, err } - slog.Debug(fmt.Sprintf("%sReturning node: %v", g.LogPrefix, node)) + slog.Debug(fmt.Sprintf("%s Returning node: %v", g.LogPrefix, node)) return node, nil } func (g *GASP) SubmitNode(ctx context.Context, node *Node) (requestedInputs *NodeResponse, err error) { - slog.Info(fmt.Sprintf("%sRemote is submitting node: %v", g.LogPrefix, node)) + slog.Info(fmt.Sprintf("%s Remote is submitting node: %v", g.LogPrefix, node)) if err = g.Storage.AppendToGraph(ctx, node, nil); err != nil { return nil, err } else if requestedInputs, err = g.Storage.FindNeededInputs(ctx, node); err != nil { return nil, err } else if requestedInputs != nil { - slog.Debug(fmt.Sprintf("%sRequested inputs: %v", g.LogPrefix, requestedInputs)) + slog.Debug(fmt.Sprintf("%s Requested inputs: %v", g.LogPrefix, requestedInputs)) if err := g.CompleteGraph(ctx, node.GraphID); err != nil { return nil, err } @@ -264,15 +285,14 @@ func (g *GASP) SubmitNode(ctx context.Context, node *Node) (requestedInputs *Nod } func (g *GASP) CompleteGraph(ctx context.Context, graphID *transaction.Outpoint) (err error) { - slog.Info(fmt.Sprintf("%sCompleting newly-synced graph: %s", g.LogPrefix, graphID.String())) if err = g.Storage.ValidateGraphAnchor(ctx, graphID); err == nil { - slog.Debug(fmt.Sprintf("%sGraph validated for node: %s", g.LogPrefix, graphID.String())) + slog.Debug(fmt.Sprintf("%s Graph validated for node: %s", g.LogPrefix, graphID.String())) if err := g.Storage.FinalizeGraph(ctx, graphID); err == nil { + slog.Info(fmt.Sprintf("%s Graph finalized for node: %s", g.LogPrefix, graphID.String())) return nil } - slog.Info(fmt.Sprintf("%sGraph finalized for node: %s", g.LogPrefix, graphID.String())) } - slog.Warn(fmt.Sprintf("%sError completing graph %s: %v", g.LogPrefix, graphID.String(), err)) + slog.Warn(fmt.Sprintf("%s Error completing graph %s: %v", g.LogPrefix, graphID.String(), err)) return g.Storage.DiscardGraph(ctx, graphID) } @@ -280,56 +300,29 @@ func (g *GASP) processIncomingNode(ctx context.Context, node *Node, spentBy *tra if txid, err := g.computeTxID(node.RawTx); err != nil { return err } else { - nodeId := (&transaction.Outpoint{ + nodeOutpoint := &transaction.Outpoint{ Txid: *txid, Index: node.OutputIndex, - }).String() - slog.Debug(fmt.Sprintf("%sProcessing incoming node: %v, spentBy: %v", g.LogPrefix, node, spentBy)) - if _, ok := seenNodes.Load(nodeId); ok { - slog.Debug(fmt.Sprintf("%sNode %s already processed, skipping.", g.LogPrefix, nodeId)) + } + + slog.Debug(fmt.Sprintf("%s Processing incoming node: %v, spentBy: %v", g.LogPrefix, node, spentBy)) + + // Per-graph cycle detection + if _, ok := seenNodes.Load(*nodeOutpoint); ok { + slog.Debug(fmt.Sprintf("%s Node %s already seen in this graph, skipping.", g.LogPrefix, nodeOutpoint.String())) return nil } - seenNodes.Store(nodeId, struct{}{}) + seenNodes.Store(*nodeOutpoint, struct{}{}) + if err := g.Storage.AppendToGraph(ctx, node, spentBy); err != nil { return err } else if neededInputs, err := g.Storage.FindNeededInputs(ctx, node); err != nil { return err } else if neededInputs != nil { - slog.Debug(fmt.Sprintf("%sNeeded inputs for node %s: %v", g.LogPrefix, nodeId, neededInputs)) - var wg sync.WaitGroup - errors := make(chan error) - for outpointStr, data := range neededInputs.RequestedInputs { - wg.Add(1) - g.limiter <- struct{}{} - go func(outpointStr string, data *NodeResponseData) { - defer func() { - <-g.limiter - wg.Done() - }() - slog.Info(fmt.Sprintf("%sRequesting new node for outpoint: %s, metadata: %v", g.LogPrefix, outpointStr, data.Metadata)) - if outpoint, err := transaction.OutpointFromString(outpointStr); err != nil { - errors <- err - } else if newNode, err := g.Remote.RequestNode(ctx, node.GraphID, outpoint, data.Metadata); err != nil { - errors <- err - } else { - slog.Debug(fmt.Sprintf("%sReceived new node: %v", g.LogPrefix, newNode)) - // Create outpoint for the current node that is spending this input - spendingOutpoint := &transaction.Outpoint{ - Txid: *txid, - Index: node.OutputIndex, - } - if err := g.processIncomingNode(ctx, newNode, spendingOutpoint, seenNodes); err != nil { - errors <- err - } - } - }(outpointStr, data) - } - go func() { - wg.Wait() - close(errors) - }() - for err := range errors { - if err != nil { + slog.Debug(fmt.Sprintf("%s Needed inputs for node %s: %v", g.LogPrefix, nodeOutpoint.String(), neededInputs)) + for outpoint, data := range neededInputs.RequestedInputs { + slog.Info(fmt.Sprintf("%s Processing dependency for outpoint: %s, metadata: %v", g.LogPrefix, outpoint.String(), data.Metadata)) + if err := g.processUTXOToCompletion(ctx, &outpoint, nodeOutpoint, seenNodes); err != nil { return err } } @@ -340,7 +333,7 @@ func (g *GASP) processIncomingNode(ctx context.Context, node *Node, spentBy *tra func (g *GASP) processOutgoingNode(ctx context.Context, node *Node, seenNodes *sync.Map) error { if g.Unidirectional { - slog.Debug(fmt.Sprintf("%sSkipping outgoing node processing in unidirectional mode.", g.LogPrefix)) + slog.Debug(fmt.Sprintf("%s Skipping outgoing node processing in unidirectional mode.", g.LogPrefix)) return nil } if node == nil { @@ -349,13 +342,13 @@ func (g *GASP) processOutgoingNode(ctx context.Context, node *Node, seenNodes *s if txid, err := g.computeTxID(node.RawTx); err != nil { return err } else { - nodeId := (&transaction.Outpoint{ + nodeId := transaction.Outpoint{ Txid: *txid, Index: node.OutputIndex, - }).String() - slog.Debug(fmt.Sprintf("%sProcessing outgoing node: %v", g.LogPrefix, node)) + } + slog.Debug(fmt.Sprintf("%s Processing outgoing node: %v", g.LogPrefix, node)) if _, ok := seenNodes.Load(nodeId); ok { - slog.Debug(fmt.Sprintf("%sNode %s already processed, skipping.", g.LogPrefix, nodeId)) + slog.Debug(fmt.Sprintf("%s Node %s already processed, skipping.", g.LogPrefix, nodeId.String())) return nil } seenNodes.Store(nodeId, struct{}{}) @@ -363,28 +356,23 @@ func (g *GASP) processOutgoingNode(ctx context.Context, node *Node, seenNodes *s return err } else if response != nil { var wg sync.WaitGroup - for outpointStr, data := range response.RequestedInputs { + for outpoint, data := range response.RequestedInputs { wg.Add(1) - g.limiter <- struct{}{} - go func(outpointStr string, data *NodeResponseData) { - defer func() { - <-g.limiter - wg.Done() - }() - var outpoint *transaction.Outpoint + go func(outpoint transaction.Outpoint, data *NodeResponseData) { + defer wg.Done() + var hydratedNode *Node var err error - if outpoint, err = transaction.OutpointFromString(outpointStr); err == nil { - var hydratedNode *Node - slog.Info(fmt.Sprintf("%sHydrating node for outpoint: %s, metadata: %v", g.LogPrefix, outpoint, data.Metadata)) - if hydratedNode, err = g.Storage.HydrateGASPNode(ctx, node.GraphID, outpoint, data.Metadata); err == nil { - slog.Debug(fmt.Sprintf("%sSending hydrated node: %v", g.LogPrefix, hydratedNode)) - if err = g.processOutgoingNode(ctx, hydratedNode, seenNodes); err == nil { - return - } + slog.Info(fmt.Sprintf("%s Hydrating node for outpoint: %s, metadata: %v", g.LogPrefix, outpoint.String(), data.Metadata)) + if hydratedNode, err = g.Storage.HydrateGASPNode(ctx, node.GraphID, &outpoint, data.Metadata); err == nil { + slog.Debug(fmt.Sprintf("%s Sending hydrated node: %v", g.LogPrefix, hydratedNode)) + if err = g.processOutgoingNode(ctx, hydratedNode, seenNodes); err == nil { + return } } - slog.Error(fmt.Sprintf("%sError hydrating node: %v", g.LogPrefix, err)) - }(outpointStr, data) + if err != nil { + slog.Error(fmt.Sprintf("%s Error hydrating node: %v", g.LogPrefix, err)) + } + }(outpoint, data) } wg.Wait() } @@ -392,6 +380,46 @@ func (g *GASP) processOutgoingNode(ctx context.Context, node *Node, seenNodes *s return nil } +// processUTXOToCompletion handles the complete UTXO processing pipeline with result sharing deduplication +func (g *GASP) processUTXOToCompletion(ctx context.Context, outpoint *transaction.Outpoint, spentBy *transaction.Outpoint, seenNodes *sync.Map) error { + // Pre-initialize the processing state to avoid race conditions + newState := &utxoProcessingState{} + newState.wg.Add(1) + + // Check if there's already an in-flight operation for this outpoint + if inflight, loaded := g.utxoProcessingMap.LoadOrStore(*outpoint, newState); loaded { + state := inflight.(*utxoProcessingState) + state.wg.Wait() + return state.err + } else { + state := newState + defer state.wg.Done() // Signal completion when we're done + + // We're the first to process this outpoint, do the complete processing + + // Request node from remote + resolvedNode, err := g.Remote.RequestNode(ctx, spentBy, outpoint, true) + if err != nil { + state.err = fmt.Errorf("error with incoming UTXO %s: %w", outpoint, err) + return state.err + } + // Process dependencies + if err = g.processIncomingNode(ctx, resolvedNode, spentBy, seenNodes); err != nil { + state.err = fmt.Errorf("error processing incoming node %s: %w", outpoint, err) + return state.err + } + + // Complete the graph (submit to engine) + if err = g.CompleteGraph(ctx, resolvedNode.GraphID); err != nil { + state.err = fmt.Errorf("error completing graph for %s: %w", outpoint, err) + return state.err + } + + // Success - don't clean up immediately, handle externally + return nil + } +} + func (g *GASP) computeTxID(rawtx string) (*chainhash.Hash, error) { if tx, err := transaction.NewTransactionFromHex(rawtx); err != nil { return nil, err @@ -399,3 +427,41 @@ func (g *GASP) computeTxID(rawtx string) (*chainhash.Hash, error) { return tx.TxID(), nil } } + +// ProcessUTXO queues a single UTXO for processing outside of the sync workflow. +// UTXOs are processed with shared deduplication state to ensure each transaction +// is only submitted once, even if multiple outputs are queued concurrently. +// This method is non-blocking - if the internal queue is full, the UTXO is dropped. +// Does NOT update LastInteraction score. +func (g *GASP) ProcessUTXO(ctx context.Context, outpoint *transaction.Outpoint) error { + select { + case g.utxoQueue <- outpoint: + return nil + default: + err := fmt.Errorf("UTXO processing queue full, dropping UTXO %s", outpoint.String()) + slog.Warn(fmt.Sprintf("%s %v", g.LogPrefix, err)) + return err + } +} + +// runProcessingWorker is the always-running worker that processes queued UTXOs +func (g *GASP) runProcessingWorker() { + seenNodes := &sync.Map{} + + for outpoint := range g.utxoQueue { + g.limiter <- struct{}{} + go func(op *transaction.Outpoint) { + defer func() { + <-g.limiter + }() + + ctx := context.Background() + if err := g.processUTXOToCompletion(ctx, op, nil, seenNodes); err != nil { + slog.Error(fmt.Sprintf("%s Error processing UTXO %s: %v", g.LogPrefix, op, err)) + } + + // Cleanup seenNodes after processing completes + seenNodes.Delete(*op) + }(outpoint) + } +} diff --git a/pkg/core/gasp/storage.go b/pkg/core/gasp/storage.go index 4e9ac101..396768cf 100644 --- a/pkg/core/gasp/storage.go +++ b/pkg/core/gasp/storage.go @@ -8,6 +8,7 @@ import ( type Storage interface { FindKnownUTXOs(ctx context.Context, since float64, limit uint32) ([]*Output, error) + HasOutputs(ctx context.Context, outpoints []*transaction.Outpoint) ([]bool, error) HydrateGASPNode(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (*Node, error) FindNeededInputs(ctx context.Context, tx *Node) (*NodeResponse, error) AppendToGraph(ctx context.Context, tx *Node, spentBy *transaction.Outpoint) error diff --git a/pkg/core/gasp/tests/gasp_get_initial_response_test.go b/pkg/core/gasp/tests/gasp_get_initial_response_test.go index e43c08de..ae38bd18 100644 --- a/pkg/core/gasp/tests/gasp_get_initial_response_test.go +++ b/pkg/core/gasp/tests/gasp_get_initial_response_test.go @@ -19,6 +19,10 @@ func (f fakeGASPStorage) FindKnownUTXOs(ctx context.Context, since float64, limi return f.findKnownUTXOsFunc(ctx, since, limit) } +func (f fakeGASPStorage) HasOutputs(ctx context.Context, outpoints []*transaction.Outpoint) ([]bool, error) { + panic("not implemented") +} + func (f fakeGASPStorage) HydrateGASPNode(ctx context.Context, graphID *transaction.Outpoint, outpoint *transaction.Outpoint, metadata bool) (*gasp.Node, error) { panic("not implemented") } diff --git a/pkg/core/gasp/tests/gasp_storage_test.go b/pkg/core/gasp/tests/gasp_storage_test.go index 1748e7a6..422c0bec 100644 --- a/pkg/core/gasp/tests/gasp_storage_test.go +++ b/pkg/core/gasp/tests/gasp_storage_test.go @@ -561,3 +561,11 @@ func (m *mockStorage) UpdateLastInteraction(ctx context.Context, host string, to func (m *mockStorage) GetLastInteraction(ctx context.Context, host string, topic string) (float64, error) { return 0, nil } + +func (m *mockStorage) FindOutpointsByMerkleState(ctx context.Context, topic string, state engine.MerkleState, limit uint32) ([]*transaction.Outpoint, error) { + return nil, nil +} + +func (m *mockStorage) ReconcileMerkleRoot(ctx context.Context, topic string, blockHeight uint32, merkleRoot *chainhash.Hash) error { + return nil +} diff --git a/pkg/core/gasp/tests/gasp_sync_test.go b/pkg/core/gasp/tests/gasp_sync_test.go index 0e08a016..0302fc39 100644 --- a/pkg/core/gasp/tests/gasp_sync_test.go +++ b/pkg/core/gasp/tests/gasp_sync_test.go @@ -26,7 +26,7 @@ type mockUTXO struct { type mockGASPStorage struct { knownStore []*mockUTXO - tempGraphStore map[string]*mockUTXO + tempGraphStore map[transaction.Outpoint]*mockUTXO mu sync.Mutex updateCallback func() @@ -43,7 +43,7 @@ type mockGASPStorage struct { func newMockGASPStorage(knownStore []*mockUTXO) *mockGASPStorage { return &mockGASPStorage{ knownStore: knownStore, - tempGraphStore: make(map[string]*mockUTXO), + tempGraphStore: make(map[transaction.Outpoint]*mockUTXO), updateCallback: func() {}, } } @@ -88,6 +88,11 @@ func (m *mockGASPStorage) HydrateGASPNode(ctx context.Context, graphID *transact m.mu.Lock() defer m.mu.Unlock() + // If graphID is nil, use the outpoint as the graphID + if graphID == nil { + graphID = outpoint + } + // Check in known store for _, utxo := range m.knownStore { if utxo.GraphID.String() == outpoint.String() { @@ -110,7 +115,7 @@ func (m *mockGASPStorage) HydrateGASPNode(ctx context.Context, graphID *transact } // Check in temp store - if tempUTXO, exists := m.tempGraphStore[outpoint.String()]; exists { + if tempUTXO, exists := m.tempGraphStore[*outpoint]; exists { return &gasp.Node{ GraphID: graphID, RawTx: tempUTXO.RawTx, @@ -129,7 +134,7 @@ func (m *mockGASPStorage) FindNeededInputs(ctx context.Context, tx *gasp.Node) ( // Default: no inputs needed return &gasp.NodeResponse{ - RequestedInputs: make(map[string]*gasp.NodeResponseData), + RequestedInputs: make(map[transaction.Outpoint]*gasp.NodeResponseData), }, nil } @@ -147,8 +152,24 @@ func (m *mockGASPStorage) AppendToGraph(ctx context.Context, tx *gasp.Node, spen if parsedTx != nil { hash = parsedTx.TxID() } - m.tempGraphStore[tx.GraphID.String()] = &mockUTXO{ - GraphID: tx.GraphID, + + // Determine the graph ID - use tx.GraphID if set, otherwise compute from transaction + var graphID *transaction.Outpoint + if tx.GraphID != nil { + graphID = tx.GraphID + } else if hash != nil { + // When GraphID is nil, create one from the transaction itself + graphID = &transaction.Outpoint{ + Txid: *hash, + Index: tx.OutputIndex, + } + } else { + // If we can't determine a GraphID, skip storage + return nil + } + + m.tempGraphStore[*graphID] = &mockUTXO{ + GraphID: graphID, RawTx: tx.RawTx, OutputIndex: tx.OutputIndex, Time: 0, // Current time @@ -175,7 +196,7 @@ func (m *mockGASPStorage) DiscardGraph(ctx context.Context, graphID *transaction m.mu.Lock() defer m.mu.Unlock() - delete(m.tempGraphStore, graphID.String()) + delete(m.tempGraphStore, *graphID) return nil } @@ -187,14 +208,42 @@ func (m *mockGASPStorage) FinalizeGraph(ctx context.Context, graphID *transactio m.mu.Lock() defer m.mu.Unlock() - if tempGraph, exists := m.tempGraphStore[graphID.String()]; exists { + if tempGraph, exists := m.tempGraphStore[*graphID]; exists { m.knownStore = append(m.knownStore, tempGraph) m.updateCallback() - delete(m.tempGraphStore, graphID.String()) + delete(m.tempGraphStore, *graphID) } return nil } +func (m *mockGASPStorage) HasOutputs(ctx context.Context, outpoints []*transaction.Outpoint) ([]bool, error) { + m.mu.Lock() + defer m.mu.Unlock() + + result := make([]bool, len(outpoints)) + for i, outpoint := range outpoints { + found := false + + // Check in known store + for _, utxo := range m.knownStore { + if utxo.Txid.Equal(outpoint.Txid) && utxo.OutputIndex == outpoint.Index { + found = true + break + } + } + + // Check in temp store if not found + if !found { + if _, exists := m.tempGraphStore[*outpoint]; exists { + found = true + } + } + + result[i] = found + } + return result, nil +} + type mockGASPRemote struct { targetGASP *gasp.GASP initialResponseFunc func(ctx context.Context, request *gasp.InitialRequest) (*gasp.InitialResponse, error) @@ -244,7 +293,7 @@ func (m *mockGASPRemote) SubmitNode(ctx context.Context, node *gasp.Node) (*gasp // Default implementation return &gasp.NodeResponse{ - RequestedInputs: make(map[string]*gasp.NodeResponseData), + RequestedInputs: make(map[transaction.Outpoint]*gasp.NodeResponseData), }, nil } @@ -255,6 +304,13 @@ func createMockUTXO(txHex string, outputIndex uint32, time uint32) *mockUTXO { Satoshis: 1000, LockingScript: &script.Script{}, }) + opReturn := &script.Script{} + opReturn.AppendOpcodes(script.OpFALSE, script.OpRETURN) + opReturn.AppendPushData([]byte(txHex)) + tx.AddOutput(&transaction.TransactionOutput{ + Satoshis: 0, + LockingScript: opReturn, + }) // Use the actual transaction hex instead of the provided string realTxHex := hex.EncodeToString(tx.Bytes()) @@ -388,7 +444,7 @@ func TestGASP_SyncBasicScenarios(t *testing.T) { // given ctx := context.Background() utxo1 := createMockUTXO("mock_sender1_rawtx1", 0, 111) - utxo2 := createMockUTXO("mock_sender2_rawtx1", 0, 222) + utxo2 := createMockUTXO("mock_sender2_rawtx1", 1, 222) storage1 := newMockGASPStorage([]*mockUTXO{utxo1, utxo2}) storage2 := newMockGASPStorage([]*mockUTXO{}) @@ -405,8 +461,10 @@ func TestGASP_SyncBasicScenarios(t *testing.T) { // then require.NoError(t, err) - result1, _ := storage1.FindKnownUTXOs(ctx, 0, 0) - result2, _ := storage2.FindKnownUTXOs(ctx, 0, 0) + result1, err := storage1.FindKnownUTXOs(ctx, 0, 0) + require.NoError(t, err) + result2, err := storage2.FindKnownUTXOs(ctx, 0, 0) + require.NoError(t, err) require.Len(t, result2, 2) require.Equal(t, len(result1), len(result2)) diff --git a/pkg/core/gasp/types.go b/pkg/core/gasp/types.go index 1bb8ba08..b3d756b7 100644 --- a/pkg/core/gasp/types.go +++ b/pkg/core/gasp/types.go @@ -47,11 +47,11 @@ type Node struct { GraphID *transaction.Outpoint `json:"graphID"` RawTx string `json:"rawTx"` OutputIndex uint32 `json:"outputIndex"` - Proof *string `json:"proof"` - TxMetadata string `json:"txMetadata"` - OutputMetadata string `json:"outputMetadata"` - Inputs map[string]*Input `json:"inputs"` - AncillaryBeef []byte `json:"ancillaryBeef"` + Proof *string `json:"proof,omitempty"` + TxMetadata string `json:"txMetadata,omitempty"` + OutputMetadata string `json:"outputMetadata,omitempty"` + Inputs map[string]*Input `json:"inputs,omitempty"` + AncillaryBeef []byte `json:"ancillaryBeef,omitempty"` } type NodeResponseData struct { @@ -59,7 +59,7 @@ type NodeResponseData struct { } type NodeResponse struct { - RequestedInputs map[string]*NodeResponseData `json:"requestedInputs"` + RequestedInputs map[transaction.Outpoint]*NodeResponseData `json:"requestedInputs"` } type VersionMismatchError struct { diff --git a/pkg/server/internal/app/errors.go b/pkg/server/internal/app/errors.go index dc3c8a89..b574f9d6 100644 --- a/pkg/server/internal/app/errors.go +++ b/pkg/server/internal/app/errors.go @@ -17,6 +17,7 @@ var ( ErrorTypeOperationTimeout = ErrorType{"operation-timeout"} ErrorTypeRawDataProcessing = ErrorType{"raw-data-processing"} ErrorTypeUnsupportedOperation = ErrorType{"unsupported-operation"} + ErrorTypeNotFound = ErrorType{"not-found"} ) // Error defines a generic application-layer error that should be translated @@ -72,6 +73,16 @@ func NewProviderFailureError(err, slug string) Error { } } +// NewNotFoundError returns an error that handles resource not found failures, +// such as when a requested output, transaction, or other resource doesn't exist. +func NewNotFoundError(err, slug string) Error { + return Error{ + slug: slug, + err: err, + errorType: ErrorTypeNotFound, + } +} + // NewAuthorizationError returns an error that handles authorization failures, // such as missing or invalid credentials when attempting to access a restricted resource. func NewAuthorizationError(err, slug string) Error { diff --git a/pkg/server/internal/app/request_foreign_gasp_node_service.go b/pkg/server/internal/app/request_foreign_gasp_node_service.go index b1c03682..9366dc6b 100644 --- a/pkg/server/internal/app/request_foreign_gasp_node_service.go +++ b/pkg/server/internal/app/request_foreign_gasp_node_service.go @@ -3,6 +3,7 @@ package app import ( "context" + "github.com/bsv-blockchain/go-overlay-services/pkg/core/engine" "github.com/bsv-blockchain/go-overlay-services/pkg/core/gasp" "github.com/bsv-blockchain/go-sdk/chainhash" "github.com/bsv-blockchain/go-sdk/transaction" @@ -51,6 +52,13 @@ func (s *RequestForeignGASPNodeService) RequestForeignGASPNode(ctx context.Conte Txid: *txID, }, dto.Topic) if err != nil { + // Check if the error is due to missing output + if err == engine.ErrMissingOutput { + return nil, NewNotFoundError( + err.Error(), + "The requested output does not exist or is not available in this overlay.", + ) + } return nil, NewForeignGASPNodeProviderError(err) } return node, nil diff --git a/pkg/server/internal/app/request_sync_response_service.go b/pkg/server/internal/app/request_sync_response_service.go index 75313e85..9d344f7a 100644 --- a/pkg/server/internal/app/request_sync_response_service.go +++ b/pkg/server/internal/app/request_sync_response_service.go @@ -73,7 +73,7 @@ type RequestSyncResponseService struct { // It validates the input parameters, constructs the initial request payload, // and delegates the operation to the provider. The response is transformed // into a DTO suitable for external use. -func (s *RequestSyncResponseService) RequestSyncResponse(ctx context.Context, topic Topic, version Version, since Since) (*RequestSyncResponseDTO, error) { +func (s *RequestSyncResponseService) RequestSyncResponse(ctx context.Context, topic Topic, version Version, since Since, limit uint32) (*RequestSyncResponseDTO, error) { if topic.IsEmpty() { return nil, NewIncorrectInputWithFieldError("topic") } @@ -81,7 +81,7 @@ func (s *RequestSyncResponseService) RequestSyncResponse(ctx context.Context, to return nil, NewIncorrectInputWithFieldError("version") } - response, err := s.provider.ProvideForeignSyncResponse(ctx, &gasp.InitialRequest{Version: version.Int(), Since: since.Float64()}, topic.String()) + response, err := s.provider.ProvideForeignSyncResponse(ctx, &gasp.InitialRequest{Version: version.Int(), Since: since.Float64(), Limit: limit}, topic.String()) if err != nil { return nil, NewRequestSyncResponseProviderError(err) } diff --git a/pkg/server/internal/app/request_sync_response_service_test.go b/pkg/server/internal/app/request_sync_response_service_test.go index f42c6e6c..d99c615c 100644 --- a/pkg/server/internal/app/request_sync_response_service_test.go +++ b/pkg/server/internal/app/request_sync_response_service_test.go @@ -16,6 +16,7 @@ func TestRequestSyncResponseService_ValidCase(t *testing.T) { InitialRequest: &gasp.InitialRequest{ Version: testabilities.DefaultVersion, Since: testabilities.DefaultSince, + Limit: 100, }, Topic: testabilities.DefaultTopic, Response: &gasp.InitialResponse{ @@ -49,7 +50,9 @@ func TestRequestSyncResponseService_ValidCase(t *testing.T) { t.Context(), testabilities.DefaultTopic, testabilities.DefaultVersion, - app.NewSince(testabilities.DefaultSince)) + app.NewSince(testabilities.DefaultSince), + 100, + ) // then: require.NoError(t, err) @@ -96,6 +99,7 @@ func TestRequestSyncResponseService_InvalidCases(t *testing.T) { InitialRequest: &gasp.InitialRequest{ Version: testabilities.DefaultVersion, Since: testabilities.DefaultSince, + Limit: 100, }, Topic: testabilities.DefaultTopic, ProvideForeignSyncResponseCall: true, @@ -117,6 +121,7 @@ func TestRequestSyncResponseService_InvalidCases(t *testing.T) { tc.topic, tc.version, tc.since, + 100, ) // then: diff --git a/pkg/server/internal/ports/error_handler.go b/pkg/server/internal/ports/error_handler.go index 29109528..8e61120d 100644 --- a/pkg/server/internal/ports/error_handler.go +++ b/pkg/server/internal/ports/error_handler.go @@ -22,6 +22,7 @@ func ErrorHandler() fiber.ErrorHandler { app.ErrorTypeProviderFailure: fiber.StatusInternalServerError, app.ErrorTypeRawDataProcessing: fiber.StatusInternalServerError, app.ErrorTypeUnsupportedOperation: fiber.StatusNotFound, + app.ErrorTypeNotFound: fiber.StatusNotFound, } return func(c *fiber.Ctx, err error) error { diff --git a/pkg/server/internal/ports/middleware/basic_middleware_group.go b/pkg/server/internal/ports/middleware/basic_middleware_group.go index 0e47e2c3..253a6730 100644 --- a/pkg/server/internal/ports/middleware/basic_middleware_group.go +++ b/pkg/server/internal/ports/middleware/basic_middleware_group.go @@ -15,22 +15,33 @@ import ( type BasicMiddlewareGroupConfig struct { OctetStreamLimit int64 // Max allowed body size for octet-stream requests. EnableStackTrace bool // Enable stack traces in panic recovery middleware. + IncludeLogger bool // Include request logger middleware. Default is false to avoid duplicate logging. } // BasicMiddlewareGroup returns a list of preconfigured middleware for the HTTP server. -// It includes logging, CORS, request ID generation, panic recovery, PProf, request size limiting, health check. +// It includes CORS, request ID generation, panic recovery, PProf, request size limiting, health check. +// Optionally includes logging based on configuration. func BasicMiddlewareGroup(cfg BasicMiddlewareGroupConfig) []fiber.Handler { - return []fiber.Handler{ + handlers := []fiber.Handler{ requestid.New(), idempotency.New(), cors.New(), recover.New(recover.Config{EnableStackTrace: cfg.EnableStackTrace}), - logger.New(logger.Config{ + } + + // Only include logger if explicitly requested + if cfg.IncludeLogger { + handlers = append(handlers, logger.New(logger.Config{ Format: "date=${time} request_id=${locals:requestid} status=${status} method=${method} path=${path} err=${error}\n", TimeFormat: "02-Jan-2006 15:04:05", - }), + })) + } + + handlers = append(handlers, healthcheck.New(), pprof.New(pprof.Config{Prefix: "/api/v1"}), LimitOctetStreamBodyMiddleware(cfg.OctetStreamLimit), - } + ) + + return handlers } diff --git a/pkg/server/internal/ports/openapi/openapi_api.gen.go b/pkg/server/internal/ports/openapi/openapi_api.gen.go index 102db79b..4e61b45d 100644 --- a/pkg/server/internal/ports/openapi/openapi_api.gen.go +++ b/pkg/server/internal/ports/openapi/openapi_api.gen.go @@ -68,14 +68,14 @@ type LookupQuestionJSONBody struct { // RequestForeignGASPNodeJSONBody defines parameters for RequestForeignGASPNode. type RequestForeignGASPNodeJSONBody struct { - // GraphID The graph ID in the format of "txID.outputIndex" + // GraphID The graph ID in the format of "txid.outputIndex" GraphID string `json:"graphID"` // OutputIndex The output index OutputIndex uint32 `json:"outputIndex"` - // TxID The transaction ID - TxID string `json:"txID"` + // Txid The transaction ID + Txid string `json:"txid"` } // RequestForeignGASPNodeParams defines parameters for RequestForeignGASPNode. @@ -85,6 +85,9 @@ type RequestForeignGASPNodeParams struct { // RequestSyncResponseJSONBody defines parameters for RequestSyncResponse. type RequestSyncResponseJSONBody struct { + // Limit Maximum number of items to return + Limit uint32 `json:"limit"` + // Since Timestamp or sequence number from which to start synchronization Since float64 `json:"since"` diff --git a/pkg/server/internal/ports/openapi/openapi_non_admin_request_types.gen.go b/pkg/server/internal/ports/openapi/openapi_non_admin_request_types.gen.go index f7d0bfcc..1b202f24 100644 --- a/pkg/server/internal/ports/openapi/openapi_non_admin_request_types.gen.go +++ b/pkg/server/internal/ports/openapi/openapi_non_admin_request_types.gen.go @@ -26,18 +26,21 @@ type LookupQuestionBody struct { // RequestForeignGASPNodeBody defines model for RequestForeignGASPNodeBody. type RequestForeignGASPNodeBody struct { - // GraphID The graph ID in the format of "txID.outputIndex" + // GraphID The graph ID in the format of "txid.outputIndex" GraphID string `json:"graphID"` // OutputIndex The output index OutputIndex uint32 `json:"outputIndex"` - // TxID The transaction ID - TxID string `json:"txID"` + // Txid The transaction ID + Txid string `json:"txid"` } // RequestSyncResponseBody defines model for RequestSyncResponseBody. type RequestSyncResponseBody struct { + // Limit Maximum number of items to return + Limit uint32 `json:"limit"` + // Since Timestamp or sequence number from which to start synchronization Since float64 `json:"since"` diff --git a/pkg/server/internal/ports/request_foreign_gasp_node_handler.go b/pkg/server/internal/ports/request_foreign_gasp_node_handler.go index b7da6cc7..a3a9f4bc 100644 --- a/pkg/server/internal/ports/request_foreign_gasp_node_handler.go +++ b/pkg/server/internal/ports/request_foreign_gasp_node_handler.go @@ -33,7 +33,7 @@ func (h *RequestForeignGASPNodeHandler) Handle(c *fiber.Ctx, params openapi.Requ node, err := h.service.RequestForeignGASPNode(c.Context(), app.RequestForeignGASPNodeDTO{ GraphID: body.GraphID, - TxID: body.TxID, + TxID: body.Txid, OutputIndex: body.OutputIndex, Topic: params.XBSVTopic, }) diff --git a/pkg/server/internal/ports/request_foreign_gasp_node_handler_test.go b/pkg/server/internal/ports/request_foreign_gasp_node_handler_test.go index 4d64b736..51757cea 100644 --- a/pkg/server/internal/ports/request_foreign_gasp_node_handler_test.go +++ b/pkg/server/internal/ports/request_foreign_gasp_node_handler_test.go @@ -25,7 +25,7 @@ func TestRequestForeignGASPNodeHandler_InvalidCases(t *testing.T) { payload: openapi.RequestForeignGASPNodeBody{ GraphID: testabilities.DefaultValidGraphID, OutputIndex: testabilities.DefaultValidOutputIndex, - TxID: testabilities.DefaultValidTxID, + Txid: testabilities.DefaultValidTxID, }, headers: map[string]string{ fiber.HeaderContentType: fiber.MIMEApplicationJSON, @@ -103,7 +103,7 @@ func TestRequestForeignGASPNodeHandler_ValidCase(t *testing.T) { SetBody(openapi.RequestForeignGASPNodeBody{ GraphID: testabilities.DefaultValidGraphID, OutputIndex: testabilities.DefaultValidOutputIndex, - TxID: testabilities.DefaultValidTxID, + Txid: testabilities.DefaultValidTxID, }). SetResult(&actualResponse). Post("/api/v1/requestForeignGASPNode") diff --git a/pkg/server/internal/ports/request_sync_response_handler.go b/pkg/server/internal/ports/request_sync_response_handler.go index c0c7d057..2513195e 100644 --- a/pkg/server/internal/ports/request_sync_response_handler.go +++ b/pkg/server/internal/ports/request_sync_response_handler.go @@ -36,6 +36,7 @@ func (h *RequestSyncResponseHandler) Handle(c *fiber.Ctx, params openapi.Request app.NewTopic(params.XBSVTopic), app.Version(body.Version), app.Since(body.Since), + body.Limit, ) if err != nil { return err diff --git a/pkg/server/internal/ports/request_sync_response_handler_test.go b/pkg/server/internal/ports/request_sync_response_handler_test.go index 426bc9d9..ed2cd9e7 100644 --- a/pkg/server/internal/ports/request_sync_response_handler_test.go +++ b/pkg/server/internal/ports/request_sync_response_handler_test.go @@ -56,6 +56,7 @@ func TestRequestSyncResponseHandler_InvalidCases(t *testing.T) { InitialRequest: &gasp.InitialRequest{ Version: testabilities.DefaultVersion, Since: testabilities.DefaultSince, + Limit: 0, }, Topic: testabilities.DefaultTopic, }, @@ -97,6 +98,7 @@ func TestRequestSyncResponseHandler_ValidCase(t *testing.T) { InitialRequest: &gasp.InitialRequest{ Version: testabilities.DefaultVersion, Since: testabilities.DefaultSince, + Limit: 0, }, Topic: testabilities.DefaultTopic, Response: &gasp.InitialResponse{ diff --git a/pkg/server/internal/testabilities/request_sync_response_provider_mock.go b/pkg/server/internal/testabilities/request_sync_response_provider_mock.go index 9701c469..5fdde46d 100644 --- a/pkg/server/internal/testabilities/request_sync_response_provider_mock.go +++ b/pkg/server/internal/testabilities/request_sync_response_provider_mock.go @@ -88,6 +88,7 @@ func NewDefaultRequestSyncResponseBody() openapi.RequestSyncResponseBody { return openapi.RequestSyncResponseBody{ Version: DefaultVersion, Since: DefaultSince, + Limit: 0, } } diff --git a/pkg/server/server_http.go b/pkg/server/server_http.go index 2fddc2a9..9a654a55 100644 --- a/pkg/server/server_http.go +++ b/pkg/server/server_http.go @@ -7,6 +7,7 @@ import ( "github.com/bsv-blockchain/go-overlay-services/pkg/core/engine" "github.com/bsv-blockchain/go-overlay-services/pkg/server/internal/adapters" + "github.com/bsv-blockchain/go-overlay-services/pkg/server/internal/ports" "github.com/bsv-blockchain/go-overlay-services/pkg/server/internal/ports/middleware" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/monitor" @@ -175,22 +176,22 @@ func New(opts ...Option) *HTTP { o(srv) } - srv.app = RegisterRoutesWithErrorHandler( - fiber.New(fiber.Config{ - CaseSensitive: true, - StrictRouting: true, - ServerHeader: srv.cfg.ServerHeader, - AppName: srv.cfg.AppName, - ReadTimeout: srv.cfg.ConnectionReadTimeout, - }), - &RegisterRoutesConfig{ - ARCAPIKey: srv.cfg.ARCAPIKey, - ARCCallbackToken: srv.cfg.ARCCallbackToken, - AdminBearerToken: srv.cfg.AdminBearerToken, - Engine: srv.engine, - OctetStreamLimit: srv.cfg.OctetStreamLimit, - }, - ) + srv.app = fiber.New(fiber.Config{ + CaseSensitive: true, + StrictRouting: true, + ServerHeader: srv.cfg.ServerHeader, + AppName: srv.cfg.AppName, + ReadTimeout: srv.cfg.ConnectionReadTimeout, + ErrorHandler: ports.ErrorHandler(), + }) + + RegisterRoutes(srv.app, &RegisterRoutesConfig{ + ARCAPIKey: srv.cfg.ARCAPIKey, + ARCCallbackToken: srv.cfg.ARCCallbackToken, + AdminBearerToken: srv.cfg.AdminBearerToken, + Engine: srv.engine, + OctetStreamLimit: srv.cfg.OctetStreamLimit, + }) srv.app.Get("/metrics", monitor.New(monitor.Config{Title: "Overlay-services API"})) diff --git a/pkg/server/server_http_register_routes.go b/pkg/server/server_http_register_routes.go index 2f4bbe31..1a06ce5d 100644 --- a/pkg/server/server_http_register_routes.go +++ b/pkg/server/server_http_register_routes.go @@ -11,6 +11,13 @@ import ( "github.com/google/uuid" ) +// GetErrorHandler returns the error handler that translates application-level errors +// into appropriate HTTP status codes and JSON responses. +// Use this in your fiber.Config when creating the app if you want proper error handling. +func GetErrorHandler() fiber.ErrorHandler { + return ports.ErrorHandler() +} + // DefaultRegisterRoutesConfig provides a default configuration with reasonable values for local development. var DefaultRegisterRoutesConfig = RegisterRoutesConfig{ ARCAPIKey: "", @@ -38,6 +45,11 @@ type RegisterRoutesConfig struct { // OctetStreamLimit defines the maximum size (in bytes) for reading applicaction/octet-stream // request bodies. By default, it is set to 1GB to protect against excessively large payloads. OctetStreamLimit int64 + + // IncludeLogger determines whether to include the built-in request logger middleware. + // When false (default), the app owner should provide their own logger. + // When true, includes a logger with request_id, status, method, path, and error logging. + IncludeLogger bool } // RegisterRoutesWithErrorHandler wraps RegisterRoutes by injecting a predefined error handler @@ -86,6 +98,7 @@ func RegisterRoutes(app *fiber.App, cfg *RegisterRoutesConfig) *fiber.App { GlobalMiddleware: middleware.BasicMiddlewareGroup(middleware.BasicMiddlewareGroupConfig{ EnableStackTrace: true, OctetStreamLimit: cfg.OctetStreamLimit, + IncludeLogger: cfg.IncludeLogger, }), })