API Best Practices
This guide outlines best practices for integrating with MOOD MNKY API services. Following these guidelines will help you build robust, efficient, and maintainable applications.General Best Practices
API Client Design
- Use a Unified Client
Copy
class MoodMnkyClient {
constructor(config) {
this.ollama = new OllamaService(config.ollamaKey);
this.flowise = new FlowiseService(config.flowiseKey);
this.langchain = new LangchainService(config.langchainKey);
this.rateLimit = new RateLimitTracker();
this.cache = new APICache();
}
// Common functionality
async handleRequest(service, endpoint, options) {
if (!this.rateLimit.canMakeRequest(service)) {
throw new RateLimitError();
}
const cacheKey = this.getCacheKey(service, endpoint, options);
const cached = this.cache.get(cacheKey);
if (cached) return cached;
const response = await this[service].request(endpoint, options);
this.cache.set(cacheKey, response);
this.rateLimit.updateLimits(service, response.headers);
return response;
}
}
- Implement Proper Error Handling
Copy
interface APIError extends Error {
status: number;
code: string;
details: any;
}
class APIErrorHandler {
static handle(error: APIError) {
switch (error.status) {
case 400:
return this.handleValidationError(error);
case 401:
return this.handleAuthenticationError(error);
case 429:
return this.handleRateLimitError(error);
case 500:
return this.handleServerError(error);
default:
return this.handleUnknownError(error);
}
}
static async handleRateLimitError(error: APIError) {
const retryAfter = parseInt(error.details.retry_after) || 60;
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
return true; // Retry request
}
}
Performance Optimization
- Request Batching
Copy
class BatchProcessor:
def __init__(self, client, batch_size=100, interval=1.0):
self.client = client
self.batch_size = batch_size
self.interval = interval
self.queue = []
self.processing = False
async def add(self, item):
self.queue.append(item)
if len(self.queue) >= self.batch_size and not self.processing:
await self.process_batch()
async def process_batch(self):
self.processing = True
while self.queue:
batch = self.queue[:self.batch_size]
try:
await self.client.process_batch(batch)
self.queue = self.queue[self.batch_size:]
await asyncio.sleep(self.interval)
except Exception as e:
logger.error(f"Batch processing error: {e}")
break
self.processing = False
- Caching Strategy
Copy
from functools import lru_cache
from datetime import datetime, timedelta
class CacheManager:
def __init__(self):
self.cache = {}
self.ttls = {}
def set(self, key, value, ttl=3600):
self.cache[key] = value
self.ttls[key] = datetime.now() + timedelta(seconds=ttl)
def get(self, key):
if key not in self.cache:
return None
if datetime.now() > self.ttls[key]:
del self.cache[key]
del self.ttls[key]
return None
return self.cache[key]
@lru_cache(maxsize=1000)
def cached_request(self, endpoint, params):
# Implementation
pass
Resource Management
- Connection Pooling
Copy
import aiohttp
from contextlib import asynccontextmanager
class ConnectionPool:
def __init__(self, max_size=100):
self.session = None
self.max_size = max_size
async def __aenter__(self):
if not self.session:
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=self.max_size)
)
return self.session
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
self.session = None
@asynccontextmanager
async def get_connection():
async with pool as session:
yield session
- Memory Management
Copy
class MemoryManager:
def __init__(self, max_size_mb=100):
self.max_size = max_size_mb * 1024 * 1024
self.current_size = 0
self.items = {}
def add(self, key, value):
size = sys.getsizeof(value)
if self.current_size + size > self.max_size:
self._cleanup()
self.items[key] = {
'value': value,
'size': size,
'last_access': time.time()
}
self.current_size += size
def _cleanup(self):
items = sorted(
self.items.items(),
key=lambda x: x[1]['last_access']
)
while self.current_size > self.max_size * 0.8:
if not items:
break
key, item = items.pop(0)
self.current_size -= item['size']
del self.items[key]
Service-Specific Best Practices
Ollama Service
-
Model Management
- Keep models updated
- Clean up unused models
- Monitor model performance
- Document model configurations
- Generation Optimization
Copy
class OllamaOptimizer:
def __init__(self, client):
self.client = client
self.context_window = 4096
def optimize_prompt(self, prompt, system=""):
# Implement prompt optimization
tokens = self.count_tokens(prompt)
if tokens > self.context_window:
return self.truncate_prompt(prompt)
return prompt
def batch_generate(self, prompts, **kwargs):
optimized = [
self.optimize_prompt(p) for p in prompts
]
return self.client.generate_batch(optimized, **kwargs)
Flowise Service
-
Chatflow Management
- Version control chatflows
- Test in development
- Document node configurations
- Monitor performance
- Stream Handling
Copy
class StreamManager {
constructor(client) {
this.client = client;
this.activeStreams = new Map();
}
async startStream(chatflowId, options) {
const stream = await this.client.createStream(chatflowId);
this.activeStreams.set(chatflowId, stream);
stream.on('data', this.handleData.bind(this));
stream.on('error', this.handleError.bind(this));
stream.on('end', () => this.closeStream(chatflowId));
return stream;
}
handleData(data) {
// Implementation
}
handleError(error) {
// Implementation
}
closeStream(chatflowId) {
const stream = this.activeStreams.get(chatflowId);
if (stream) {
stream.close();
this.activeStreams.delete(chatflowId);
}
}
}
Langchain Service
-
Chain Optimization
- Use appropriate memory types
- Optimize document processing
- Implement proper caching
- Monitor chain performance
- Document Processing
Copy
class DocumentProcessor:
def __init__(self, client):
self.client = client
self.batch_size = 5
self.supported_types = {'.pdf', '.txt', '.docx'}
async def process_documents(self, files):
tasks = []
for i in range(0, len(files), self.batch_size):
batch = files[i:i + self.batch_size]
tasks.append(self.process_batch(batch))
return await asyncio.gather(*tasks)
async def process_batch(self, files):
results = []
for file in files:
if self.is_supported(file):
result = await self.client.process_document(file)
results.append(result)
return results
def is_supported(self, file):
return Path(file).suffix.lower() in self.supported_types
Monitoring & Logging
Structured Logging
Copy
import structlog
logger = structlog.get_logger()
class APILogger:
def __init__(self):
self.logger = structlog.wrap_logger(
logger,
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
def log_request(self, service, endpoint, duration, status):
self.logger.info(
"api_request",
service=service,
endpoint=endpoint,
duration_ms=duration,
status=status
)
def log_error(self, service, error):
self.logger.error(
"api_error",
service=service,
error_type=type(error).__name__,
error_message=str(error)
)
Performance Monitoring
Copy
class PerformanceMonitor:
def __init__(self):
self.metrics = defaultdict(list)
def record_metric(self, service, endpoint, duration):
self.metrics[f"{service}.{endpoint}"].append({
'timestamp': time.time(),
'duration': duration
})
def get_statistics(self, service, endpoint):
data = self.metrics[f"{service}.{endpoint}"]
if not data:
return None
durations = [d['duration'] for d in data]
return {
'count': len(durations),
'avg': statistics.mean(durations),
'p95': statistics.quantiles(durations, n=20)[18],
'max': max(durations)
}
Support & Resources
For implementation support and resources:- Developer Portal: developers.moodmnky.com
- API Status: status.moodmnky.com
- Documentation: docs.moodmnky.com
- Community: community.moodmnky.com
- GitHub: github.com/moodmnky-llc