15 specialized ML pipelines for text, audio, and vision
Factory-based system for creating and managing HuggingFace Transformers pipelines. Uses RustFileProvider to fetch model files from Rust's ModelCache.
PipelineFactory.create_pipeline(task, model_id, architecture)
↓
Selects appropriate pipeline class
↓
BasePipeline subclass (e.g., Florence2Pipeline)
↓
Sets file_provider (RustFileProvider)
↓
Loads model using HuggingFace Transformers
↓
Ready for inference
Key Principle: Pipelines request model files → RustFileProvider fetches from Rust → No duplicate downloads
Task: text-generation
Models: GPT-2, LLaMA, Mistral, Phi, Qwen, etc.
Use Cases: Text completion, creative writing, code generation
from pipelines import PipelineFactory
pipeline = PipelineFactory.create_pipeline(
task="text-generation",
model_id="meta-llama/Llama-2-7b-hf"
)
pipeline.file_provider = rust_file_provider
pipeline.load(model_id="meta-llama/Llama-2-7b-hf")
result = pipeline.generate({
"prompt": "Once upon a time",
"max_new_tokens": 50,
"temperature": 0.7
})
print(result['text'])Task: feature-extraction
Models: sentence-transformers, BERT, RoBERTa
Use Cases: Semantic search, similarity, clustering
pipeline = PipelineFactory.create_pipeline(
task="feature-extraction",
model_id="sentence-transformers/all-MiniLM-L6-v2"
)
pipeline.load(model_id="sentence-transformers/all-MiniLM-L6-v2")
result = pipeline.generate({
"texts": ["Hello world", "Machine learning"],
"normalize_embeddings": True
})
embeddings = result['embeddings'] # List[List[float]]Task: automatic-speech-recognition
Models: Whisper (tiny, base, small, medium, large)
Use Cases: Transcription, captioning, voice commands
pipeline = PipelineFactory.create_pipeline(
task="whisper",
model_id="openai/whisper-base"
)
pipeline.load(model_id="openai/whisper-base")
result = pipeline.generate({
"audio": audio_array, # numpy array
"language": "en", # optional
"task": "transcribe" # or "translate"
})
print(result['text'])Task: florence2
Models: Florence-2 (base, large)
Use Cases: Image captioning, VQA, object detection, OCR
pipeline = PipelineFactory.create_pipeline(
task="florence2",
model_id="microsoft/Florence-2-base"
)
pipeline.load(model_id="microsoft/Florence-2-base")
result = pipeline.generate({
"image": pil_image,
"prompt": "<OD>", # Object detection
"max_new_tokens": 1024
})
print(result['text'])Task: clip
Models: CLIP (ViT, ResNet variants)
Use Cases: Zero-shot classification, image search, multimodal embeddings
pipeline = PipelineFactory.create_pipeline(
task="clip",
model_id="openai/clip-vit-base-patch32"
)
pipeline.load(model_id="openai/clip-vit-base-patch32")
result = pipeline.generate({
"image": pil_image,
"texts": ["a cat", "a dog", "a bird"]
})
probs = result['probabilities'] # [0.7, 0.2, 0.1]Task: clap
Models: CLAP
Use Cases: Audio classification, sound search
Task: translation
Models: MarianMT, NLLB, M2M100
Use Cases: Language translation
Task: code-generation
Models: CodeLLaMA, StarCoder, CodeGen
Use Cases: Code completion, generation, refactoring
Task: text-similarity
Models: Cross-encoder models
Use Cases: Re-ranking, semantic similarity
Task: image-classification
Models: ViT, ResNet, EfficientNet
Use Cases: Image classification, object recognition
Task: multimodal
Models: LLaVA, Qwen-VL, etc.
Use Cases: Visual question answering, image reasoning
Task: janus
Models: Janus (multimodal understanding)
Use Cases: Unified vision-language tasks
Task: text-to-speech
Models: Bark, VITS, FastSpeech
Use Cases: Speech synthesis
Task: tokenization
Purpose: Tokenization utilities
Use Cases: Token counting, encoding/decoding
Task: zero-shot-classification
Models: BART, DeBERTa
Use Cases: Classify without training
factory.py - Smart pipeline creation
class PipelineFactory:
@staticmethod
def create_pipeline(
task: str,
model_id: str,
architecture: Optional[str] = None
) -> BasePipeline:
"""
Create pipeline based on task or architecture.
Priority:
1. Architecture (if provided)
2. Task type
3. Model ID patterns
"""Examples:
# By task
pipeline = PipelineFactory.create_pipeline(
task="text-generation",
model_id="gpt2"
)
# By architecture
pipeline = PipelineFactory.create_pipeline(
task="multimodal",
model_id="microsoft/Florence-2-base",
architecture="Florence2"
)
# Auto-detect from model ID
pipeline = PipelineFactory.create_pipeline(
task="feature-extraction",
model_id="sentence-transformers/all-MiniLM-L6-v2"
)base.py - Abstract base class
class BasePipeline(ABC):
def __init__(self):
self.model = None
self.tokenizer = None
self.processor = None
self.file_provider: Optional[RustFileProvider] = None
@abstractmethod
def pipeline_type(self) -> str:
"""Return pipeline task type"""
@abstractmethod
def load(self, model_id: str, options: dict) -> dict:
"""Load model, returns status"""
@abstractmethod
def generate(self, input_data: dict) -> dict:
"""Run inference, returns results"""
def unload(self):
"""Free resources"""How It Works:
- Pipeline calls
transformers.AutoModel.from_pretrained(model_id) - Transformers tries to download
config.json - RustFileProvider intercepts (via custom resolver)
- Fetches file from Rust via gRPC
- Transformers continues loading with local file
Setting File Provider:
pipeline = PipelineFactory.create_pipeline(...)
pipeline.file_provider = rust_file_provider # Set before load()
pipeline.load(model_id="...")Defined in types.py:
class PipelineTask(str, Enum):
TEXT_GENERATION = "text-generation"
FEATURE_EXTRACTION = "feature-extraction"
AUTOMATIC_SPEECH_RECOGNITION = "automatic-speech-recognition"
IMAGE_TO_TEXT = "image-to-text"
IMAGE_CLASSIFICATION = "image-classification"
OBJECT_DETECTION = "object-detection"
ZERO_SHOT_CLASSIFICATION = "zero-shot-classification"
TRANSLATION = "translation"
# ... all task types# Unit tests
pytest tests/test_pipelines.py -v
# Integration tests (requires Rust + models)
pytest tests/test_pipelines_integration.py -v
# Test specific pipeline
pytest tests/test_pipelines.py::TestTextGeneration -v- Create pipeline file:
pipelines/my_pipeline.py
from .base import BasePipeline
class MyPipeline(BasePipeline):
def pipeline_type(self) -> str:
return "my-task"
def load(self, model_id: str, options: dict) -> dict:
# Use self.file_provider to fetch files
# Load model with transformers
# Return {"status": "success"}
pass
def generate(self, input_data: dict) -> dict:
# Run inference
# Return results
pass- Add to
__init__.py:
from .my_pipeline import MyPipeline- Register in
factory.py:
TASK_TO_PIPELINE = {
PipelineTask.MY_TASK: MyPipeline,
# ...
}- Add to
types.py:
class PipelineTask(str, Enum):
MY_TASK = "my-task"⚙️ In Progress: Full implementation of all 15 pipelines
✅ Structure: Complete factory + base + types
⚙️ Integration: RustFileProvider wiring
See: TODO.md for detailed status
- Core - RustFileProvider implementation
- Services - TransformersService usage
- ModelCache (Rust) - File serving