diff --git a/Jenkinsfile b/Jenkinsfile index 5f7e4c7..1cfaf8e 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -8,10 +8,10 @@ node ("small-ec2-fleet") { try{ stage('Deploy to pypi') { - sh 'python3 setup.py sdist' - sh 'pip3 install twine' + sh 'python3 -m pip install hatch twine' + sh 'python3 -m hatch build' withCredentials([usernamePassword(credentialsId: 'python_sdk', usernameVariable: 'USR', passwordVariable: 'PSW')]) { - sh '/home/ec2-user/.local/bin/twine upload -u $USR -p $PSW dist/*' + sh 'python -m twine upload -u $USR -p $PSW dist/*' } } @@ -19,18 +19,16 @@ node ("small-ec2-fleet") { sh """ sudo yum-config-manager --add-repo https://cli.github.com/packages/rpm/gh-cli.repo sudo yum install gh -y - sed -i -r "s/version=\'[0-9].[0-9].[0-9]/version=\'\$(cat version.conf)/g" setup.py - sed -i -r "s/v[0-9].[0-9].[0-9]/v\$(cat version.conf)/g" setup.py """ withCredentials([sshUserPrivateKey(keyFileVariable:'check',credentialsId: 'main-github')]) { sh """ git reset --hard origin/latest - GIT_SSH_COMMAND='ssh -i $check' git checkout -b \$(cat version.conf) - GIT_SSH_COMMAND='ssh -i $check' git push --set-upstream origin \$(cat version.conf) + GIT_SSH_COMMAND='ssh -i $check' git checkout -B \$(make version) + GIT_SSH_COMMAND='ssh -i $check' git push --set-upstream origin \$(make version) """ } withCredentials([string(credentialsId: 'gh_token', variable: 'GH_TOKEN')]) { - sh(script:"""gh release create \$(cat version.conf) --generate-notes""", returnStdout: true) + sh(script:"""gh release create \$(make version) --generate-notes""", returnStdout: true) } } diff --git a/Makefile b/Makefile index 4b923d9..06cb2c9 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,22 @@ -PRE_COMMIT = pre-commit +PYTHON = python +PRE_COMMIT = $(PYTHON) -m pre_commit PRE_COMMIT_RUN_ARGS = --all-files PRE_COMMIT_INSTALL_ARGS = --install-hooks +HATCH = $(PYTHON) -m hatch +HATCH_VERSION = .PHONY: lint lint: $(PRE_COMMIT) run $(PRE_COMMIT_RUN_ARGS) .PHONY: pre-commit-install pre-commit-install: - $(PRE_COMMIT) install $(PRE_COMMIT_INSTALL_ARGS) \ No newline at end of file + $(PRE_COMMIT) install $(PRE_COMMIT_INSTALL_ARGS) + +.PHONY: version +version: + @$(HATCH) version + +.PHONY: bump_version +bump_version: + $(HATCH) version $(HATCH_VERSION) \ No newline at end of file diff --git a/memphis/memphis.py b/memphis/memphis.py index b5abd7b..a06d95e 100644 --- a/memphis/memphis.py +++ b/memphis/memphis.py @@ -631,7 +631,6 @@ async def produce( except Exception as e: raise MemphisError(str(e)) from e - async def fetch_messages( self, station_name: str, @@ -643,8 +642,8 @@ async def fetch_messages( max_msg_deliveries: int = 10, generate_random_suffix: bool = False, start_consume_from_sequence: int = 1, - last_messages: int = -1 - ): + last_messages: int = -1, + ): """Consume a batch of messages. Args:. station_name (str): station name to consume messages from. @@ -669,7 +668,18 @@ async def fetch_messages( if consumer_map_key in self.consumers_map: consumer = self.consumers_map[consumer_map_key] else: - consumer = await self.consumer(station_name=station_name, consumer_name=consumer_name, consumer_group=consumer_group, batch_size=batch_size, batch_max_time_to_wait_ms=batch_max_time_to_wait_ms, max_ack_time_ms=max_ack_time_ms, max_msg_deliveries=max_msg_deliveries, generate_random_suffix=generate_random_suffix, start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages) + consumer = await self.consumer( + station_name=station_name, + consumer_name=consumer_name, + consumer_group=consumer_group, + batch_size=batch_size, + batch_max_time_to_wait_ms=batch_max_time_to_wait_ms, + max_ack_time_ms=max_ack_time_ms, + max_msg_deliveries=max_msg_deliveries, + generate_random_suffix=generate_random_suffix, + start_consume_from_sequence=start_consume_from_sequence, + last_messages=last_messages, + ) messages = await consumer.fetch(batch_size) if messages == None: messages = [] @@ -1023,7 +1033,6 @@ def default_error_handler(e): print("ping exception raised", e) - class Consumer: def __init__( self, @@ -1061,7 +1070,6 @@ def __init__( self.dls_callback_func = None self.t_dls = asyncio.create_task(self.__consume_dls()) - def set_context(self, context): """Set a context (dict) that will be passed to each message handler call.""" self.context = context @@ -1112,10 +1120,12 @@ async def __consume_dls(self): ) async for msg in self.consumer_dls.messages: index_to_insert = self.dls_current_index - if index_to_insert>=10000: - index_to_insert%=10000 - self.dls_messages.insert(index_to_insert, Message(msg, self.connection, self.consumer_group)) - self.dls_current_index+=1 + if index_to_insert >= 10000: + index_to_insert %= 10000 + self.dls_messages.insert( + index_to_insert, Message(msg, self.connection, self.consumer_group) + ) + self.dls_current_index += 1 if self.dls_callback_func != None: await self.dls_callback_func( [Message(msg, self.connection, self.consumer_group)], @@ -1132,7 +1142,7 @@ async def fetch(self, batch_size: int = 10): if self.connection.is_connection_active: try: self.batch_size = batch_size - if len(self.dls_messages)>0: + if len(self.dls_messages) > 0: if len(self.dls_messages) <= batch_size: messages = self.dls_messages self.dls_messages = [] @@ -1151,7 +1161,7 @@ async def fetch(self, batch_size: int = 10): subject = get_internal_name(self.station_name) consumer_group = get_internal_name(self.consumer_group) self.psub = await self.connection.broker_connection.pull_subscribe( - subject + ".final", durable=durableName + subject + ".final", durable=durableName ) msgs = await self.psub.fetch(batch_size) for msg in msgs: @@ -1217,9 +1227,8 @@ async def ack(self): await self.message.ack() except Exception as e: if ( - "$memphis_pm_id" - in self.message.headers and "$memphis_pm_sequence" - in self.message.headers + "$memphis_pm_id" in self.message.headers + and "$memphis_pm_sequence" in self.message.headers ): try: msg = { diff --git a/pyproject.toml b/pyproject.toml index 0d8e5f4..8115481 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,54 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "memphis-py" +dynamic = ["version"] +description = "A powerful messaging platform for modern developers" +readme = "README.md" +license = "Apache-2.0" +authors = [ + { name = "Memphis.dev", email = "team@memphis.dev" }, +] +keywords = [ + "data", + "devtool", + "message broker", + "streaming", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: GNU General Public License (GPL)", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Software Development", +] +dependencies = [ + "asyncio", + "graphql-core", + "jsonschema", + "nats-py", + "protobuf", +] + +[project.urls] +Homepage = "https://github.com/memphisdev/memphis.py" + +[tool.hatch.version] +path = "version.conf" +pattern = ''' *([ \'"])?v?(?P.+?)\1''' + +[tool.hatch.build.targets.sdist] +include = [ + "/memphis", +] + [tool.isort] profile = "black" known_third_party = ["memphis"] \ No newline at end of file diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 224a779..0000000 --- a/setup.cfg +++ /dev/null @@ -1,2 +0,0 @@ -[metadata] -description-file = README.md \ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index d53182d..0000000 --- a/setup.py +++ /dev/null @@ -1,35 +0,0 @@ -from pathlib import Path - -from setuptools import setup - -this_directory = Path(__file__).parent -long_description = (this_directory / "README.md").read_text() - -setup( - name="memphis-py", - packages=["memphis"], - version="0.3.2", - license="Apache-2.0", - description="A powerful messaging platform for modern developers", - long_description=long_description, - long_description_content_type="text/markdown", - readme="README.md", - author="Memphis.dev", - author_email="team@memphis.dev", - url="https://github.com/memphisdev/memphis.py", - download_url="https://github.com/memphisdev/memphis.py/archive/refs/tags/v0.3.2.tar.gz", - keywords=["message broker", "devtool", "streaming", "data"], - install_requires=["asyncio", "nats-py", "protobuf", "jsonschema", "graphql-core"], - classifiers=[ - "Development Status :: 4 - Beta", - "Intended Audience :: Developers", - "Topic :: Software Development", - "License :: OSI Approved :: GNU General Public License (GPL)", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - ], -) diff --git a/version.conf b/version.conf index 9fc80f9..3082796 100644 --- a/version.conf +++ b/version.conf @@ -1 +1 @@ -0.3.2 \ No newline at end of file +"0.3.2" \ No newline at end of file