Skip to content

Implement encryption and shard management functionalities #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ idna==3.4
requests==2.31.0
urllib3==2.0.2
web3
eth-accounts
eth-accounts
py_ecc
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ charset-normalizer==3.1.0
idna==3.4
requests==2.31.0
urllib3==2.0.2
eth-account==0.13.7
eth-account==0.13.7
py_ecc
198 changes: 196 additions & 2 deletions src/lighthouseweb3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,24 @@
ipns_publish_record as ipnsPublishRecord,
get_ipns_record as getIpnsRecord,
remove_ipns_record as removeIpnsRecord,
create_wallet as createWallet
create_wallet as createWallet,
upload_encrypted as uploadEncrypted
)


from typing import Any, Dict, List

from .functions.encryption import (
get_jwt as getJwt,
revoke_access as revokeAccess,
generate as generateKey,
transfer_ownership as transferOwnership,
share_to_address as shareToAddress,
save_shards as saveShards,
get_access_condition as getAccessCondition,
get_auth_message as getAuthMessage,
recover_key as recoveryKey,
shared_key as sharedKey
)
class Lighthouse:
def __init__(self, token: str = ""):
self.token = token or os.environ.get("LIGHTHOUSE_TOKEN", "")
Expand All @@ -37,6 +51,23 @@ def upload(self, source: str, tag: str = ''):
return d.upload(source, self.token, tag)
except Exception as e:
raise e

def uploadEncrypted(self, source_path: str, public_key: str, auth_token: str, cid_version: int = 1) -> Dict[str, List[Dict]]:
"""
Upload a file or directory to the Lighthouse.

:param source_path: str, path to file or directory
:param public_key: str, public key of the user
:param auth_token: str, auth token of the user
:param cid_version: int, cid version of the user
:return: t.Upload, the upload result
"""

try:
return uploadEncrypted.upload_file(source_path, self.token, public_key, auth_token, cid_version)
except Exception as e:
raise e


def uploadBlob(self, source: io.BufferedReader, filename: str, tag: str = ''):
"""
Expand Down Expand Up @@ -224,3 +255,166 @@ def getTagged(self, tag: str):
except Exception as e:
raise e

class Kavach:
@staticmethod
def sharedKey(key: str, threshold: int = 3, key_count: int = 5) -> Dict[str, Any]:
"""
Splits a secret key into shards using Shamir's Secret Sharing on BLS12-381 curve.

:param key: Hex string of the master secret key
:param threshold: Minimum number of shards required to reconstruct the key
:param key_count: Total number of shards to generate

:return: Dict containing isShardable flag and list of key shards with their indices
"""

try:
return sharedKey.shard_key(key, threshold, key_count)
except Exception as e:
raise e

@staticmethod
def recoverKey(shards: List[Dict[str, str]]) -> Dict[str, str]:
"""
Recovers the master key from a list of key shards using Lagrange interpolation.

:param shards: List of dictionaries containing 'key' and 'index' as hex strings
:return: Dictionary containing the recovered master key
"""
try:
return recoveryKey.recover_key(shards)
except Exception as e:
raise e

@staticmethod
def getAuthMessage(address: str) -> dict[str, Any]:
"""
Get Authentication message from the server

:param address: str, The public key of the user
:return: dict, A dict with authentication message or error
"""
try:
return getAuthMessage.get_auth_message(address)
except Exception as e:
raise e

@staticmethod
def getAccessCondition(cid: str):
"""
Get Access Condition for cid from the node

:param cid: str, Content Identifier for the data to be downloaded
:return: conditions dict of access conditions
"""

try:
return getAccessCondition.get_access_condition(cid)
except Exception as e:
raise e

@staticmethod
def saveShards(
address: str,
cid: str,
auth_token: str,
key_shards: List[dict],
share_to: List[str] = None
) -> Dict[str, Any]:
"""
Save key shards to multiple nodes.

:param address: str, The Ethereum address of the user.
:param cid: str, The Content Identifier (CID) of the file for which key shards are being saved.
:param auth_token: str, The authentication token obtained by signing a message.
:param key_shards: List[KeyShard], A list of KeyShard objects, each containing a key and its index.
:param share_to: List[str], optional, A list of Ethereum addresses to which the key shards should be shared. Defaults to None.
:return: Dict[str, Any], A dictionary indicating the success or failure of the operation, along with any error messages.
"""

try:
return saveShards.save_shards(address, cid, auth_token, key_shards, share_to)
except Exception as e:
raise e

@staticmethod
def shareToAddress(address: str, cid: str, auth_token: Dict[str, Any], share_to: List[str]) -> Dict[str, Any]:
"""
Share an encrypted file with a list of addresses.

:param address: str, The public address of the file owner.
:param cid: str, The CID of the file to share.
:param auth_token: Dict[str, Any], The authentication token.
:param share_to: List[str], A list of public addresses to share the file with.
:return: Dict[str, Any], A dictionary indicating the result of the share operation.
"""
try:
return shareToAddress.share_to_address(address, cid, auth_token, share_to)
except Exception as e:
raise e

@staticmethod
def transferOwnership(address: str, cid: str, new_owner: str, auth_token: str, reset_shared_to: bool = True) -> dict[str, Any]:
"""
Transfer ownership of a file from the current owner to a new owner.

:param address: str, The address of the current owner.
:param cid: str, The Content Identifier (CID) of the file to transfer.
:param new_owner: str, The address of the new owner.
:param auth_token: str, The authentication token for the current owner.
:param reset_shared_to: bool, Whether to reset the list of users the file is shared with (default: True).
:return: dict, A dictionary indicating the success or failure of the operation.
"""
try:
return transferOwnership.transfer_ownership(address, cid, new_owner, auth_token, reset_shared_to)
except Exception as e:
raise e

@staticmethod
def generate(threshold: int = 3, key_count: int = 5) -> Dict[str, any]:
"""
Generates a set of master secret keys and corresponding key shards using BLS (Boneh-Lynn-Shacham)
threshold cryptography.

:param threshold: int, The minimum number of key shards required to reconstruct the master key.
:param key_count: int, The total number of key shards to generate.
:return: Dict[str, any], A dictionary containing the master key and a list of key shards.
"""

try:
return generateKey.generate(threshold, key_count)
except Exception as e:
raise e

@staticmethod
def revokeAccess(address: str, cid: str, auth_token: str, revoke_to: List[str]) -> Dict:
"""
Revokes access to a shared file for specified recipients.

:param address: str, The address of the user initiating the revocation.
:param cid: str, The CID of the file for which access is being revoked.
:param auth_token: str, The authentication token of the user.
:param revoke_to: List[str], A list of addresses for whom access is to be revoked.
:return: Dict, A dictionary indicating the success or failure of the revocation.
"""
try:
return revokeAccess.revoke_access(address, cid, auth_token, revoke_to)
except Exception as e:
raise e

@staticmethod
def getJWT(address: str, payload: str, use_as_refresh_token: bool = False, chain: str = "ALL") -> Dict:
"""
Retrieves a JSON Web Token (JWT) for authentication.

:param address: str, The blockchain address of the user.
:param payload: str, The signed message or refresh token.
:param use_as_refresh_token: bool, If True, payload is treated as a refresh token.
:param chain: str, The blockchain chain (e.g., "ALL", "ETHEREUM").
:return: Dict, A dictionary containing the JWT and refresh token, or an error.
"""

try:
return getJwt.get_jwt(address, payload, use_as_refresh_token, chain)
except Exception as e:
raise e
3 changes: 3 additions & 0 deletions src/lighthouseweb3/functions/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ class Config:
lighthouse_node = "https://node.lighthouse.storage"
lighthouse_bls_node = "https://encryption.lighthouse.storage"
lighthouse_gateway = "https://gateway.lighthouse.storage/ipfs"

is_dev = False
lighthouse_bls_node_dev = "http://enctest.lighthouse.storage"
43 changes: 43 additions & 0 deletions src/lighthouseweb3/functions/encryption/generate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from py_ecc.bls import G2ProofOfPossession as BLS
from py_ecc.optimized_bls12_381 import curve_order
import random
from typing import Dict, List

def int_to_bytes(x: int) -> bytes:
return x.to_bytes(32, byteorder="big")

def eval_poly(poly: List[int], x: int) -> int:
"""Evaluate polynomial at a given point x."""
result = 0
for i, coeff in enumerate(poly):
result = (result + coeff * pow(x, i, curve_order)) % curve_order
return result

def generate(threshold: int = 3, key_count: int = 5) -> Dict[str, any]:
if threshold > key_count:
raise ValueError("threshold must be less than or equal to key_count")

# Generate random polynomial coefficients (secret is constant term)
poly = [random.randint(1, curve_order - 1) for _ in range(threshold)]
master_sk = poly[0] # constant term is master key

shares = []
# Generate random vector IDs, ensuring uniqueness
id_vec = set()
while len(id_vec) < key_count:
id_vec.add(random.randint(1, curve_order - 1))
id_vec = list(id_vec)

for x in id_vec:
y = eval_poly(poly, x)
# Convert index to hex string without '0x' prefix to match JS output
index_hex = hex(x)[2:].zfill(64) # Ensure 64-character hex string
shares.append({
"index": index_hex,
"key": int_to_bytes(y).hex()
})

return {
"masterKey": int_to_bytes(master_sk).hex(),
"keyShards": shares
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from typing import Any
from .utils import api_node_handler

def get_access_condition(cid: str) -> dict[str, Any]:
try:
conditions = api_node_handler(f"/api/fileAccessConditions/get/{cid}", "GET")
return {'data': conditions}
except Exception as error:
raise error
10 changes: 10 additions & 0 deletions src/lighthouseweb3/functions/encryption/get_auth_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Any
from .utils import api_node_handler


def get_auth_message(address: str) -> dict[str, Any]:
try:
response = api_node_handler(f"/api/message/{address}", "GET")
return {'message': response[0]['message'], 'error': None}
except Exception as e:
return {'message': None, 'error':str(e)}
17 changes: 17 additions & 0 deletions src/lighthouseweb3/functions/encryption/get_jwt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .utils import api_node_handler

def get_jwt(address: str, payload: str, use_as_refresh_token: bool = False, chain: str = "ALL") -> dict:
try:
if not use_as_refresh_token:
data = api_node_handler(
"/api/message/get-jwt", "POST", "",
{"address": address, "signature": payload, "chain": chain}
)
else:
data = api_node_handler(
"/api/message/get-jwt", "PUT", "",
{"address": address, "refreshToken": payload}
)
return {"JWT": data["token"], "refreshToken": data["refreshToken"], "error": None}
except Exception as e:
return {"JWT": None, "error": "Invalid Signature"}
29 changes: 29 additions & 0 deletions src/lighthouseweb3/functions/encryption/recover_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import List, Dict
from py_ecc.optimized_bls12_381.optimized_curve import curve_order

def recover_key(shards: List[Dict[str, str]]) -> Dict[str, str]:

try:
# Convert hex strings to integers
x_coords = [int(shard['index'], 16) for shard in shards]
y_coords = [int(shard['key'], 16) for shard in shards]

# Lagrange interpolation to recover the constant term (secret)
def lagrange_interpolate(x: int, x_s: List[int], y_s: List[int], p: int) -> int:
total = 0
for i in range(len(x_s)):
numerator = denominator = 1
for j in range(len(x_s)):
if i != j:
numerator = (numerator * (x - x_s[j])) % p
denominator = (denominator * (x_s[i] - x_s[j])) % p
l = (y_s[i] * numerator * pow(denominator, -1, p)) % p
total = (total + l) % p
return total

# Recover the master key at x=0
master_key = lagrange_interpolate(0, x_coords, y_coords, curve_order)

return {"masterKey": hex(master_key)[2:].zfill(64), "error": None}
except Exception as e:
return {"masterKey": None, "error":e}
Loading