Earlier roles / Feeds Processing
Had some critical tasks getting supplier feeds/data
- File Copy
- File load
- Run Validations
- Load Data
DB supporting it
- Schema
- Jobs / Options / Run status / Retry
Technically from a scale point of view
- File watcher
- File lock
- Process jobs
Validations
- Bunch of procedures
Key design tweaks
- Paralellize operations
- Data copy as objects/temp tables
- Parallel file copy
- Support Multiple threads
- Avoid data blocking/updates
ML Context
- For a Parallel model creation
- Configuration
- Submit Job
- 5 timeseries category datasets / Global models in each category
- 10 jobs, 5 category dataset models
10 different models
- Prepare data Job
- Fetch initial data
- Process missing variables
- Data imputation
- Save Results
Execute Job
- Read prepared data
- Fetch Algo ro run
- Train algo
- Put training accuracy
- Save model
Predict Job
- Load saved model
- Run predictions
- Save in DB
Design Ideas
- Atomic functions
- Job monitor / independent execution units
- Horizontal scaling in Kubernetes App
- Common DB and multiple execution parallel functions
Python Options
Fast API - Uvicorn also has an option to start and run several worker processes. Link
uvicorn main:app --host 0.0.0.0 --port 8080 --workers 4
Flask API - Link
if __name__ == '__main__':
app.run(threaded=True)
flask run --with-threads
app.run(threaded=True)
More Reads
System Design — Design a distributed job scheduler (Keep It Simple Stupid Interview series)
Orchestrating a Background Job Workflow in Celery for Python
System Design: Designing a distributed Job Scheduler | Many interesting concepts to learn
Examples
Keep Exploring!!!
#!pip install nest_asyncio | |
import asyncio | |
import nest_asyncio | |
nest_asyncio.apply() | |
async def processfeeds(): | |
task1 = asyncio.create_task(loaddata()) | |
print("Data load waiting") | |
await task1 | |
print('Data load done') | |
await asyncio.sleep(1) | |
task2 = asyncio.create_task(processdata()) | |
print("Data process waiting") | |
await task2 | |
await asyncio.sleep(5) | |
print('Data process done') | |
async def loaddata(): | |
print("loadingdata") | |
await asyncio.sleep(5) | |
print("data loaded") | |
async def processdata(): | |
print("process data") | |
await asyncio.sleep(5) | |
print("data processed") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(processfeeds()) |
No comments:
Post a Comment