-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess.py
More file actions
139 lines (124 loc) · 5.46 KB
/
process.py
File metadata and controls
139 lines (124 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import threading
import time
import sys
import datetime
from airtable import Airtable
import logging
logging.basicConfig(format='%(asctime)s %(filename)s %(levelname)s: %(message)s',
level=logging.INFO)
base_key = 'appQ2YoOIQFBKKIpG'
tables = ['RUNS']
airtables={}
for t in tables:
airtables[t] = Airtable(base_key, t, api_key=os.environ['AIRTABLE_KEY'])
class processThread (threading.Thread):
def __init__(self, threadID, name, runid, runtype):
threading.Thread.__init__(self)
self.threadID = threadID
self.name= name
self.runid = runid
self.runtype = runtype.lower()
def run(self):
logging.info("Starting RAW2ROOT of " + self.runid)
fields = {'Processing status': 'RAW2ROOT STARTED'}
airtables['RUNS'].update(self.name, fields)
#Launch processing on pccmsdaq02
exitStatus=os.WEXITSTATUS(os.system('ssh pccmsdaq02 "cd /home/cmsdaq/work/LYBenchProcessing; ./processData.sh -t %s -r %s" > /dev/null 2>&1'%(self.runtype,self.runid)))
if (exitStatus==0):
logging.info("RAW2ROOT completed for " + self.runid)
fields = {'Processing status': 'RAW2ROOT COMPLETED'}
airtables['RUNS'].update(self.name, fields)
else:
logging.info("RAW2ROOT failed for " + self.runid)
fields = {'Processing status': 'FAILED'}
airtables['RUNS'].update(self.name, fields)
del processingRuns[self.name]
class analysisThread (threading.Thread):
def __init__(self, threadID, name, runid, runtype, ledref=None,ledrefid=None):
threading.Thread.__init__(self)
self.threadID = threadID
self.name= name
self.runid = runid
self.runtype = runtype.lower()
self.ledref = ledref
self.ledrefid = ledrefid
def run(self):
if (self.runtype=='ped'):
return
if (self.runtype=='source' and (self.ledref==None or self.ledrefid==None)):
return
logging.info("Started analysis of " + self.runid)
fields = {'Processing status': 'PROCESSING STARTED'}
if (self.runtype=='source'):
fields.update({'LED RUNS': [self.ledrefid]})
airtables['RUNS'].update(self.name, fields)
#Launch analysis
if (self.runtype=='led'):
exitStatus=os.WEXITSTATUS(os.system('cd /home/cmsdaq/Analysis/LYBenchMacros; ./runSinglePEFit.sh -i %s -l -s -w > logs/singlePEFit_%s.log 2>&1'%(self.runid,self.runid)))
elif (self.runtype=='source'):
exitStatus=os.WEXITSTATUS(os.system('cd /home/cmsdaq/Analysis/LYBenchMacros; ./runSourceAnalysis.sh -i %s -p %s -w > logs/sourceAnalysis_%s.log 2>&1'%(self.runid,self.ledref,self.runid)))
if (exitStatus==0):
logging.info("Analysis completed for " + self.runid)
fields = {'Processing status': 'PROCESSING COMPLETED','Results':'http://10.0.0.44/process/%s/'%self.runid}
airtables['RUNS'].update(self.name, fields)
else:
logging.info("Analysis failed for " + self.runid)
fields = {'Processing status': 'FAILED'}
airtables['RUNS'].update(self.name, fields)
del analysisRuns[self.name]
processingRuns={}
analysisRuns={}
counterThreads=1
logging.info("Starting processing loop")
while True:
try:
#Find the last good LED-SCAN run for today
today=datetime.datetime.now().date()
lastValidatedRuns=airtables['RUNS'].search('Processing status','VALIDATED')
todayLedRuns={}
for r in lastValidatedRuns:
if r['fields']['Type']!='LED':
continue
if not 'SCAN' in r['fields']['RunID']:
continue
if datetime.datetime.strptime(r['fields']['Created'],'%Y-%m-%dT%H:%M:%S.%fZ').date() != today:
continue
todayLedRuns[r['fields']['RunID']]=r['id']
if (len(todayLedRuns)>0):
lastLed=sorted(todayLedRuns.keys(),reverse=True)[0]
#Discover if there is any run to be processed
runsToProcess=airtables['RUNS'].search('Processing status','DAQ COMPLETED')
for r in runsToProcess:
if not r['id'] in processingRuns:
#print(r)
processingRuns[r['id']]=processThread(counterThreads,r['id'],r['fields']['RunID'],r['fields']['Type'])
processingRuns[r['id']].start()
counterThreads+=1
#Discover if there is any run to be analysed
runsToAnalyze=airtables['RUNS'].search('Processing status','RAW2ROOT COMPLETED')
for r in runsToAnalyze:
if not r['id'] in analysisRuns:
if (r['fields']['Type']=='PED'):
continue
elif (r['fields']['Type']=='LED'):
analysisRuns[r['id']]=analysisThread(counterThreads,r['id'],r['fields']['RunID'],r['fields']['Type'])
analysisRuns[r['id']].start()
counterThreads+=1
elif (r['fields']['Type']=='SOURCE'):
if (len(todayLedRuns)==0):
continue
analysisRuns[r['id']]=analysisThread(counterThreads,r['id'],r['fields']['RunID'],r['fields']['Type'],lastLed,todayLedRuns[lastLed])
analysisRuns[r['id']].start()
counterThreads+=1
except KeyboardInterrupt:
logging.info("Bye")
sys.exit()
except Exception as e:
logging.error(e)
try:
#refresh every 10s
time.sleep(10)
except KeyboardInterrupt:
logging.info("Bye")
sys.exit()