1919client = WebClient (token = slack_token )
2020
2121
22- def process_global_attributes (
22+ def process_line_for_global_attributes (
2323 line , cur , submission_id , metadata , submission_filename , line_counter
2424):
2525 logger .debug ("Processing global attribute: %s" , line )
@@ -35,20 +35,49 @@ def process_global_attributes(
3535 f"*{ submission_filename } * _line:{ line_counter } _ - "
3636 f"Unable to locate attribute_name *{ tokens [0 ]} * in _metadata_types_ table."
3737 )
38-
39- logger .warning (msg )
40- try :
41- client .chat_postMessage (
42- channel = slack_channel , text = "<!channel> :warning: " + msg
43- )
44- except SlackApiError as e :
45- logger .error (e )
38+ post_msg_to_slack (msg )
4639 else :
4740 str_submission_id = str (submission_id )
4841 str_row = str (rows [0 ][0 ])
4942 metadata .append ((str_submission_id , str_row , tokens [1 ]))
5043
5144
45+ def post_msg_to_slack (msg ):
46+ logger .warning (msg )
47+ try :
48+ client .chat_postMessage (
49+ channel = slack_channel , text = "<!channel> :warning: " + msg
50+ )
51+ except SlackApiError as e :
52+ logger .error (e )
53+
54+
55+ def process_global_attributes (lines , cur , submission_id , metadata , submission_filename ):
56+ processed_lines = 0
57+ global_attributes = []
58+ for line in lines :
59+ processed_lines += 1
60+ if line .startswith ("//" ):
61+ continue
62+ elif line .strip ().startswith (":" ):
63+ global_attributes .append (line )
64+ else :
65+ break
66+
67+ for global_attribute in global_attributes :
68+ process_line_for_global_attributes (
69+ global_attribute ,
70+ cur ,
71+ submission_id ,
72+ metadata ,
73+ submission_filename ,
74+ processed_lines ,
75+ )
76+
77+ # returning -1 because lines is an 0-indexed array
78+ return processed_lines - 1 if processed_lines > 0 else 0
79+
80+
5281def process_etuff_file (file , version = None , notes = None ):
5382 start = time .perf_counter ()
5483 submission_filename = file # full path name is now preferred rather than - file[file.rindex("/") + 1 :]
@@ -82,97 +111,87 @@ def process_etuff_file(file, version=None, notes=None):
82111
83112 metadata = []
84113 proc_obs = []
114+
85115 s_time = time .perf_counter ()
86116 with open (file , "rb" ) as data :
87117 lines = [line .decode ("utf-8" , "ignore" ) for line in data .readlines ()]
88- variable_lookup = {}
89- line_counter = 0
90- for line in lines :
91- line_counter += 1
92- if line .startswith ("//" ):
93- continue
94- elif line .strip ().startswith (":" ):
95- process_global_attributes (
96- line ,
97- cur ,
98- submission_id ,
99- metadata ,
100- submission_filename ,
101- line_counter ,
102- )
118+ lines_length = len (lines )
119+
120+ line_counter = 0
121+ variable_lookup = {}
122+
123+ metadata_lines = process_global_attributes (lines , cur , submission_id , metadata , submission_filename )
124+ line_counter += metadata_lines
125+
126+ for counter in range (metadata_lines , lines_length ):
127+ line = lines [line_counter ]
128+ line_counter += 1
129+ tokens = line .split ("," )
130+ tokens = [token .replace ('"' , "" ) for token in tokens ]
131+ if tokens :
132+ variable_name = tokens [3 ]
133+ if variable_name in variable_lookup :
134+ variable_id = variable_lookup [variable_name ]
103135 else :
104- # Parse proc_observations
105- tokens = line .split ("," )
106- tokens = [token .replace ('"' , "" ) for token in tokens ]
107- if tokens :
108- variable_name = tokens [3 ]
109- if variable_name in variable_lookup :
110- variable_id = variable_lookup [variable_name ]
111- else :
136+ cur .execute (
137+ "SELECT variable_id FROM observation_types WHERE variable_name = %s" ,
138+ (variable_name ,),
139+ )
140+ row = cur .fetchone ()
141+ if row :
142+ variable_id = row [0 ]
143+ else :
144+ try :
145+ logger .debug ("variable_name=%s\t tokens=%s" , variable_name , tokens )
112146 cur .execute (
113- "SELECT variable_id FROM observation_types WHERE variable_name = %s" ,
114- (variable_name ,),
147+ "INSERT INTO observation_types("
148+ "variable_name, variable_units) VALUES (%s, %s) "
149+ "ON CONFLICT (variable_name) DO NOTHING" ,
150+ (variable_name , tokens [4 ].strip ()),
115151 )
116- row = cur .fetchone ()
117- if row :
118- variable_id = row [0 ]
119- else :
120- try :
121- logger .debug (variable_name , tokens )
122- cur .execute (
123- "INSERT INTO observation_types("
124- "variable_name, variable_units) VALUES (%s, %s) "
125- "ON CONFLICT (variable_name) DO NOTHING" ,
126- (variable_name , tokens [4 ].strip ()),
127- )
128- except (
129- Exception ,
130- psycopg2 .DatabaseError ,
131- ) as error :
132- logger .error (
133- "variable_id '%s' already exists in 'observation_types'. tokens:"
134- " '%s. \n error: %s" ,
135- variable_name ,
136- tokens ,
137- error ,
138- )
139- conn .rollback ()
140- cur .execute (
141- "SELECT nextval('observation_types_variable_id_seq')"
142- )
143- variable_id = cur .fetchone ()[0 ]
144- variable_lookup [variable_name ] = variable_id
145- date_time = None
146- if tokens [0 ] != '""' and tokens [0 ] != "" :
147- if tokens [0 ].startswith ('"' ):
148- tokens [0 ].replace ('"' , "" )
149- date_time = dt .strptime (
150- tokens [0 ], "%Y-%m-%d %H:%M:%S"
151- ).astimezone (pytz .utc )
152- else :
153- stripped_line = line .strip ("\n " )
154- msg = (
155- f"*{ submission_filename } * _line:{ line_counter } _ - "
156- f"No datetime... skipping line: { stripped_line } "
152+ except (
153+ Exception ,
154+ psycopg2 .DatabaseError ,
155+ ) as error :
156+ logger .error (
157+ "variable_id '%s' already exists in 'observation_types'. tokens:"
158+ " '%s. \n error: %s" ,
159+ variable_name ,
160+ tokens ,
161+ error ,
157162 )
158- logger .warning (msg )
159- try :
160- client .chat_postMessage (
161- channel = slack_channel ,
162- text = "<!channel> :warning: " + msg ,
163- )
164- except SlackApiError as e :
165- logger .error (e )
166- continue
167- proc_obs .append (
168- [
169- date_time ,
170- variable_id ,
171- tokens [2 ],
172- submission_id ,
173- str (submission_id ),
174- ]
163+ conn .rollback ()
164+ cur .execute (
165+ "SELECT nextval('observation_types_variable_id_seq')"
175166 )
167+ variable_id = cur .fetchone ()[0 ]
168+ variable_lookup [variable_name ] = variable_id
169+ date_time = None
170+ if tokens [0 ] != '""' and tokens [0 ] != "" :
171+ if tokens [0 ].startswith ('"' ):
172+ tokens [0 ].replace ('"' , "" )
173+ date_time = dt .strptime (
174+ tokens [0 ], "%Y-%m-%d %H:%M:%S"
175+ ).astimezone (pytz .utc )
176+ else :
177+ stripped_line = line .strip ("\n " )
178+ msg = (
179+ f"*{ submission_filename } * _line:{ line_counter } _ - "
180+ f"No datetime... skipping line: { stripped_line } "
181+ )
182+ post_msg_to_slack (msg )
183+ continue
184+
185+ proc_obs .append (
186+ [
187+ date_time ,
188+ variable_id ,
189+ tokens [2 ],
190+ submission_id ,
191+ str (submission_id ),
192+ ]
193+ )
194+
176195 len_proc_obs = len (proc_obs )
177196 e_time = time .perf_counter ()
178197 sub_elapsed = round (e_time - s_time , 2 )
0 commit comments