Error Handling & Best Practices

Handle errors from both the Prediction and Inference APIs robustly — implement retry logic, manage rate limits, set up webhooks, secure API keys, and ship production-ready AI integrations.

What you'll be able to do after this module

Write a single error-handling layer that works for both APIs. Retry transient errors with exponential backoff. Set up webhooks instead of polling for long-running audio and video predictions. Safely manage API keys. Pass the production readiness checklist before you go live.


The two error formats — side by side

Module 3 showed the schemas. Here's what to do with them.

Prediction API error (top-level error_id):

{
  "id": "pred_abc123",
  "status": "failed",
  "error_id": "INSUFFICIENT_CREDITS",
  "error_message": "Your account has run out of credits.",
  "error": null
}

Inference API error (nested error object):

{
  "error": {
    "error_id": "context_length_exceeded",
    "type": "InferenceError",
    "code": "context_length_exceeded",
    "message": "Your messages exceed the model context window.",
    "param": "messages",
    "status": 400,
    "request_id": "req_xyz789"
  }
}

The key differences:

  • Prediction errors live at the top level of the Prediction Object (status: "failed" + error_id)
  • Inference errors are nested under an "error" key, with a type of "InferenceError" and an HTTP status

Detecting which API returned an error

def is_prediction_error(obj: dict) -> bool:
    return "status" in obj and obj.get("status") == "failed" and "error_id" in obj

def is_inference_error(obj: dict) -> bool:
    return "error" in obj and isinstance(obj["error"], dict)

def handle_response(obj: dict):
    if is_inference_error(obj):
        err = obj["error"]
        raise InferenceAPIError(
            message=err["message"],
            code=err["code"],
            status=err["status"],
            request_id=err.get("request_id"),
        )
    if is_prediction_error(obj):
        raise PredictionAPIError(
            message=obj["error_message"],
            error_id=obj["error_id"],
            prediction_id=obj["id"],
        )

Prediction API error codes

error_idHTTP statusMeaningWhat to do
UNAUTHORIZED401Invalid or missing API keyVerify x-api-key is set and not expired
MODEL_NOT_FOUND404model_id doesn't existCheck the model catalog — the ID may have changed
INSUFFICIENT_CREDITS402Account ran out of creditsTop up in Dashboard or set auto-recharge
INVALID_INPUT422Input doesn't match input_schemaLog the validation errors from error_message
RATE_LIMIT_EXCEEDED429Too many requestsRespect Retry-After header, add jitter to retries
PREDICTION_FAILED500Model failed at inference timeRetry once; if persists, contact support with prediction_id
TIMEOUT504Prediction exceeded time limitUse webhooks for long-running jobs (video, audio)

Inference API error codes

codeHTTP statusMeaning
invalid_api_key401Invalid, revoked, or missing API key
insufficient_quota402Account ran out of token quota
model_not_found404Requested model not available
context_length_exceeded400Prompt + completion > context window
invalid_request_error400Malformed request body
rate_limit_exceeded429Too many requests per minute/day
server_error500Infrastructure error — safe to retry
service_unavailable503Model temporarily offline — retry with backoff

Retry with exponential backoff

Retry on transient errors: rate limits (429), server errors (500/503), and prediction failures.

import time
import random
import httpx

def call_with_retry(fn, max_attempts: int = 4):
    """
    Retry a callable with exponential backoff + jitter.
    Retryable: 429, 500, 503
    Non-retryable: 400, 401, 402, 404, 422
    """
    NON_RETRYABLE = {400, 401, 402, 404, 422}
    
    for attempt in range(max_attempts):
        try:
            return fn()
        except httpx.HTTPStatusError as e:
            status = e.response.status_code
            if status in NON_RETRYABLE:
                raise  # Don't retry client errors
            
            if attempt == max_attempts - 1:
                raise  # Final attempt — surface the error
            
            # Respect Retry-After if present
            retry_after = e.response.headers.get("Retry-After")
            if retry_after:
                time.sleep(float(retry_after))
            else:
                # Exponential backoff: 1s, 2s, 4s, 8s … with ±25% jitter
                base_delay = 2 ** attempt
                jitter = random.uniform(0.75, 1.25)
                time.sleep(base_delay * jitter)

# Usage
result = call_with_retry(
    lambda: client.chat.completions.create(
        model="deepbrain-router",
        messages=[{"role": "user", "content": "Hello"}],
    )
)

Stop polling — use webhooks

Polling GET /v1/predictions/:id is fine in development, but for audio and video jobs (which can take 30–120 seconds), webhooks are the production pattern.

import httpx, os

resp = httpx.post(
    "https://api.skytells.ai/v1/predictions",
    headers={"x-api-key": os.environ["SKYTELLS_API_KEY"]},
    json={
        "model_id": "stable-video-6s",
        "inputs": {"prompt": "A drone shot over a mountain lake at dawn"},
        "webhook_url": "https://yourapp.com/api/webhooks/skytells",
        "webhook_events": ["prediction.completed", "prediction.failed"],
    },
)
print(resp.json()["id"])  # prediction ID — store this

Skytells will POST the full Prediction Object to your webhook_url when the job completes:

from fastapi import FastAPI, Request, Header, HTTPException
import hmac, hashlib

app = FastAPI()
WEBHOOK_SECRET = os.environ["SKYTELLS_WEBHOOK_SECRET"]  # set in Dashboard

@app.post("/api/webhooks/skytells")
async def handle_webhook(
    request: Request,
    x_skytells_signature: str = Header(None),
):
    body = await request.body()
    
    # Verify signature — never skip this in production
    expected = hmac.new(WEBHOOK_SECRET.encode(), body, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(f"sha256={expected}", x_skytells_signature or ""):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    event = await request.json()
    prediction = event["data"]
    
    if prediction["status"] == "succeeded":
        output_urls = prediction["output"]  # download within 24 hours
        await save_to_cloud_storage(output_urls)
    elif prediction["status"] == "failed":
        await log_prediction_failure(prediction["id"], prediction["error_id"])

Output URL expiry

Prediction output URLs (images, video, audio) are temporary CDN links that expire after 24 hours. Transfer them to your own storage immediately on completion.

import httpx, boto3, os
from urllib.parse import urlparse
from pathlib import Path

def save_outputs(prediction: dict, bucket: str) -> list[str]:
    """Download Prediction outputs to S3 and return permanent URLs."""
    s3 = boto3.client("s3")
    permanent_urls = []
    
    for i, url in enumerate(prediction.get("output", [])):
        # Determine file extension from URL
        ext = Path(urlparse(url).path).suffix or ".bin"
        key = f"predictions/{prediction['id']}/{i}{ext}"
        
        # Stream-download and upload
        with httpx.stream("GET", url) as r:
            s3.upload_fileobj(r.raw, bucket, key)
        
        permanent_urls.append(f"https://{bucket}.s3.amazonaws.com/{key}")
    
    return permanent_urls

API key security

Good practices:

  1. Environment variables onlySKYTELLS_API_KEY in .env, never hardcoded
  2. Add .env to .gitignore immediately when you create the file
  3. Separate keys per environment — use a different key for development, staging, and production
  4. Rotate keys on a schedule — treat keys like passwords; rotate every 90 days or immediately on suspected exposure
  5. Scope keys if possible — create read-only keys for analytics, write keys for generation, per the Dashboard's key scoping options
  6. Use secrets managers — AWS Secrets Manager, GCP Secret Manager, or Doppler to inject keys at runtime, not at build time
# .gitignore — must contain
.env
.env.local
.env.*.local
*.env

Rate limit strategy

The platform enforces per-minute and per-day request limits. Your retry logic (above) handles the 429, but also:

  • Batch appropriately — embeddings support arrays of inputs; use batches instead of loop-per-item
  • Pre-cache common completions — if 80% of requests share the same prompt prefix, cache the response
  • Queue in production — use a job queue (BullMQ, Celery, SQS) to smooth out traffic spikes rather than firing concurrent requests

Production readiness checklist

Before you go live:

  • API key is in an environment variable, not hardcoded
  • .env is in .gitignore
  • Retry logic covers 429 and 5xx with exponential backoff
  • Prediction outputs are downloaded to permanent storage within 24 hours
  • Webhooks are set up for video/audio predictions
  • Webhook signatures are verified server-side
  • Error types from both APIs are caught and surfaced to users with helpful messages
  • Test with INSUFFICIENT_CREDITS (low-balance account) to confirm graceful degradation
  • Test with RATE_LIMIT_EXCEEDED to confirm retry behaviour
  • Separate API keys for dev, staging, and production environments

Congratulations — you've completed AI Foundations

What you can build now:

  • A chat UI backed by POST /v1/chat/completions with streaming
  • An image generation pipeline using POST /v1/predictions with async polling or webhooks
  • A semantic search system using POST /v1/embeddings
  • Stateful multi-turn conversations using POST /v1/responses

Where to go next:

  • Image Generation learning path — advanced TrueFusion prompting, ControlNet, upscaling
  • SDK Mastery learning path — idiomatic Python and TypeScript SDK patterns
  • Building Production Apps learning path — queuing, caching, cost optimization at scale

On this page