Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"fmt"
"maps"
"slices"

"github.com/eigerco/strawberry/internal/block"
Expand All @@ -22,6 +23,15 @@ const (

type ServiceState map[block.ServiceId]ServiceAccount

func (ss ServiceState) Delete(keys ...block.ServiceId) {
for _, key := range keys {
delete(ss, key)
}
}
func (ss ServiceState) Merge(moreServices ServiceState) {
maps.Copy(ss, moreServices)
}

func (ss ServiceState) Clone() ServiceState {
if ss == nil {
return nil
Expand Down
232 changes: 97 additions & 135 deletions internal/statetransition/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package statetransition

import (
"bytes"

"github.com/eigerco/strawberry/internal/crypto/ed25519"

"errors"
"fmt"
"maps"
Expand All @@ -17,6 +14,7 @@ import (
"github.com/eigerco/strawberry/internal/block"
"github.com/eigerco/strawberry/internal/common"
"github.com/eigerco/strawberry/internal/crypto"
"github.com/eigerco/strawberry/internal/crypto/ed25519"
"github.com/eigerco/strawberry/internal/disputing"
"github.com/eigerco/strawberry/internal/guaranteeing"
"github.com/eigerco/strawberry/internal/jamtime"
Expand Down Expand Up @@ -1033,7 +1031,7 @@ func replaceIfChanged(initialServiceId, changedServiceId, selfChangedServiceID b
// ParallelDelta implements equation 12.17 v0.7.0 (∆*(S, ⟦R⟧, ⟨NS → NG⟩) → (S, ⟦X⟧, B, U))
// (e S, t ⟦X⟧, r ⟦R⟧, f ⟨NS → NG⟩) → (S, ⟦X⟧, B, U)
func (a *Accumulator) ParallelDelta(
initialAccState state.AccumulationState,
initState state.AccumulationState,
transfers []service.DeferredTransfer,
workReports []block.WorkReport,
alwaysAccumulate map[block.ServiceId]uint64, // D⟨NS → NG⟩
Expand All @@ -1044,10 +1042,6 @@ func (a *Accumulator) ParallelDelta(
ServiceGasPairs, // accumulation gas
) {

delta := func(svcID block.ServiceId) AccumulationOutput {
return a.Delta1(initialAccState, transfers, workReports, alwaysAccumulate, svcID)
}

// Get all unique service indices involved (s)
// let s = { d_s | r ∈ r, d ∈ r_d } ∪ K(f) ∪ { t_d | t ∈ t }
serviceIndices := make(map[block.ServiceId]struct{})
Expand All @@ -1068,150 +1062,118 @@ func (a *Accumulator) ParallelDelta(
serviceIndices[t.ReceiverServiceIndex] = struct{}{}
}

// caches the output per each executed service to not run the same service multiple times
execSvcIds := map[block.ServiceId]struct{}{
initState.ManagerServiceId: {},
initState.DesignateServiceId: {},
initState.CreateProtectedServiceId: {},
}
for serviceId := range serviceIndices {
execSvcIds[serviceId] = struct{}{}
}
for _, serviceId := range initState.AssignedServiceIds {
execSvcIds[serviceId] = struct{}{}
}

// execute all the services in parallel
mu := sync.Mutex{}
wg := sync.WaitGroup{}
delta := map[block.ServiceId]AccumulationOutput{}

wg.Add(len(execSvcIds))
for serviceId := range execSvcIds {
go func(serviceId block.ServiceId) {
defer wg.Done()
output := a.Delta1(initState, transfers, workReports, alwaysAccumulate, serviceId)
mu.Lock()
delta[serviceId] = output
mu.Unlock()
}(serviceId)
}
wg.Wait()

var allTransfers []service.DeferredTransfer
// u = [(s, ∆(s)u) | s <− s]
accumHashPairs := ServiceHashPairSet{}
accumGasPairs := make(ServiceGasPairs, 0)

var allPreimageProvisions []polkavm.ProvidedPreimage

var mu sync.Mutex
var wg sync.WaitGroup

allAddedServices := service.ServiceState{}
allRemovedIndices := map[block.ServiceId]struct{}{}

for svcId := range serviceIndices {
wg.Add(1)
go func(serviceId block.ServiceId) {
defer wg.Done()
// Process single service using Delta1
output := delta(serviceId)
accState, deferredTransfers, resultHash, gasUsed, preimageProvisions := output.AccumulationState, output.DeferredTransfers, output.Result, output.GasUsed, output.ProvidedPreimages
mu.Lock()
defer mu.Unlock()
// Collect transfers
if len(deferredTransfers) > 0 {
allTransfers = append(allTransfers, deferredTransfers...)
}

// Store accumulation result if present
if resultHash != nil {
accumHashPairs[state.ServiceHashPair{
ServiceId: serviceId,
Hash: *resultHash,
}] = struct{}{}
}
accumGasPairs = append(accumGasPairs, ServiceGasPair{
ServiceId: serviceId,
Gas: gasUsed,
})

allPreimageProvisions = append(allPreimageProvisions, preimageProvisions...)
// Adds the newly created services after accumulation to the service state set
// Removes the deleted services from the state
//
// n = ⋃[s∈s]((∆(s)e)d ∖ K(d ∖ {s}))
// m = ⋃[s∈s](K(d) ∖ K((∆(s)e)d))
// (d ∪ n) ∖ m
removedIndices := mapKeys(initialAccState.ServiceState)
deleteKeys(removedIndices, slices.Collect(maps.Keys(accState.ServiceState))...)
maps.Copy(allRemovedIndices, removedIndices)

initialServices := maps.Clone(initialAccState.ServiceState)
delete(initialServices, serviceId)
deleteKeys(accState.ServiceState, slices.Collect(maps.Keys(initialServices))...)

maps.Copy(allAddedServices, accState.ServiceState)
}(svcId)
}
newAccState := state.AccumulationState{}
// Manager changed state
// e* = ∆(m)e
var changedState state.AccumulationState
wg.Add(1)
go func(serviceId block.ServiceId) {
defer wg.Done()

for serviceId := range serviceIndices {
// Process single service using Delta1
output := delta(serviceId)
mu.Lock()
defer mu.Unlock()

changedState = output.AccumulationState
}(initialAccState.ManagerServiceId)

// i′ = (∆(v)e)i
// v′ = R(v, e∗v, (∆(v)e)v)
var selfChangedDesignateServiceId block.ServiceId
wg.Add(1)
go func(serviceId block.ServiceId) {
defer wg.Done()

// Process single service using Delta1
output := delta(serviceId)
mu.Lock()
defer mu.Unlock()

newAccState.ValidatorKeys = output.AccumulationState.ValidatorKeys
selfChangedDesignateServiceId = output.AccumulationState.DesignateServiceId
}(initialAccState.DesignateServiceId)

// ∀c ∈ NC ∶ q′c = ((∆(a_c)e)q)c
// ∀c ∈ NC ∶ a′c = R(a_c, (e*a)c, ((∆(a_c)e)a)c)
var selfChangedAssignedServiceIds [common.TotalNumberOfCores]block.ServiceId
for core, assignServiceId := range initialAccState.AssignedServiceIds {
wg.Add(1)
go func(serviceId block.ServiceId) {
defer wg.Done()

// Process single service using Delta1
output := delta(serviceId)
mu.Lock()
defer mu.Unlock()

newAccState.PendingAuthorizersQueues[core] = output.AccumulationState.PendingAuthorizersQueues[core]
selfChangedAssignedServiceIds[core] = output.AccumulationState.AssignedServiceIds[core]
}(assignServiceId)
}

// r′ = R(r, e*r , (∆(r)e)r)
var selfChangedCreateProtectedServiceId block.ServiceId
wg.Add(1)
go func(serviceId block.ServiceId) {
defer wg.Done()

output := delta(serviceId)

mu.Lock()
defer mu.Unlock()

// TODO maybe execute only if service ID changed (optimization)
selfChangedCreateProtectedServiceId = output.AccumulationState.CreateProtectedServiceId
}(initialAccState.CreateProtectedServiceId)
output := delta[serviceId]
accState, deferredTransfers, resultHash, gasUsed, preimageProvisions := output.AccumulationState, output.DeferredTransfers, output.Result, output.GasUsed, output.ProvidedPreimages
// Collect transfers
if len(deferredTransfers) > 0 {
allTransfers = append(allTransfers, deferredTransfers...)
}

// Wait for the rest of the processes
wg.Wait()
// Store accumulation result if present
if resultHash != nil {
accumHashPairs[state.ServiceHashPair{
ServiceId: serviceId,
Hash: *resultHash,
}] = struct{}{}
}
accumGasPairs = append(accumGasPairs, ServiceGasPair{
ServiceId: serviceId,
Gas: gasUsed,
})

initialServices := initialAccState.ServiceState.Clone()
maps.Copy(initialServices, allAddedServices) // d U n
deleteKeys(initialServices, slices.Collect(maps.Keys(allRemovedIndices))...) // d \ m
allPreimageProvisions = append(allPreimageProvisions, preimageProvisions...)
// Adds the newly created services after accumulation to the service state set
// Removes the deleted services from the state
//
// n = ⋃[s∈s]((∆(s)e)d ∖ K(d ∖ {s}))
// m = ⋃[s∈s](K(d) ∖ K((∆(s)e)d))
// (d ∪ n) ∖ m
removedIndices := mapKeys(initState.ServiceState)
deleteKeys(removedIndices, slices.Collect(maps.Keys(accState.ServiceState))...)
maps.Copy(allRemovedIndices, removedIndices)

newAccState.ServiceState = initialServices
// (m′, z′) = e*_(m,z)
newAccState.ManagerServiceId = changedState.ManagerServiceId
newAccState.AmountOfGasPerServiceId = changedState.AmountOfGasPerServiceId
initialServices := maps.Clone(initState.ServiceState)
delete(initialServices, serviceId)
deleteKeys(accState.ServiceState, slices.Collect(maps.Keys(initialServices))...)

newAccState.DesignateServiceId = replaceIfChanged(initialAccState.DesignateServiceId, changedState.DesignateServiceId, selfChangedDesignateServiceId)
for _, core := range newAccState.AssignedServiceIds {
newAccState.AssignedServiceIds[core] = replaceIfChanged(initialAccState.AssignedServiceIds[core], changedState.AssignedServiceIds[core], selfChangedAssignedServiceIds[core])
maps.Copy(allAddedServices, accState.ServiceState)
}
newAccState.CreateProtectedServiceId = replaceIfChanged(initialAccState.CreateProtectedServiceId, changedState.CreateProtectedServiceId, selfChangedCreateProtectedServiceId)

// d′ = P ((d ∪ n) ∖ m, [⋃s∈s] ∆1(o, w, f , s)p)
newAccState.ServiceState = a.preimageIntegration(newAccState.ServiceState, allPreimageProvisions)

return newAccState, allTransfers, accumHashPairs, accumGasPairs
// Manager changed state
// e* = ∆(m)e
managerState := delta[initState.ManagerServiceId].AccumulationState

var newPendingAuthorizersQueues [common.TotalNumberOfCores]state.PendingAuthorizersQueue
var newAssignedServiceIds [common.TotalNumberOfCores]block.ServiceId
for core, serviceId := range initState.AssignedServiceIds {
// ∀c ∈ NC ∶ q′c = ((∆(a_c)e)q)c
newPendingAuthorizersQueues[core] = delta[serviceId].AccumulationState.PendingAuthorizersQueues[core]

// ∀c ∈ NC ∶ a′c = R(a_c, (e*a)c, ((∆(a_c)e)a)c)
newAssignedServiceIds[core] = replaceIfChanged(initState.AssignedServiceIds[core], managerState.AssignedServiceIds[core], delta[initState.AssignedServiceIds[core]].AccumulationState.AssignedServiceIds[core])
}

newServiceState := initState.ServiceState.Clone()
newServiceState.Merge(allAddedServices)
newServiceState.Delete(slices.Collect(maps.Keys(allRemovedIndices))...)
return state.AccumulationState{
// d′ = P((d ∪ n) ∖ m, ⋃ s∈s ∆(s)p))
ServiceState: a.preimageIntegration(newServiceState, allPreimageProvisions),
// i′ = (∆(v)e)i
ValidatorKeys: delta[initState.DesignateServiceId].AccumulationState.ValidatorKeys,
// (m′, z′) = e*_(m,z)
ManagerServiceId: managerState.ManagerServiceId,
AmountOfGasPerServiceId: managerState.AmountOfGasPerServiceId,
// v′ = R(v, e∗v, (∆(v)e)v)
DesignateServiceId: replaceIfChanged(initState.DesignateServiceId, managerState.DesignateServiceId, delta[initState.DesignateServiceId].AccumulationState.DesignateServiceId),
// r′ = R(r, e*r , (∆(r)e)r)
CreateProtectedServiceId: replaceIfChanged(initState.CreateProtectedServiceId, managerState.CreateProtectedServiceId, delta[initState.CreateProtectedServiceId].AccumulationState.CreateProtectedServiceId),

PendingAuthorizersQueues: newPendingAuthorizersQueues,
AssignedServiceIds: newAssignedServiceIds,
}, allTransfers, accumHashPairs, accumGasPairs
}

func mapKeys[K comparable, V any](m map[K]V) map[K]struct{} {
Expand Down
Loading