Skip to content

Commit c2a427d

Browse files
authored
Refactoring for reducing number of queries (#188)
1 parent 3aa0721 commit c2a427d

File tree

5 files changed

+253
-110
lines changed

5 files changed

+253
-110
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# coding: utf-8
2+
3+
import unittest
4+
from unittest import mock
5+
6+
import psycopg2
7+
import tagbase_server.utils.processing_utils as pu
8+
9+
10+
class TestIngest(unittest.TestCase):
11+
PG_VERSION = "postgres:9.5"
12+
SAMPLE_METADATA_LINES = [
13+
"// global attributes:",
14+
"// etag device attributes:",
15+
':instrument_name = "159903_2012_117464"',
16+
':instrument_type = "s"',
17+
':manufacturer = "Wildlife"',
18+
':model = "SPOT"',
19+
':owner_contact = "a@a.org"',
20+
':person_owner = "foo bar"',
21+
':ptt = "117464"',
22+
]
23+
24+
fake_submission_id = 1
25+
fake_submission_filename = "test_file"
26+
27+
@mock.patch("psycopg2.connect")
28+
def test_processing_file_metadata_with_existing_attributes(self, mock_connect):
29+
metadata_attribs_in_db = [[1, "instrument_name"], [2, "model"]]
30+
# result of psycopg2.connect(**connection_stuff)
31+
mock_con = mock_connect.return_value
32+
# result of con.cursor(cursor_factory=DictCursor)
33+
mock_cur = mock_con.cursor.return_value
34+
# return this when calling cur.fetchall()
35+
mock_cur.fetchall.return_value = metadata_attribs_in_db
36+
37+
conn = psycopg2.connect(
38+
dbname="test",
39+
user="test",
40+
host="localhost",
41+
port="32780",
42+
password="test",
43+
)
44+
cur = conn.cursor()
45+
46+
metadata = []
47+
processed_lines = pu.process_global_attributes(
48+
TestIngest.SAMPLE_METADATA_LINES,
49+
cur,
50+
TestIngest.fake_submission_id,
51+
metadata,
52+
TestIngest.fake_submission_filename,
53+
)
54+
assert len(TestIngest.SAMPLE_METADATA_LINES), processed_lines + 1
55+
assert len(metadata_attribs_in_db), len(metadata)
56+
assert metadata[0][2], "159903_2012_117464"
57+
assert metadata[1][2], "SPOT"
58+
59+
@mock.patch("psycopg2.connect")
60+
def test_processing_file_metadata_without_attributes(self, mock_connect):
61+
metadata_attribs_in_db = []
62+
# result of psycopg2.connect(**connection_stuff)
63+
mock_con = mock_connect.return_value
64+
# result of con.cursor(cursor_factory=DictCursor)
65+
mock_cur = mock_con.cursor.return_value
66+
# return this when calling cur.fetchall()
67+
mock_cur.fetchall.return_value = metadata_attribs_in_db
68+
69+
conn = psycopg2.connect(
70+
dbname="test",
71+
user="test",
72+
host="localhost",
73+
port="32780",
74+
password="test",
75+
)
76+
cur = conn.cursor()
77+
78+
metadata = []
79+
processed_lines = pu.process_global_attributes(
80+
TestIngest.SAMPLE_METADATA_LINES,
81+
cur,
82+
TestIngest.fake_submission_id,
83+
metadata,
84+
TestIngest.fake_submission_filename,
85+
)
86+
assert len(TestIngest.SAMPLE_METADATA_LINES), processed_lines + 1
87+
88+
89+
if __name__ == "__main__":
90+
unittest.main()
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .processing_utils import *

tagbase_server/tagbase_server/utils/processing_utils.py

Lines changed: 139 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import os
21
import logging
32
from datetime import datetime as dt
43
from io import StringIO
@@ -7,46 +6,82 @@
76
import pandas as pd
87
import psycopg2.extras
98
import pytz
10-
from slack_sdk import WebClient
11-
from slack_sdk.errors import SlackApiError
129
from tzlocal import get_localzone
1310

1411
from tagbase_server.utils.db_utils import connect
12+
from tagbase_server.utils.slack_utils import post_msg
1513

1614
logger = logging.getLogger(__name__)
17-
slack_token = os.environ.get("SLACK_BOT_TOKEN", "")
18-
slack_channel = os.environ.get("SLACK_BOT_CHANNEL", "tagbase-server")
19-
client = WebClient(token=slack_token)
2015

2116

22-
def process_global_attributes(
23-
line, cur, submission_id, metadata, submission_filename, line_counter
17+
def process_all_lines_for_global_attributes(
18+
global_attributes_lines,
19+
cur,
20+
submission_id,
21+
metadata,
22+
submission_filename,
23+
line_counter,
2424
):
25-
logger.debug("Processing global attribute: %s", line)
26-
tokens = line.strip()[1:].split(" = ")
27-
logger.debug("Processing token: %s", tokens)
28-
cur.execute(
29-
"SELECT attribute_id FROM metadata_types WHERE attribute_name = %s",
30-
(tokens[0],),
25+
attrbs_map = {}
26+
for line in global_attributes_lines:
27+
line = line.strip()
28+
logger.debug("Processing global attribute: %s", line)
29+
tokens = line[1:].split(" = ")
30+
# attribute_name = tokens[0], attribute_value = tokens[1]
31+
if len(tokens) > 1:
32+
attrbs_map[tokens[0]] = tokens[1]
33+
else:
34+
logger.warning("Metadata line %s NOT in expected format!", line)
35+
36+
attrbs_names = ", ".join(
37+
["'{}'".format(attrib_name) for attrib_name in attrbs_map.keys()]
38+
)
39+
attrbs_ids_query = (
40+
"SELECT attribute_id, attribute_name FROM metadata_types "
41+
"WHERE attribute_name IN ({})".format(attrbs_names)
3142
)
43+
logger.debug("Query=%s", attrbs_ids_query)
44+
cur.execute(attrbs_ids_query)
3245
rows = cur.fetchall()
33-
if len(rows) == 0:
46+
47+
str_submission_id = str(submission_id)
48+
for row in rows:
49+
attribute_id = row[0]
50+
attribute_name = row[1]
51+
attribute_value = attrbs_map[attribute_name]
52+
metadata.append((str_submission_id, str(attribute_id), attribute_value))
53+
attrbs_map.pop(attribute_name)
54+
55+
if len(attrbs_map.keys()) > 0:
56+
not_found_attributes = ", ".join(attrbs_map.keys())
3457
msg = (
3558
f"*{submission_filename}* _line:{line_counter}_ - "
36-
f"Unable to locate attribute_name *{tokens[0]}* in _metadata_types_ table."
59+
f"Unable to locate attribute_names *{not_found_attributes}* in _metadata_types_ table."
3760
)
61+
post_msg(msg)
3862

39-
logger.warning(msg)
40-
try:
41-
client.chat_postMessage(
42-
channel=slack_channel, text="<!channel> :warning: " + msg
43-
)
44-
except SlackApiError as e:
45-
logger.error(e)
46-
else:
47-
str_submission_id = str(submission_id)
48-
str_row = str(rows[0][0])
49-
metadata.append((str_submission_id, str_row, tokens[1]))
63+
64+
def process_global_attributes(lines, cur, submission_id, metadata, submission_filename):
65+
processed_lines = 0
66+
global_attributes = []
67+
for line in lines:
68+
processed_lines += 1
69+
if line.startswith("//"):
70+
continue
71+
elif line.strip().startswith(":"):
72+
global_attributes.append(line)
73+
else:
74+
break
75+
76+
process_all_lines_for_global_attributes(
77+
global_attributes,
78+
cur,
79+
submission_id,
80+
metadata,
81+
submission_filename,
82+
processed_lines,
83+
)
84+
return processed_lines - 1 if processed_lines > 0 else 0
5085

5186

5287
def process_etuff_file(file, version=None, notes=None):
@@ -82,97 +117,91 @@ def process_etuff_file(file, version=None, notes=None):
82117

83118
metadata = []
84119
proc_obs = []
120+
85121
s_time = time.perf_counter()
86122
with open(file, "rb") as data:
87123
lines = [line.decode("utf-8", "ignore") for line in data.readlines()]
88-
variable_lookup = {}
89-
line_counter = 0
90-
for line in lines:
91-
line_counter += 1
92-
if line.startswith("//"):
93-
continue
94-
elif line.strip().startswith(":"):
95-
process_global_attributes(
96-
line,
97-
cur,
98-
submission_id,
99-
metadata,
100-
submission_filename,
101-
line_counter,
102-
)
124+
lines_length = len(lines)
125+
126+
line_counter = 0
127+
variable_lookup = {}
128+
129+
metadata_lines = process_global_attributes(
130+
lines, cur, submission_id, metadata, submission_filename
131+
)
132+
line_counter += metadata_lines
133+
134+
for counter in range(metadata_lines, lines_length):
135+
line = lines[line_counter]
136+
line_counter += 1
137+
tokens = line.split(",")
138+
tokens = [token.replace('"', "") for token in tokens]
139+
if tokens:
140+
variable_name = tokens[3]
141+
if variable_name in variable_lookup:
142+
variable_id = variable_lookup[variable_name]
103143
else:
104-
# Parse proc_observations
105-
tokens = line.split(",")
106-
tokens = [token.replace('"', "") for token in tokens]
107-
if tokens:
108-
variable_name = tokens[3]
109-
if variable_name in variable_lookup:
110-
variable_id = variable_lookup[variable_name]
111-
else:
144+
cur.execute(
145+
"SELECT variable_id FROM observation_types WHERE variable_name = %s",
146+
(variable_name,),
147+
)
148+
row = cur.fetchone()
149+
if row:
150+
variable_id = row[0]
151+
else:
152+
try:
153+
logger.debug(
154+
"variable_name=%s\ttokens=%s", variable_name, tokens
155+
)
112156
cur.execute(
113-
"SELECT variable_id FROM observation_types WHERE variable_name = %s",
114-
(variable_name,),
157+
"INSERT INTO observation_types("
158+
"variable_name, variable_units) VALUES (%s, %s) "
159+
"ON CONFLICT (variable_name) DO NOTHING",
160+
(variable_name, tokens[4].strip()),
115161
)
116-
row = cur.fetchone()
117-
if row:
118-
variable_id = row[0]
119-
else:
120-
try:
121-
logger.debug(variable_name, tokens)
122-
cur.execute(
123-
"INSERT INTO observation_types("
124-
"variable_name, variable_units) VALUES (%s, %s) "
125-
"ON CONFLICT (variable_name) DO NOTHING",
126-
(variable_name, tokens[4].strip()),
127-
)
128-
except (
129-
Exception,
130-
psycopg2.DatabaseError,
131-
) as error:
132-
logger.error(
133-
"variable_id '%s' already exists in 'observation_types'. tokens:"
134-
" '%s. \nerror: %s",
135-
variable_name,
136-
tokens,
137-
error,
138-
)
139-
conn.rollback()
140-
cur.execute(
141-
"SELECT nextval('observation_types_variable_id_seq')"
142-
)
143-
variable_id = cur.fetchone()[0]
144-
variable_lookup[variable_name] = variable_id
145-
date_time = None
146-
if tokens[0] != '""' and tokens[0] != "":
147-
if tokens[0].startswith('"'):
148-
tokens[0].replace('"', "")
149-
date_time = dt.strptime(
150-
tokens[0], "%Y-%m-%d %H:%M:%S"
151-
).astimezone(pytz.utc)
152-
else:
153-
stripped_line = line.strip("\n")
154-
msg = (
155-
f"*{submission_filename}* _line:{line_counter}_ - "
156-
f"No datetime... skipping line: {stripped_line}"
162+
except (
163+
Exception,
164+
psycopg2.DatabaseError,
165+
) as error:
166+
logger.error(
167+
"variable_id '%s' already exists in 'observation_types'. tokens:"
168+
" '%s. \nerror: %s",
169+
variable_name,
170+
tokens,
171+
error,
157172
)
158-
logger.warning(msg)
159-
try:
160-
client.chat_postMessage(
161-
channel=slack_channel,
162-
text="<!channel> :warning: " + msg,
163-
)
164-
except SlackApiError as e:
165-
logger.error(e)
166-
continue
167-
proc_obs.append(
168-
[
169-
date_time,
170-
variable_id,
171-
tokens[2],
172-
submission_id,
173-
str(submission_id),
174-
]
173+
conn.rollback()
174+
cur.execute(
175+
"SELECT nextval('observation_types_variable_id_seq')"
175176
)
177+
variable_id = cur.fetchone()[0]
178+
variable_lookup[variable_name] = variable_id
179+
date_time = None
180+
if tokens[0] != '""' and tokens[0] != "":
181+
if tokens[0].startswith('"'):
182+
tokens[0].replace('"', "")
183+
date_time = dt.strptime(
184+
tokens[0], "%Y-%m-%d %H:%M:%S"
185+
).astimezone(pytz.utc)
186+
else:
187+
stripped_line = line.strip("\n")
188+
msg = (
189+
f"*{submission_filename}* _line:{line_counter}_ - "
190+
f"No datetime... skipping line: {stripped_line}"
191+
)
192+
post_msg(msg)
193+
continue
194+
195+
proc_obs.append(
196+
[
197+
date_time,
198+
variable_id,
199+
tokens[2],
200+
submission_id,
201+
str(submission_id),
202+
]
203+
)
204+
176205
len_proc_obs = len(proc_obs)
177206
e_time = time.perf_counter()
178207
sub_elapsed = round(e_time - s_time, 2)

0 commit comments

Comments
 (0)