Advanced50 minModule 3 of 3

Multi-Model Pipelines

Compose image, video, and audio models into reliable production pipelines — fan-out, fan-in, sequential chains, and circuit breakers.

Why multi-model pipelines?

Single-model calls are straightforward. Real products often chain multiple generations:

User prompt Image generationtruefusion-pro Image-to-videotruefusion-video Background musicbeatfusion-2.0 Final: animated videowith soundtrack

Each step has its own latency, error rate, and cost. Pipelines need error handling, retries, and observability at every stage.

Pipeline architecture

output output error error error retry fail success Input Pipeline Executor Stage 1 Stage 2 Stage 3 Error Handler Failure Handler Output

Core pipeline abstraction

// lib/pipeline.ts

interface PipelineStage<TInput, TOutput> {
  name: string;
  execute: (input: TInput) => Promise<TOutput>;
  maxRetries?: number;
  timeoutMs?: number;
}

interface PipelineResult<T> {
  success: boolean;
  output?: T;
  error?: Error;
  stageDurations: Record<string, number>;
}

async function runWithRetry<T>(
  fn: () => Promise<T>,
  maxRetries: number,
  stageName: string,
): Promise<T> {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error) {
      if (attempt === maxRetries) throw error;
      const backoffMs = Math.min(1000 * 2 ** attempt, 30_000);
      console.warn(`[${stageName}] attempt ${attempt + 1} failed, retrying in ${backoffMs}ms`);
      await new Promise(r => setTimeout(r, backoffMs));
    }
  }
  throw new Error(`Unreachable`);
}

export async function executePipeline<T>(
  input: unknown,
  stages: PipelineStage<unknown, unknown>[],
): Promise<PipelineResult<T>> {
  const stageDurations: Record<string, number> = {};
  let current: unknown = input;

  for (const stage of stages) {
    const start = Date.now();
    try {
      current = await runWithRetry(
        () => stage.execute(current),
        stage.maxRetries ?? 2,
        stage.name,
      );
      stageDurations[stage.name] = Date.now() - start;
    } catch (error) {
      stageDurations[stage.name] = Date.now() - start;
      return { success: false, error: error as Error, stageDurations };
    }
  }

  return { success: true, output: current as T, stageDurations };
}

Pattern 1: Sequential chain (image → video)

Turn a still image into an animated video:

import { executePipeline } from '@/lib/pipeline';

const imageToVideoStages = [
  {
    name: 'generate-image',
    maxRetries: 2,
    execute: async (input: { prompt: string }) => {
      const res = await fetch('https://api.skytells.ai/v1/predictions', {
        method: 'POST',
        headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
        body: JSON.stringify({ model: 'truefusion-pro', input: { prompt: input.prompt, width: 1024, height: 576 } }),
      }).then(r => r.json());

      // Poll until done
      return pollUntilSucceeded(res.id);
    },
  },
  {
    name: 'animate-image',
    maxRetries: 1,
    execute: async (imagePrediction: { output: string[] }) => {
      const imageUrl = imagePrediction.output[0];
      const res = await fetch('https://api.skytells.ai/v1/predictions', {
        method: 'POST',
        headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
        body: JSON.stringify({
          model: 'truefusion-video',
          input: { image_url: imageUrl, motion_strength: 0.7, duration_seconds: 4 },
        }),
      }).then(r => r.json());

      return pollUntilSucceeded(res.id);
    },
  },
];

const result = await executePipeline<{ output: string[] }>(
  { prompt: 'A lighthouse on rocky cliffs at dusk, dramatic sky' },
  imageToVideoStages,
);

if (result.success) {
  console.log('Video URL:', result.output!.output[0]);
  console.log('Stage times:', result.stageDurations);
}

Pattern 2: Fan-out / fan-in (parallel variations)

Generate multiple variations in parallel, pick the best:

async function generateVariations(
  prompt: string,
  count = 4,
): Promise<string[]> {
  const predictions = await Promise.all(
    Array.from({ length: count }, (_, i) =>
      fetch('https://api.skytells.ai/v1/predictions', {
        method: 'POST',
        headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
        body: JSON.stringify({
          model: 'truefusion-pro',
          input: { prompt, width: 1024, height: 1024, seed: i * 42 },
        }),
      }).then(r => r.json()),
    ),
  );

  // Poll all in parallel
  const results = await Promise.allSettled(
    predictions.map(p => pollUntilSucceeded(p.id)),
  );

  // Collect successful outputs
  return results
    .filter((r): r is PromiseFulfilledResult<{ output: string[] }> => r.status === 'fulfilled')
    .flatMap(r => r.value.output);
}

Pattern 3: Full media pipeline

Image + video + audio in a coordinated pipeline:

image succeeded music succeeded Prompt Generate image+ Generate music Animate image to video Music ready Merge video + audio Final media
async function fullMediaPipeline(prompt: string, musicStyle: string) {
  // Stage 1: Image + Music in parallel (fan-out)
  const [imagePred, musicPred] = await Promise.all([
    fetch('https://api.skytells.ai/v1/predictions', {
      method: 'POST',
      headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
      body: JSON.stringify({ model: 'truefusion-pro', input: { prompt, width: 1024, height: 576 } }),
    }).then(r => r.json()),

    fetch('https://api.skytells.ai/v1/predictions', {
      method: 'POST',
      headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
      body: JSON.stringify({ model: 'beatfusion-2.0', input: { prompt: musicStyle, duration_seconds: 12 } }),
    }).then(r => r.json()),
  ]);

  // Stage 2: Wait for image, then animate (sequential for image→video)
  // Wait for music separately (may complete earlier)
  const [imageDone, musicDone] = await Promise.all([
    pollUntilSucceeded(imagePred.id),
    pollUntilSucceeded(musicPred.id),
  ]);

  // Stage 3: Animate image
  const videoPred = await fetch('https://api.skytells.ai/v1/predictions', {
    method: 'POST',
    headers: { 'x-api-key': process.env.SKYTELLS_API_KEY!, 'Content-Type': 'application/json' },
    body: JSON.stringify({
      model: 'truefusion-video',
      input: { image_url: imageDone.output[0], duration_seconds: 5 },
    }),
  }).then(r => r.json());

  const videoDone = await pollUntilSucceeded(videoPred.id);

  // Stage 4: Merge (server-side ffmpeg)
  const finalUrl = await mergeVideoAndAudio(videoDone.output[0], musicDone.output[0]);

  return { imageUrl: imageDone.output[0], videoUrl: videoDone.output[0], finalUrl };
}

Circuit breaker

Prevent cascading failures when Skytells has degraded availability:

// lib/circuit-breaker.ts
type State = 'closed' | 'open' | 'half-open';

class CircuitBreaker {
  private state: State = 'closed';
  private failureCount = 0;
  private successCount = 0;
  private openedAt = 0;

  constructor(
    private readonly failureThreshold = 5,
    private readonly successThreshold = 2,
    private readonly timeoutMs = 60_000,
  ) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (Date.now() - this.openedAt > this.timeoutMs) {
        this.state = 'half-open';
        this.successCount = 0;
      } else {
        throw new Error('Circuit breaker is open — service unavailable');
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess() {
    this.failureCount = 0;
    if (this.state === 'half-open') {
      this.successCount++;
      if (this.successCount >= this.successThreshold) {
        this.state = 'closed';
      }
    }
  }

  private onFailure() {
    this.failureCount++;
    if (this.failureCount >= this.failureThreshold) {
      this.state = 'open';
      this.openedAt = Date.now();
    }
  }

  get currentState() { return this.state; }
}

export const skytellsCircuitBreaker = new CircuitBreaker();

Observability

Log structured events for every pipeline execution:

interface PipelineEvent {
  pipelineId: string;
  userId: string;
  totalDurationMs: number;
  stages: Array<{ name: string; durationMs: number; status: 'ok' | 'error' }>;
  totalCostUsd: number;
  success: boolean;
}

Summary

You've completed the Generative AI Patterns path. The patterns:

  1. Sequential chain — output of one model is input to the next
  2. Fan-out / fan-in — parallel variations, then collect
  3. Full media pipeline — image + audio in parallel, then compose
  4. Circuit breaker — fail fast when the service is degraded
  5. Backpressure — queue-based rate limiting for high throughput

Apply these patterns to build:

  • AI-powered content creation platforms
  • Automated media production pipelines
  • Real-time product visualization systems

Next steps:

On this page