15
15
import contextlib
16
16
import itertools
17
17
import logging
18
- from collections import OrderedDict , defaultdict
18
+ from collections import OrderedDict
19
19
from datetime import datetime
20
20
from typing import Any , Callable , Dict , List , Literal , Optional , Sequence , Tuple , Union
21
21
@@ -74,7 +74,10 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
74
74
"""Whether to read from Dynamodb by forcing consistent reads"""
75
75
76
76
tags : Union [Dict [str , str ], None ] = None
77
- """AWS resource tags added to each table"""
77
+ """Key-value pairs added to each feature-view"""
78
+
79
+ tag_aws_resources : StrictBool = False
80
+ """Add the feature-view tags to the underlying AWS dynamodb tables"""
78
81
79
82
session_based_auth : bool = False
80
83
"""AWS session based client authentication"""
@@ -138,38 +141,6 @@ async def close(self):
138
141
def async_supported (self ) -> SupportedAsyncMethods :
139
142
return SupportedAsyncMethods (read = True , write = True )
140
143
141
- @staticmethod
142
- def _table_tags (online_config , table_instance ) -> list [dict [str , str ]]:
143
- table_instance_tags = table_instance .tags or {}
144
- online_tags = online_config .tags or {}
145
-
146
- common_tags = [
147
- {"Key" : key , "Value" : table_instance_tags .get (key ) or value }
148
- for key , value in online_tags .items ()
149
- ]
150
- table_tags = [
151
- {"Key" : key , "Value" : value }
152
- for key , value in table_instance_tags .items ()
153
- if key not in online_tags
154
- ]
155
-
156
- return common_tags + table_tags
157
-
158
- @staticmethod
159
- def _update_tags (dynamodb_client , table_name : str , new_tags : list [dict [str , str ]]):
160
- table_arn = dynamodb_client .describe_table (TableName = table_name )["Table" ][
161
- "TableArn"
162
- ]
163
- current_tags = dynamodb_client .list_tags_of_resource (ResourceArn = table_arn )[
164
- "Tags"
165
- ]
166
- if current_tags :
167
- remove_keys = [tag ["Key" ] for tag in current_tags ]
168
- dynamodb_client .untag_resource (ResourceArn = table_arn , TagKeys = remove_keys )
169
-
170
- if new_tags :
171
- dynamodb_client .tag_resource (ResourceArn = table_arn , Tags = new_tags )
172
-
173
144
def update (
174
145
self ,
175
146
config : RepoConfig ,
@@ -189,60 +160,26 @@ def update(
189
160
"""
190
161
online_config = config .online_store
191
162
assert isinstance (online_config , DynamoDBOnlineStoreConfig )
192
- dynamodb_client = self ._get_dynamodb_client (
193
- online_config .region ,
194
- online_config .endpoint_url ,
195
- online_config .session_based_auth ,
196
- )
163
+
197
164
dynamodb_resource = self ._get_dynamodb_resource (
198
165
online_config .region ,
199
166
online_config .endpoint_url ,
200
167
online_config .session_based_auth ,
201
168
)
202
169
203
- do_tag_updates = defaultdict (bool )
204
- for table_instance in tables_to_keep :
205
- # Add Tags attribute to creation request only if configured to prevent
206
- # TagResource permission issues, even with an empty Tags array.
207
- table_tags = self ._table_tags (online_config , table_instance )
208
- kwargs = {"Tags" : table_tags } if table_tags else {}
209
-
210
- table_name = _get_table_name (online_config , config , table_instance )
211
- try :
212
- dynamodb_resource .create_table (
213
- TableName = table_name ,
214
- KeySchema = [{"AttributeName" : "entity_id" , "KeyType" : "HASH" }],
215
- AttributeDefinitions = [
216
- {"AttributeName" : "entity_id" , "AttributeType" : "S" }
217
- ],
218
- BillingMode = "PAY_PER_REQUEST" ,
219
- ** kwargs ,
220
- )
221
-
222
- except ClientError as ce :
223
- do_tag_updates [table_name ] = True
224
-
225
- # If the table creation fails with ResourceInUseException,
226
- # it means the table already exists or is being created.
227
- # Otherwise, re-raise the exception
228
- if ce .response ["Error" ]["Code" ] != "ResourceInUseException" :
229
- raise
230
-
231
- for table_instance in tables_to_keep :
232
- table_name = _get_table_name (online_config , config , table_instance )
233
- dynamodb_client .get_waiter ("table_exists" ).wait (TableName = table_name )
234
- # once table is confirmed to exist, update the tags.
235
- # tags won't be updated in the create_table call if the table already exists
236
- if do_tag_updates [table_name ]:
237
- tags = self ._table_tags (online_config , table_instance )
238
- self ._update_tags (dynamodb_client , table_name , tags )
239
-
240
- for table_to_delete in tables_to_delete :
241
- _delete_table_idempotent (
242
- dynamodb_resource ,
243
- _get_table_name (online_config , config , table_to_delete ),
170
+ def get_table_manager (table ):
171
+ return _DynamoTableManager (
172
+ dynamodb_resource = dynamodb_resource ,
173
+ config = config ,
174
+ feature_view = table ,
244
175
)
245
176
177
+ for table in tables_to_keep :
178
+ get_table_manager (table ).update ()
179
+
180
+ for table in tables_to_delete :
181
+ get_table_manager (table ).delete ()
182
+
246
183
def teardown (
247
184
self ,
248
185
config : RepoConfig ,
@@ -265,9 +202,11 @@ def teardown(
265
202
)
266
203
267
204
for table in tables :
268
- _delete_table_idempotent (
269
- dynamodb_resource , _get_table_name (online_config , config , table )
270
- )
205
+ _DynamoTableManager (
206
+ dynamodb_resource = dynamodb_resource ,
207
+ config = config ,
208
+ feature_view = table ,
209
+ ).delete ()
271
210
272
211
def online_write_batch (
273
212
self ,
@@ -845,3 +784,86 @@ def _latest_data_to_write(
845
784
as_hashable = ((d [0 ].SerializeToString (), d ) for d in data )
846
785
sorted_data = sorted (as_hashable , key = lambda ah : (ah [0 ], ah [1 ][2 ]))
847
786
return (v for _ , v in OrderedDict ((ah [0 ], ah [1 ]) for ah in sorted_data ).items ())
787
+
788
+
789
+ class _DynamoTableManager :
790
+ def __init__ (
791
+ self , dynamodb_resource , config : RepoConfig , feature_view : FeatureView
792
+ ):
793
+ self .config = config
794
+ self .feature_view = feature_view
795
+ self ._dynamodb_resource = dynamodb_resource
796
+
797
+ @property
798
+ def _dynamodb_client (self ):
799
+ return self ._dynamodb_resource .meta .client
800
+
801
+ @property
802
+ def table_name (self ) -> str :
803
+ return _get_table_name (
804
+ self .config .online_config , self .config , self .feature_view
805
+ )
806
+
807
+ def table_tags (self ) -> list [dict [str , str ]]:
808
+ table_instance_tags = self .feature_view .tags or {}
809
+ online_tags = self .config .online_config .tags or {}
810
+
811
+ common_tags = [
812
+ {"Key" : key , "Value" : table_instance_tags .get (key ) or value }
813
+ for key , value in online_tags .items ()
814
+ ]
815
+ table_tags = [
816
+ {"Key" : key , "Value" : value }
817
+ for key , value in table_instance_tags .items ()
818
+ if key not in online_tags
819
+ ]
820
+
821
+ return common_tags + table_tags
822
+
823
+ def _update_tags (self , new_tags : list [dict [str , str ]]):
824
+ table_arn = self ._dynamodb_client .describe_table (TableName = self .table_name )[
825
+ "Table"
826
+ ]["TableArn" ]
827
+ current_tags = self ._dynamodb_client .list_tags_of_resource (
828
+ ResourceArn = table_arn
829
+ )["Tags" ]
830
+ if current_tags :
831
+ remove_keys = [tag ["Key" ] for tag in current_tags ]
832
+ self ._dynamodb_client .untag_resource (
833
+ ResourceArn = table_arn , TagKeys = remove_keys
834
+ )
835
+
836
+ if new_tags :
837
+ self ._dynamodb_client .tag_resource (ResourceArn = table_arn , Tags = new_tags )
838
+
839
+ def update (self ):
840
+ # Add Tags attribute to creation request only if configured to prevent
841
+ # TagResource permission issues, even with an empty Tags array.
842
+ do_tag_update = self .config .online_config .tag_aws_resources
843
+ table_tags = self .table_tags ()
844
+ kwargs = {"Tags" : table_tags } if table_tags and do_tag_update else {}
845
+ try :
846
+ self ._dynamodb_resource .create_table (
847
+ TableName = self .table_name ,
848
+ KeySchema = [{"AttributeName" : "entity_id" , "KeyType" : "HASH" }],
849
+ AttributeDefinitions = [
850
+ {"AttributeName" : "entity_id" , "AttributeType" : "S" }
851
+ ],
852
+ BillingMode = "PAY_PER_REQUEST" ,
853
+ ** kwargs ,
854
+ )
855
+ do_tag_update = False
856
+ except ClientError as ce :
857
+ # If the table creation fails with ResourceInUseException,
858
+ # it means the table already exists or is being created.
859
+ # Otherwise, re-raise the exception
860
+ if ce .response ["Error" ]["Code" ] != "ResourceInUseException" :
861
+ raise
862
+
863
+ # tags won't be updated in the create_table call if the table already exists
864
+ self ._dynamodb_client .get_waiter ("table_exists" ).wait (TableName = self .table_name )
865
+ if do_tag_update :
866
+ self ._update_tags (table_tags )
867
+
868
+ def delete (self ) -> None :
869
+ _delete_table_idempotent (self ._dynamodb_resource , self .table_name )
0 commit comments