@@ -160,7 +160,7 @@ func (s *DASubmitter) recordFailure(reason common.DASubmitterFailureReason) {
160160}
161161
162162// SubmitHeaders submits pending headers to DA layer
163- func (s * DASubmitter ) SubmitHeaders (ctx context.Context , headers []* types.SignedHeader , cache cache.Manager , signer signer.Signer ) error {
163+ func (s * DASubmitter ) SubmitHeaders (ctx context.Context , headers []* types.SignedHeader , marshalledHeaders [][] byte , cache cache.Manager , signer signer.Signer ) error {
164164 if len (headers ) == 0 {
165165 return nil
166166 }
@@ -169,26 +169,30 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed
169169 return fmt .Errorf ("signer is nil" )
170170 }
171171
172+ if len (marshalledHeaders ) != len (headers ) {
173+ return fmt .Errorf ("marshalledHeaders length (%d) does not match headers length (%d)" , len (marshalledHeaders ), len (headers ))
174+ }
175+
172176 s .logger .Info ().Int ("count" , len (headers )).Msg ("submitting headers to DA" )
173177
174- return submitToDA ( s , ctx , headers ,
175- func ( header * types. SignedHeader ) ([]byte , error ) {
176- // A. Marshal the inner SignedHeader content to bytes (canonical representation for signing)
177- // This effectively signs "Fields 1-3" of the intended DAHeaderEnvelope.
178- contentBytes , err := header . MarshalBinary ( )
179- if err != nil {
180- return nil , fmt .Errorf ("failed to marshal signed header for envelope signing : %w" , err )
181- }
178+ // Create DA envelopes from pre-marshalled headers
179+ envelopes := make ([][] byte , len ( headers ))
180+ for i , header := range headers {
181+ // Sign the pre-marshalled header content
182+ envelopeSignature , err := signer . Sign ( marshalledHeaders [ i ] )
183+ if err != nil {
184+ return fmt .Errorf ("failed to sign envelope for header %d : %w" , i , err )
185+ }
182186
183- // B. Sign the contentBytes with the envelope signer (aggregator)
184- envelopeSignature , err := signer .Sign (contentBytes )
185- if err != nil {
186- return nil , fmt .Errorf ("failed to sign envelope: %w" , err )
187- }
187+ // Create the envelope and marshal it
188+ envelope , err := header .MarshalDAEnvelope (envelopeSignature )
189+ if err != nil {
190+ return fmt .Errorf ("failed to marshal DA envelope for header %d: %w" , i , err )
191+ }
192+ envelopes [i ] = envelope
193+ }
188194
189- // C. Create the envelope and marshal it
190- return header .MarshalDAEnvelope (envelopeSignature )
191- },
195+ return submitToDA (s , ctx , headers , envelopes ,
192196 func (submitted []* types.SignedHeader , res * datypes.ResultSubmit ) {
193197 for _ , header := range submitted {
194198 cache .SetHeaderDAIncluded (header .Hash ().String (), res .Height , header .Height ())
@@ -206,11 +210,15 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed
206210}
207211
208212// SubmitData submits pending data to DA layer
209- func (s * DASubmitter ) SubmitData (ctx context.Context , unsignedDataList []* types.SignedData , cache cache.Manager , signer signer.Signer , genesis genesis.Genesis ) error {
213+ func (s * DASubmitter ) SubmitData (ctx context.Context , unsignedDataList []* types.SignedData , marshalledData [][] byte , cache cache.Manager , signer signer.Signer , genesis genesis.Genesis ) error {
210214 if len (unsignedDataList ) == 0 {
211215 return nil
212216 }
213217
218+ if len (marshalledData ) != len (unsignedDataList ) {
219+ return fmt .Errorf ("marshalledData length (%d) does not match unsignedDataList length (%d)" , len (marshalledData ), len (unsignedDataList ))
220+ }
221+
214222 // Sign the data (cache returns unsigned SignedData structs)
215223 signedDataList , err := s .signData (unsignedDataList , signer , genesis )
216224 if err != nil {
@@ -223,10 +231,17 @@ func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.
223231
224232 s .logger .Info ().Int ("count" , len (signedDataList )).Msg ("submitting data to DA" )
225233
226- return submitToDA (s , ctx , signedDataList ,
227- func (signedData * types.SignedData ) ([]byte , error ) {
228- return signedData .MarshalBinary ()
229- },
234+ // Filter marshalledData to match signedDataList (removes empty data)
235+ filteredMarshalledData := make ([][]byte , 0 , len (signedDataList ))
236+ signedIdx := 0
237+ for i , unsigned := range unsignedDataList {
238+ if signedIdx < len (signedDataList ) && unsigned .Height () == signedDataList [signedIdx ].Height () {
239+ filteredMarshalledData = append (filteredMarshalledData , marshalledData [i ])
240+ signedIdx ++
241+ }
242+ }
243+
244+ return submitToDA (s , ctx , signedDataList , filteredMarshalledData ,
230245 func (submitted []* types.SignedData , res * datypes.ResultSubmit ) {
231246 for _ , sd := range submitted {
232247 cache .SetDataDAIncluded (sd .Data .DACommitment ().String (), res .Height , sd .Height ())
@@ -342,16 +357,15 @@ func submitToDA[T any](
342357 s * DASubmitter ,
343358 ctx context.Context ,
344359 items []T ,
345- marshalFn func ( T ) ([] byte , error ) ,
360+ marshaled [][] byte ,
346361 postSubmit func ([]T , * datypes.ResultSubmit ),
347362 itemType string ,
348363 namespace []byte ,
349364 options []byte ,
350365 getTotalPendingFn func () uint64 ,
351366) error {
352- marshaled , err := marshalItems (ctx , items , marshalFn , itemType )
353- if err != nil {
354- return err
367+ if len (items ) != len (marshaled ) {
368+ return fmt .Errorf ("items length (%d) does not match marshaled length (%d)" , len (items ), len (marshaled ))
355369 }
356370
357371 pol := defaultRetryPolicy (s .config .DA .MaxSubmitAttempts , s .config .DA .BlockTime .Duration )
@@ -513,55 +527,6 @@ func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes int) ([]T,
513527 return items [:count ], marshaled [:count ], nil
514528}
515529
516- func marshalItems [T any ](
517- parentCtx context.Context ,
518- items []T ,
519- marshalFn func (T ) ([]byte , error ),
520- itemType string ,
521- ) ([][]byte , error ) {
522- if len (items ) == 0 {
523- return nil , nil
524- }
525- marshaled := make ([][]byte , len (items ))
526- ctx , cancel := context .WithCancel (parentCtx )
527- defer cancel ()
528-
529- // Semaphore to limit concurrency to 32 workers
530- sem := make (chan struct {}, 32 )
531-
532- // Use a channel to collect results from goroutines
533- resultCh := make (chan error , len (items ))
534-
535- // Marshal items concurrently
536- for i , item := range items {
537- go func (idx int , itm T ) {
538- sem <- struct {}{}
539- defer func () { <- sem }()
540-
541- select {
542- case <- ctx .Done ():
543- resultCh <- ctx .Err ()
544- default :
545- bz , err := marshalFn (itm )
546- if err != nil {
547- resultCh <- fmt .Errorf ("failed to marshal %s item at index %d: %w" , itemType , idx , err )
548- return
549- }
550- marshaled [idx ] = bz
551- resultCh <- nil
552- }
553- }(i , item )
554- }
555-
556- // Wait for all goroutines to complete and check for errors
557- for i := 0 ; i < len (items ); i ++ {
558- if err := <- resultCh ; err != nil {
559- return nil , err
560- }
561- }
562- return marshaled , nil
563- }
564-
565530func waitForBackoffOrContext (ctx context.Context , backoff time.Duration ) error {
566531 if backoff <= 0 {
567532 select {
0 commit comments