#
#
Kevin Harrington madhephaestus
https://github.com/madhephaestus
I am a roboticist who likes to engineer tools that make the process of designing
robots easier. I build free and open source robotics development software.
#
#
Maxim Kulkin
maximkulkin / lewansoul-lx16a (70 lines) – Git 2020aug18
https://github.com/maximkulkin/lewansoul-lx16a
#
#
#
#
#
@madhephaestus
madhephaestus Moving ID read/write to the bus
Latest commit c2f8a66 on Jun 20
History
1 contributor
45 lines (41 sloc) 1.17 KB
include
include
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
void setup() {
servoBus.begin(&Serial1,
1,// on TX pin 1
2);// use pin 2 as the TX flag for buffer
Serial.begin(115200);
servoBus.debug(true);
Serial.println(“Beginning Servo Example”);
}
void loop() {
int divisor =4;
for (int i = 0; i < 1000/divisor; i++) { long start = millis(); uint16_t angle = i * 24*divisor; int16_t pos = 0; pos = servo.pos_read(); Serial.printf(“\n\nPosition at %d -> %s\n”, pos,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
do {
servo.move_time(angle, 10*divisor);
} while (!servo.isCommandOk());
Serial.printf("Move to %d -> %s\n", angle,
servo.isCommandOk() ? "OK" : "\n\nERR!!\n\n");
Serial.println("Voltage = " + String(servo.vin()));
Serial.println("Temp = " + String(servo.temp()));
Serial.println("ID = " + String(servo.id_read()));
Serial.println("Motor Mode = " + String(servo.readIsMotorMode()));
long took = millis()-start;
long time = (10*divisor)-took;
if(time>0)
delay(time);
else{
Serial.println("Real Time broken, took: "+String(took));
}
}
servo.move_time(0, 3000);
delay(3000);
}
#
#
#
#
#
@madhephaestus
madhephaestus Tested working calibration protocol
Latest commit dd164c4 on May 19
History
1 contributor
126 lines (111 sloc) 3.49 KB
include
include
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
LX16AServo servo2(&servoBus, 2);
LX16AServo servo3(&servoBus, 3);
int16_t startingAngles []= {-9000,0,9000};
bool calibrationDone = false;
define HOME_SWITCH_PIN 32
int divisor = 3;
void setup() {
while(!Serial);
delay(1000);
Serial.begin(115200);
Serial.println(“Starting”);
servoBus.begin(&Serial1, 1, // on TX pin 1
2); // use pin 2 as the TX flag for buffer
servoBus.retry = 2; // enforce synchronous real time
//servoBus.debug(true);
Serial.println("Beginning Coordinated Servo Example");
pinMode(HOME_SWITCH_PIN, INPUT_PULLUP);
servo.disable();
servo2.disable();
servo3.disable();
}
// 40ms trajectory planning loop seems the most stable
void interp(int i){
long start = millis();
servoBus.move_sync_start();
int32_t pos = servo.pos_read();
int32_t pos2 = servo2.pos_read();
int32_t pos3 = servo3.pos_read();
int32_t angle = (i * 9 * divisor)-4500;
int timingOffset = millis()-start;
servo2.move_time_and_wait_for_sync(angle+4500, 10 * divisor+timingOffset);
servo3.move_time_and_wait_for_sync(-4500-(angle), 10 * divisor+timingOffset);
servo.move_time_and_wait_for_sync(angle*2, 10 * divisor+timingOffset);
//if(abs(pos2-pos)>100){
// Serial.printf(“\n\nPosition at %d, %d and %d-> %s\n”, pos, pos2,pos3,
// servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
// Serial.printf(“Move to %d -> %s\n”, angle,
// servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
//}
long took = millis() – start;
long time = (10 * divisor) - took;
if (time > 0)
delay(time);
else {
Serial.println("Real Time broken, took: " + String(took));
}
}
void loop() {
bool HOME = !digitalRead(HOME_SWITCH_PIN);
if(!calibrationDone){
Serial.println("Homing pin "+String(HOME));
if(HOME){
Serial.println("Calibrating 1");
servo.calibrate(startingAngles[0],-9000,9000);
Serial.println("Calibrating 2");
servo2.calibrate(startingAngles[1],-2512,10000);
Serial.println("Calibrating 3");
servo3.calibrate(startingAngles[2],-9000,9000);
Serial.println("Limits read");
}
Serial.println("Read 1");
int32_t pos = servo.pos_read();
Serial.println("Read 2");
int32_t pos2 = servo2.pos_read();
Serial.println("Read 3");
int32_t pos3 = servo3.pos_read();
Serial.printf("\n\nPosition at %d, %d and %d-> %s\n", pos, pos2, pos3,
servo.isCommandOk() ? "OK" : "\n\nERR!!\n\n");
if(pos==startingAngles[0]&&
pos2==startingAngles[1]&&
pos3==startingAngles[2]&&
HOME){
calibrationDone=true;
Serial.println("Calibration Done!");
servo.move_time(0, 2000);
delay(2000);
servo2.move_time_and_wait_for_sync(0, 2000);
servo3.move_time_and_wait_for_sync(0, 2000);
servo.move_time_and_wait_for_sync(-9000, 2000);
servoBus.move_sync_start();
delay(2000);
}
}
if(!calibrationDone){
delay(50);
return;
}
if(HOME){
int32_t pos = servo.pos_read();
int32_t pos2 = servo2.pos_read();
int32_t pos3 = servo3.pos_read();
Serial.printf("\n\nMAX at %d, %d and %d", servo.getMaxCentDegrees(), servo2.getMaxCentDegrees(), servo3.getMaxCentDegrees());
Serial.printf("\nPos at %d, %d and %d", pos, pos2, pos3);
Serial.printf("\nmin at %d, %d and %d", servo.getMinCentDegrees(), servo2.getMinCentDegrees(), servo3.getMinCentDegrees());
//
delay(50);
return;
}
for (int i = 0; i < 1000 / divisor; i++) {
interp( i);
}
Serial.println("Interpolated Set pos done, not long set");
for (int i = 1000 / divisor; i >0; i--) {
interp( i);
}
Serial.println("Loop resetting");
}
#
#
#
#
#
33 lines (28 sloc) 797 Bytes
include
include
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
void setup() {
servoBus.begin(&Serial1,
1,// on TX pin 1
2);// use pin 2 as the TX flag for buffer
Serial.begin(115200);
}
void loop() {
for (int i = -10; i < 10; i++) { int16_t pos = 0; pos = servo.pos_read(); Serial.printf(“\n\nPosition at %d -> %s\n”, pos,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
int16_t angle = i * 100;
servo.motor_mode(angle);
Serial.printf("Move to %d -> %s\n", angle,
servo.isCommandOk() ? "OK" : "\n\nERR!!\n\n");
Serial.println("Voltage = " + String(servo.vin()));
Serial.println("Temp = " + String(servo.temp()));
Serial.println("ID = " + String(servo.id_read()));
delay(500);
}
servo.move_time(0, 500);
delay(5000);
}
#
#
#
#
79 lines (67 sloc) 2.14 KB
include
include
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
LX16AServo servo2(&servoBus, 2);
LX16AServo servo3(&servoBus, 3);
void setup() {
servoBus.begin(&Serial1, 1, // on TX pin 1
2); // use pin 2 as the TX flag for buffer
Serial.begin(115200);
servoBus.retry = 1; // enforce synchronous real time
servoBus.debug(true);
Serial.println(“Beginning Coordinated Servo Example”);
}
// 40ms trajectory planning loop seems the most stable
void loop() {
int divisor = 3;
for (int i = 0; i < 1000 / divisor; i++) {
long start = millis();
int16_t pos = 0;
pos = servo.pos_read();
int16_t pos2 = servo2.pos_read();
int16_t pos3 = servo3.pos_read();
uint16_t angle = i * 24 * divisor;
servo2.move_time_and_wait_for_sync(angle, 10 * divisor);
servo3.move_time_and_wait_for_sync(angle, 10 * divisor);
servo.move_time_and_wait_for_sync(angle, 10 * divisor);
servoBus.move_sync_start();
//if(abs(pos2-pos)>100){
Serial.printf("\n\nPosition at %d and %d-> %s\n", pos, pos2,
servo.isCommandOk() ? "OK" : "\n\nERR!!\n\n");
Serial.printf("Move to %d -> %s\n", angle,
servo.isCommandOk() ? "OK" : "\n\nERR!!\n\n");
//}
long took = millis() - start;
long time = (10 * divisor) - took;
if (time > 0)
delay(time);
else {
Serial.println("Real Time broken, took: " + String(took));
}
}
Serial.println("Interpolated Set pos done, not long set");
servoBus.retry = 5; // These commands must arrive
servo.move_time(0, 10000);
servo2.move_time(0, 10000);
servo3.move_time(0, 10000);
servoBus.retry = 0; // Back to low latency mode
for (int i = 0; i < 1000 / divisor; i++) {
long start = millis();
int16_t pos = 0;
pos = servo.pos_read();
int16_t pos2 = servo2.pos_read();
int16_t pos3 = servo3.pos_read();
Serial.printf("\n\nPosition at %d and %d\n", pos, pos2);
Serial.println("Voltage = " + String(servo.vin()));
Serial.println("Temp = " + String(servo.temp()));
long took = millis() - start;
long time = (10 * divisor) - took;
if (time > 0)
delay(time);
else {
Serial.println("Real Time broken, took: " + String(took));
}
}
Serial.println("Loop resetting");
}
#
#
#
#
#
21 lines (18 sloc) 505 Bytes
include
include
LX16ABus servoBus;
LX16AServo servo(&servoBus, LX16A_BROADCAST_ID);// send these commands to any motor on the bus
int id =3;
void setup() {
servoBus.begin(&Serial1,
1,// on TX pin 1
2);// use pin 2 as the TX flag for buffer
Serial.begin(115200);
}
void loop() {
// Set any motor plugged in to ID 3
// this INO acts as an auto-provisioner for any motor plugged in
servo.id_write(id);
Serial.println(“Setting to ID “+String (id));
delay(200);
}
#
#
#
#
#
#
#
#
#
#
#
maximkulkin / lewansoul-lx16a (70 lines) – Git 2020aug18
https://github.com/maximkulkin/lewansoul-lx16a
README.md
LewanSoul LX-16A servos driver & GUI
LewanSoul LX-16A is a wonderful servos although using them requires implementing custom serial protocol and you need to configure them before use (set unique IDs for all servos, configur limits, etc.). Company provides a tool for that, but it works only on Windows.
This project aims to fill the gap for non-Windows users. It consists of two parts: a servo library and a GUI tool. Servo library implements an interface to talk to servos using their protocol and GUI provides convenient way to configure them.
alt text
Installation
Just install both packages to use them:
( cd lewansoul-lx16a && python setup.py install )
( cd lewansoul-lx16a-terminal && python setup.py install )
To launch GUI use lewansoul-lx16a-terminal command.
Code example
import serial
import lewansoul_lx16a
SERIAL_PORT = ‘/dev/tty.wchusbserial1410’
controller = lewansoul_lx16a.ServoController(
serial.Serial(SERIAL_PORT, 115200, timeout=1),
)
control servos directly
controller.move(1, 100) # move servo ID=1 to position 100
or through proxy objects
servo1 = controller.servo(1)
servo2 = controller.servo(2)
servo1.move(100)
synchronous move of multiple servos
servo1.move_prepare(300)
servo2.move_prepare(600)
controller.move_start()
Example of controlling servos through Bus Servo Controller:
import serial
import time
from lewansoul_lx16a_controller import ServoController
logging.basicConfig(level=logging.DEBUG)
s = serial.Serial(‘/dev/tty.usbserial-00000000’, 9600, timeout=2)
c = ServoController(s, timeout=5)
print(c.get_battery_voltage())
servo1_id = 3
servo2_id = 5
print(c.get_positions([servo1_id, servo2_id]))
c.move({servo1_id: 1000, servo2_id: 500}, time=300)
time.sleep(0.3)
c.move({servo1_id: 800, servo2_id: 500}, time=2000)
time.sleep(2)
c.unload([servo1_id, servo2_id])
time.sleep(0.1) # important not to close serial connection immediately
#
#
#
#
#
lewansoul_lx16a_terminal.py – 700 lines
https://github.com/maximkulkin/lewansoul-lx16a/blob/master/lewansoul-lx16a-terminal/lewansoul_lx16a_terminal.py
!/usr/bin/env python3
“””
Terminal GUI for LewanSoul LX-16A servos
Author: Maxim Kulkin
Website: https://github.com/maximkulkin/lewansoul-lx16a
“””
import sys
from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal
from PyQt5.QtWidgets import (QWidget, QApplication, QDialog, QMessageBox, QListWidgetItem)
from PyQt5.uic import loadUi as _loadUi
from collections import namedtuple
import pkg_resources
import logging
import serial
from serial.tools.list_ports import comports
import lewansoul_lx16a
def loadUi(path, widget):
real_path = pkg_resources.resource_filename(‘lewansoul_lx16a_terminal’, path)
_loadUi(real_path, widget)
ServoConfiguration = namedtuple(‘ServoConfiguration’, [
‘servo_id’, ‘position_limits’, ‘voltage_limits’, ‘max_temperature’, ‘position_offset’,
])
ServoState = namedtuple(‘ServoState’, [
‘servo_id’, ‘voltage’, ‘temperature’,
‘mode’, ‘position’, ‘speed’, ‘motor_on’, ‘led_on’, ‘led_errors’,
])
class ConfigureIdDialog(QDialog):
def init(self):
super(ConfigureIdDialog, self).init()
loadUi('resources/ConfigureId.ui', self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def servoId(self):
return self.servoIdEdit.value()
@servoId.setter
def servoId(self, value):
self.servoIdEdit.setValue(value)
class ConfigurePositionLimitsDialog(QDialog):
def init(self):
super(ConfigurePositionLimitsDialog, self).init()
loadUi('resources/ConfigurePositionLimits.ui', self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def minPosition(self):
return self.rangeMin.value()
@minPosition.setter
def minPosition(self, value):
self.rangeMin.setValue(value)
@property
def maxPosition(self):
return self.rangeMax.value()
@maxPosition.setter
def maxPosition(self, value):
self.rangeMax.setValue(value)
class ConfigureVoltageLimitsDialog(QDialog):
def init(self):
super(ConfigureVoltageLimitsDialog, self).init()
loadUi('resources/ConfigureVoltageLimits.ui', self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def minVoltage(self):
return self.rangeMin.value()
@minVoltage.setter
def minVoltage(self, value):
self.rangeMin.setValue(value)
@property
def maxVoltage(self):
return self.rangeMax.value()
@maxVoltage.setter
def maxVoltage(self, value):
self.rangeMax.setValue(value)
class ConfigureMaxTemperatureDialog(QDialog):
def init(self):
super(ConfigureMaxTemperatureDialog, self).init()
loadUi('resources/ConfigureMaxTemperature.ui', self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def maxTemperature(self):
return self.maxTemperatureEdit.value()
@maxTemperature.setter
def maxTemperature(self, value):
self.maxTemperatureEdit.setValue(value)
class ConfigurePositionOffsetDialog(QDialog):
def init(self, servo):
super(ConfigurePositionOffsetDialog, self).init()
self._servo = servo
loadUi('resources/ConfigurePositionOffset.ui', self)
self.positionOffsetSlider.valueChanged.connect(self._on_slider_change)
self.positionOffsetEdit.valueChanged.connect(self._on_edit_change)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
def _update_servo(self, deviation):
self._servo.set_position_offset(deviation)
def _on_slider_change(self, position):
self.positionOffsetEdit.setValue(position)
self._update_servo(position)
def _on_edit_change(self, position):
self.positionOffsetSlider.setValue(position)
self._update_servo(position)
@property
def positionOffset(self):
return self.positionOffsetEdit.value()
@positionOffset.setter
def positionOffset(self, value):
self.positionOffsetEdit.setValue(value)
class ServoScanThread(QThread):
servoFound = pyqtSignal(int)
def __init__(self, controller):
super(ServoScanThread, self).__init__()
self._controller = controller
def _servo_exists(self, id):
try:
servo_id = self._controller.get_servo_id(id, timeout=0.2)
return (servo_id == id)
except TimeoutError:
return False
def run(self):
for servoId in range(1, 253):
if self.isInterruptionRequested():
break
try:
if self._servo_exists(servoId):
self.servoFound.emit(servoId)
except lewansoul_lx16a.TimeoutError:
pass
self.yieldCurrentThread()
class GetServoConfigurationThread(QThread):
servoConfigurationUpdated = pyqtSignal(ServoConfiguration)
servoConfigurationTimeout = pyqtSignal()
MAX_RETRIES = 5
def __init__(self, servo):
super(GetServoConfigurationThread, self).__init__()
self._servo = servo
@property
def servo_id(self):
return self._servo.servo_id
def run(self):
try:
retries = 0
while True:
if self.isInterruptionRequested():
return
try:
position_limits = self._servo.get_position_limits()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
while True:
if self.isInterruptionRequested():
return
try:
voltage_limits = self._servo.get_voltage_limits()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
while True:
if self.isInterruptionRequested():
return
try:
max_temperature = self._servo.get_max_temperature_limit()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
while True:
if self.isInterruptionRequested():
return
try:
position_offset = self._servo.get_position_offset()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
self.servoConfigurationUpdated.emit(ServoConfiguration(
servo_id=self.servo_id,
position_limits=position_limits,
voltage_limits=voltage_limits,
max_temperature=max_temperature,
position_offset=position_offset,
))
except lewansoul_lx16a.TimeoutError:
self.servoConfigurationTimeout.emit()
class ServoMonitorThread(QThread):
servoStateUpdated = pyqtSignal(ServoState)
def __init__(self, servo):
super(ServoMonitorThread, self).__init__()
self._servo = servo
def run(self):
while not self.isInterruptionRequested():
try:
voltage = self._servo.get_voltage()
temperature = self._servo.get_temperature()
mode = self._servo.get_mode()
if mode == 0:
position, speed = self._servo.get_position(), 0
else:
position, speed = 0, self._servo.get_motor_speed()
motor_on = self._servo.is_motor_on()
led_on = self._servo.is_led_on()
led_errors = self._servo.get_led_errors()
self.servoStateUpdated.emit(ServoState(
servo_id=self._servo.servo_id,
voltage=voltage,
temperature=temperature,
mode=mode,
position=position,
speed=speed,
motor_on=motor_on,
led_on=led_on,
led_errors=led_errors,
))
except lewansoul_lx16a.TimeoutError:
pass
self.sleep(1)
class Terminal(QWidget):
logger = logging.getLogger(‘lewansoul.terminal’)
def __init__(self):
super(Terminal, self).__init__()
self.connection = False
self.selected_servo_id = False
self.servo = None
self._servo_initialization = False
self._available_ports = []
loadUi('resources/ServoTerminal.ui', self)
self.configureIdButton.clicked.connect(self._configure_servo_id)
self.configurePositionLimitsButton.clicked.connect(self._configure_position_limits)
self.configureVoltageLimitsButton.clicked.connect(self._configure_voltage_limits)
self.configureMaxTemperatureButton.clicked.connect(self._configure_max_temperature)
self.configurePositionOffsetButton.clicked.connect(self._configure_position_offset)
self.portCombo.currentTextChanged.connect(self._on_port_change)
self.refreshPortsButton.clicked.connect(self._refresh_ports)
self.servoList.currentItemChanged.connect(lambda curItem, prevItem: self._on_servo_selected(curItem))
self.scanServosButton.clicked.connect(self._scan_servos)
self.servoOrMotorSwitch.valueChanged.connect(self._on_servo_motor_switch)
self.speedSlider.valueChanged.connect(self._on_speed_slider_change)
self.speedEdit.valueChanged.connect(self._on_speed_edit_change)
self.positionSlider.valueChanged.connect(self._on_position_slider_change)
self.positionEdit.valueChanged.connect(self._on_position_edit_change)
self.motorOnButton.clicked.connect(self._on_motor_on_button)
self.ledOnButton.clicked.connect(self._on_led_on_button)
self.clearLedErrorsButton.clicked.connect(self._on_clear_led_errors_button)
self._servoScanThread = None
self._servoReadConfigurationThread = None
self._servoStateMonitorThread = None
self.connectionGroup.setEnabled(False)
self._refresh_ports()
self._on_servo_selected(None)
self.show()
def _refresh_ports(self):
old_text = self.portCombo.currentText()
self._available_ports = sorted(comports())
self.portCombo.blockSignals(True)
self.portCombo.clear()
self.portCombo.addItem('')
self.portCombo.addItems([port.device for port in self._available_ports])
self.portCombo.setCurrentText(old_text)
self.portCombo.blockSignals(False)
if self.portCombo.currentText() != old_text:
self._on_port_change(self.portCombo.currentIndex())
def _on_port_change(self, portIdx):
self._connect_to_port(self.portCombo.currentText())
def _connect_to_port(self, device):
if self.connection:
self.connection.close()
self.logger.info('Disconnected')
self.connection = None
self.controller = None
self.connectionGroup.setEnabled(False)
self.servoList.clear()
self._on_servo_selected(None)
if device:
self.logger.info('Connecting to port %s' % device)
try:
self.connection = serial.Serial(device, 115200, timeout=1)
self.controller = lewansoul_lx16a.ServoController(self.connection, timeout=1)
self.connectionGroup.setEnabled(True)
self.logger.info('Connected to {}'.format(device))
except serial.serialutil.SerialException as e:
self.logger.error('Failed to connect to port {}'.format(device))
QMessageBox.critical(self, "Connection error", "Failed to connect to device")
def _scan_servos(self):
if not self.controller:
return
def scanStarted():
self.servoList.clear()
self.scanServosButton.setText('Stop Scan')
self.scanServosButton.setEnabled(True)
def scanFinished():
self.scanServosButton.setText('Scan Servos')
self.scanServosButton.setEnabled(True)
self._servoScanThread = None
def servoFound(servoId):
item = QListWidgetItem('Servo ID=%s' % servoId)
item.setData(Qt.UserRole, servoId)
self.servoList.addItem(item)
if not self._servoScanThread:
self._servoScanThread = ServoScanThread(self.controller)
self._servoScanThread.servoFound.connect(servoFound)
self._servoScanThread.started.connect(scanStarted)
self._servoScanThread.finished.connect(scanFinished)
self.scanServosButton.setEnabled(False)
if self._servoScanThread.isRunning():
self._servoScanThread.requestInterruption()
else:
self._servoScanThread.start()
def _on_servo_selected(self, item):
servo_id = None
if item is not None:
servo_id = item.data(Qt.UserRole)
if servo_id:
self.servo = self.controller.servo(servo_id)
self.servoGroup.setEnabled(True)
def servo_configuration_updated(details):
self.servoIdLabel.setText(str(details.servo_id))
self.positionLimits.setText('%d .. %d' % details.position_limits)
self.voltageLimits.setText('%d .. %d' % details.voltage_limits)
self.maxTemperature.setText(str(details.max_temperature))
self.positionOffset.setText(str(details.position_offset))
self._servo_initialization = True
if self._servoStateMonitorThread:
self._servoStateMonitorThread.requestInterruption()
self._servoStateMonitorThread = ServoMonitorThread(self.servo)
self._servoStateMonitorThread.servoStateUpdated.connect(self._update_servo_state)
self._servoStateMonitorThread.start()
def servo_configuration_timeout():
self._on_servo_selected(None)
QMessageBox.warning(self, "Timeout", "Timeout reading servo data")
if self._servoReadConfigurationThread and self._servoReadConfigurationThread.servo_id != servo_id:
self._servoReadConfigurationThread.requestInterruption()
self._servoReadConfigurationThread.wait()
self._servoReadConfigurationThread = None
if self._servoReadConfigurationThread is None:
self._servoReadConfigurationThread = GetServoConfigurationThread(self.servo)
self._servoReadConfigurationThread.servoConfigurationUpdated.connect(servo_configuration_updated)
self._servoReadConfigurationThread.servoConfigurationTimeout.connect(servo_configuration_timeout)
self._servoReadConfigurationThread.start()
else:
if self._servoStateMonitorThread:
self._servoStateMonitorThread.requestInterruption()
self._servoStateMonitorThread.wait()
self._servoStateMonitorThread = None
if self._servoReadConfigurationThread:
self._servoReadConfigurationThread.requestInterruption()
self._servoReadConfigurationThread.wait()
self._servoReadConfigurationThread = None
self.servo = None
self.servoIdLabel.setText('')
self.positionLimits.setText('')
self.voltageLimits.setText('')
self.maxTemperature.setText('')
self.currentVoltage.setText('Voltage:')
self.currentTemperature.setText('Temperature:')
self.currentPosition.setText('')
self.servoGroup.setEnabled(False)
def _configure_servo_id(self):
if not self.servo:
return
dialog = ConfigureIdDialog()
dialog.servoId = self.servo.get_servo_id()
if dialog.exec_():
self.logger.info('Setting servo ID to %d' % dialog.servoId)
self.servo.set_servo_id(dialog.servoId)
self.servo = self.controller.servo(dialog.servoId)
self.servoIdLabel.setText(str(dialog.servoId))
item = self.servoList.currentItem()
if item is not None:
item.setText('Servo ID=%d' % dialog.servoId)
item.setData(Qt.UserRole, dialog.servoId)
def _configure_position_limits(self):
if not self.servo:
return
dialog = ConfigurePositionLimitsDialog()
dialog.minPosition, dialog.maxPosition = self.servo.get_position_limits()
if dialog.exec_():
self.logger.info('Setting position limits to %d..%d' % (dialog.minPosition, dialog.maxPosition))
self.servo.set_position_limits(dialog.minPosition, dialog.maxPosition)
self.positionLimits.setText('%d .. %d' % (dialog.minPosition, dialog.maxPosition))
def _configure_voltage_limits(self):
if not self.servo:
return
dialog = ConfigureVoltageLimitsDialog()
dialog.minVoltage, dialog.maxVoltage = self.servo.get_voltage_limits()
if dialog.exec_():
self.logger.info('Setting voltage limits to %d..%d' % (dialog.minVoltage, dialog.maxVoltage))
self.servo.set_voltage_limits(dialog.minVoltage, dialog.maxVoltage)
self.voltageLimits.setText('%d .. %d' % (dialog.minVoltage, dialog.maxVoltage))
def _configure_max_temperature(self):
if not self.servo:
return
dialog = ConfigureMaxTemperatureDialog()
dialog.maxTemperature = self.servo.get_max_temperature_limit()
if dialog.exec_():
self.logger.info('Setting max temperature limit to %d' % (dialog.maxTemperature))
self.servo.set_max_temperature_limit(dialog.maxTemperature)
self.maxTemperature.setText(str(dialog.maxTemperature))
def _configure_position_offset(self):
if not self.servo:
return
dialog = ConfigurePositionOffsetDialog(self.servo)
old_position_offset = self.servo.get_position_offset()
dialog.positionOffset = old_position_offset
if dialog.exec_():
self.logger.info('Setting position offset limit to %d' % (dialog.positionOffset))
self.servo.set_position_offset(dialog.positionOffset)
self.servo.save_position_offset()
self.positionOffset.setText(str(dialog.positionOffset))
else:
self.servo.set_position_offset(old_position_offset)
def _on_servo_motor_switch(self, value):
try:
if value == 0:
self.servoOrMotorModeUi.setCurrentIndex(0)
self.servo.set_servo_mode()
else:
self.servoOrMotorModeUi.setCurrentIndex(1)
self.servo.set_motor_mode()
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout changing motor mode")
def _on_speed_slider_change(self, speed):
try:
self.speedEdit.setValue(speed)
self.logger.info('Setting motor speed to %d' % speed)
self.servo.set_motor_mode(speed)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout updating motor speed")
def _on_speed_edit_change(self, speed):
try:
self.speedSlider.setValue(speed)
self.logger.info('Setting motor speed to %d' % speed)
self.servo.set_motor_mode(speed)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout updating motor speed")
def _on_position_slider_change(self, position):
try:
self.positionEdit.setValue(position)
self.logger.info('Setting servo position to %d' % position)
self.servo.move(position)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout setting servo position")
def _on_position_edit_change(self, position):
try:
self.positionSlider.setValue(position)
self.logger.info('Setting servo position to %d' % position)
self.servo.move(position)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout setting servo position")
def _on_motor_on_button(self):
if not self.servo:
return
try:
if self.servo.is_motor_on():
if self.servo.get_mode() == 0:
self.servo.motor_off()
else:
self.servo.set_motor_mode(0)
else:
if self.servo.get_mode() == 0:
self.servo.motor_on()
else:
self.servo.set_motor_mode(self.speedSlider.value())
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout updating motor state")
def _on_led_on_button(self):
if not self.servo:
return
try:
if self.servo.is_led_on():
self.servo.led_off()
self.ledOnButton.setChecked(False)
else:
self.servo.led_on()
self.ledOnButton.setChecked(True)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout updating LED state")
def _on_clear_led_errors_button(self):
if not self.servo:
return
try:
self.servo.set_led_errors(0)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, "Timeout", "Timeout resetting LED errors")
def _update_servo_state(self, servo_state):
self.currentVoltage.setText('Voltage: %d' % servo_state.voltage)
self.currentTemperature.setText('Temperature: %d' % servo_state.temperature)
self.motorOnButton.setChecked(servo_state.motor_on)
self.ledOnButton.setChecked(servo_state.led_on)
if servo_state.led_errors:
messages = []
if lewansoul_lx16a.SERVO_ERROR_OVER_TEMPERATURE | servo_state.led_errors:
messages.append('Overheating')
if lewansoul_lx16a.SERVO_ERROR_OVER_VOLTAGE | servo_state.led_errors:
messages.append('Voltage is out of limits')
if lewansoul_lx16a.SERVO_ERROR_LOCKED_ROTOR | servo_state.led_errors:
messages.append('Locked rotor')
self.ledErrors.setText('\n'.join(messages))
self.clearLedErrorsButton.setEnabled(True)
else:
self.ledErrors.setText('')
self.clearLedErrorsButton.setEnabled(False)
if self.servoOrMotorModeUi.currentIndex() == 0:
self.currentPosition.setText(str(servo_state.position))
if self._servo_initialization:
self.positionSlider.setValue(servo_state.position)
else:
self.currentSpeed.setText(str(servo_state.speed))
if self._servo_initialization:
self.speedSlider.setValue(servo_state.speed)
self._servo_initialization = False
def closeEvent(self, event):
if self._servoScanThread:
self._servoScanThread.requestInterruption()
self._servoScanThread.wait()
self._servoScanThread = None
if self._servoReadConfigurationThread:
self._servoReadConfigurationThread.requestInterruption()
self._servoReadConfigurationThread.wait()
self._servoReadConfigurationThread = None
if self._servoStateMonitorThread:
self._servoStateMonitorThread.requestInterruption()
self._servoStateMonitorThread.wait()
self._servoStateMonitorThread = None
if self.connection:
self.connection.close()
event.accept()
def main():
logging.basicConfig(level=logging.DEBUG)
app = QApplication(sys.argv)
terminal = Terminal()
sys.exit(app.exec_())
if name == ‘main‘:
main()
#
#
#
#
#
.END
Categories: Uncategorized