Skip to content

Commit 9cf53c2

Browse files
feat(sdk): Use display_name for Kubernetes pod names
Enables set_display_name() to affect Kubernetes pod names for container tasks, improving cluster-side debugging and resource monitoring. Benefits: - Easier debugging: Pod names match display names, simplifying task identification in kubectl, logs, and monitoring tools - Resource monitoring: In ParallelFor loops, each iteration's pod has a clear name, enabling per-component resource usage tracking - Better observability: Metrics and logs can be easily correlated with pipeline tasks Changes: - pipeline_task.py: Enhanced set_display_name() to handle LoopArgumentVariable objects from ParallelFor loops - pipeline_spec_builder.py: - Use sanitized display_name as DAG task key for container tasks - Fix producer task references in groups (ParallelFor, Condition, ExitHandler) to use correct DAG keys - Ensure uniqueness and enforce Kubernetes naming constraints Features: - Display names are sanitized and used for pod names - Supports loop variables: task.set_display_name(f"task-{loop_var.field}") - Handles duplicate names with automatic suffix - Truncates to fit Kubernetes 63-char limit - Fully backward compatible Fixes: - Resolves "unknown producer task" error when using display_name in groups Testing: - Added unit tests for display_name functionality - Tested with ParallelFor loops - Verified backward compatibility Related to: #12434 Signed-off-by: wassimbensalem <bswassim@gmail.com>
1 parent ea5a1d8 commit 9cf53c2

File tree

3 files changed

+349
-12
lines changed

3 files changed

+349
-12
lines changed

sdk/python/kfp/compiler/compiler_test.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,176 @@ def my_pipeline():
419419
self.assertEqual(my_pipeline.pipeline_spec.pipeline_info.display_name,
420420
'my display name')
421421

422+
def test_set_display_name_for_container_task(self):
423+
"""Test that display_name is used as DAG task key for container tasks."""
424+
425+
@dsl.component
426+
def my_component(text: str) -> str:
427+
return text
428+
429+
@dsl.pipeline(name='my-pipeline')
430+
def my_pipeline():
431+
task = my_component(text='hello').set_display_name('My Task')
432+
return task
433+
434+
my_pipeline()
435+
# Check that the DAG task key uses the sanitized display_name
436+
self.assertIn('my-task', my_pipeline.pipeline_spec.root.dag.tasks)
437+
# Verify task_info.name is set correctly
438+
task_spec = my_pipeline.pipeline_spec.root.dag.tasks['my-task']
439+
self.assertEqual(task_spec.task_info.name, 'My Task')
440+
441+
def test_set_display_name_backward_compatibility(self):
442+
"""Test that tasks without display_name use current behavior."""
443+
444+
@dsl.component
445+
def my_component(text: str) -> str:
446+
return text
447+
448+
@dsl.pipeline(name='my-pipeline')
449+
def my_pipeline():
450+
task = my_component(text='hello')
451+
return task
452+
453+
my_pipeline()
454+
# Check that the DAG task key uses sanitized task name
455+
# The task variable name would be sanitized
456+
task_keys = list(my_pipeline.pipeline_spec.root.dag.tasks.keys())
457+
self.assertEqual(len(task_keys), 1)
458+
# Task name should be sanitized (component name gets prefixed with 'comp-')
459+
# but the DAG task key uses the task variable name which is sanitized
460+
# component name without prefix
461+
self.assertGreater(len(task_keys[0]), 0)
462+
463+
def test_set_display_name_uniqueness(self):
464+
"""Test that duplicate display_names get uniqueness suffix."""
465+
466+
@dsl.component
467+
def my_component(text: str) -> str:
468+
return text
469+
470+
@dsl.pipeline(name='my-pipeline')
471+
def my_pipeline():
472+
task1 = my_component(text='hello').set_display_name('Same Name')
473+
task2 = my_component(text='world').set_display_name('Same Name')
474+
return task1, task2
475+
476+
my_pipeline()
477+
# Check that both tasks exist with unique keys
478+
task_keys = list(my_pipeline.pipeline_spec.root.dag.tasks.keys())
479+
self.assertEqual(len(task_keys), 2)
480+
# One should be 'same-name', the other 'same-name-2'
481+
self.assertIn('same-name', task_keys)
482+
self.assertIn('same-name-2', task_keys)
483+
484+
def test_set_display_name_long_name_truncation(self):
485+
"""Test that very long display_names are truncated."""
486+
487+
@dsl.component
488+
def my_component(text: str) -> str:
489+
return text
490+
491+
@dsl.pipeline(name='my-pipeline')
492+
def my_pipeline():
493+
# Create a very long display name (> 15 chars)
494+
long_name = 'a' * 50
495+
task = my_component(text='hello').set_display_name(long_name)
496+
return task
497+
498+
my_pipeline()
499+
# Check that the task key is truncated
500+
task_keys = list(my_pipeline.pipeline_spec.root.dag.tasks.keys())
501+
self.assertEqual(len(task_keys), 1)
502+
# Task key should be truncated to max_task_name_length (15)
503+
self.assertLessEqual(len(task_keys[0]), 20) # Allow some room for suffix
504+
505+
def test_set_display_name_in_loop(self):
506+
"""Test that display_name works for tasks inside ParallelFor loops."""
507+
508+
@dsl.component
509+
def my_component(text: str) -> str:
510+
return text
511+
512+
@dsl.pipeline(name='my-pipeline')
513+
def my_pipeline():
514+
with dsl.ParallelFor(items=['a', 'b', 'c']) as item:
515+
task = my_component(text=item).set_display_name('Loop Task')
516+
return task
517+
518+
my_pipeline()
519+
# Find the loop component
520+
loop_component = None
521+
for comp_name, comp_spec in my_pipeline.pipeline_spec.components.items():
522+
if comp_spec.dag and 'loop-task' in comp_spec.dag.tasks:
523+
loop_component = comp_spec
524+
break
525+
self.assertIsNotNone(loop_component)
526+
self.assertIn('loop-task', loop_component.dag.tasks)
527+
528+
def test_set_display_name_with_dependencies(self):
529+
"""Test that task dependencies work correctly with display_name."""
530+
531+
@dsl.component
532+
def producer(text: str) -> str:
533+
return text
534+
535+
@dsl.component
536+
def consumer(text: str) -> str:
537+
return text
538+
539+
@dsl.pipeline(name='my-pipeline')
540+
def my_pipeline():
541+
prod = producer(text='hello').set_display_name('Producer')
542+
cons = consumer(text=prod.output).set_display_name('Consumer')
543+
return cons
544+
545+
my_pipeline()
546+
# Check that both tasks exist
547+
self.assertIn('producer', my_pipeline.pipeline_spec.root.dag.tasks)
548+
self.assertIn('consumer', my_pipeline.pipeline_spec.root.dag.tasks)
549+
# Check that consumer depends on producer
550+
consumer_spec = my_pipeline.pipeline_spec.root.dag.tasks['consumer']
551+
self.assertIn('producer', consumer_spec.dependent_tasks)
552+
553+
def test_set_display_name_special_characters(self):
554+
"""Test that display_name with special characters is sanitized."""
555+
556+
@dsl.component
557+
def my_component(text: str) -> str:
558+
return text
559+
560+
@dsl.pipeline(name='my-pipeline')
561+
def my_pipeline():
562+
task = my_component(text='hello').set_display_name('My Task @#$%')
563+
return task
564+
565+
my_pipeline()
566+
# Check that special characters are sanitized
567+
task_keys = list(my_pipeline.pipeline_spec.root.dag.tasks.keys())
568+
self.assertEqual(len(task_keys), 1)
569+
# Should only contain lowercase letters, numbers, and hyphens
570+
self.assertTrue(all(c.isalnum() or c == '-' for c in task_keys[0]))
571+
572+
def test_set_display_name_empty_after_sanitization(self):
573+
"""Test that empty display_name after sanitization falls back to task name."""
574+
575+
@dsl.component
576+
def my_component(text: str) -> str:
577+
return text
578+
579+
@dsl.pipeline(name='my-pipeline')
580+
def my_pipeline():
581+
# Display name that becomes empty after sanitization
582+
task = my_component(text='hello').set_display_name('!!!')
583+
return task
584+
585+
my_pipeline()
586+
# Should fall back to task name
587+
task_keys = list(my_pipeline.pipeline_spec.root.dag.tasks.keys())
588+
self.assertEqual(len(task_keys), 1)
589+
# Should not be empty
590+
self.assertGreater(len(task_keys[0]), 0)
591+
422592
def test_set_description_through_pipeline_decorator(self):
423593

424594
@dsl.pipeline(description='Prefer me.')

0 commit comments

Comments
 (0)