خطوط لوله دادهای بدون سرور (Serverless Data Pipelines) به توسعهدهندگان و تحلیلگران داده این امکان را میدهند تا دادهها را به صورت خودکار پردازش و تجزیهوتحلیل کنند، بدون نیاز به مدیریت سرورها و زیرساختهای پیچیده. در این مستندات به پیادهسازی یک خط لوله دادهای بدون سرور بر روی سرورهای ابری پرداخته میشود. این راهنما شامل نمونههای کد و پیکربندیهای مورد نیاز برای راهاندازی و اجرای یک خط لوله دادهای بدون سرور است.
الزامات
- دسترسی به سرور ابری با دسترسی ریشه (Root Access)
- سیستمعامل لینوکس (ترجیحاً اوبونتو ۲۰.۰۴ یا بالاتر)
- نصب Docker و Kubernetes برای مدیریت توابع بدون سرور
- آشنایی با زبان برنامهنویسی پایتون
۱. مقدمهای بر خطوط لوله دادهای بدون سرور
خطوط لوله دادهای بدون سرور برای پردازش و انتقال دادهها در معماریهای دادهای به کار میروند. در این نوع معماری، توابع بدون سرور به طور خودکار اجرا شده و دادهها را پردازش و به سیستمهای دیگر منتقل میکنند. از مزایای این رویکرد میتوان به مقیاسپذیری خودکار، هزینه بر اساس مصرف و کاهش نیاز به مدیریت زیرساخت اشاره کرد.
۲. پیادهسازی خط لوله دادهای بدون سرور با استفاده از OpenFaaS
مرحله ۱: نصب Docker و OpenFaaS
ابتدا Docker و Docker Compose را نصب کنید:
sudo apt-get update
sudo apt-get install -y docker.io docker-compose
sudo systemctl start docker
sudo systemctl enable docker
سپس OpenFaaS را نصب کنید:
git clone https://github.com/openfaas/faas
cd faas
./deploy_stack.sh
این اسکریپت OpenFaaS را با استفاده از Docker Compose راهاندازی میکند. پس از اجرای این اسکریپت، OpenFaaS بر روی سرور شما اجرا خواهد شد.
مرحله ۲: ایجاد توابع برای خط لوله دادهای
برای ایجاد توابع دادهای در OpenFaaS، از faas-cli
استفاده کنید. ابتدا faas-cli
را نصب کنید:
curl -sL https://cli.openfaas.com | sudo sh
سپس یک تابع جدید برای پردازش داده ایجاد کنید:
faas-cli new --lang python3 process-data
مرحله ۳: نوشتن کد پردازش داده
به فایل handler.py
مراجعه کنید و کد پردازش داده را در آن قرار دهید. به عنوان مثال، کدی که دادههای JSON ورودی را پردازش کرده و مقادیر را دوباره به عنوان خروجی بازمیگرداند:
import json
def handle(event, context):
try:
data = json.loads(event.body)
result = {
"processed_value": data.get("value", 0) * 2
}
return {
"status": "success",
"result": result
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
مرحله ۴: ساخت و استقرار تابع
برای ساخت و استقرار تابع، از دستور زیر استفاده کنید:
faas-cli up -f process-data.yml
این دستور تابع process-data
را ایجاد کرده و آن را در محیط OpenFaaS استقرار میدهد.
۳. استفاده از Kafka برای مدیریت جریان داده
Apache Kafka یکی از ابزارهای محبوب برای مدیریت جریان دادهها به صورت بلادرنگ است. در این بخش، به راهاندازی Kafka و استفاده از آن برای ایجاد خطوط لوله دادهای پرداخته میشود.
مرحله ۱: نصب Apache Kafka
برای نصب Kafka، ابتدا ZooKeeper را به عنوان سرویس هماهنگکننده نصب کنید:
sudo apt-get install -y zookeeperd
سپس Kafka را نصب کنید:
wget https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -xzf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
مرحله ۲: راهاندازی Kafka
برای راهاندازی Kafka، دستورات زیر را اجرا کنید:
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
مرحله ۳: ایجاد یک Topic در Kafka
برای مدیریت جریان دادهها، باید یک topic در Kafka ایجاد کنید:
bin/kafka-topics.sh --create --topic data-pipeline --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
۴. اتصال OpenFaaS به Kafka
برای اینکه تابع OpenFaaS بتواند دادههای Kafka را پردازش کند، نیاز به پیکربندی مناسب برای اتصال به Kafka دارید. در اینجا یک تابع جدید برای دریافت دادهها از Kafka و پردازش آنها ایجاد میکنیم.
مرحله ۱: ایجاد یک تابع جدید در OpenFaaS
یک تابع جدید به نام kafka-consumer
ایجاد کنید:
faas-cli new --lang python3 kafka-consumer
مرحله ۲: نوشتن کد برای مصرف داده از Kafka
کد زیر را در فایل handler.py
قرار دهید تا دادههای دریافت شده از Kafka را پردازش کند:
from kafka import KafkaConsumer
import json
def handle(event, context):
consumer = KafkaConsumer(
'data-pipeline',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
data = message.value
print(f"Received data: {data}")
# پردازش داده
return {
"status": "success",
"message": "Data processed successfully"
}
مرحله ۳: ساخت و استقرار تابع
برای ساخت و استقرار تابع kafka-consumer
از دستور زیر استفاده کنید:
faas-cli up -f kafka-consumer.yml
۵. ذخیرهسازی دادههای پردازش شده در پایگاه داده
برای ذخیرهسازی دادههای پردازش شده، میتوانید از پایگاههای داده مانند MongoDB یا PostgreSQL استفاده کنید.
مرحله ۱: نصب MongoDB
MongoDB را به عنوان یک پایگاه داده NoSQL بر روی سرور خود نصب کنید:
sudo apt-get install -y mongodb
sudo systemctl start mongodb
sudo systemctl enable mongodb
مرحله ۲: افزودن کد ذخیرهسازی به تابع OpenFaaS
کد زیر را به handler.py
اضافه کنید تا دادههای پردازش شده در MongoDB ذخیره شوند:
from pymongo import MongoClient
def save_to_mongo(data):
client = MongoClient("mongodb://localhost:27017/")
db = client["data_pipeline"]
collection = db["processed_data"]
collection.insert_one(data)
def handle(event, context):
try:
data = json.loads(event.body)
result = {
"processed_value": data.get("value", 0) * 2
}
save_to_mongo(result)
return {
"status": "success",
"result": result
}
except Exception as e:
return {
"status": "error",
"message": str(e)
}
نتیجهگیری
در این مستندات به بررسی پیادهسازی خطوط لوله دادهای بدون سرور با استفاده از ابزارهای OpenFaaS، Kafka و MongoDB پرداخته شد. این رویکرد به توسعهدهندگان و تحلیلگران داده امکان میدهد که بدون نیاز به مدیریت زیرساختهای پیچیده، دادهها را بهصورت خودکار پردازش و تحلیل کنند. با استفاده از این راهنما، میتوانید خطوط لوله دادهای خود را بر روی سرورهای ابری به راحتی پیادهسازی کنید.