Cognition: Integration Examples

Working integration patterns for Express, Fastify, Koa, background workers, microservices, PII scrubbing, OpenTelemetry, health endpoints, and more.

The examples below cover the most common ways to integrate Cognition into a production application. Each one is complete and runnable. All examples assume these environment variables are set:

SKYTELLS_API_KEY=sk_...
SKYTELLS_PROJECT_ID=my-api-service

Express Server

Full production setup with error capture, security scanner middleware, runtime thresholds, and graceful shutdown:

import express from 'express';
import { Cognition, cognitionMiddleware } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
  environment: process.env.NODE_ENV,
  release: process.env.npm_package_version,
  runtime: {
    enabled: true,
    snapshotIntervalMs: 15_000,
    thresholds: {
      heapUsedMb: 512,
      eventLoopLagMs: 150,
      eluPercent: 0.85,
    },
  },
});

const app = express();
app.use(express.json());

// Security scanner middleware — before routes
app.use(cognitionMiddleware());

app.get('/health', (req, res) => {
  res.json({
    status: 'ok',
    version: process.env.npm_package_version,
    buffer: cognition.bufferSize,
    dropped: cognition.droppedCount,
  });
});

app.get('/users/:id', async (req, res) => {
  cognition.addBreadcrumb({
    category: 'http',
    message: `GET /users/${req.params.id}`,
    level: 'info',
  });

  // ... handler
  res.json({ id: req.params.id });
});

// Error handler — must be after routes
app.use((err: Error, req: express.Request, res: express.Response, _next: express.NextFunction) => {
  cognition.setTag('route', req.path);
  cognition.captureError(err, { level: 'error' });
  res.status(500).json({ error: 'Internal Server Error' });
});

const server = app.listen(3000);

// Graceful shutdown
const shutdown = async () => {
  server.close(async () => {
    await cognition.close(5000);
    process.exit(0);
  });
};

process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

Fastify Server

Fastify doesn't use Express middleware, so use the manual scanForThreats() API in a preHandler hook for security scanning:

import Fastify from 'fastify';
import { Cognition } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
  environment: process.env.NODE_ENV,
});

const app = Fastify({ logger: true });

// Security scanning as a preHandler hook
app.addHook('preHandler', async (request, reply) => {
  const threats = await cognition.scanForThreats({
    query: request.query as Record<string, string>,
    headers: request.headers as Record<string, string>,
    body: request.body,
    route: request.url,
  });

  if (threats.length > 0) {
    app.log.warn({ threats }, 'Security threats detected');
    // Optionally block:
    // reply.code(400).send({ error: 'Bad Request' });
    // return;
  }
});

app.get('/users/:id', async (request, reply) => {
  return { id: (request.params as any).id };
});

// Start
const start = async () => {
  await app.listen({ port: 3000, host: '0.0.0.0' });
};

start().catch(async (err) => {
  cognition.captureError(err, { level: 'fatal' });
  await cognition.close();
  process.exit(1);
});

process.on('SIGTERM', async () => {
  await app.close();
  await cognition.close(5000);
  process.exit(0);
});

Koa Server

import Koa from 'koa';
import Router from '@koa/router';
import bodyParser from 'koa-bodyparser';
import { Cognition } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
});

const app = new Koa();
const router = new Router();

// Error handling middleware
app.use(async (ctx, next) => {
  try {
    await next();
  } catch (err) {
    const error = err as Error;
    cognition.captureError(error, {
      extra: { path: ctx.path, method: ctx.method },
    });
    ctx.status = 500;
    ctx.body = { error: 'Internal Server Error' };
  }
});

// Security scanning middleware
app.use(async (ctx, next) => {
  const threats = await cognition.scanForThreats({
    query: ctx.query as Record<string, string>,
    headers: ctx.headers as Record<string, string>,
    body: ctx.request.body,
    route: ctx.path,
  });
  if (threats.length > 0) {
    ctx.app.emit('security-threat', threats, ctx);
  }
  return next();
});

app.use(bodyParser());
app.use(router.routes());

const server = app.listen(3000);

process.on('SIGTERM', async () => {
  server.close(async () => {
    await cognition.close(5000);
    process.exit(0);
  });
});

Cron Job / Worker

For non-HTTP workloads: batch jobs, workers, and scheduled tasks:

import { Cognition } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
  captureErrors: true,
  captureSecurityThreats: false, // No HTTP server
  runtime: {
    enabled: true,
    snapshotIntervalMs: 30_000,
    thresholds: { heapUsedMb: 256 },
  },
});

async function processQueue() {
  const jobId = `job-${Date.now()}`;

  cognition.addBreadcrumb({
    category: 'job',
    message: `${jobId}: started`,
    level: 'info',
  });

  try {
    // ... job logic
    const itemsProcessed = 100;

    cognition.captureMessage(`${jobId}: completed — ${itemsProcessed} items`, {
      level: 'info',
      extra: { itemsProcessed },
    });
  } catch (err) {
    cognition.captureError(err as Error, {
      level: 'error',
      tags: { jobId },
    });
    throw err;
  }
}

processQueue()
  .then(async () => {
    await cognition.flush(); // Ensure events are sent before exit
    process.exit(0);
  })
  .catch(async (err) => {
    await cognition.close(); // Flush on error too
    process.exit(1);
  });

Microservice with Graceful Shutdown

A complete lifecycle pattern for containerized microservices (Kubernetes, ECS, Cloud Run):

import { Cognition } from '@skytells/cognition';
import http from 'node:http';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
  environment: process.env.NODE_ENV,
  serverName: process.env.HOSTNAME,  // Pod/container name from env
});

let isShuttingDown = false;

const server = http.createServer(async (req, res) => {
  if (isShuttingDown) {
    res.writeHead(503, { 'Connection': 'close' });
    res.end('Service Unavailable');
    return;
  }
  // ... handle requests
  res.writeHead(200);
  res.end('OK');
});

server.listen(3000);

const shutdown = async (signal: string) => {
  if (isShuttingDown) return;
  isShuttingDown = true;

  cognition.captureMessage(`Shutting down (${signal})`, { level: 'info' });

  // Stop accepting connections
  server.close(async () => {
    // Flush all buffered Cognition events
    await cognition.close(5000);
    process.exit(0);
  });

  // Force exit after 10s
  setTimeout(() => process.exit(1), 10_000).unref();
};

process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));

PII Scrubbing with beforeSend

Prevent sensitive data from leaving your process:

import { Cognition } from '@skytells/cognition';
import type { CognitionEvent } from '@skytells/cognition';

const EMAIL_REGEX = /[\w.+-]+@[\w-]+\.[a-z]{2,}/gi;
const IP_REGEX = /\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b/g;
const TOKEN_REGEX = /Bearer\s+[A-Za-z0-9\-._~+/]+=*/g;

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,

  beforeSend: (event: CognitionEvent) => {
    const e = event as any;

    // Scrub error messages
    if (e.error?.message) {
      e.error.message = e.error.message
        .replace(EMAIL_REGEX, '[EMAIL]')
        .replace(IP_REGEX, '[IP]')
        .replace(TOKEN_REGEX, 'Bearer [TOKEN]');
    }

    // Scrub user fields
    if (e.user) {
      if (e.user.email) e.user.email = '[REDACTED]';
      if (e.user.ip) e.user.ip = '[REDACTED]';
    }

    // Drop debug messages entirely
    if (e.type === 'message' && e.level === 'debug') {
      return null;
    }

    // Remove known-sensitive extra fields
    if (e.extra) {
      delete e.extra.password;
      delete e.extra.token;
      delete e.extra.creditCard;
    }

    return event;
  },
});

Custom Anomaly Alerting

Integrate Cognition anomaly events with an external alerting channel:

import { Cognition } from '@skytells/cognition';

const SLACK_WEBHOOK = process.env.SLACK_WEBHOOK_URL!;

async function alertSlack(message: string) {
  await fetch(SLACK_WEBHOOK, {
    method: 'POST',
    headers: { 'content-type': 'application/json' },
    body: JSON.stringify({ text: message }),
  });
}

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,

  runtime: {
    thresholds: {
      heapUsedMb: 512,
      eventLoopLagMs: 200,
    },
  },

  beforeSend: (event) => {
    if (event.type === 'anomaly') {
      const a = event as any;
      const msg = `ANOMALY: ${a.metric} reached ${a.actual} (threshold: ${a.threshold})`;
      alertSlack(msg).catch(console.error);
    }
    return event;
  },
});

Security Blocking

Reject requests that contain detected threats:

import express from 'express';
import { Cognition, SecurityScanner } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
});

const app = express();
app.use(express.json());

// Blocking security middleware — reject threats before routing
app.use(async (req, res, next) => {
  const scanner = new SecurityScanner();
  const threats = await scanner.scan({
    query: req.query as Record<string, string>,
    body: req.body,
    headers: req.headers as Record<string, string>,
    route: req.path,
  });

  if (threats.length > 0) {
    cognition.captureEvent({
      type: 'security',
      threats,
      route: req.path,
      method: req.method,
    } as any);
    res.status(400).json({ error: 'Bad Request' });
    return;
  }

  next();
});

app.get('/search', (req, res) => {
  res.json({ results: [] });
});

app.listen(3000);

Business Logic Breadcrumbs

Trace the exact path through your application logic before an error:

import { Cognition } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
});

async function processOrder(orderId: string, userId: string) {
  cognition.addBreadcrumb({
    category: 'order',
    message: `processOrder start: orderId=${orderId}`,
    level: 'info',
    data: { orderId, userId },
  });

  const order = await fetchOrder(orderId);

  cognition.addBreadcrumb({
    category: 'order',
    message: `fetchOrder complete: status=${order.status}`,
    level: 'info',
    data: { status: order.status, total: order.total },
  });

  if (order.status !== 'pending') {
    cognition.addBreadcrumb({
      category: 'order',
      message: `unexpected status: ${order.status}`,
      level: 'warning',
    });
    throw new Error(`Cannot process order in status: ${order.status}`);
  }

  await chargePayment(order);

  cognition.addBreadcrumb({
    category: 'order',
    message: 'payment charged',
    level: 'info',
  });
}

async function fetchOrder(id: string) {
  return { id, status: 'completed', total: 99.99 };
}

async function chargePayment(order: any) {
  // ...
}

OpenTelemetry with Auto-Instrumentation

import { Cognition } from '@skytells/cognition';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { registerInstrumentations } from '@opentelemetry/instrumentation';
import express from 'express';

// Initialize Cognition first — registers global TracerProvider
const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
  environment: process.env.NODE_ENV,
  tracing: {
    enabled: true,
    sampleRate: 0.1, // 10% sampling
  },
});

// Register auto-instrumentation after Cognition
registerInstrumentations({
  instrumentations: [
    getNodeAutoInstrumentations({
      '@opentelemetry/instrumentation-http': {
        ignoreIncomingRequestHook: (req) =>
          (req.url ?? '').startsWith('/health'),
      },
      '@opentelemetry/instrumentation-fs': { enabled: false },
    }),
  ],
});

const app = express();

app.get('/users/:id', async (req, res) => {
  // HTTP, db calls are automatically traced
  res.json({ id: req.params.id });
});

app.listen(3000);

process.on('SIGTERM', async () => {
  await cognition.close(5000);
  process.exit(0);
});

Multi-Service Setup

When multiple services report to the same Skytells project, differentiate them with serverName and tags:

// Service A: API Gateway
const apiGateway = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
  serverName: 'api-gateway',
  environment: process.env.NODE_ENV,
  captureErrors: true,
});
apiGateway.setTag('service', 'api-gateway');
apiGateway.setTag('version', process.env.npm_package_version ?? '0.0.0');

// Service B: Worker (non-singleton)
const worker = new (require('@skytells/cognition').CognitionClient)({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
  serverName: 'worker',
  captureErrors: false, // worker manages its own error handling
});

Health Check Endpoint

Expose SDK transport diagnostics alongside application health:

import express from 'express';
import { Cognition } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
});

const app = express();

app.get('/health', async (req, res) => {
  const mem = process.memoryUsage();

  res.json({
    status: 'ok',
    timestamp: new Date().toISOString(),
    version: process.env.npm_package_version,
    uptime: process.uptime(),
    memory: {
      heapUsedMb: Math.round(mem.heapUsed / 1024 / 1024),
      rssMb: Math.round(mem.rss / 1024 / 1024),
    },
    cognition: {
      bufferSize: cognition.bufferSize,
      droppedCount: cognition.droppedCount,
    },
  });
});

app.listen(3000);

Custom Event Types

Emit custom Cognition events for business metrics and audit trails:

import { Cognition } from '@skytells/cognition';

const cognition = Cognition.init({
  apiKey: process.env.SKYTELLS_API_KEY!,
  projectId: process.env.SKYTELLS_PROJECT_ID!,
});

// Custom business metric
cognition.captureEvent({
  type: 'custom',
  name: 'checkout.completed',
  timestamp: Date.now(),
  payload: {
    orderId: 'ord-abc-123',
    userId: 'usr-456',
    totalAmount: 149.99,
    currency: 'USD',
    itemCount: 3,
    couponApplied: true,
  },
} as any);

// Audit event
cognition.captureEvent({
  type: 'custom',
  name: 'user.role_changed',
  timestamp: Date.now(),
  payload: {
    targetUserId: 'usr-789',
    fromRole: 'viewer',
    toRole: 'admin',
    changedBy: 'usr-admin-001',
  },
} as any);

How is this guide?

On this page