Background: lonostudios
Christian Juncker Brædstrup
Senior Data Scientist
Our setup before Stau,
# server.py
import job_X
import job_Y
import job_Z
while True:
if current_time == midnight:
job_X.run()
job_Y.run()
job_Z.run()
sleep(60)
server.py
src/
job_X.py
job_Y.py
job_Z.py
...
tests/
...
Folder structure
Data science workflows quickly become a spaghetti of cross-job dependencies
# CONFIG
STAU_CONFIG = ReportConfig(
job_type="job_A", # src/job_A.py
chunk_size=10, # 10 ids in each chunk
dependencies=[
ReportDependencies(
job_type = "job_B", # src/job_B.py
max_lag = 60 # Recompute if job_B is more than 60 minutes old
),
],
main_func="job_A_main_func",
)
def job_A_main_func(...):
...
Dependency information is located with the code.
Jobs and files map one-to-one.
SELECT id
FROM AllMyUnits
WHERE active = 1
1. Read table
2. Import job
3. Read dependencies
4. Import dependencies
5. Construct initial graph
# src/stau_schedule.py
from apscheduler.triggers.cron import CronTrigger
recurring_jobs = [
{
"id": "1",
"name": "job A",
"trigger": CronTrigger(hour='*/1'),
"kwargs_stau_api": {
"jobType": "name_of_job_A_file",
"kwargs_func": {
# Function kwargs
"max_data_age_minutes": 120
},
}
},
{
"id": "2",
"name": "job B",
"trigger": CronTrigger(minute='15'),
"kwargs_stau_api": {
"jobType": "name_of_job_B_file",
"kwargs_func": {},
}
},
...
}
1
Develop and test locally
2
Add STAU_CONFIG definition
3
Add job to schedule
4
Deploy and test
Pending
Running
Done
Failed
github.com/LinuxChristian