Примеры интеграции
Готовые решения для типовых интеграционных задач
REST API → Database
Синхронизация данных из REST API в PostgreSQL
Database IntegrationНачинающийJavaScript
Код примера:
// Конфигурация интеграции REST API → PostgreSQL
const integration = {
name: "API to Database Sync",
source: {
type: "http",
method: "GET",
url: "https://api.example.com/users",
headers: {
"Authorization": "Bearer TOKEN",
"Content-Type": "application/json"
},
schedule: "0 */15 * * * *" // каждые 15 минут
},
transformation: {
script: `
return data.users.map(user => ({
id: user.id,
name: user.full_name,
email: user.email,
created_at: new Date(user.created_timestamp)
}));
`
},
target: {
type: "postgresql",
connection: "postgresql://user:pass@localhost:5432/db",
table: "users",
operation: "upsert",
key: "id"
}
};
File Processing
Обработка CSV файлов и загрузка в облачное хранилище
File IntegrationСреднийPython
Код примера:
# Обработка CSV файлов
import pandas as pd
from integralink import FileProcessor, CloudStorage
def process_csv_file(file_path):
# Чтение CSV файла
df = pd.read_csv(file_path)
# Валидация данных
df = df.dropna(subset=['email'])
df['email'] = df['email'].str.lower()
# Преобразование данных
df['full_name'] = df['first_name'] + ' ' + df['last_name']
df['processed_at'] = pd.Timestamp.now()
# Сохранение в облако
cloud_storage = CloudStorage(
provider='aws_s3',
bucket='processed-data',
credentials={
'access_key': 'YOUR_ACCESS_KEY',
'secret_key': 'YOUR_SECRET_KEY'
}
)
output_path = f"processed/{pd.Timestamp.now().strftime('%Y%m%d')}/users.parquet"
df.to_parquet(output_path)
cloud_storage.upload(output_path)
return {"processed_records": len(df), "output_path": output_path}
Message Queue Integration
Обработка сообщений из RabbitMQ и отправка в Slack
MessagingПродвинутыйJavaScript
Код примера:
// RabbitMQ → Slack интеграция
const { RabbitMQConsumer, SlackNotifier } = require('integralink');
const config = {
rabbitmq: {
url: 'amqp://localhost:5672',
queue: 'alerts',
exchange: 'notifications',
routingKey: 'alert.*'
},
slack: {
webhook: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL',
channel: '#alerts',
username: 'IntegraLink Bot'
}
};
const consumer = new RabbitMQConsumer(config.rabbitmq);
const slack = new SlackNotifier(config.slack);
consumer.onMessage(async (message) => {
try {
const alert = JSON.parse(message.content);
// Форматирование сообщения для Slack
const slackMessage = {
text: `🚨 Alert: ${alert.title}`,
attachments: [{
color: alert.severity === 'critical' ? 'danger' : 'warning',
fields: [
{ title: 'Service', value: alert.service, short: true },
{ title: 'Severity', value: alert.severity, short: true },
{ title: 'Description', value: alert.description, short: false }
],
timestamp: Math.floor(Date.now() / 1000)
}]
};
await slack.send(slackMessage);
console.log('Alert sent to Slack:', alert.id);
// Подтверждение обработки сообщения
message.ack();
} catch (error) {
console.error('Error processing alert:', error);
message.nack(false, true); // Вернуть в очередь для повторной обработки
}
});
consumer.start();
IoT Data Pipeline
Сбор данных с IoT устройств через MQTT и аналитика
IoTПродвинутыйPython
Код примера:
# IoT данные через MQTT → InfluxDB → Grafana
import json
import asyncio
from datetime import datetime
from integralink import MQTTClient, InfluxDBWriter, AlertManager
class IoTDataPipeline:
def __init__(self):
self.mqtt = MQTTClient(
broker="mqtt.iot-platform.com",
port=1883,
topics=["sensors/+/temperature", "sensors/+/humidity"]
)
self.influx = InfluxDBWriter(
url="http://localhost:8086",
token="YOUR_INFLUX_TOKEN",
org="your-org",
bucket="iot-data"
)
self.alerts = AlertManager()
async def process_sensor_data(self, topic, payload):
try:
# Парсинг MQTT сообщения
data = json.loads(payload)
sensor_id = topic.split('/')[1]
metric_type = topic.split('/')[2]
# Подготовка данных для InfluxDB
point = {
"measurement": "sensor_data",
"tags": {
"sensor_id": sensor_id,
"metric_type": metric_type,
"location": data.get("location", "unknown")
},
"fields": {
"value": float(data["value"]),
"battery_level": float(data.get("battery", 100))
},
"time": datetime.utcnow()
}
# Запись в InfluxDB
await self.influx.write_point(point)
# Проверка алертов
await self.check_alerts(sensor_id, metric_type, data["value"])
except Exception as e:
print(f"Error processing sensor data: {e}")
async def check_alerts(self, sensor_id, metric_type, value):
# Проверка критических значений
thresholds = {
"temperature": {"min": -10, "max": 50},
"humidity": {"min": 20, "max": 80}
}
if metric_type in thresholds:
threshold = thresholds[metric_type]
if value < threshold["min"] or value > threshold["max"]:
await self.alerts.send_alert({
"sensor_id": sensor_id,
"metric": metric_type,
"value": value,
"threshold": threshold,
"severity": "critical" if value < threshold["min"] - 5 or value > threshold["max"] + 5 else "warning"
})
def start(self):
self.mqtt.on_message = self.process_sensor_data
self.mqtt.connect()
print("IoT Data Pipeline started...")