Advanced50 minModule 3 of 3
Multi-Model Pipelines
Compose image, video, and audio models into reliable production pipelines — fan-out, fan-in, sequential chains, and circuit breakers.
Why multi-model pipelines?
Single-model calls are straightforward. Real products often chain multiple generations:
Each step has its own latency, error rate, and cost. Pipelines need error handling, retries, and observability at every stage.
Pipeline architecture
Core pipeline abstraction
// lib/pipeline.ts
interface PipelineStage<TInput, TOutput> {
name: string;
execute: (input: TInput) => Promise<TOutput>;
maxRetries?: number;
timeoutMs?: number;
}
interface PipelineResult<T> {
success: boolean;
output?: T;
error?: Error;
stageDurations: Record<string, number>;
}
async function runWithRetry<T>(
fn: () => Promise<T>,
maxRetries: number,
stageName: string,
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
if (attempt === maxRetries) throw error;
const backoffMs = Math.min(1000 * 2 ** attempt, 30_000);
console.warn(`[${stageName}] attempt ${attempt + 1} failed, retrying in ${backoffMs}ms`);
await new Promise(r => setTimeout(r, backoffMs));
}
}
throw new Error(`Unreachable`);
}
export async function executePipeline<T>(
input: unknown,
stages: PipelineStage<unknown, unknown>[],
): Promise<PipelineResult<T>> {
const stageDurations: Record<string, number> = {};
let current: unknown = input;
for (const stage of stages) {
const start = Date.now();
try {
current = await runWithRetry(
() => stage.execute(current),
stage.maxRetries ?? 2,
stage.name,
);
stageDurations[stage.name] = Date.now() - start;
} catch (error) {
stageDurations[stage.name] = Date.now() - start;
return { success: false, error: error as Error, stageDurations };
}
}
return { success: true, output: current as T, stageDurations };
}Pattern 1: Sequential chain (image → video)
Turn a still image into an animated video:
import { executePipeline } from '@/lib/pipeline';
const imageToVideoStages = [
{
name: 'generate-image',
maxRetries: 2,
execute: async (input: { prompt: string }) => {
const res = await fetch('https://api.skytells.ai/v1/predictions', {
method: 'POST',
headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
body: JSON.stringify({ model: 'truefusion-pro', input: { prompt: input.prompt, width: 1024, height: 576 } }),
}).then(r => r.json());
// Poll until done
return pollUntilSucceeded(res.id);
},
},
{
name: 'animate-image',
maxRetries: 1,
execute: async (imagePrediction: { output: string[] }) => {
const imageUrl = imagePrediction.output[0];
const res = await fetch('https://api.skytells.ai/v1/predictions', {
method: 'POST',
headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'truefusion-video',
input: { image_url: imageUrl, motion_strength: 0.7, duration_seconds: 4 },
}),
}).then(r => r.json());
return pollUntilSucceeded(res.id);
},
},
];
const result = await executePipeline<{ output: string[] }>(
{ prompt: 'A lighthouse on rocky cliffs at dusk, dramatic sky' },
imageToVideoStages,
);
if (result.success) {
console.log('Video URL:', result.output!.output[0]);
console.log('Stage times:', result.stageDurations);
}Pattern 2: Fan-out / fan-in (parallel variations)
Generate multiple variations in parallel, pick the best:
async function generateVariations(
prompt: string,
count = 4,
): Promise<string[]> {
const predictions = await Promise.all(
Array.from({ length: count }, (_, i) =>
fetch('https://api.skytells.ai/v1/predictions', {
method: 'POST',
headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'truefusion-pro',
input: { prompt, width: 1024, height: 1024, seed: i * 42 },
}),
}).then(r => r.json()),
),
);
// Poll all in parallel
const results = await Promise.allSettled(
predictions.map(p => pollUntilSucceeded(p.id)),
);
// Collect successful outputs
return results
.filter((r): r is PromiseFulfilledResult<{ output: string[] }> => r.status === 'fulfilled')
.flatMap(r => r.value.output);
}Pattern 3: Full media pipeline
Image + video + audio in a coordinated pipeline:
async function fullMediaPipeline(prompt: string, musicStyle: string) {
// Stage 1: Image + Music in parallel (fan-out)
const [imagePred, musicPred] = await Promise.all([
fetch('https://api.skytells.ai/v1/predictions', {
method: 'POST',
headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
body: JSON.stringify({ model: 'truefusion-pro', input: { prompt, width: 1024, height: 576 } }),
}).then(r => r.json()),
fetch('https://api.skytells.ai/v1/predictions', {
method: 'POST',
headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
body: JSON.stringify({ model: 'beatfusion-2.0', input: { prompt: musicStyle, duration_seconds: 12 } }),
}).then(r => r.json()),
]);
// Stage 2: Wait for image, then animate (sequential for image→video)
// Wait for music separately (may complete earlier)
const [imageDone, musicDone] = await Promise.all([
pollUntilSucceeded(imagePred.id),
pollUntilSucceeded(musicPred.id),
]);
// Stage 3: Animate image
const videoPred = await fetch('https://api.skytells.ai/v1/predictions', {
method: 'POST',
headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
body: JSON.stringify({
model: 'truefusion-video',
input: { image_url: imageDone.output[0], duration_seconds: 5 },
}),
}).then(r => r.json());
const videoDone = await pollUntilSucceeded(videoPred.id);
// Stage 4: Merge (server-side ffmpeg)
const finalUrl = await mergeVideoAndAudio(videoDone.output[0], musicDone.output[0]);
return { imageUrl: imageDone.output[0], videoUrl: videoDone.output[0], finalUrl };
}Circuit breaker
Prevent cascading failures when Skytells has degraded availability:
// lib/circuit-breaker.ts
type State = 'closed' | 'open' | 'half-open';
class CircuitBreaker {
private state: State = 'closed';
private failureCount = 0;
private successCount = 0;
private openedAt = 0;
constructor(
private readonly failureThreshold = 5,
private readonly successThreshold = 2,
private readonly timeoutMs = 60_000,
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
if (Date.now() - this.openedAt > this.timeoutMs) {
this.state = 'half-open';
this.successCount = 0;
} else {
throw new Error('Circuit breaker is open — service unavailable');
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess() {
this.failureCount = 0;
if (this.state === 'half-open') {
this.successCount++;
if (this.successCount >= this.successThreshold) {
this.state = 'closed';
}
}
}
private onFailure() {
this.failureCount++;
if (this.failureCount >= this.failureThreshold) {
this.state = 'open';
this.openedAt = Date.now();
}
}
get currentState() { return this.state; }
}
export const skytellsCircuitBreaker = new CircuitBreaker();Observability
Log structured events for every pipeline execution:
interface PipelineEvent {
pipelineId: string;
userId: string;
totalDurationMs: number;
stages: Array<{ name: string; durationMs: number; status: 'ok' | 'error' }>;
totalCostUsd: number;
success: boolean;
}Summary
You've completed the Generative AI Patterns path. The patterns:
- Sequential chain — output of one model is input to the next
- Fan-out / fan-in — parallel variations, then collect
- Full media pipeline — image + audio in parallel, then compose
- Circuit breaker — fail fast when the service is degraded
- Backpressure — queue-based rate limiting for high throughput
Apply these patterns to build:
- AI-powered content creation platforms
- Automated media production pipelines
- Real-time product visualization systems
Next steps:
- Build an AI Image Studio — apply these patterns in a real product
- Enterprise & Compliance — secure your pipelines at scale