Skip to content

Commit a787f9b

Browse files
authored
new Streaming Field Renamer block
1 parent 198db91 commit a787f9b

File tree

3 files changed

+84
-0
lines changed

3 files changed

+84
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
### Renames the fields of a input data, optimised for streaming and big data, given a set of rules defined in a CSV file.
2+
3+
See instructions in block option tab.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
{
2+
"@visokiotype": "CustomBlockSchema.CustomBlockManifest",
3+
"name": "Streaming Field Renamer",
4+
"scriptFilename": "script.py",
5+
"language": "PYTHON",
6+
"executableVersion": null,
7+
"minVersions": [
8+
null
9+
],
10+
"optionsVersion": 1,
11+
"apiVersion": "VERSION_0",
12+
"isResourceIntensiveScript": false,
13+
"showPartitioning": false,
14+
"icon": "",
15+
"description": "Renames the fields of a input data, optimised for streaming and big data, given a set of rules defined in a CSV file",
16+
"category": "Preparation",
17+
"subcategory": null,
18+
"tags": [
19+
"field",
20+
"streaming",
21+
"rename",
22+
"name"
23+
],
24+
"introductoryText": "#### **Optimised for streaming and big data**, with this block, fields of the input data can be renamed according to a set of **rules defined** in a *CSV file*.\nEach record is a field rename operation. The CSV file must contain two fields: \"Current Name\" and \"New Name.\", the first field specifies the current field name, the second field specifies the new field name.\n\nExample CSV file:\n\nCurrent Name, New Name\n–––––––––––––––––––––\nCategory, New Category\nIsin, Isin Number \nSomething else, Something renamed\n\n## Input data:\nThe input data will be renames with those rules from the csv file\nExample:\n\nIndex, Category, Name, Isin\n–––––––––––––––––––––\n1, Subordinated, ABBEY, XS0076061927 \n2, Subordinated, HBOS, XS0070530885\n\n## Output:\nAfter execution, the output would look like this:\n\nIndex, New Category, Name, Isin Number\n––––––––––––––––––––––––––––––––\n1, Subordinated, ABBEY, XS0076061927 \n2, Subordinated, HBOS, XS0070530885",
25+
"dependencies": "",
26+
"options": [
27+
{
28+
"name": "rules",
29+
"title": "Rename rules CSV file",
30+
"description": "A CSV file with rename rules, it has the columns \"Current Name\" and \"New Name.\"",
31+
"groupTitle": "Options",
32+
"width": "TWO",
33+
"@visokiotype": "CustomBlockSchema.FileCustomBlockPublicOption",
34+
"mandatory": true,
35+
"defaultValue": null,
36+
"readOnly": true
37+
}
38+
],
39+
"blockOutputs": [
40+
{
41+
"@visokiotype": "CustomBlockSchema.BlockOutputPublicOption",
42+
"id": "Output Data",
43+
"label": "1",
44+
"displayName": "Output",
45+
"tooltip": null
46+
}
47+
],
48+
"docker": {
49+
"@visokiotype": "CustomBlockSchema.DockerCustomBlockPublicOption",
50+
"customBaseImage": null,
51+
"useCustomBaseImage": false,
52+
"customSystemLibraries": null,
53+
"installVisokioRepLibraries": false
54+
},
55+
"designLock": false,
56+
"apiMode": "STREAM_TRANSFORM"
57+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import pandas as pd
2+
from omniscope.api import OmniscopeApi
3+
omniscope_api = OmniscopeApi()
4+
5+
# Load rename rules from a CSV file
6+
rules_file_path = omniscope_api.get_option("rules")
7+
rename_rules = pd.read_csv(rules_file_path)
8+
9+
# Ensure the CSV contains the necessary columns
10+
if "Current Name" not in rename_rules.columns or "New Name" not in rename_rules.columns:
11+
omniscope_api.abort("The rename rules file must contain columns 'Current Name' and 'New Name'.")
12+
13+
# Create a dictionary mapping current names to new names
14+
rename_map = dict(zip(rename_rules["Current Name"], rename_rules["New Name"]))
15+
16+
def handle_chunk(chunk):
17+
# Rename columns based on the rename_map
18+
chunk = chunk.rename(columns=rename_map)
19+
20+
# Return the modified chunk
21+
return chunk
22+
23+
# Process each incoming chunk with the renaming function
24+
omniscope_api.process_stream(handle_chunk)

0 commit comments

Comments
 (0)