Skip to content

feat(nova-cli): adopt to nova cli usage together with robotpad #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# General
.DS_Store
.git
.gitignore
.dockerignore
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
env/
venv/
.venv/
pip-log.txt
pip-delete-this-directory.txt

# Project-specific
README.md
docs/
tests/
*.yml
Dockerfile
*.dockerfile
*.env
.idea/
.vscode/
*.log
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.env
27 changes: 27 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# https://medium.com/@albertazzir/blazing-fast-python-docker-builds-with-poetry-a78a66f5aed0
FROM python:3.9-buster as builder

RUN pip install --upgrade pip \
&& pip install poetry==1.8.3

ENV POETRY_NO_INTERACTION=1 \
POETRY_VIRTUALENVS_IN_PROJECT=1 \
POETRY_VIRTUALENVS_CREATE=1 \
POETRY_CACHE_DIR=/tmp/poetry_cache

WORKDIR /app

COPY pyproject.toml poetry.lock ./

RUN --mount=type=cache,target=$POETRY_CACHE_DIR poetry install --without dev --no-root

FROM python:3.9-slim-buster as runtime

ENV VIRTUAL_ENV=/app/.venv \
PATH="/app/.venv/bin:$PATH"

COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}
WORKDIR /app
COPY . .

ENTRYPOINT ["python", "-m", "gamepad_example"]
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

Small python project to demonstrate the usage of a gamepad

## Usage
## Local Usage
```
poetry install
poetry shell
python src/move_robot.py
python gamepad_example/move_robot.py
```
16 changes: 16 additions & 0 deletions gamepad_example/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import uvicorn
from loguru import logger
import os

from gamepad_example.app import app


def main(host: str = "0.0.0.0", port: int = 3000):
log_level = os.getenv('LOG_LEVEL', 'info')
logger.info("Starting Service...")

# base path is injected/set when running within the wandelbots environment
base_path = os.getenv('BASE_PATH', '')
if len(base_path) > 0:
logger.info("serving with base path '{}'", base_path)
uvicorn.run(app, host=host, port=port, reload=False, log_level=log_level, proxy_headers=True, forwarded_allow_ips='*')
4 changes: 4 additions & 0 deletions gamepad_example/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from gamepad_example import main

if __name__ == "__main__":
main()
94 changes: 94 additions & 0 deletions gamepad_example/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import asyncio
import copy
from typing import List

import wandelbots_api_client as wb
from decouple import config
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.responses import RedirectResponse
from loguru import logger

from gamepad_example.move_robot import move_robot, stop_movement
from gamepad_example.utils import CELL_ID
from gamepad_example.utils import get_api_client
from inputs import get_gamepad

BASE_PATH = config("BASE_PATH", default="", cast=str)
app = FastAPI(title="gamepad_example", root_path=BASE_PATH)

app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


@app.get("/", summary="Redirects to the swagger docs page")
async def root():
# One could serve a nice UI here as well. For simplicity, we just redirect to the swagger docs page.
return RedirectResponse(url=BASE_PATH + "/docs")


@app.get("/app_icon.png", summary="Services the app icon for the homescreen")
async def get_app_icon():
try:
return FileResponse(path="static/app_icon.png", media_type='image/png')
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Icon not found")


@app.post(
"/activate_jogging",
status_code=201,
summary="Moves the robot via jogging with Gamepad input",
description="oves the robot via jogging with Gamepad input")
async def activate_jogging(background_tasks: BackgroundTasks):
logger.info("creating api clients...")
api_client = get_api_client()
motion_group_api = wb.MotionGroupApi(api_client)
controller_api = wb.ControllerApi(api_client)

# get the motion group id
controllers = await controller_api.list_controllers(cell=CELL_ID)

if len(controllers.instances) != 1:
raise HTTPException(
status_code=400,
detail="No or more than one controllers found. Example just works with one controllers. "
"Go to settings app and create one or delete all except one.")
controller_id = controllers.instances[0].controller
logger.info("using controller {}", controller_id)

logger.info("selecting motion group...")
motion_groups = await motion_group_api.list_motion_groups(cell=CELL_ID)
if len(motion_groups.instances) != 1:
raise HTTPException(
status_code=400,
detail="No or more than one motion group found. Example just works with one motion group. "
"Go to settings app and create one or delete all except one.")
motion_group_id = motion_groups.instances[0].motion_group
logger.info("using motion group {}", motion_group_id)

logger.info("activating motion groups...")
await motion_group_api.activate_motion_group(
cell=CELL_ID,
motion_group=motion_group_id)
await controller_api.set_default_mode(cell=CELL_ID, controller=controller_id, mode="MODE_CONTROL")

background_tasks.add_task(move_robot, motion_group_id)
api_client.close()
return {"status": "jogging activated"}


@app.post(
"/deactivate_jogging",
status_code=201,
summary="Stops the robot jogging",
description="stops the robot jogging")
async def deactivate_jogging():
stop_movement()
return {"status": "jogging deactivated"}
119 changes: 119 additions & 0 deletions gamepad_example/move_robot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import asyncio
from inputs import get_gamepad
from loguru import logger

import wandelbots_api_client as wb

from gamepad_example.utils import CELL_ID
from gamepad_example.utils import get_api_client

shall_run = False
max_position_velocity = 100
max_rotation_velocity = 0.5

gamepad_position_direction = wb.models.Vector3d(x=0, y=0, z=0)
gamepad_rotation_direction = wb.models.Vector3d(x=0, y=0, z=0)
gamepad_position_velocity = 0
gamepad_rotation_velocity = 0
gamepad_updated = False


async def read_gamepad():
global gamepad_updated
global gamepad_position_direction, gamepad_position_velocity, max_position_velocity
global gamepad_rotation_direction, gamepad_rotation_velocity, max_rotation_velocity
global shall_run

while True:
if not shall_run:
return
latest_gamepad_events = get_gamepad()
for event in latest_gamepad_events:
if event.ev_type == "Absolute" and event.code == "ABS_X":
gamepad_position_direction.x = event.state / 32767
elif event.ev_type == "Absolute" and event.code == "ABS_Y":
gamepad_position_direction.y = event.state / 32767
elif event.ev_type == "Absolute" and event.code == "ABS_Z":
gamepad_position_direction.z = - event.state / 255
elif event.ev_type == "Absolute" and event.code == "ABS_RZ":
gamepad_position_direction.z = event.state / 255

elif event.ev_type == "Absolute" and event.code == "ABS_RX":
gamepad_rotation_direction.y = event.state / 32767
elif event.ev_type == "Absolute" and event.code == "ABS_RY":
gamepad_rotation_direction.x = -event.state / 32767
elif event.code == "BTN_TL":
gamepad_rotation_direction.z = -event.state
elif event.code == "BTN_TR":
gamepad_rotation_direction.z = event.state
logger.info("read event...")
gamepad_updated = True

gamepad_position_velocity = (
gamepad_position_direction.x ** 2 + gamepad_position_direction.y ** 2 + gamepad_position_direction.z ** 2) ** 0.5
gamepad_position_velocity = gamepad_position_velocity / 1.732 * max_position_velocity
gamepad_rotation_velocity = (
gamepad_rotation_direction.x ** 2 + gamepad_rotation_direction.y ** 2 + gamepad_rotation_direction.z ** 2) ** 0.5
gamepad_rotation_velocity = gamepad_rotation_velocity / 1.732 * max_rotation_velocity

await asyncio.sleep(0) # yield control to other tasks


async def jogging_direction_generator(motion_group_id, responses_from_robot):
async def read_responses(responses_from_robot):
async for response in responses_from_robot:
pass
# print(response)

asyncio.create_task(read_responses(responses_from_robot))

global gamepad_position_direction, gamepad_position_velocity
global gamepad_rotation_direction, gamepad_rotation_velocity
global gamepad_updated
global shall_run
direction_request = wb.models.DirectionJoggingRequest(
motion_group=motion_group_id,
position_direction=wb.models.Vector3d(x=0, y=0, z=0),
rotation_direction=wb.models.Vector3d(x=0, y=0, z=0),
position_velocity=0,
rotation_velocity=0,
response_rate=1000
)

yield direction_request
while True:
if not shall_run:
return

if gamepad_updated:
direction_request.position_direction = gamepad_position_direction
direction_request.rotation_direction = gamepad_rotation_direction
direction_request.position_velocity = gamepad_position_velocity
direction_request.rotation_velocity = gamepad_rotation_velocity
gamepad_updated = False
yield direction_request
await asyncio.sleep(0) # yield control to other tasks


async def move_robot(motion_group_id):
global shall_run

shall_run = True
api_client = get_api_client()
jogging_api = wb.MotionGroupJoggingApi(api_client)

# start the reading from gamepad
asyncio.create_task(read_gamepad())

# start the robot interaction
await jogging_api.direction_jogging(
cell=CELL_ID,
client_request_generator=lambda response_stream: jogging_direction_generator(
motion_group_id, response_stream
))
api_client.close()


def stop_movement():
global shall_run
shall_run = False
65 changes: 65 additions & 0 deletions gamepad_example/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import wandelbots_api_client as wb
import requests
from decouple import config
from requests.auth import HTTPBasicAuth

CELL_ID = config("CELL_ID", default="cell", cast=str)


def get_api_client() -> wb.ApiClient:
"""Creates a new API client for the wandelbots API.
"""
# if there is basic auth required (e.g. when running this service locally)
basic_user = config("NOVA_USERNAME", default=None, cast=str)
basic_pwd = config("NOVA_PASSWORD", default=None, cast=str)

base_url = get_base_url(basic_user, basic_pwd)

client_config = wb.Configuration(host=base_url)
client_config.verify_ssl = False

if basic_user is not None and basic_pwd is not None:
client_config.username = basic_user
client_config.password = basic_pwd

return wb.ApiClient(client_config)


def get_base_url(basic_user, basic_pwd) -> str:
# in-cluster it is the api-gateway service
# when working with a remote instance one needs to provide the host via env variable
api_host = config("WANDELAPI_BASE_URL", default="api-gateway:8080", cast=str)
api_host = api_host.strip()
api_host = api_host.replace("http://", "")
api_host = api_host.replace("https://", "")
api_host = api_host.rstrip("/")
api_base_path = "/api/v1"
protocol = get_protocol(api_host, basic_user, basic_pwd)
if protocol is None:
msg = f"Could not determine protocol for host {api_host}. Make sure the host is reachable."
raise Exception(msg)
return f"{protocol}{api_host}{api_base_path}"


# get the protocol of the host (http or https)
def get_protocol(host, basic_user, basic_pwd) -> str:
api = f"/api/v1/cells/{CELL_ID}/controllers"
auth = None
if basic_user is not None and basic_pwd is not None:
auth = HTTPBasicAuth(basic_user, basic_pwd)

try:
response = requests.get(f"https://{host}{api}", auth=auth, timeout=5)
if response.status_code == 200:
return "https://"
except requests.RequestException:
pass

try:
response = requests.get(f"http://{host}{api}", auth=auth, timeout=5)
if response.status_code == 200:
return "http://"
except requests.RequestException:
pass

return None
Loading