Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions pipeline/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,20 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) {
return nil, fmt.Errorf("error downloading source manifest: %s", err)
}

// Check a few segments from the segmented source input file (non-hls)
sourceSegments := sourceManifest.GetAllSegments()
job.sourceSegments = len(sourceSegments)
err = f.probeSourceSegments(job, sourceSegments)
if err != nil {
return nil, err
}

// Check a few segments from the recording source input file (hls)
err = f.probeRecordingSourceSegments(job, &inputInfo, sourceSegments)
if err != nil {
log.LogError(job.RequestID, "failed to probe recording source segments before transcoding - continuing with transcode", err)
}

outputs, transcodedSegments, err := transcode.RunTranscodeProcess(transcodeRequest, job.StreamName, inputInfo, f.Broadcaster)
if err != nil {
log.LogError(job.RequestID, "RunTranscodeProcess returned an error", err)
Expand Down Expand Up @@ -333,6 +340,44 @@ func (f *ffmpeg) probeSourceSegment(requestID string, seg *m3u8.MediaSegment, so
return nil
}

func (f *ffmpeg) probeRecordingSourceSegments(job *JobInfo, iv *video.InputVideo, sourceSegments []*m3u8.MediaSegment) error {
// Only inspect recording segments if the height/width was not determined in the initial probing step
if job.InputFileInfo.Format != "hls" && (iv.Tracks[0].VideoTrack.Width == 0 || iv.Tracks[0].VideoTrack.Height == 0) {
return nil
}
oldWidth, oldHeight := iv.Tracks[0].VideoTrack.Width, iv.Tracks[0].VideoTrack.Height
segCount := len(sourceSegments)
// Check a random segment in the middle
segmentToCheck := sourceSegments[segCount/2]

u, err := clients.ManifestURLToSegmentURL(job.SegmentingTargetURL, segmentToCheck.URI)
if err != nil {
return fmt.Errorf("error checking recording source segments: %w", err)
}
probeURL, err := clients.SignURL(u)
if err != nil {
return fmt.Errorf("failed to create signed url for %s: %w", u, err)
}
if err := backoff.Retry(func() error {
recSegmentProbe, err := f.probe.ProbeFile(job.RequestID, probeURL)
if err != nil {
return fmt.Errorf("probe failed for recording source segment %s: %w", u, err)
}
videoTrack, err := recSegmentProbe.GetTrack(video.TrackTypeVideo)
hasVideoTrack := err == nil
if hasVideoTrack {
iv.Tracks[0].VideoTrack.Width = videoTrack.Width
iv.Tracks[0].VideoTrack.Height = videoTrack.Height
log.Log(job.RequestID, "Updated recording track info from", "old-width", oldWidth, "old-height", oldHeight, "new-width", iv.Tracks[0].VideoTrack.Width, "new-height", iv.Tracks[0].VideoTrack.Height)
}
return nil
}, retries(3)); err != nil {
return err
}

return nil
}

func copyFileToLocalTmpAndSegment(job *JobInfo) (string, error) {
// Create a temporary local file to write to
localSourceFile, err := os.CreateTemp(os.TempDir(), LocalSourceFilePattern)
Expand Down