@@ -165,9 +165,50 @@ func (u *Manager) FMultipartPutObject(ctx context.Context, bucket string, key st
165165 c := minio.Core {Client : u .client }
166166
167167 // ----------------- Start fetching previous upload info from db -----------------
168- // Fetch upload id. If not found, initiate a new multipart upload.
168+ // Fetch uploaded size
169169 var uploadId string
170+ var uploadedSize int64
171+
170172 uploadIdKey := fmt .Sprintf (uploadIdKeyTemplate , filePath )
173+ uploadedSizeKey := fmt .Sprintf (uploadedSizeKeyTemplate , filePath )
174+ partsKey := fmt .Sprintf (partsKeyTemplate , filePath )
175+
176+ uploadedSizeBytes , err := (* u .storage ).Get ([]byte (u .cacheBucket ), []byte (uploadedSizeKey ))
177+ if err != nil {
178+ log .Debugf ("Get uploaded size by: %s warn: %v" , uploadedSizeKey , err )
179+ }
180+ //nolint: nestif // readability
181+ if uploadedSizeBytes != nil {
182+ uploadedSize , err = strconv .ParseInt (string (uploadedSizeBytes ), 10 , 64 )
183+ if err != nil {
184+ uploadedSize = 0
185+ }
186+ // Validate cached uploaded size doesn't exceed file size
187+ if uploadedSize > fileSize {
188+ log .Warnf ("Cached uploaded size %d exceeds file size %d, resetting to 0" , uploadedSize , fileSize )
189+ uploadedSize = 0
190+
191+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (uploadedSizeKey ))
192+ if err != nil {
193+ log .Errorf ("Delete uploaded size failed: %v" , err )
194+ }
195+
196+ // Reset uploadId and parts as well
197+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (uploadIdKey ))
198+ if err != nil {
199+ log .Errorf ("Delete upload id failed: %v" , err )
200+ }
201+
202+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (partsKey ))
203+ if err != nil {
204+ log .Errorf ("Delete parts failed: %v" , err )
205+ }
206+ }
207+ } else {
208+ uploadedSize = 0
209+ }
210+
211+ // Fetch upload id. If not found, initiate a new multipart upload.
171212 uploadIdBytes , err := (* u .storage ).Get ([]byte (u .cacheBucket ), []byte (uploadIdKey ))
172213 if err != nil {
173214 log .Debugf ("Get upload id by: %s warn: %v" , uploadIdKey , err )
@@ -183,28 +224,11 @@ func (u *Manager) FMultipartPutObject(ctx context.Context, bucket string, key st
183224 }
184225 log .Debugf ("Get upload id: %s by: %s" , uploadId , uploadIdKey )
185226
186- // Fetch uploaded size
187- var uploadedSize int64
188- uploadedSizeKey := fmt .Sprintf (uploadedSizeKeyTemplate , filePath )
189- uploadedSizeBytes , err := (* u .storage ).Get ([]byte (u .cacheBucket ), []byte (uploadedSizeKey ))
190- if err != nil {
191- log .Debugf ("Get uploaded size by: %s warn: %v" , uploadedSizeKey , err )
192- }
193- if uploadedSizeBytes != nil {
194- uploadedSize , err = strconv .ParseInt (string (uploadedSizeBytes ), 10 , 64 )
195- if err != nil {
196- uploadedSize = 0
197- }
198- } else {
199- uploadedSize = 0
200- }
201-
202227 u .uploadProgressChan <- FileUploadProgress {Name : filePath , Uploaded : uploadedSize , TotalSize : - 1 }
203228 log .Debugf ("Get uploaded size: %d by: %s" , uploadedSize , uploadedSizeKey )
204229
205230 // Fetch uploaded parts
206231 var parts []minio.CompletePart
207- partsKey := fmt .Sprintf (partsKeyTemplate , filePath )
208232 partsBytes , err := (* u .storage ).Get ([]byte (u .cacheBucket ), []byte (partsKey ))
209233 if err != nil {
210234 log .Debugf ("Get uploaded parts by: %s warn: %v" , partsKey , err )
@@ -242,10 +266,67 @@ func (u *Manager) FMultipartPutObject(ctx context.Context, bucket string, key st
242266 return errors .Wrap (err , "Optimal part info failed" )
243267 }
244268
269+ // Check if all parts are already uploaded
270+ partsToUpload := totalPartsCount - len (partNumbers )
271+ //nolint: nestif // readability
272+ if partsToUpload <= 0 {
273+ if totalPartsCount == len (partNumbers ) && uploadedSize == fileSize {
274+ // All parts are uploaded and size matches - complete the upload
275+ log .Infof ("File: %s, all parts already uploaded, completing multipart upload" , filePath )
276+
277+ // Sort all completed parts.
278+ slices .SortFunc (parts , func (i , j minio.CompletePart ) int {
279+ return i .PartNumber - j .PartNumber
280+ })
281+
282+ _ , err = c .CompleteMultipartUpload (ctx , bucket , key , uploadId , parts , opts )
283+ if err != nil {
284+ log .Errorf ("Complete multipart upload failed: %v" , err )
285+ return errors .Wrapf (err , "Complete multipart upload failed" )
286+ }
287+
288+ // Clean up cache entries
289+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (uploadIdKey ))
290+ if err != nil {
291+ log .Errorf ("Delete upload id failed: %v" , err )
292+ }
293+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (partsKey ))
294+ if err != nil {
295+ log .Errorf ("Delete parts failed: %v" , err )
296+ }
297+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (uploadedSizeKey ))
298+ if err != nil {
299+ log .Errorf ("Delete uploaded size failed: %v" , err )
300+ }
301+
302+ return nil
303+ } else {
304+ // Parts count or size mismatch - need to re-upload
305+ log .Warnf ("File: %s, parts/size mismatch (total parts: %d, uploaded parts: %d, uploaded size: %d, file size: %d), clearing cache" ,
306+ filePath , totalPartsCount , len (partNumbers ), uploadedSize , fileSize )
307+
308+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (uploadedSizeKey ))
309+ if err != nil {
310+ log .Errorf ("Delete uploaded size failed: %v" , err )
311+ }
312+
313+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (uploadIdKey ))
314+ if err != nil {
315+ log .Errorf ("Delete upload id failed: %v" , err )
316+ }
317+
318+ err = (* u .storage ).Delete ([]byte (u .cacheBucket ), []byte (partsKey ))
319+ if err != nil {
320+ log .Errorf ("Delete parts failed: %v" , err )
321+ }
322+ return errors .New ("File parts/size mismatch, need to re-upload all parts" )
323+ }
324+ }
325+
245326 // Declare a channel that sends the next part number to be uploaded.
246327 uploadPartsCh := make (chan int )
247328 // Declare a channel that sends back the response of a part upload.
248- uploadedPartsCh := make (chan uploadedPartRes , totalPartsCount - len ( partNumbers ) )
329+ uploadedPartsCh := make (chan uploadedPartRes , partsToUpload )
249330 // Used for readability, lastPartNumber is always totalPartsCount.
250331 lastPartNumber := totalPartsCount
251332
@@ -362,7 +443,7 @@ func (u *Manager) FMultipartPutObject(ctx context.Context, bucket string, key st
362443 }
363444 return uploadRes .Error
364445 }
365- // Update the uploadedSize.
446+ // Update the uploadedSize with the current part size
366447 uploadedSize += uploadRes .Part .Size
367448 parts = append (parts , minio.CompletePart {
368449 ETag : uploadRes .Part .ETag ,
0 commit comments