"No one is harder on a talented person than the person themselves" - Linda Wilkinson ; "Trust your guts and don't follow the herd" ; "Validate direction not destination" ;

June 28, 2023

Designing Async / Paralleize tasks

Earlier roles / Feeds Processing

Had some critical tasks getting supplier feeds/data

  • File Copy
  • File load
  • Run Validations
  • Load Data

DB supporting it

  • Schema
  • Jobs / Options / Run status / Retry

Technically from a scale point of view

  • File watcher
  • File lock
  • Process jobs

Validations

  • Bunch of procedures

Key design tweaks

  • Paralellize operations
  • Data copy as objects/temp tables 
  • Parallel file copy
  • Support Multiple threads
  • Avoid data blocking/updates

ML Context

  • For a Parallel model creation
  • Configuration
  • Submit Job
  • 5 timeseries category datasets / Global models in each category
  • 10 jobs, 5 category dataset models

10 different models

  • Prepare data Job
  • Fetch initial data
  • Process missing variables
  • Data imputation
  • Save Results

Execute Job

  • Read prepared data
  • Fetch Algo ro run
  • Train algo
  • Put training accuracy
  • Save model

Predict Job

  • Load saved model
  • Run predictions
  • Save in DB

Design Ideas

  • Atomic functions
  • Job monitor / independent execution units
  • Horizontal scaling in Kubernetes App
  • Common DB and multiple execution parallel functions

Python Options

Fast API - Uvicorn also has an option to start and run several worker processes. Link

uvicorn main:app --host 0.0.0.0 --port 8080 --workers 4

Flask API - Link

if __name__ == '__main__':

    app.run(threaded=True)

flask run --with-threads

app.run(threaded=True)

More Reads

System Design — Design a distributed job scheduler (Keep It Simple Stupid Interview series)

Orchestrating a Background Job Workflow in Celery for Python

System Design: Designing a distributed Job Scheduler | Many interesting concepts to learn

Examples


Keep Exploring!!!

#!pip install nest_asyncio
import asyncio
import nest_asyncio
nest_asyncio.apply()
async def processfeeds():
task1 = asyncio.create_task(loaddata())
print("Data load waiting")
await task1
print('Data load done')
await asyncio.sleep(1)
task2 = asyncio.create_task(processdata())
print("Data process waiting")
await task2
await asyncio.sleep(5)
print('Data process done')
async def loaddata():
print("loadingdata")
await asyncio.sleep(5)
print("data loaded")
async def processdata():
print("process data")
await asyncio.sleep(5)
print("data processed")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(processfeeds())
view raw asyncexample.py hosted with ❤ by GitHub




No comments: