Примеры интеграции

Готовые решения для типовых интеграционных задач

REST API → Database

Синхронизация данных из REST API в PostgreSQL

Database IntegrationНачинающийJavaScript

Код примера:

// Конфигурация интеграции REST API → PostgreSQL
const integration = {
  name: "API to Database Sync",
  source: {
    type: "http",
    method: "GET",
    url: "https://api.example.com/users",
    headers: {
      "Authorization": "Bearer TOKEN",
      "Content-Type": "application/json"
    },
    schedule: "0 */15 * * * *" // каждые 15 минут
  },
  transformation: {
    script: `
      return data.users.map(user => ({
        id: user.id,
        name: user.full_name,
        email: user.email,
        created_at: new Date(user.created_timestamp)
      }));
    `
  },
  target: {
    type: "postgresql",
    connection: "postgresql://user:pass@localhost:5432/db",
    table: "users",
    operation: "upsert",
    key: "id"
  }
};

File Processing

Обработка CSV файлов и загрузка в облачное хранилище

File IntegrationСреднийPython

Код примера:

# Обработка CSV файлов
import pandas as pd
from integralink import FileProcessor, CloudStorage

def process_csv_file(file_path):
    # Чтение CSV файла
    df = pd.read_csv(file_path)
    
    # Валидация данных
    df = df.dropna(subset=['email'])
    df['email'] = df['email'].str.lower()
    
    # Преобразование данных
    df['full_name'] = df['first_name'] + ' ' + df['last_name']
    df['processed_at'] = pd.Timestamp.now()
    
    # Сохранение в облако
    cloud_storage = CloudStorage(
        provider='aws_s3',
        bucket='processed-data',
        credentials={
            'access_key': 'YOUR_ACCESS_KEY',
            'secret_key': 'YOUR_SECRET_KEY'
        }
    )
    
    output_path = f"processed/{pd.Timestamp.now().strftime('%Y%m%d')}/users.parquet"
    df.to_parquet(output_path)
    cloud_storage.upload(output_path)
    
    return {"processed_records": len(df), "output_path": output_path}

Message Queue Integration

Обработка сообщений из RabbitMQ и отправка в Slack

MessagingПродвинутыйJavaScript

Код примера:

// RabbitMQ → Slack интеграция
const { RabbitMQConsumer, SlackNotifier } = require('integralink');

const config = {
  rabbitmq: {
    url: 'amqp://localhost:5672',
    queue: 'alerts',
    exchange: 'notifications',
    routingKey: 'alert.*'
  },
  slack: {
    webhook: 'https://hooks.slack.com/services/YOUR/WEBHOOK/URL',
    channel: '#alerts',
    username: 'IntegraLink Bot'
  }
};

const consumer = new RabbitMQConsumer(config.rabbitmq);
const slack = new SlackNotifier(config.slack);

consumer.onMessage(async (message) => {
  try {
    const alert = JSON.parse(message.content);
    
    // Форматирование сообщения для Slack
    const slackMessage = {
      text: `🚨 Alert: ${alert.title}`,
      attachments: [{
        color: alert.severity === 'critical' ? 'danger' : 'warning',
        fields: [
          { title: 'Service', value: alert.service, short: true },
          { title: 'Severity', value: alert.severity, short: true },
          { title: 'Description', value: alert.description, short: false }
        ],
        timestamp: Math.floor(Date.now() / 1000)
      }]
    };
    
    await slack.send(slackMessage);
    console.log('Alert sent to Slack:', alert.id);
    
    // Подтверждение обработки сообщения
    message.ack();
    
  } catch (error) {
    console.error('Error processing alert:', error);
    message.nack(false, true); // Вернуть в очередь для повторной обработки
  }
});

consumer.start();

IoT Data Pipeline

Сбор данных с IoT устройств через MQTT и аналитика

IoTПродвинутыйPython

Код примера:

# IoT данные через MQTT → InfluxDB → Grafana
import json
import asyncio
from datetime import datetime
from integralink import MQTTClient, InfluxDBWriter, AlertManager

class IoTDataPipeline:
    def __init__(self):
        self.mqtt = MQTTClient(
            broker="mqtt.iot-platform.com",
            port=1883,
            topics=["sensors/+/temperature", "sensors/+/humidity"]
        )
        
        self.influx = InfluxDBWriter(
            url="http://localhost:8086",
            token="YOUR_INFLUX_TOKEN",
            org="your-org",
            bucket="iot-data"
        )
        
        self.alerts = AlertManager()
    
    async def process_sensor_data(self, topic, payload):
        try:
            # Парсинг MQTT сообщения
            data = json.loads(payload)
            sensor_id = topic.split('/')[1]
            metric_type = topic.split('/')[2]
            
            # Подготовка данных для InfluxDB
            point = {
                "measurement": "sensor_data",
                "tags": {
                    "sensor_id": sensor_id,
                    "metric_type": metric_type,
                    "location": data.get("location", "unknown")
                },
                "fields": {
                    "value": float(data["value"]),
                    "battery_level": float(data.get("battery", 100))
                },
                "time": datetime.utcnow()
            }
            
            # Запись в InfluxDB
            await self.influx.write_point(point)
            
            # Проверка алертов
            await self.check_alerts(sensor_id, metric_type, data["value"])
            
        except Exception as e:
            print(f"Error processing sensor data: {e}")
    
    async def check_alerts(self, sensor_id, metric_type, value):
        # Проверка критических значений
        thresholds = {
            "temperature": {"min": -10, "max": 50},
            "humidity": {"min": 20, "max": 80}
        }
        
        if metric_type in thresholds:
            threshold = thresholds[metric_type]
            if value < threshold["min"] or value > threshold["max"]:
                await self.alerts.send_alert({
                    "sensor_id": sensor_id,
                    "metric": metric_type,
                    "value": value,
                    "threshold": threshold,
                    "severity": "critical" if value < threshold["min"] - 5 or value > threshold["max"] + 5 else "warning"
                })
    
    def start(self):
        self.mqtt.on_message = self.process_sensor_data
        self.mqtt.connect()
        print("IoT Data Pipeline started...")

Нужен кастомный пример?

Не нашли подходящий пример? Мы поможем создать решение под ваши задачи.