diff --git a/README.md b/README.md index 021096b..9cf014d 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,11 @@ -# Plugin - Intercom to S3 +# Plugin - Intercom to S3/GCS -This plugin moves data from the [Intercom](https://developers.intercom.com/v2.0/docs) API to S3 based on the specified object +This plugin moves data from the [Intercom](https://developers.intercom.com/v2.0/docs) API to S3/GCS based on the specified object ## Hooks ### IntercomHook This hook handles the authentication and request to Intercom. Based on [python-intercom](https://github.com/jkeyes/python-intercom) module. -### S3Hook -[Core Airflow S3Hook](https://pythonhosted.org/airflow/_modules/S3_hook.html) with the standard boto dependency. - ## Operators ### IntercomToS3Operator This operator composes the logic for this plugin. It fetches the intercom specified object and saves the result in a S3 Bucket, under a specified key, in @@ -23,5 +20,21 @@ njson format. The parameters it can accept include the following. - `fields`: *optional* list of fields that you want to get from the object. If *None*, then this will get all fields for the object - `replication_key`: *optional* name of the replication key, if needed. - `replication_key_value`: *(optional)* value of the replication key, if needed. The operator will import only results with the property from replication_key grater than the value of this param. -- `intercom_method`: *(optional)* method to call from python-intercom. Default to "all". -- `**kwargs`: replication key and value, if replication_key parameter is given and extra params for intercom method if needed. \ No newline at end of file +- `intercom_method`: *(optional)* method to call from python-intercom. Default to "all". +- `**kwargs`: replication key and value, if replication_key parameter is given and extra params for intercom method if needed. + +### IntercomToGCSOperator +This operator composes the logic for this plugin. It fetches the intercom specified object and saves the result in a GCS Bucket, under a specified key, in +njson format. The parameters it can accept include the following. + +- `intercom_conn_id`: The intercom connection id from Airflow +- `intercom_obj`: Intercom object to query +- `intercom_method`: *optional* Method from python-intercom. +- `s3_conn_id`: GCS connection id from Airflow. +- `s3_bucket`: The output sgcsbucket. +- `gcs_key`: The input gcs object. +- `fields`: *optional* list of fields that you want to get from the object. If *None*, then this will get all fields for the object +- `replication_key`: *optional* name of the replication key, if needed. +- `replication_key_value`: *(optional)* value of the replication key, if needed. The operator will import only results with the property from replication_key grater than the value of this param. +- `intercom_method`: *(optional)* method to call from python-intercom. Default to "all". +- `**kwargs`: replication key and value, if replication_key parameter is given and extra params for intercom method if needed. diff --git a/__init__.py b/__init__.py index e07ef18..dfcbf1b 100644 --- a/__init__.py +++ b/__init__.py @@ -1,12 +1,13 @@ from airflow.plugins_manager import AirflowPlugin from intercom_plugin.operators.intercom_to_s3_operator import IntercomToS3Operator +from intercom_plugin.operators.intercom_to_gcs_operator import IntercomToGCSOperator from intercom_plugin.hooks.intercom_hook import IntercomHook class IntercomToS3Plugin(AirflowPlugin): name = "intercom_plugin" hooks = [IntercomHook] - operators = [IntercomToS3Operator] + operators = [IntercomToS3Operator, IntercomToGCSOperator] executors = [] macros = [] admin_views = [] diff --git a/operators/intercom_to_gcs_operator.py b/operators/intercom_to_gcs_operator.py new file mode 100644 index 0000000..4b55b27 --- /dev/null +++ b/operators/intercom_to_gcs_operator.py @@ -0,0 +1,147 @@ +import logging +import json +import collections +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from intercom_plugin.hooks.intercom_hook import IntercomHook +from tempfile import NamedTemporaryFile + + +class IntercomToGCSOperator(BaseOperator): + """ + Make a query against Intercom and write the resulting data to gcs. + """ + template_field = ('gcs_object', ) + + @apply_defaults + def __init__( + self, + intercom_conn_id, + intercom_obj, + intercom_method='all', + gcs_conn_id='', + gcs_bucket='', + gcs_object='', + fields=None, + replication_key_name=None, + replication_key_value=0, + *args, + **kwargs + ): + """ + Initialize the operator + :param intercom_conn_id: name of the Airflow connection that has + your Intercom tokens + :param intercom_obj: name of the Intercom object we are + fetching data from + :param gcs_conn_id: name of the Airflow connection that has + your GCS conection params + :param gcs_bucket: name of the destination GCS bucket + :param gcs_object: name of the destination file from bucket + :param fields: *(optional)* list of fields that you want + to get from the object. + If *None*, then this will get all fields + for the object + :param replication_key_name: *(optional)* name of the replication key, + if needed. + :param replication_key_value: *(optional)* value of the replication key, + if needed. The operator will import only + results with the property from replication_key + grater than the value of this param. + :param intercom_method *(optional)* method to call from python-intercom + Default to "all". + :param \**kwargs: Extra params for the intercom query, based on python + intercom module + """ + + super().__init__(*args, **kwargs) + + self.intercom_conn_id = intercom_conn_id + self.intercom_obj = intercom_obj + self.intercom_method = intercom_method + + self.gcs_conn_id = gcs_conn_id + self.gcs_bucket = gcs_bucket + self.gcs_object = gcs_object + + self.fields = fields + self.replication_key_name = replication_key_name + self.replication_key_value = replication_key_value + self._kwargs = kwargs + + def filter_fields(self, result): + """ + Filter the fields from an resulting object. + + This will return a object only with fields given + as parameter in the constructor. + + All fields are returned when "fields" param is None. + """ + if not self.fields: + return result + obj = {} + for field in self.fields: + obj[field] = result[field] + return obj + + def filter(self, results): + """ + Filter the results. + This will filter the results if there's a replication key given as param. + """ + if not isinstance(results, collections.Iterable): + return json.loads((json.dumps(results, default=lambda o: o.__dict__))) + + filtered = [] + for result in results: + result_json = json.loads((json.dumps(result, + default=lambda o: o.__dict__))) + + if not self.replication_key_name or \ + int(result_json[self.replication_key_name]) >= int(self.replication_key_value): + filtered.append(self.filter_fields(result_json)) + logging.info(filtered) + + return filtered + + def execute(self, context): + """ + Execute the operator. + This will get all the data for a particular Intercom model + and write it to a file. + """ + logging.info("Prepping to gather data from Intercom") + hook = IntercomHook( + conn_id=self.intercom_conn_id, + ) + + # attempt to login to Intercom + # if this process fails, it will raise an error and die right here + # we could wrap it + hook.get_conn() + + logging.info( + "Making request for" + " {0} object".format(self.intercom_obj) + ) + + # fetch the results from intercom and filter them + + results = hook.run_query(self.intercom_obj, self.intercom_method) + filterd_results = self.filter(results) + + # write the results to a temporary file and save that file to gcs + with NamedTemporaryFile("w") as tmp: + for result in filterd_results: + tmp.write(json.dumps(result) + '\n') + + tmp.flush() + + gcs_conn = GoogleCloudStorageHook(self.gcs_conn_id) + gcs_conn.upload(self.gcs_bucket, self.gcs_object, tmp.name) + + tmp.close() + + logging.info("Query finished!")