1919client = WebClient (token = slack_token )
2020
2121
22- def process_global_attributes (
22+ def process_line_for_global_attributes (
2323 line , cur , submission_id , metadata , submission_filename , line_counter
2424):
2525 logger .debug ("Processing global attribute: %s" , line )
@@ -35,20 +35,49 @@ def process_global_attributes(
3535 f"*{ submission_filename } * _line:{ line_counter } _ - "
3636 f"Unable to locate attribute_name *{ tokens [0 ]} * in _metadata_types_ table."
3737 )
38-
39- logger .warning (msg )
40- try :
41- client .chat_postMessage (
42- channel = slack_channel , text = "<!channel> :warning: " + msg
43- )
44- except SlackApiError as e :
45- logger .error (e )
38+ post_msg_to_slack (msg )
4639 else :
4740 str_submission_id = str (submission_id )
4841 str_row = str (rows [0 ][0 ])
4942 metadata .append ((str_submission_id , str_row , tokens [1 ]))
5043
5144
45+ def post_msg_to_slack (msg ):
46+ logger .warning (msg )
47+ try :
48+ client .chat_postMessage (
49+ channel = slack_channel , text = "<!channel> :warning: " + msg
50+ )
51+ except SlackApiError as e :
52+ logger .error (e )
53+
54+
55+ def process_global_attributes (lines , cur , submission_id , metadata , submission_filename ):
56+ processed_lines = 0
57+ global_attributes = []
58+ for line in lines :
59+ processed_lines += 1
60+ if line .startswith ("//" ):
61+ continue
62+ elif line .strip ().startswith (":" ):
63+ global_attributes .append (line )
64+ else :
65+ break
66+
67+ for global_attribute in global_attributes :
68+ process_line_for_global_attributes (
69+ global_attribute ,
70+ cur ,
71+ submission_id ,
72+ metadata ,
73+ submission_filename ,
74+ processed_lines ,
75+ )
76+
77+ # returning -1 because lines is an 0-indexed array
78+ return processed_lines - 1 if processed_lines > 0 else 0
79+
80+
5281def process_etuff_file (file , version = None , notes = None ):
5382 start = time .perf_counter ()
5483 submission_filename = file # full path name is now preferred rather than - file[file.rindex("/") + 1 :]
@@ -82,97 +111,91 @@ def process_etuff_file(file, version=None, notes=None):
82111
83112 metadata = []
84113 proc_obs = []
114+
85115 s_time = time .perf_counter ()
86116 with open (file , "rb" ) as data :
87117 lines = [line .decode ("utf-8" , "ignore" ) for line in data .readlines ()]
88- variable_lookup = {}
89- line_counter = 0
90- for line in lines :
91- line_counter += 1
92- if line .startswith ("//" ):
93- continue
94- elif line .strip ().startswith (":" ):
95- process_global_attributes (
96- line ,
97- cur ,
98- submission_id ,
99- metadata ,
100- submission_filename ,
101- line_counter ,
102- )
118+ lines_length = len (lines )
119+
120+ line_counter = 0
121+ variable_lookup = {}
122+
123+ metadata_lines = process_global_attributes (
124+ lines , cur , submission_id , metadata , submission_filename
125+ )
126+ line_counter += metadata_lines
127+
128+ for counter in range (metadata_lines , lines_length ):
129+ line = lines [line_counter ]
130+ line_counter += 1
131+ tokens = line .split ("," )
132+ tokens = [token .replace ('"' , "" ) for token in tokens ]
133+ if tokens :
134+ variable_name = tokens [3 ]
135+ if variable_name in variable_lookup :
136+ variable_id = variable_lookup [variable_name ]
103137 else :
104- # Parse proc_observations
105- tokens = line .split ("," )
106- tokens = [token .replace ('"' , "" ) for token in tokens ]
107- if tokens :
108- variable_name = tokens [3 ]
109- if variable_name in variable_lookup :
110- variable_id = variable_lookup [variable_name ]
111- else :
138+ cur .execute (
139+ "SELECT variable_id FROM observation_types WHERE variable_name = %s" ,
140+ (variable_name ,),
141+ )
142+ row = cur .fetchone ()
143+ if row :
144+ variable_id = row [0 ]
145+ else :
146+ try :
147+ logger .debug (
148+ "variable_name=%s\t tokens=%s" , variable_name , tokens
149+ )
112150 cur .execute (
113- "SELECT variable_id FROM observation_types WHERE variable_name = %s" ,
114- (variable_name ,),
151+ "INSERT INTO observation_types("
152+ "variable_name, variable_units) VALUES (%s, %s) "
153+ "ON CONFLICT (variable_name) DO NOTHING" ,
154+ (variable_name , tokens [4 ].strip ()),
115155 )
116- row = cur .fetchone ()
117- if row :
118- variable_id = row [0 ]
119- else :
120- try :
121- logger .debug (variable_name , tokens )
122- cur .execute (
123- "INSERT INTO observation_types("
124- "variable_name, variable_units) VALUES (%s, %s) "
125- "ON CONFLICT (variable_name) DO NOTHING" ,
126- (variable_name , tokens [4 ].strip ()),
127- )
128- except (
129- Exception ,
130- psycopg2 .DatabaseError ,
131- ) as error :
132- logger .error (
133- "variable_id '%s' already exists in 'observation_types'. tokens:"
134- " '%s. \n error: %s" ,
135- variable_name ,
136- tokens ,
137- error ,
138- )
139- conn .rollback ()
140- cur .execute (
141- "SELECT nextval('observation_types_variable_id_seq')"
142- )
143- variable_id = cur .fetchone ()[0 ]
144- variable_lookup [variable_name ] = variable_id
145- date_time = None
146- if tokens [0 ] != '""' and tokens [0 ] != "" :
147- if tokens [0 ].startswith ('"' ):
148- tokens [0 ].replace ('"' , "" )
149- date_time = dt .strptime (
150- tokens [0 ], "%Y-%m-%d %H:%M:%S"
151- ).astimezone (pytz .utc )
152- else :
153- stripped_line = line .strip ("\n " )
154- msg = (
155- f"*{ submission_filename } * _line:{ line_counter } _ - "
156- f"No datetime... skipping line: { stripped_line } "
156+ except (
157+ Exception ,
158+ psycopg2 .DatabaseError ,
159+ ) as error :
160+ logger .error (
161+ "variable_id '%s' already exists in 'observation_types'. tokens:"
162+ " '%s. \n error: %s" ,
163+ variable_name ,
164+ tokens ,
165+ error ,
157166 )
158- logger .warning (msg )
159- try :
160- client .chat_postMessage (
161- channel = slack_channel ,
162- text = "<!channel> :warning: " + msg ,
163- )
164- except SlackApiError as e :
165- logger .error (e )
166- continue
167- proc_obs .append (
168- [
169- date_time ,
170- variable_id ,
171- tokens [2 ],
172- submission_id ,
173- str (submission_id ),
174- ]
167+ conn .rollback ()
168+ cur .execute (
169+ "SELECT nextval('observation_types_variable_id_seq')"
175170 )
171+ variable_id = cur .fetchone ()[0 ]
172+ variable_lookup [variable_name ] = variable_id
173+ date_time = None
174+ if tokens [0 ] != '""' and tokens [0 ] != "" :
175+ if tokens [0 ].startswith ('"' ):
176+ tokens [0 ].replace ('"' , "" )
177+ date_time = dt .strptime (
178+ tokens [0 ], "%Y-%m-%d %H:%M:%S"
179+ ).astimezone (pytz .utc )
180+ else :
181+ stripped_line = line .strip ("\n " )
182+ msg = (
183+ f"*{ submission_filename } * _line:{ line_counter } _ - "
184+ f"No datetime... skipping line: { stripped_line } "
185+ )
186+ post_msg_to_slack (msg )
187+ continue
188+
189+ proc_obs .append (
190+ [
191+ date_time ,
192+ variable_id ,
193+ tokens [2 ],
194+ submission_id ,
195+ str (submission_id ),
196+ ]
197+ )
198+
176199 len_proc_obs = len (proc_obs )
177200 e_time = time .perf_counter ()
178201 sub_elapsed = round (e_time - s_time , 2 )
0 commit comments