Skip to content

migrated package configurations to use hatch #117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,27 @@ node ("small-ec2-fleet") {
try{

stage('Deploy to pypi') {
sh 'python3 setup.py sdist'
sh 'pip3 install twine'
sh 'python3 -m pip install hatch twine'
sh 'python3 -m hatch build'
withCredentials([usernamePassword(credentialsId: 'python_sdk', usernameVariable: 'USR', passwordVariable: 'PSW')]) {
sh '/home/ec2-user/.local/bin/twine upload -u $USR -p $PSW dist/*'
sh 'python -m twine upload -u $USR -p $PSW dist/*'
}
}

stage('Checkout to version branch'){
sh """
sudo yum-config-manager --add-repo https://cli.github.com/packages/rpm/gh-cli.repo
sudo yum install gh -y
sed -i -r "s/version=\'[0-9].[0-9].[0-9]/version=\'\$(cat version.conf)/g" setup.py
sed -i -r "s/v[0-9].[0-9].[0-9]/v\$(cat version.conf)/g" setup.py
"""
withCredentials([sshUserPrivateKey(keyFileVariable:'check',credentialsId: 'main-github')]) {
sh """
git reset --hard origin/latest
GIT_SSH_COMMAND='ssh -i $check' git checkout -b \$(cat version.conf)
GIT_SSH_COMMAND='ssh -i $check' git push --set-upstream origin \$(cat version.conf)
GIT_SSH_COMMAND='ssh -i $check' git checkout -B \$(make version)
GIT_SSH_COMMAND='ssh -i $check' git push --set-upstream origin \$(make version)
"""
}
withCredentials([string(credentialsId: 'gh_token', variable: 'GH_TOKEN')]) {
sh(script:"""gh release create \$(cat version.conf) --generate-notes""", returnStdout: true)
sh(script:"""gh release create \$(make version) --generate-notes""", returnStdout: true)
}
}

Expand Down
15 changes: 13 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
PRE_COMMIT = pre-commit
PYTHON = python
PRE_COMMIT = $(PYTHON) -m pre_commit
PRE_COMMIT_RUN_ARGS = --all-files
PRE_COMMIT_INSTALL_ARGS = --install-hooks

HATCH = $(PYTHON) -m hatch
HATCH_VERSION =
.PHONY: lint
lint:
$(PRE_COMMIT) run $(PRE_COMMIT_RUN_ARGS)

.PHONY: pre-commit-install
pre-commit-install:
$(PRE_COMMIT) install $(PRE_COMMIT_INSTALL_ARGS)
$(PRE_COMMIT) install $(PRE_COMMIT_INSTALL_ARGS)

.PHONY: version
version:
@$(HATCH) version

.PHONY: bump_version
bump_version:
$(HATCH) version $(HATCH_VERSION)
39 changes: 24 additions & 15 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,6 @@ async def produce(
except Exception as e:
raise MemphisError(str(e)) from e


async def fetch_messages(
self,
station_name: str,
Expand All @@ -643,8 +642,8 @@ async def fetch_messages(
max_msg_deliveries: int = 10,
generate_random_suffix: bool = False,
start_consume_from_sequence: int = 1,
last_messages: int = -1
):
last_messages: int = -1,
):
"""Consume a batch of messages.
Args:.
station_name (str): station name to consume messages from.
Expand All @@ -669,7 +668,18 @@ async def fetch_messages(
if consumer_map_key in self.consumers_map:
consumer = self.consumers_map[consumer_map_key]
else:
consumer = await self.consumer(station_name=station_name, consumer_name=consumer_name, consumer_group=consumer_group, batch_size=batch_size, batch_max_time_to_wait_ms=batch_max_time_to_wait_ms, max_ack_time_ms=max_ack_time_ms, max_msg_deliveries=max_msg_deliveries, generate_random_suffix=generate_random_suffix, start_consume_from_sequence=start_consume_from_sequence, last_messages=last_messages)
consumer = await self.consumer(
station_name=station_name,
consumer_name=consumer_name,
consumer_group=consumer_group,
batch_size=batch_size,
batch_max_time_to_wait_ms=batch_max_time_to_wait_ms,
max_ack_time_ms=max_ack_time_ms,
max_msg_deliveries=max_msg_deliveries,
generate_random_suffix=generate_random_suffix,
start_consume_from_sequence=start_consume_from_sequence,
last_messages=last_messages,
)
messages = await consumer.fetch(batch_size)
if messages == None:
messages = []
Expand Down Expand Up @@ -1023,7 +1033,6 @@ def default_error_handler(e):
print("ping exception raised", e)



class Consumer:
def __init__(
self,
Expand Down Expand Up @@ -1061,7 +1070,6 @@ def __init__(
self.dls_callback_func = None
self.t_dls = asyncio.create_task(self.__consume_dls())


def set_context(self, context):
"""Set a context (dict) that will be passed to each message handler call."""
self.context = context
Expand Down Expand Up @@ -1112,10 +1120,12 @@ async def __consume_dls(self):
)
async for msg in self.consumer_dls.messages:
index_to_insert = self.dls_current_index
if index_to_insert>=10000:
index_to_insert%=10000
self.dls_messages.insert(index_to_insert, Message(msg, self.connection, self.consumer_group))
self.dls_current_index+=1
if index_to_insert >= 10000:
index_to_insert %= 10000
self.dls_messages.insert(
index_to_insert, Message(msg, self.connection, self.consumer_group)
)
self.dls_current_index += 1
if self.dls_callback_func != None:
await self.dls_callback_func(
[Message(msg, self.connection, self.consumer_group)],
Expand All @@ -1132,7 +1142,7 @@ async def fetch(self, batch_size: int = 10):
if self.connection.is_connection_active:
try:
self.batch_size = batch_size
if len(self.dls_messages)>0:
if len(self.dls_messages) > 0:
if len(self.dls_messages) <= batch_size:
messages = self.dls_messages
self.dls_messages = []
Expand All @@ -1151,7 +1161,7 @@ async def fetch(self, batch_size: int = 10):
subject = get_internal_name(self.station_name)
consumer_group = get_internal_name(self.consumer_group)
self.psub = await self.connection.broker_connection.pull_subscribe(
subject + ".final", durable=durableName
subject + ".final", durable=durableName
)
msgs = await self.psub.fetch(batch_size)
for msg in msgs:
Expand Down Expand Up @@ -1217,9 +1227,8 @@ async def ack(self):
await self.message.ack()
except Exception as e:
if (
"$memphis_pm_id"
in self.message.headers and "$memphis_pm_sequence"
in self.message.headers
"$memphis_pm_id" in self.message.headers
and "$memphis_pm_sequence" in self.message.headers
):
try:
msg = {
Expand Down
51 changes: 51 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,54 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "memphis-py"
dynamic = ["version"]
description = "A powerful messaging platform for modern developers"
readme = "README.md"
license = "Apache-2.0"
authors = [
{ name = "Memphis.dev", email = "team@memphis.dev" },
]
keywords = [
"data",
"devtool",
"message broker",
"streaming",
]
classifiers = [
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License (GPL)",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Software Development",
]
dependencies = [
"asyncio",
"graphql-core",
"jsonschema",
"nats-py",
"protobuf",
]

[project.urls]
Homepage = "https://github.com/memphisdev/memphis.py"

[tool.hatch.version]
path = "version.conf"
pattern = ''' *([ \'"])?v?(?P<version>.+?)\1'''

[tool.hatch.build.targets.sdist]
include = [
"/memphis",
]

[tool.isort]
profile = "black"
known_third_party = ["memphis"]
2 changes: 0 additions & 2 deletions setup.cfg

This file was deleted.

35 changes: 0 additions & 35 deletions setup.py

This file was deleted.

2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.2
"0.3.2"