Beginner40 minModule 3 of 3

Automate Your Content Pipeline

Build an automated content pipeline using n8n (no-code) or a Python script that generates and schedules content from a weekly calendar.

The goal

Create a system that:

  1. Reads a content calendar (a spreadsheet or JSON file)
  2. Generates video + music for each entry automatically
  3. Saves the outputs to a folder or cloud storage
  4. (Optionally) notifies you via email or Slack when ready

You schedule it weekly — it runs while you sleep.

Pipeline overview

Monday 9am Content Calendar Pipeline trigger Read this week's prompts Generate videos in parallel Generate music in parallel Merge video + audio Save to storage Notify via Slack/email

Option A — n8n (no-code)

n8n is an open-source workflow automation tool with a visual editor. It has a native HTTP Request node that can call any REST API — including Skytells.

Step 1: Set up n8n

# Docker (recommended)
docker run -d --name n8n -p 5678:5678 \
  -v ~/.n8n:/home/node/.n8n \
  n8nio/n8n

# Or with npm
npm install -g n8n && n8n start

Open http://localhost:5678.

Step 2: Create the workflow

Nodes you'll need:

  1. Schedule Trigger — runs every Monday at 9am
  2. Read Binary File (or Google Sheets) — loads the content calendar
  3. SplitInBatches — processes one item at a time
  4. HTTP Request — calls POST https://api.skytells.ai/v1/predictions
  5. WaitHTTP Request (loop) — polls until complete
  6. HTTP Request — downloads output
  7. Write Binary File — saves to disk
  8. Slack — notifies you when done

Step 3: Configure the HTTP Request node

{
  "url": "https://api.skytells.ai/v1/predictions",
  "method": "POST",
  "headers": {
    "x-api-key": "={{ $env.SKYTELLS_API_KEY }}",
    "Content-Type": "application/json"
  },
  "body": {
    "model": "truefusion-video-pro",
    "input": {
      "prompt": "={{ $json.prompt }}",
      "duration_seconds": 5,
      "aspect_ratio": "9:16"
    }
  }
}

Option B — Python script

For developers who prefer code, here's a complete automated pipeline:

Content calendar format (JSON)

[
  {
    "week": "2025-W03",
    "slug": "morning-coffee",
    "video_prompt": "A barista making pour-over coffee, morning light, 9:16, cinematic",
    "music_prompt": "Calm morning café ambience, acoustic guitar, warm, 10 seconds",
    "post_date": "2025-01-13"
  },
  {
    "week": "2025-W03",
    "slug": "product-launch",
    "video_prompt": "Minimalist product reveal on marble, slow rotation, clean studio",
    "music_prompt": "Corporate announcement sting, brass and strings, dramatic build, 8 seconds",
    "post_date": "2025-01-15"
  }
]

The pipeline script

#!/usr/bin/env python3
"""
content_pipeline.py — automated content generation pipeline
Usage: python content_pipeline.py content_calendar.json
"""
import os
import sys
import json
import time
import urllib.request
import subprocess
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

API_KEY = os.environ["SKYTELLS_API_KEY"]
BASE = "https://api.skytells.ai/v1"
OUTPUT_DIR = Path("generated")
OUTPUT_DIR.mkdir(exist_ok=True)


def create_prediction(model: str, input_data: dict) -> str:
    """Create a prediction and return its ID."""
    req = urllib.request.Request(
        f"{BASE}/predictions",
        data=json.dumps({"model": model, "input": input_data}).encode(),
        headers={"x-api-key": API_KEY, "Content-Type": "application/json"},
    )
    with urllib.request.urlopen(req) as resp:
        return json.loads(resp.read())["id"]


def wait_for_prediction(prediction_id: str, interval: int = 5) -> str:
    """Poll until the prediction succeeds, return the output URL."""
    while True:
        req = urllib.request.Request(
            f"{BASE}/predictions/{prediction_id}",
            headers={"x-api-key": API_KEY},
        )
        with urllib.request.urlopen(req) as resp:
            p = json.loads(resp.read())

        if p["status"] == "succeeded":
            return p["output"][0]
        if p["status"] in ("failed", "canceled"):
            raise RuntimeError(f"Prediction {prediction_id} failed: {p.get('error')}")

        time.sleep(interval)


def download(url: str, path: Path) -> None:
    urllib.request.urlretrieve(url, path)


def merge_video_audio(video_path: Path, audio_path: Path, output_path: Path) -> None:
    subprocess.run(
        [
            "ffmpeg", "-y",
            "-i", str(video_path),
            "-i", str(audio_path),
            "-c:v", "copy", "-c:a", "aac",
            "-filter_complex", "[1:a]volume=0.35[m];[m]apad[out]",
            "-map", "0:v", "-map", "[out]",
            "-shortest", str(output_path),
        ],
        check=True,
        capture_output=True,
    )


def process_entry(entry: dict) -> str:
    slug = entry["slug"]
    print(f"[{slug}] Starting generation...")

    # Launch video and music in parallel
    with ThreadPoolExecutor(max_workers=2) as executor:
        video_future = executor.submit(
            lambda: wait_for_prediction(
                create_prediction("truefusion-video-pro", {
                    "prompt": entry["video_prompt"],
                    "duration_seconds": 10,
                    "aspect_ratio": "9:16",
                })
            )
        )
        music_future = executor.submit(
            lambda: wait_for_prediction(
                create_prediction("beatfusion-2.0", {
                    "prompt": entry["music_prompt"],
                    "duration_seconds": 12,
                })
            )
        )

        video_url = video_future.result()
        music_url = music_future.result()

    print(f"[{slug}] Downloading...")
    video_path = OUTPUT_DIR / f"{slug}_video.mp4"
    music_path = OUTPUT_DIR / f"{slug}_music.mp3"
    final_path = OUTPUT_DIR / f"{slug}_final.mp4"

    download(video_url, video_path)
    download(music_url, music_path)

    print(f"[{slug}] Merging...")
    merge_video_audio(video_path, music_path, final_path)

    print(f"[{slug}] Done → {final_path}")
    return str(final_path)


def main():
    calendar_path = sys.argv[1] if len(sys.argv) > 1 else "content_calendar.json"
    with open(calendar_path) as f:
        calendar = json.load(f)

    print(f"Processing {len(calendar)} entries from {calendar_path}")

    # Process up to 3 entries in parallel
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = {executor.submit(process_entry, entry): entry["slug"] for entry in calendar}
        for future in as_completed(futures):
            slug = futures[future]
            try:
                output = future.result()
                print(f"✓ {slug}: {output}")
            except Exception as e:
                print(f"✗ {slug}: {e}")


if __name__ == "__main__":
    main()

Run it

SKYTELLS_API_KEY=sk-... python content_pipeline.py content_calendar.json

Schedule it (cron)

# Add to crontab: run every Monday at 9am
crontab -e
0 9 * * 1 cd /path/to/project && SKYTELLS_API_KEY=sk-... python content_pipeline.py content_calendar.json >> pipeline.log 2>&1

Sending notifications

Add a Slack notification when the pipeline finishes:

import urllib.request
import json

def notify_slack(message: str) -> None:
    webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
    if not webhook_url:
        return
    urllib.request.urlopen(
        urllib.request.Request(
            webhook_url,
            data=json.dumps({"text": message}).encode(),
            headers={"Content-Type": "application/json"},
        )
    )

# At the end of main():
notify_slack(f"✅ Content pipeline complete — {len(calendar)} videos generated")

Storing in the cloud

Instead of saving locally, upload directly to S3 or Cloudflare R2:

import boto3

def upload_to_s3(local_path: Path, key: str) -> str:
    s3 = boto3.client("s3")
    s3.upload_file(
        str(local_path),
        os.environ["S3_BUCKET"],
        key,
        ExtraArgs={"ContentType": "video/mp4"},
    )
    return f"https://{os.environ['S3_BUCKET']}.s3.amazonaws.com/{key}"

Summary

You've completed the AI Video & Audio for Creators path. Your automated content pipeline:

  1. Reads a content calendar
  2. Generates video clips and music in parallel
  3. Merges them with ffmpeg
  4. Saves outputs to disk or cloud storage
  5. Notifies you via Slack when done

All triggered weekly by a cron job or n8n schedule — hands-free content production.

Next steps:

On this page