Skip to main content

API Best Practices

This guide outlines best practices for integrating with MOOD MNKY API services. Following these guidelines will help you build robust, efficient, and maintainable applications.

General Best Practices

API Client Design

  1. Use a Unified Client
class MoodMnkyClient {
  constructor(config) {
    this.ollama = new OllamaService(config.ollamaKey);
    this.flowise = new FlowiseService(config.flowiseKey);
    this.langchain = new LangchainService(config.langchainKey);
    this.rateLimit = new RateLimitTracker();
    this.cache = new APICache();
  }

  // Common functionality
  async handleRequest(service, endpoint, options) {
    if (!this.rateLimit.canMakeRequest(service)) {
      throw new RateLimitError();
    }

    const cacheKey = this.getCacheKey(service, endpoint, options);
    const cached = this.cache.get(cacheKey);
    if (cached) return cached;

    const response = await this[service].request(endpoint, options);
    this.cache.set(cacheKey, response);
    this.rateLimit.updateLimits(service, response.headers);

    return response;
  }
}
  1. Implement Proper Error Handling
interface APIError extends Error {
  status: number;
  code: string;
  details: any;
}

class APIErrorHandler {
  static handle(error: APIError) {
    switch (error.status) {
      case 400:
        return this.handleValidationError(error);
      case 401:
        return this.handleAuthenticationError(error);
      case 429:
        return this.handleRateLimitError(error);
      case 500:
        return this.handleServerError(error);
      default:
        return this.handleUnknownError(error);
    }
  }

  static async handleRateLimitError(error: APIError) {
    const retryAfter = parseInt(error.details.retry_after) || 60;
    await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
    return true; // Retry request
  }
}

Performance Optimization

  1. Request Batching
class BatchProcessor:
    def __init__(self, client, batch_size=100, interval=1.0):
        self.client = client
        self.batch_size = batch_size
        self.interval = interval
        self.queue = []
        self.processing = False

    async def add(self, item):
        self.queue.append(item)
        if len(self.queue) >= self.batch_size and not self.processing:
            await self.process_batch()

    async def process_batch(self):
        self.processing = True
        while self.queue:
            batch = self.queue[:self.batch_size]
            try:
                await self.client.process_batch(batch)
                self.queue = self.queue[self.batch_size:]
                await asyncio.sleep(self.interval)
            except Exception as e:
                logger.error(f"Batch processing error: {e}")
                break
        self.processing = False
  1. Caching Strategy
from functools import lru_cache
from datetime import datetime, timedelta

class CacheManager:
    def __init__(self):
        self.cache = {}
        self.ttls = {}

    def set(self, key, value, ttl=3600):
        self.cache[key] = value
        self.ttls[key] = datetime.now() + timedelta(seconds=ttl)

    def get(self, key):
        if key not in self.cache:
            return None
        
        if datetime.now() > self.ttls[key]:
            del self.cache[key]
            del self.ttls[key]
            return None
        
        return self.cache[key]

    @lru_cache(maxsize=1000)
    def cached_request(self, endpoint, params):
        # Implementation
        pass

Resource Management

  1. Connection Pooling
import aiohttp
from contextlib import asynccontextmanager

class ConnectionPool:
    def __init__(self, max_size=100):
        self.session = None
        self.max_size = max_size

    async def __aenter__(self):
        if not self.session:
            self.session = aiohttp.ClientSession(
                connector=aiohttp.TCPConnector(limit=self.max_size)
            )
        return self.session

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
            self.session = None

@asynccontextmanager
async def get_connection():
    async with pool as session:
        yield session
  1. Memory Management
class MemoryManager:
    def __init__(self, max_size_mb=100):
        self.max_size = max_size_mb * 1024 * 1024
        self.current_size = 0
        self.items = {}

    def add(self, key, value):
        size = sys.getsizeof(value)
        if self.current_size + size > self.max_size:
            self._cleanup()
        
        self.items[key] = {
            'value': value,
            'size': size,
            'last_access': time.time()
        }
        self.current_size += size

    def _cleanup(self):
        items = sorted(
            self.items.items(),
            key=lambda x: x[1]['last_access']
        )
        
        while self.current_size > self.max_size * 0.8:
            if not items:
                break
            key, item = items.pop(0)
            self.current_size -= item['size']
            del self.items[key]

Service-Specific Best Practices

Ollama Service

  1. Model Management
    • Keep models updated
    • Clean up unused models
    • Monitor model performance
    • Document model configurations
  2. Generation Optimization
class OllamaOptimizer:
    def __init__(self, client):
        self.client = client
        self.context_window = 4096

    def optimize_prompt(self, prompt, system=""):
        # Implement prompt optimization
        tokens = self.count_tokens(prompt)
        if tokens > self.context_window:
            return self.truncate_prompt(prompt)
        return prompt

    def batch_generate(self, prompts, **kwargs):
        optimized = [
            self.optimize_prompt(p) for p in prompts
        ]
        return self.client.generate_batch(optimized, **kwargs)

Flowise Service

  1. Chatflow Management
    • Version control chatflows
    • Test in development
    • Document node configurations
    • Monitor performance
  2. Stream Handling
class StreamManager {
  constructor(client) {
    this.client = client;
    this.activeStreams = new Map();
  }

  async startStream(chatflowId, options) {
    const stream = await this.client.createStream(chatflowId);
    this.activeStreams.set(chatflowId, stream);

    stream.on('data', this.handleData.bind(this));
    stream.on('error', this.handleError.bind(this));
    stream.on('end', () => this.closeStream(chatflowId));

    return stream;
  }

  handleData(data) {
    // Implementation
  }

  handleError(error) {
    // Implementation
  }

  closeStream(chatflowId) {
    const stream = this.activeStreams.get(chatflowId);
    if (stream) {
      stream.close();
      this.activeStreams.delete(chatflowId);
    }
  }
}

Langchain Service

  1. Chain Optimization
    • Use appropriate memory types
    • Optimize document processing
    • Implement proper caching
    • Monitor chain performance
  2. Document Processing
class DocumentProcessor:
    def __init__(self, client):
        self.client = client
        self.batch_size = 5
        self.supported_types = {'.pdf', '.txt', '.docx'}

    async def process_documents(self, files):
        tasks = []
        for i in range(0, len(files), self.batch_size):
            batch = files[i:i + self.batch_size]
            tasks.append(self.process_batch(batch))
        
        return await asyncio.gather(*tasks)

    async def process_batch(self, files):
        results = []
        for file in files:
            if self.is_supported(file):
                result = await self.client.process_document(file)
                results.append(result)
        return results

    def is_supported(self, file):
        return Path(file).suffix.lower() in self.supported_types

Monitoring & Logging

Structured Logging

import structlog

logger = structlog.get_logger()

class APILogger:
    def __init__(self):
        self.logger = structlog.wrap_logger(
            logger,
            processors=[
                structlog.processors.TimeStamper(fmt="iso"),
                structlog.processors.JSONRenderer()
            ]
        )

    def log_request(self, service, endpoint, duration, status):
        self.logger.info(
            "api_request",
            service=service,
            endpoint=endpoint,
            duration_ms=duration,
            status=status
        )

    def log_error(self, service, error):
        self.logger.error(
            "api_error",
            service=service,
            error_type=type(error).__name__,
            error_message=str(error)
        )

Performance Monitoring

class PerformanceMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)

    def record_metric(self, service, endpoint, duration):
        self.metrics[f"{service}.{endpoint}"].append({
            'timestamp': time.time(),
            'duration': duration
        })

    def get_statistics(self, service, endpoint):
        data = self.metrics[f"{service}.{endpoint}"]
        if not data:
            return None

        durations = [d['duration'] for d in data]
        return {
            'count': len(durations),
            'avg': statistics.mean(durations),
            'p95': statistics.quantiles(durations, n=20)[18],
            'max': max(durations)
        }

Support & Resources

For implementation support and resources: