Source code for boatdclient.boatd_client

from __future__ import print_function

try:
    from urllib.request import urlopen, Request
except ImportError:
    from urllib2 import urlopen, Request

from collections import namedtuple
from functools import wraps
import json

from .bearing import Bearing
from .point import Point


Wind = namedtuple('Wind', ['absolute', 'speed', 'apparent'])


class Boatd(object):
    def __init__(self, host='localhost', port=2222):
        '''
        Create a boat instance, connecting to boatd at `host` on port `port`
        '''
        self.host = host
        self.port = port

    def url(self, endpoint):
        '''Return a formatted url pointing at `endpoint` on the boatd server'''
        return 'http://{0}:{1}{2}'.format(self.host, self.port, endpoint)

    def get(self, endpoint):
        '''Return the result of a GET request to `endpoint` on boatd'''
        json_body = urlopen(self.url(endpoint)).read().decode('utf-8')
        return json.loads(json_body)

    def post(self, content, endpoint=''):
        '''
        Issue a POST request with `content` as the body to `endpoint` and
        return the result.
        '''
        url = self.url(endpoint)
        post_content = json.dumps(content).encode('utf-8')
        headers = {'Content-Type': 'application/json'}
        request = Request(url, post_content, headers)

        response = urlopen(request)

        return json.loads(response.read().decode('utf-8'))

    def quit(self):
        content = self.post({'quit': True}, '/')
        print(content)

    @property
    def version(self):
        '''Return the version of boatd'''
        content = self.get('/')
        return content.get('boatd').get('version')


[docs]class Boat(object): ''' A boat controlled by boatd :param auto_update: automatically update properties when they are requested. ''' def __init__(self, boatd=None, auto_update=True): if boatd is None: self.boatd = Boatd() else: self.boatd = boatd self.auto_update = auto_update self._cached_boat = {} def _auto_update(f): @wraps(f) def dec(self) : if self.auto_update: self.update() return f(self) return dec def update(self): self._cached_boat = self.boatd.get('/boat') @property @_auto_update def heading(self): ''' Return the current heading of the boat in degrees. :returns: current bearing :rtype: Bearing ''' content = self._cached_boat return Bearing(float(content.get('heading'))) @property @_auto_update def wind(self): ''' Return the direction of the wind in degrees. :returns: wind object containing direction bearing and speed :rtype: Wind ''' content = self._cached_boat.get('wind') return Wind( Bearing(content.get('absolute')), content.get('speed'), Bearing(content.get('apparent')) ) @property @_auto_update def position(self): ''' Return the current position of the boat. :returns: current position :rtype: Point ''' content = self._cached_boat lat, lon = content.get('position') return Point(lat, lon)
[docs] def set_rudder(self, angle): ''' Set the angle of the rudder to be `angle` degrees. :param angle: rudder angle :type angle: float between -90 and 90 ''' angle = float(angle) request = self.boatd.post({'value': float(angle)}, '/rudder') return request.get('result')
@property @_auto_update def target_rudder_angle(self): ''' Return the current target rudder angle in degrees. :returns: rudder angle :rtype: float ''' content = self._cached_boat return float(content.get('rudder_angle'))
[docs] def set_sail(self, angle): ''' Set the angle of the sail to `angle` degrees :param angle: sail angle :type angle: float between -90 and 90 ''' angle = float(angle) request = self.boatd.post({'value': float(angle)}, '/sail') return request.get('result')
@property @_auto_update def target_sail_angle(self): ''' Return the current target sail angle in degrees. :returns: sail angle :rtype: float ''' content = self._cached_boat return float(content.get('sail_angle'))
[docs]class Behaviour(object): def __init__(self, boatd=None): if boatd is None: self.boatd = Boatd() else: self.boatd = boatd def _get_behaviour_data(self): return self.boatd.get('/behaviours')
[docs] def list(self): '''Return a list of the available behaviours to run.''' return list(self._get_behaviour_data().get('behaviours').keys())
[docs] def start(self, name): ''' End the current behaviour and run a named behaviour. :param name: the name of the behaviour to run :type name: str ''' d = self.boatd.post({'active': name}, endpoint='/behaviours') current = d.get('active') if current is not None: return 'started {}'.format(current) else: return 'no behaviour running'
[docs] def stop(self): ''' Stop the current behaviour. ''' self.start(None)
def get_current_waypoints(boatd=None): ''' Get the current set of waypoints active from boatd. :returns: The current waypoints :rtype: List of Points ''' if boatd is None: boatd = Boatd() content = boatd.get('/waypoints') return [Point(*coords) for coords in content.get('waypoints')] def get_home_position(boatd=None): ''' Get the current home position from boatd. :returns: The configured home position :rtype: Points ''' if boatd is None: boatd = Boatd() content = boatd.get('/waypoints') home = content.get('home', None) if home is not None: lat, lon = home return Point(lat, lon) else: return None if __name__ == '__main__': boat = Boat() print(boat.version) print(boat.heading) print(boat.wind) print(boat.position) print(boat.rudder(0)) print(boat.rudder(10))