-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathenvlog.py
More file actions
94 lines (78 loc) · 2.9 KB
/
envlog.py
File metadata and controls
94 lines (78 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import pandas as pd
from datetime import timedelta, datetime as dt, timezone
import time
from urllib.request import urlopen
import json
import math
def envlog(telnr, dateObs, utc):
'''
Gets weather/env data from tcsu archiver.
'''
#define archiver api url
hostname = f'k{telnr}dataserver'
port = 17668
url = f'http://{hostname}:{port}/retrieval/data/getData.json?'
#calc start and end period using +/- interval
utDatetime = dt.strptime(dateObs + ' ' + utc, '%Y-%m-%d %H:%M:%S.%f')
interval = 30
dt1 = utDatetime + timedelta(seconds=-interval)
dt1 = dt1.strftime('%Y-%m-%dT%H:%M:%SZ')
dt2 = utDatetime + timedelta(seconds=interval)
dt2 = dt2.strftime('%Y-%m-%dT%H:%M:%SZ')
#map keywords for KOA to archiver channels
keymap = {
'wx_dewpoint' : f'k0:met:dewpointRaw',
'wx_outhum' : f'k0:met:humidityRaw',
'wx_outtmp' : f'k0:met:tempRaw',
'wx_domtmp' : f'k{telnr}:met:tempRaw',
'wx_domhum' : f'k{telnr}:met:humidityRaw',
'wx_pressure' : f'k0:met:pressureRaw',
'wx_windspeed' : f'k{telnr}:met:windSpeedRaw',
'wx_winddir' : f'k{telnr}:met:windAzRaw',
'guidfwhm' : f'k{telnr}:dcs:pnt:cam0:fwhm'
}
#defaults for return data dict
data = {}
data['wx_time'] = 'null'
data['fwhm_time'] = 'null'
#get channel data from archive for each pv
errors = []
warns = []
mn = None
for kw, pv in keymap.items():
data[kw] = 'null'
try:
#query archiver api and make sure we found some records
sendUrl = f'{url}pv={pv}&from={dt1}&to={dt2}'
d = urlopen(sendUrl).read().decode('utf8')
d = json.loads(d)
if len(d) == 0:
warns.append(f"No records for {pv}")
continue
#find closest entry in time
ts_utc = utDatetime.replace(tzinfo=timezone.utc).timestamp()
entry = find_closest_entry(d[0]['data'], ts_utc)
data[kw] = entry['val']
#tack on decimal seconds from nanos
ts = time.strftime('%Y-%m-%d %H:%M:%S', time.gmtime(entry['secs']))
nanos = str(entry['nanos'])[0:2]
ts = f"{ts}.{nanos}"
#mark closest time
if kw == 'guidfwhm': data['fwhm_time'] = ts[-11:]
else:
diff = abs(entry['secs'] - ts_utc)
if mn == None or diff < mn:
data['wx_time'] = ts[-11:]
mn = diff
except Exception as e:
errors.append(f"{pv}:{str(e)}")
return data, errors, warns
def find_closest_entry(entries, ts):
mn = None
best = None
for e in entries:
diff = abs(e['secs'] - ts)
if mn == None or diff < mn:
mn = diff
best = e
return best