#!/usr/bin/env python3 # vim:set ts=4 sw=4 sts=4 ai et smarttab: # greenctld, a hamlib-compatible driver for the Green Heron Engineering RT-21 # Digital Rotor Controller. The Green Heron serial protocol is documented at # https://www.greenheronengineering.com/prod_documents/controllers/docs/RT-21_Manual_current.pdf # # The TCP network protocol is compatible with the hamlib rotctld protocol, which # gpredict speaks. # # Ported to Python 3. Simplified BSD License. # Copyright (C) 2022 Jeff Moe # # Copyright (c) 2017, Astro Digital, Inc. Released under the terms of the # Simplified BSD License; see the LICENSE file for details. # # -- Michael Toren # Astro Digital, Inc. import socket import serial import argparse import traceback import time import select import sys import re import os class DummyRotor(object): ''' A fake Rotor class, useful for debugging the TCPServer class on any machine, even if the rotor is not physically connected to it. ''' az = 0 el = 0 def set_pos(self, az, el): print('==> %d,%d' % (az, el)) self.az = az self.el = el def get_pos(self): print('<== %d,%d' % (self.az, self.el)) return (self.az, self.el,) def stop(self): print("==> Stop") class GreenHeronRotor(object): ''' Driver for the Green Heron Engineering RT-21 Digital Rotor Controller ''' az_serial = None el_serial = None def __init__(self, az_device, el_device, baud, timeout): self.az_serial = serial.Serial(az_device, baudrate=baud, timeout=timeout) self.el_serial = serial.Serial(el_device, baudrate=baud, timeout=timeout) print('--- Serial timeout set to', timeout) def stop(self): print("==> Stop") self.az_serial.write(';'.encode('utf-8')) self.el_serial.write(';'.encode('utf-8')) def set_pos(self, az, el): print('==> %d,%d' % (az, el)) self.az_serial.write('AP1%03d\r;'.encode('utf-8') % az) self.el_serial.write('AP1%03d\r;'.encode('utf-8') % el) time.sleep(0.1) def __parse_response(self, buf): match = re.match(r'^([0-9][0-9][0-9]);$', buf.decode()) if not match: return -1 ret = match.groups()[0] ret = int(ret) return ret def get_pos(self): ''' On success, returns a tuple of (az, el) On failure, returns False ''' self.az_serial.flushInput() self.el_serial.flushInput() self.az_serial.write(b'AI1;') self.el_serial.write(b'AI1;') az_buf = self.az_serial.read(4) el_buf = self.el_serial.read(4) if len(az_buf) != 4 or len(el_buf) != 4: print('!!! Serial read failure, received %s and %s' % (repr(az_buf), repr(el_buf))) return False az = self.__parse_response(az_buf) el = self.__parse_response(el_buf) if az < 0 or el < 0: print('!!! Failed to parse response, received %s and %s' % (repr(az_buf), repr(el_buf))) return False print('<== %d,%d' % (az, el)) return (az, el,) class TCPServer(object): ''' Implements a subset of the rotctld TCP protocol. gpredict only sends three commands, all of which are supported: - "p" to request the current position - "P " to set the desired position - "q" to quit This driver also supports: - "S" to stop any current movement ''' # A mapping of client fd's -> receive buffers client_buf = {} def __init__(self, port, rotor, ip=''): self.rotor = rotor self.listener = socket.socket() self.listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.listener.bind((ip, port)) self.listener.listen(4) addr = self.listener.getsockname() print('--- Listening for connections on %s:%d' % (addr[0], addr[1])) def close_client(self, fd): self.rotor.stop() try: fd.close() del self.client_buf[fd] except: pass def parse_client_command(self, fd, cmd): cmd = cmd.strip() if cmd == '': return # "\\dump_state" is sent by satnogs hamlib, need to ignore it and send okay state if cmd == '\\dump_state': fd.send(b'RPRT 0\n') return print('<-- %s' % repr(cmd)) # "q", to quit if cmd == 'q': self.close_client(fd) return # "S", to stop the current rotation if cmd == 'S': self.rotor.stop() print('--> RPRT 0') fd.send(b'RPRT 0\n') return # "p", to get current position if cmd == 'p': pos = self.rotor.get_pos() if not pos: print('--> RPRT -6') fd.send(b'RPRT -6\n') else: az, el = pos print('--> %d,%d' % (az, el)) fd.send(b'%.6f\n%.6f\n' % (az, el)) return # "P " to set desired position match = re.match(r'^P\s+([\d.]+)\s+([\d.]+)$', cmd) if match: az = match.groups()[0] el = match.groups()[1] try: az = int(float(az)) el = int(float(el)) except: print('--> RPRT -8 (could not parse)') fd.send(b'RPRT -8\n') return if az == 360: az = 359 if az > 359: print('--> RPRT -1 (az too large)') fd.send(b'RPRT -1\n') return if el > 90: print('--> RPRT -1 (el too large)') fd.send(b'RPRT -1\n') return self.rotor.set_pos(az, el) print('--> RPRT 0') fd.send(b'RPRT 0\n') return # Nothing else is supported print('--> RPRT -4 (unknown command)') fd.send(b'RPRT -4\n') def read_client(self, fd): buf = fd.recv(1024) if len(buf) == 0: print('<-- EOF') self.close_client(fd) return self.client_buf[fd] += buf.decode() while True: cmd, sep, tail = self.client_buf[fd].partition('\n') # Check if a full line of input is present if not sep: return else: self.client_buf[fd] = tail self.parse_client_command(fd, cmd) # Check if the client sent a "q", to quit if fd not in self.client_buf: return def __run_once(self): rlist = [ self.listener ] + list(self.client_buf.keys()) wlist = [] xlist = [] rlist, wlist, xlist = select.select(rlist, wlist, xlist) for fd in rlist: if fd == self.listener: new_fd, addr = self.listener.accept() new_fd.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024*16) new_fd.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024*16) new_fd.setblocking(False) self.client_buf[new_fd] = '' print('<-- Connect %s:%d' % (addr[0], addr[1])) else: try: self.read_client(fd) except Exception as e: print('Unhandled exception, killing client and issuing motor stop command:') traceback.print_exc() self.close_client(fd) print() def loop(self): while True: self.__run_once() if __name__ == '__main__': sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 1) parser = argparse.ArgumentParser() parser.add_argument('--az-device', '-a', type=str, required=True, help='Serial device for azimuth') parser.add_argument('--el-device', '-e', type=str, required=True, help='Serial device for elevation') parser.add_argument('--speed', '-s', type=int, default=4800, help='Serial device speed') parser.add_argument('--timeout', '-t', type=float, default=1.5, help='Serial timeout') parser.add_argument('--port', '-p', type=int, default=4533, help='TCP port') parser.add_argument('--get-pos', '-g', action='store_true', help='Issue get position, and exit; for serial comms test') parser.add_argument('--dummy', action='store_true', help='Use a dummy rotor, not the real serial device') args = parser.parse_args() if args.dummy: rotor = DummyRotor() else: rotor = GreenHeronRotor(args.az_device, args.el_device, args.speed, args.timeout) if args.get_pos: print(rotor.get_pos()) sys.exit(0) server = TCPServer(args.port, rotor) server.loop()