LewanSoul Arduino C++/ Rpi python Libraries v0.1
by tl fong01
8:05PM MON. 9/21/2020
LewanSoul Arduino C++/ Rpi python Libraries v0.1
LX-16A Arduino Libraries 2020sep16
https://www.arduinolibraries.info/libraries/lx16a-servo
Contents
1. Arduino Library 1/2
2. Kevin Harrington madhephaestus Arduino Library 2/2
https://github.com/madhephaestus
3. Maxim Kulkin maximkulkin Python Library – Git 2020aug18
https://github.com/maximkulkin/lewansoul-lx16a
Arduino Library 1/2
lx-16a.cpp
lx-16a.h
lx-16a.cpp
#pragma once
#include <Arduino.h>
#define LX16A_BROADCAST_ID 0xFE
#define LX16A_SERVO_MOVE_TIME_WRITE 1
#define LX16A_SERVO_MOVE_TIME_READ 2
#define LX16A_SERVO_MOVE_TIME_WAIT_WRITE 7
#define LX16A_SERVO_MOVE_TIME_WAIT_READ 8
#define LX16A_SERVO_MOVE_START 11
#define LX16A_SERVO_MOVE_STOP 12
#define LX16A_SERVO_ID_WRITE 13
#define LX16A_SERVO_ID_READ 14
#define LX16A_SERVO_ANGLE_OFFSET_ADJUST 17
#define LX16A_SERVO_ANGLE_OFFSET_WRITE 18
#define LX16A_SERVO_ANGLE_OFFSET_READ 19
#define LX16A_SERVO_ANGLE_LIMIT_WRITE 20
#define LX16A_SERVO_ANGLE_LIMIT_READ 21
#define LX16A_SERVO_VIN_LIMIT_WRITE 22
#define LX16A_SERVO_VIN_LIMIT_READ 23
#define LX16A_SERVO_TEMP_MAX_LIMIT_WRITE 24
#define LX16A_SERVO_TEMP_MAX_LIMIT_READ 25
#define LX16A_SERVO_TEMP_READ 26
#define LX16A_SERVO_VIN_READ 27
#define LX16A_SERVO_POS_READ 28
#define LX16A_SERVO_OR_MOTOR_MODE_WRITE 29
#define LX16A_SERVO_OR_MOTOR_MODE_READ 30
#define LX16A_SERVO_LOAD_OR_UNLOAD_WRITE 31
#define LX16A_SERVO_LOAD_OR_UNLOAD_READ 32
#define LX16A_SERVO_LED_CTRL_WRITE 33
#define LX16A_SERVO_LED_CTRL_READ 34
#define LX16A_SERVO_LED_ERROR_WRITE 35
#define LX16A_SERVO_LED_ERROR_READ 36
class LX16ABus {
private:
bool _debug;
int myTXFlagGPIO = -1;
int myTXPin=-1;
int lastCommand=0;
void setTXFlag(int flag) {
myTXFlagGPIO = flag;
if (myTXFlagGPIO >= 0) {
pinMode(myTXFlagGPIO, OUTPUT);
digitalWrite(myTXFlagGPIO, 0);
}
}
public:
bool _deepDebug = false;
LX16ABus() :
_debug(false) {
}
// debug enables/disables debug printf’s for this servo
void debug(bool on) {
_debug = on;
}
void begin(HardwareSerial * port, int tXpin, int TXFlagGPIO = -1) {
_port = port;
myTXPin=tXpin;
#if defined ARDUINO_ARCH_ESP32
port->begin(_baud, SERIAL_8N1,myTXPin,myTXPin);
pinMode(myTXPin, OUTPUT|PULLUP|OPEN_DRAIN);
#elif defined(CORE_TEENSY)
pinMode(myTXPin, OUTPUT_OPENDRAIN);
port->begin(_baud, SERIAL_8N1);
port->setTX(myTXPin, true);
#else
pinMode(myTXPin, OUTPUT);
port->begin(_baud, SERIAL_8N1);
#endif
delay(3);
while (port->available())
port->read();
setTXFlag(TXFlagGPIO);
}
// time returns the number of ms to TX/RX n characters
uint32_t time(uint8_t n) {
return n * 10 * 1000 / _baud; // 10 bits per char
}
uint32_t timeus(uint8_t n) {
return n * 10 * 1000000 / _baud; // 10 bits per char
}
// methods passed through to Serial
bool available() {
return _port->available();
}
int read() {
return _port->read();
}
void write(const uint8_t *buf, int buflen) {
#if defined ARDUINO_ARCH_ESP32
pinMode(myTXPin, OUTPUT);
#elif defined(CORE_TEENSY)
_port->setTX(myTXPin, false);
#endif
// _port->write(buf, buflen);
// _port->flush();
delayMicroseconds(10);
for(int i=0;i<buflen;i++){
_port->write(buf[i]);
_port->flush();
}
#if defined ARDUINO_ARCH_ESP32
pinMode(myTXPin, OUTPUT|PULLUP|OPEN_DRAIN);
#elif defined(CORE_TEENSY)
_port->setTX(myTXPin, true);
#endif
}
int retry = 3;
void setRetryCount(int count) {
retry = count;
}
// write a command with the provided parameters
// returns true if the command was written without conflict onto the bus
bool write_no_retry(uint8_t cmd, const uint8_t *params, int param_cnt,
uint8_t MYID);
// read sends a command to the servo and reads back the response into the params buffer.
// returns true if everything checks out correctly.
bool read_no_retry(uint8_t cmd, uint8_t *params, int param_len,
uint8_t MYID);
// read sends a command to the servo and reads back the response into the params buffer.
// returns true if everything checks out correctly.
bool rcv(uint8_t cmd, uint8_t *params, int param_len,
uint8_t MYID);
// write a command with the provided parameters
// returns true if the command was written without conflict onto the bus
bool write(uint8_t cmd, const uint8_t *params, int param_cnt,
uint8_t MYID) {
if (retry == 0) {
return write_no_retry(cmd, params, param_cnt, MYID);
} else {
for (int i = 0; i < retry; i++) {
bool ok = write_no_retry(cmd, params, param_cnt, MYID);
if (ok)
return true;
}
}
return false;
}
// read sends a command to the servo and reads back the response into the params buffer.
// returns true if everything checks out correctly.
bool read(uint8_t cmd, uint8_t *params, int param_len, uint8_t MYID) {
if (retry == 0) {
return read_no_retry(cmd, params, param_len, MYID);
} else {
for (int i = 0; i < retry; i++) {
bool ok = read_no_retry(cmd, params, param_len, MYID);
if (ok)
return true;
}
}
return false;
}
HardwareSerial * _port = NULL;
int _baud = 115200;
/**
* Command name: SERVO_LOAD_OR_UNLOAD_WRITE
Command value: 31 Length: 4
Parameter 1: Whether the internal motor of the servo is unloaded power-down
or not, the range 0 or 1, 0 represents the unloading power down, and the servo
has no torque output. 1 represents the loaded motor, then the servo has a
torque output, the default value is 0.
@return sucess
*/
bool disableAll() {
uint8_t params[] = { 0 };
return write(LX16A_SERVO_ID_WRITE, params, 1,
LX16A_BROADCAST_ID);
}
/**
* Command name: SERVO_LOAD_OR_UNLOAD_WRITE
Command value: 31 Length: 4
Parameter 1: Whether the internal motor of the servo is unloaded power-down
or not, the range 0 or 1, 0 represents the unloading power down, and the servo
has no torque output. 1 represents the loaded motor, then the servo has a
torque output, the default value is 0.
@return sucess
*/
bool enableAll() {
uint8_t params[] = { 1 };
return write(LX16A_SERVO_ID_WRITE, params, 1,
LX16A_BROADCAST_ID);
}
/**
* Command name: SERVO_MOVE_START Command value: 11
Length: 3
With the use of command SERVO_MOVE_TIME_WAIT_WRITE, described in
point 3
@return sucess
*/
bool move_sync_start() {
uint8_t params[1];
return write(LX16A_SERVO_MOVE_START, params, 1,
LX16A_BROADCAST_ID);
}
/**
* Command name: SERVO_MOVE_STOP Command value: 12
Length: 3
When the command arrives at the servo, it will stop running
This is sent to all servos at once
*/
void stopAll() {
uint8_t params[1];
write(LX16A_SERVO_MOVE_STOP, params, 1,
LX16A_BROADCAST_ID);
}
// id_read returns the ID of the servo, useful if the id is 0xfe, which is broadcast…
uint8_t id_read() {
uint8_t params[1];
if (!read(LX16A_SERVO_ID_READ, params, 1, LX16A_BROADCAST_ID)) {
return 0;
}
return params[0];
}
// id_write sets the id of the servo, updates the object’s id if write appears successful
void id_write(uint8_t id) {
uint8_t params[] = { id };
write(LX16A_SERVO_ID_WRITE, params, 1, LX16A_BROADCAST_ID);
}
};
class LX16AServo {
private:
bool commandOK = true;
int32_t lastKnownGoodPosition = 0;
bool isMotorMode = false;
bool isInitialized = false;
//private:
LX16ABus * _bus;
uint8_t _id = LX16A_BROADCAST_ID;
int32_t staticOffset =0;
int32_t maxCentDegrees =240000;
int32_t minCentDegrees =0;
public:
LX16AServo(LX16ABus * bus, int id) :
_bus(bus), _id(id) {
}
/**
* Set the current position as a specific angle.
*
* This function will first read the current position, then
* calculate an offset to be applied to all position reads and writes.
* The offset will make convert the current position to the value passed in
*
* This is a function to be called when the motor hits a limit switch
* and load in a specific value when the limit is reached
*
*/
void calibrate(int32_t currentAngleCentDegrees,int32_t min_angle_cent_deg, int32_t max_angle_cent_deg){
int32_t current;
initialize();
do{
current=pos_read()-staticOffset;
}while(!isCommandOk());// this is a calibration and can not be allowed to fail
staticOffset=currentAngleCentDegrees-current;
readLimits();
int32_t min_angle = (min_angle_cent_deg-staticOffset) / 24;
int32_t max_angle = (max_angle_cent_deg-staticOffset) / 24;
if(min_angle<0){
//staticOffset +=-min_angle_cent_deg;
int theoretivalMinError = currentAngleCentDegrees-min_angle_cent_deg;
Serial.println(“ERROR! Minimum of servo ID”+String(_id)+” can not be below hardware limit”);
Serial.println(“Given the offset”+String(currentAngleCentDegrees)+”\n minimum in centDegrees “+String(minCentDegrees)+” of “+String(min_angle_cent_deg));
Serial.println(” maximum in centDegrees “+String(maxCentDegrees)+” of “+String(max_angle_cent_deg));
Serial.println(” error in centDegrees “+String(theoretivalMinError));
Serial.println(” current in centDegrees “+String(currentAngleCentDegrees));
minCentDegrees= (min_angle*24)+staticOffset;
maxCentDegrees= ((max_angle)*24)+staticOffset;
do {
uint8_t params[] = { (uint8_t) min_angle, (uint8_t) (min_angle
>> 8),
(uint8_t) max_angle, (uint8_t) (max_angle >> 8) };
commandOK = _bus->write(LX16A_SERVO_ANGLE_LIMIT_WRITE, params,
4, _id);
} while (!isCommandOk());// this is a calibration and can not be allowed to fail
move_time(theoretivalMinError+2,0);
//while(1);
}
if(max_angle>1000){
int theoretivalMinError = (max_angle_cent_deg-(24000+staticOffset));
Serial.println(“ERROR! Maximum of servo ID”+String(_id)+” can not be above hardware limit”);
Serial.println(“Given the offset “+String(currentAngleCentDegrees)+”\n minimum in centDegrees “+String(minCentDegrees)+” of “+String(min_angle_cent_deg));
Serial.println(” maximum in centDegrees “+String(maxCentDegrees)+” of “+String(max_angle_cent_deg));
Serial.println(” error in centDegrees “+String(theoretivalMinError));
Serial.println(” current in centDegrees “+String(currentAngleCentDegrees));
Serial.println(” range in centDegrees “+String(maxCentDegrees-minCentDegrees));
max_angle=1000;
min_angle=0;
minCentDegrees= (min_angle*24)+staticOffset;
maxCentDegrees= ((max_angle)*24)+staticOffset;
do {
uint8_t params[] = { (uint8_t) min_angle, (uint8_t) (min_angle
>> 8),
(uint8_t) max_angle, (uint8_t) (max_angle >> 8) };
commandOK = _bus->write(LX16A_SERVO_ANGLE_LIMIT_WRITE, params,
4, _id);
} while (!isCommandOk());// this is a calibration and can not be allowed to fail
move_time(currentAngleCentDegrees-theoretivalMinError-2,0);
//while(1);
}
if(min_angle<max_angle){
do{
uint8_t params[] = { (uint8_t) min_angle, (uint8_t) (min_angle >> 8),
(uint8_t) max_angle, (uint8_t) (max_angle >> 8) };
commandOK = _bus->write(LX16A_SERVO_ANGLE_LIMIT_WRITE, params, 4, _id);
}while(!isCommandOk());// this is a calibration and can not be allowed to fail
}else{
Serial.println(“ERROR! Max of servo “+String(_id)+” must be larger than min”);
//while(1);
}
minCentDegrees= (min_angle*24)+staticOffset;
maxCentDegrees= ((max_angle)*24)+staticOffset;
}
int32_t getMinCentDegrees(){
return minCentDegrees;
}
int32_t getMaxCentDegrees(){
return maxCentDegrees;
}
bool isCommandOk() {
return commandOK;
}
void initialize() {
if (isInitialized) {
return;
}
isInitialized = true;
motor_mode(0);
pos_read();
readLimits();
}
void readLimits(){
uint8_t params[4];
do{
if (!_bus->read(LX16A_SERVO_ANGLE_LIMIT_READ, params, 4, _id)) {
commandOK = false;
}
commandOK = true;
minCentDegrees= ((params[0] | ((uint16_t) params[1] << 8))*24)+staticOffset;
maxCentDegrees= ((params[2] | ((uint16_t) params[3] << 8))*24)+staticOffset;
}while(!isCommandOk());// this is a calibration and can not be allowed to fail
}
/**
* Length: 7
Parameter 1: lower 8 bits of angle value
Parameter 2: higher 8 bits of angle value.range 0~100. corresponding to the
servo angle of 0 ~ 240 °, that means the minimum angle of the servo can be
varied is 0.24 degree.
Parameter 3: lower 8 bits of time value
Parameter 4: higher 8 bits of time value. the range of time is 0~30000ms.
When the command is sent to servo, the servo will be rotated from current
angle to parameter angle at uniform speed within param
*/
void move_time(int32_t angle, uint16_t time) {
initialize();
if(angle> maxCentDegrees){
angle=maxCentDegrees;
Serial.println(“ERROR Capped set at max “+String(maxCentDegrees));
}
if(angle<minCentDegrees){
angle=minCentDegrees;
Serial.println(“ERROR Capped set at min “+String(minCentDegrees));
}
if (isMotorMode)
motor_mode(0);
angle = (angle-staticOffset) / 24;
uint8_t params[] = { (uint8_t) angle, (uint8_t) (angle >> 8),
(uint8_t) time, (uint8_t) (time >> 8) };
commandOK = _bus->write(LX16A_SERVO_MOVE_TIME_WRITE, params, 4, _id);
}
/**
* Command name: SERVO_MOVE_TIME_WAIT_WRITE
Command value: 7
Length : 7
Parameter1: lower 8 bits of preset angle
Parameter2: higher 8 bits of preset angle. range 0~100. corresponding to the
servo angle of 0 ~ 240 °. that means the minimum angle of the servo can be
varied is 0.24 degree.
Parameter3: lower 8 bits of preset time
Parameter3: higher 8 bits of preset time. the range of time is 0~30000ms.
The function of this command is similar to this
“SERVO_MOVE_TIME_WRITE” command in the first point. But the difference
is that the servo will not immediately turn when the command arrives at the
servo,the servo will be rotated from current angle to parameter angle at unifor
m speed within parameter time until the command name SERVO_MOVE_ST
ART sent to servo(command value of 11)
, then the servo will be rotate
*/
void move_time_and_wait_for_sync(int32_t angle, uint16_t time) {
initialize();
if(angle> maxCentDegrees){
angle=maxCentDegrees;
Serial.println(“ERROR Capped set at max “+String(maxCentDegrees));
}
if(angle<minCentDegrees){
angle=minCentDegrees;
Serial.println(“ERROR Capped set at min “+String(minCentDegrees));
}
if (isMotorMode)
motor_mode(0);
angle = (angle-staticOffset) / 24;
uint8_t params[] = { (uint8_t) angle, (uint8_t) (angle >> 8),
(uint8_t) time, (uint8_t) (time >> 8) };
commandOK = _bus->write(LX16A_SERVO_MOVE_TIME_WAIT_WRITE, params, 4,
_id);
// // this write command has a packet coming back
// commandOK = _bus->rcv(LX16A_SERVO_MOVE_TIME_WAIT_WRITE, params, 4,
// _id);
}
/**
* Command name: SERVO_MOVE_STOP Command value: 12
Length: 3
When the command arrives at the servo, it will stop running
*/
void stop() {
uint8_t params[1];
commandOK = _bus->write(LX16A_SERVO_MOVE_STOP, params, 1, _id);
}
/**
* Command name: SERVO_LOAD_OR_UNLOAD_WRITE
Command value: 31 Length: 4
Parameter 1: Whether the internal motor of the servo is unloaded power-down
or not, the range 0 or 1, 0 represents the unloading power down, and the servo
has no torque output. 1 represents the loaded motor, then the servo has a
torque output, the default value is 0.
*/
void disable() {
uint8_t params[] = { 0 };
commandOK = _bus->write(LX16A_SERVO_LOAD_OR_UNLOAD_WRITE, params, 1, _id);
}
/**
* Command name: SERVO_LOAD_OR_UNLOAD_WRITE
Command value: 31 Length: 4
Parameter 1: Whether the internal motor of the servo is unloaded power-down
or not, the range 0 or 1, 0 represents the unloading power down, and the servo
has no torque output. 1 represents the loaded motor, then the servo has a
torque output, the default value is 0.
*/
void enable() {
uint8_t params[] = { 1 };
commandOK = _bus->write(LX16A_SERVO_LOAD_OR_UNLOAD_WRITE, params, 1, _id);
}
/**
* Command name: SERVO_OR_MOTOR_MODE_WRITE
Command value: 29
Length: 7
Parameter 1: Servo mode, range 0 or 1, 0 for position control mode, 1 for
motor control mode, default 0,
Parameter 2: null value
Parameter 3: lower 8 bits of rotation speed value
Parameter 4: higher 8 bits of rotation speed value. range -1000~1000,
Only in the motor control mode is valid, control the motor speed, the value of
the negative value represents the reverse, positive value represents the
forward rotation. Write mode and speed do not support power-down save.
Note: Since the rotation speed is the “signed short int” type of data, it is forced
to convert the data to convert the data to “unsigned short int “type of data before sending the
command packet.
*/
void motor_mode(int16_t speed) {
bool isMotorMode_tmp = speed != 0;
uint8_t params[] = { (uint8_t) (isMotorMode_tmp ? 1 : 0), 0,
(uint8_t) speed, (uint8_t) (speed >> 8) };
commandOK = _bus->write(LX16A_SERVO_OR_MOTOR_MODE_WRITE, params, 4,
_id);
if (commandOK)
isMotorMode = isMotorMode_tmp;
}
// angle_adjust sets the position angle offset in centi-degrees (-3000..3000)
void angle_offset_save() {
uint8_t params[1];
_bus-> write(LX16A_SERVO_ANGLE_OFFSET_WRITE, params, 1,
_id);
}
// angle_adjust sets the position angle offset in centi-degrees (-3000..3000)
void angle_offset_adjust(int16_t angle) {
int32_t tmp = (int32_t) angle;
uint8_t params[] = { (uint8_t) tmp};
commandOK = _bus->write(LX16A_SERVO_ANGLE_OFFSET_ADJUST, params, 1,
_id);
}
// angle_adjust sets the position angle offset in centi-degrees (-3000..3000)
int16_t read_angle_offset() {
uint8_t params[1];
if (!_bus->read(LX16A_SERVO_ANGLE_OFFSET_READ, params, 1, _id)) {
commandOK = false;
return 0;
}
commandOK = true;
return params[0] ;
}
// pos_read returns the servo position in centi-degrees (0..24000)
int32_t pos_read() {
initialize();
uint8_t params[2];
if (!_bus->read(LX16A_SERVO_POS_READ, params, 2, _id)) {
commandOK = false;
return pos_read_cached()+staticOffset;
}
commandOK = true;
lastKnownGoodPosition = ((int16_t) params[0]
| ((int16_t) params[1] << 8)) * 24;
return pos_read_cached()+staticOffset;
}
/**
* Get the cached position from the most recent read
*/
int32_t pos_read_cached() {
return lastKnownGoodPosition;
}
// id_read returns the ID of the servo, useful if the id is 0xfe, which is broadcast…
uint8_t id_read() {
uint8_t params[1];
if (!_bus->read(LX16A_SERVO_ID_READ, params, 1, LX16A_BROADCAST_ID)) {
commandOK = false;
return 0;
}
commandOK = true;
return params[0];
}
// id_write sets the id of the servo, updates the object’s id if write appears successful
void id_write(uint8_t id) {
uint8_t params[] = { id };
bool ok = _bus->write(LX16A_SERVO_ID_WRITE, params, 1, LX16A_BROADCAST_ID);
if (ok && _id != LX16A_BROADCAST_ID)
_id = id;
commandOK = ok;
}
/**
* Command name: SERVO_OR_MOTOR_MODE_READ
Command value: 30 Length: 3
Read the relative values of the servo, for the details of the command package
that the servo returns to host computer, please refer to the description of Table
4 below.
*/
bool readIsMotorMode() {
uint8_t params[4];
if (!_bus->read(LX16A_SERVO_OR_MOTOR_MODE_READ, params, 4, _id)) {
commandOK = false;
return false;
}
commandOK = true;
isMotorMode = params[0] == 1;
return isMotorMode;
}
// temp_read returns the servo temperature in centigrade
uint8_t temp() {
uint8_t params[1];
if (!_bus->read(LX16A_SERVO_TEMP_READ, params, 1, _id)) {
commandOK = false;
return 0;
}
commandOK = true;
return params[0];
}
// vin_read returns the servo input voltage in millivolts
uint16_t vin() {
uint8_t params[2];
if (!_bus->read(LX16A_SERVO_VIN_READ, params, 2, _id)) {
commandOK = false;
return 0;
}
commandOK = true;
return params[0] | ((uint16_t) params[1] << 8);
}
};
—
lx16a-servo.h
#include <Arduino.h>
#include <lx16a-servo.h>
// write a command with the provided parameters
// returns true if the command was written without conflict onto the bus
bool LX16ABus::write_no_retry(uint8_t cmd, const uint8_t *params, int param_cnt,
uint8_t MYID) {
if (param_cnt < 0 || param_cnt > 4)
return false;
// prepare packet in a buffer
int buflen = 6 + param_cnt;
uint8_t buf[buflen];
uint8_t ret[buflen];
buf[0] = 0x55;
buf[1] = 0x55;
buf[2] = MYID;
buf[3] = buflen – 3;
buf[4] = cmd;
for (int i = 0; i < param_cnt; i++)
buf[5 + i] = params[i];
uint8_t cksum = 0;
for (int i = 2; i < buflen – 1; i++)
cksum += buf[i];
buf[buflen – 1] = ~cksum;
// clear input buffer
int junkCount = 0;
delayMicroseconds(timeus(2));
while (available()) {
if (junkCount == 0) {
// if (_debug)
// Serial.print(“\n\t\t[ “);
}
// if (_debug)
// Serial.print(” ” + String(read()) + “, “);
// else
read();
delayMicroseconds(timeus(3));
junkCount++;
}
// if (_debug && junkCount!=0) {
// Serial.print(“]\n “);
// Serial.println(
// “\t\t Junk bytes = ” + String(junkCount) + ” id ” + String(MYID)
// + ” cmd ” + String(cmd) + ” last cmd “
// + String(lastCommand));
// }
lastCommand = cmd;
// send command packet
uint32_t t0 = millis();
if (myTXFlagGPIO >= 0) {
digitalWrite(myTXFlagGPIO, 1);
}
delayMicroseconds(timeus(1));
write(buf, buflen);
delayMicroseconds(timeus(1));
if (myTXFlagGPIO >= 0) {
digitalWrite(myTXFlagGPIO, 0);
}
// expect to read back command by virtue of single-pin loop-back
uint32_t tout = time(buflen+4) + 4; // 2ms margin
int got = 0;
bool ok = true;
if (_deepDebug)
Serial.printf(“RCV: “);
while ((got < buflen) && ((millis() – t0) < tout)) {
if (available()) {
ret[got] = read();
if (ret[got] != buf[got]) {
ok = false;
}
got++;
}
}
if (got<buflen){
ok = false;
}
// if (_debug) {
// if (!ok) {
// Serial.print(“\n\n\tWrote:[ “);
// for(int i=0;i<buflen;i++){
// Serial.print(” ” + String(buf[i]) + “, “);
// }
// Serial.print(“] id ” + String(MYID)
// + ” cmd ” + String(cmd) + ” last cmd “
// + String(lastCommand));
//
// Serial.print(“\n\tGot :[ “);
// for(int i=0;i<got;i++){
// Serial.print(” ” + String(ret[i]) + “, “);
// }
// Serial.print(“] in “+String(millis()-t0)+”\n”);
// }
// }
return ok;
}
bool LX16ABus::rcv(uint8_t cmd, uint8_t *params, int param_len, uint8_t MYID) {
// read back the expected response
uint32_t t0 = millis();
uint32_t tout = time(param_len + 6) + 30; // 20ms for the servo to think
int got = 0;
uint8_t sum = 0;
// if (_deepDebug)
// Serial.printf(“RCV: “);
int len = 7; // minimum length
while (got < len && ((millis() – t0) < tout)) {
if (available()) {
int ch = read();
// if (_deepDebug)
// Serial.printf(” 0x%02x”, ch);
switch (got) {
case 0:
case 1:
if (ch != 0x55) {
// if (_debug)
// Serial.printf(” ERR (hdr)\n”);
return false;
}
break;
case 2:
if (ch != MYID && MYID != 0xfe) {
// if (_debug)
// Serial.printf(” ERR (id)\n”);
return false;
}
break;
case 3:
if (ch < 3 || ch > 7) {
// if (_debug)
// Serial.printf(” ERR (len)\n”);
return false;
}
len = ch + 3;
if (len > param_len + 6) {
// if (_debug)
// Serial.println(
// ” ERR (param_len) got ” + String(len)
// + ” expected “
// + String((param_len + 6)));
return false;
}
break;
case 4:
if (ch != cmd) {
// if (_debug)
// Serial.printf(” ERR (cmd)\n”);
return false;
}
break;
default:
if (got == len – 1) {
if ((uint8_t) ch == (uint8_t) ~sum) {
// if (_deepDebug)
// Serial.printf(” OK\n”);
return true;
} else {
// if (_debug)
// Serial.printf(” ERR (cksum!=%02x)\n”, ~sum);
return false;
}
}
if (got – 5 > param_len) {
// if (_debug)
// Serial.printf(” ERR (cksum)\n”);
return false;
}
params[got – 5] = ch;
}
if (got > 1)
sum += ch;
got++;
}
}
// if (_debug){
// long done = millis();
// Serial.println(
// “Read TIMEOUT Expected ” + String(len) + ” got ” + String(got)
//
// + ” on cmd: ” + String(cmd) + ” id ” + String(MYID)+
// ” started at “+String(t0)+” finished at “+String (done)+” took “+String(done-t0));
// }
return false;
}
// read sends a command to the servo and reads back the response into the params buffer.
// returns true if everything checks out correctly.
bool LX16ABus::read_no_retry(uint8_t cmd, uint8_t *params, int param_len,
uint8_t MYID) {
// send the read command
bool ok = write(cmd, NULL, 0, MYID);
// if (!ok) {
// if (_debug)
// Serial.println(
// “Command of read failed on cmd: ” + String(cmd) + ” id “
// + String(MYID));
// return false;
// }
return rcv(cmd, params, param_len, MYID);
}
##############################################################################
##############################################################################
Kevin Harrington madhephaestus
https://github.com/madhephaestus
I am a roboticist who likes to engineer tools that make the process of designing
robots easier. I build free and open source robotics development software.
##############################################################################
##############################################################################
Maxim Kulkin
maximkulkin / lewansoul-lx16a (70 lines) – Git 2020aug18
https://github.com/maximkulkin/lewansoul-lx16a
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
@madhephaestus
madhephaestus Moving ID read/write to the bus
Latest commit c2f8a66 on Jun 20
History
1 contributor
45 lines (41 sloc) 1.17 KB
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
void setup() {
servoBus.begin(&Serial1,
1,// on TX pin 1
2);// use pin 2 as the TX flag for buffer
Serial.begin(115200);
servoBus.debug(true);
Serial.println(“Beginning Servo Example”);
}
void loop() {
int divisor =4;
for (int i = 0; i < 1000/divisor; i++) {
long start = millis();
uint16_t angle = i * 24*divisor;
int16_t pos = 0;
pos = servo.pos_read();
Serial.printf(“\n\nPosition at %d -> %s\n”, pos,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
do {
servo.move_time(angle, 10*divisor);
} while (!servo.isCommandOk());
Serial.printf(“Move to %d -> %s\n”, angle,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
Serial.println(“Voltage = ” + String(servo.vin()));
Serial.println(“Temp = ” + String(servo.temp()));
Serial.println(“ID = ” + String(servo.id_read()));
Serial.println(“Motor Mode = ” + String(servo.readIsMotorMode()));
long took = millis()-start;
long time = (10*divisor)-took;
if(time>0)
delay(time);
else{
Serial.println(“Real Time broken, took: “+String(took));
}
}
servo.move_time(0, 3000);
delay(3000);
}
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
@madhephaestus
madhephaestus Tested working calibration protocol
Latest commit dd164c4 on May 19
History
1 contributor
126 lines (111 sloc) 3.49 KB
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
LX16AServo servo2(&servoBus, 2);
LX16AServo servo3(&servoBus, 3);
int16_t startingAngles []= {-9000,0,9000};
bool calibrationDone = false;
#define HOME_SWITCH_PIN 32
int divisor = 3;
void setup() {
while(!Serial);
delay(1000);
Serial.begin(115200);
Serial.println(“Starting”);
servoBus.begin(&Serial1, 1, // on TX pin 1
2); // use pin 2 as the TX flag for buffer
servoBus.retry = 2; // enforce synchronous real time
//servoBus.debug(true);
Serial.println(“Beginning Coordinated Servo Example”);
pinMode(HOME_SWITCH_PIN, INPUT_PULLUP);
servo.disable();
servo2.disable();
servo3.disable();
}
// 40ms trajectory planning loop seems the most stable
void interp(int i){
long start = millis();
servoBus.move_sync_start();
int32_t pos = servo.pos_read();
int32_t pos2 = servo2.pos_read();
int32_t pos3 = servo3.pos_read();
int32_t angle = (i * 9 * divisor)-4500;
int timingOffset = millis()-start;
servo2.move_time_and_wait_for_sync(angle+4500, 10 * divisor+timingOffset);
servo3.move_time_and_wait_for_sync(-4500-(angle), 10 * divisor+timingOffset);
servo.move_time_and_wait_for_sync(angle*2, 10 * divisor+timingOffset);
//if(abs(pos2-pos)>100){
// Serial.printf(“\n\nPosition at %d, %d and %d-> %s\n”, pos, pos2,pos3,
// servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
// Serial.printf(“Move to %d -> %s\n”, angle,
// servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
//}
long took = millis() – start;
long time = (10 * divisor) – took;
if (time > 0)
delay(time);
else {
Serial.println(“Real Time broken, took: ” + String(took));
}
}
void loop() {
bool HOME = !digitalRead(HOME_SWITCH_PIN);
if(!calibrationDone){
Serial.println(“Homing pin “+String(HOME));
if(HOME){
Serial.println(“Calibrating 1”);
servo.calibrate(startingAngles[0],-9000,9000);
Serial.println(“Calibrating 2”);
servo2.calibrate(startingAngles[1],-2512,10000);
Serial.println(“Calibrating 3”);
servo3.calibrate(startingAngles[2],-9000,9000);
Serial.println(“Limits read”);
}
Serial.println(“Read 1”);
int32_t pos = servo.pos_read();
Serial.println(“Read 2”);
int32_t pos2 = servo2.pos_read();
Serial.println(“Read 3”);
int32_t pos3 = servo3.pos_read();
Serial.printf(“\n\nPosition at %d, %d and %d-> %s\n”, pos, pos2, pos3,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
if(pos==startingAngles[0]&&
pos2==startingAngles[1]&&
pos3==startingAngles[2]&&
HOME){
calibrationDone=true;
Serial.println(“Calibration Done!”);
servo.move_time(0, 2000);
delay(2000);
servo2.move_time_and_wait_for_sync(0, 2000);
servo3.move_time_and_wait_for_sync(0, 2000);
servo.move_time_and_wait_for_sync(-9000, 2000);
servoBus.move_sync_start();
delay(2000);
}
}
if(!calibrationDone){
delay(50);
return;
}
if(HOME){
int32_t pos = servo.pos_read();
int32_t pos2 = servo2.pos_read();
int32_t pos3 = servo3.pos_read();
Serial.printf(“\n\nMAX at %d, %d and %d”, servo.getMaxCentDegrees(), servo2.getMaxCentDegrees(), servo3.getMaxCentDegrees());
Serial.printf(“\nPos at %d, %d and %d”, pos, pos2, pos3);
Serial.printf(“\nmin at %d, %d and %d”, servo.getMinCentDegrees(), servo2.getMinCentDegrees(), servo3.getMinCentDegrees());
//
delay(50);
return;
}
for (int i = 0; i < 1000 / divisor; i++) {
interp( i);
}
Serial.println(“Interpolated Set pos done, not long set”);
for (int i = 1000 / divisor; i >0; i–) {
interp( i);
}
Serial.println(“Loop resetting”);
}
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
33 lines (28 sloc) 797 Bytes
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
void setup() {
servoBus.begin(&Serial1,
1,// on TX pin 1
2);// use pin 2 as the TX flag for buffer
Serial.begin(115200);
}
void loop() {
for (int i = -10; i < 10; i++) {
int16_t pos = 0;
pos = servo.pos_read();
Serial.printf(“\n\nPosition at %d -> %s\n”, pos,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
int16_t angle = i * 100;
servo.motor_mode(angle);
Serial.printf(“Move to %d -> %s\n”, angle,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
Serial.println(“Voltage = ” + String(servo.vin()));
Serial.println(“Temp = ” + String(servo.temp()));
Serial.println(“ID = ” + String(servo.id_read()));
delay(500);
}
servo.move_time(0, 500);
delay(5000);
}
##############################################################################
##############################################################################
##############################################################################
##############################################################################
79 lines (67 sloc) 2.14 KB
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
LX16AServo servo2(&servoBus, 2);
LX16AServo servo3(&servoBus, 3);
void setup() {
servoBus.begin(&Serial1, 1, // on TX pin 1
2); // use pin 2 as the TX flag for buffer
Serial.begin(115200);
servoBus.retry = 1; // enforce synchronous real time
servoBus.debug(true);
Serial.println(“Beginning Coordinated Servo Example”);
}
// 40ms trajectory planning loop seems the most stable
void loop() {
int divisor = 3;
for (int i = 0; i < 1000 / divisor; i++) {
long start = millis();
int16_t pos = 0;
pos = servo.pos_read();
int16_t pos2 = servo2.pos_read();
int16_t pos3 = servo3.pos_read();
uint16_t angle = i * 24 * divisor;
servo2.move_time_and_wait_for_sync(angle, 10 * divisor);
servo3.move_time_and_wait_for_sync(angle, 10 * divisor);
servo.move_time_and_wait_for_sync(angle, 10 * divisor);
servoBus.move_sync_start();
//if(abs(pos2-pos)>100){
Serial.printf(“\n\nPosition at %d and %d-> %s\n”, pos, pos2,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
Serial.printf(“Move to %d -> %s\n”, angle,
servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
//}
long took = millis() – start;
long time = (10 * divisor) – took;
if (time > 0)
delay(time);
else {
Serial.println(“Real Time broken, took: ” + String(took));
}
}
Serial.println(“Interpolated Set pos done, not long set”);
servoBus.retry = 5; // These commands must arrive
servo.move_time(0, 10000);
servo2.move_time(0, 10000);
servo3.move_time(0, 10000);
servoBus.retry = 0; // Back to low latency mode
for (int i = 0; i < 1000 / divisor; i++) {
long start = millis();
int16_t pos = 0;
pos = servo.pos_read();
int16_t pos2 = servo2.pos_read();
int16_t pos3 = servo3.pos_read();
Serial.printf(“\n\nPosition at %d and %d\n”, pos, pos2);
Serial.println(“Voltage = ” + String(servo.vin()));
Serial.println(“Temp = ” + String(servo.temp()));
long took = millis() – start;
long time = (10 * divisor) – took;
if (time > 0)
delay(time);
else {
Serial.println(“Real Time broken, took: ” + String(took));
}
}
Serial.println(“Loop resetting”);
}
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
21 lines (18 sloc) 505 Bytes
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, LX16A_BROADCAST_ID);// send these commands to any motor on the bus
int id =3;
void setup() {
servoBus.begin(&Serial1,
1,// on TX pin 1
2);// use pin 2 as the TX flag for buffer
Serial.begin(115200);
}
void loop() {
// Set any motor plugged in to ID 3
// this INO acts as an auto-provisioner for any motor plugged in
servo.id_write(id);
Serial.println(“Setting to ID “+String (id));
delay(200);
}
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
maximkulkin / lewansoul-lx16a (70 lines) – Git 2020aug18
https://github.com/maximkulkin/lewansoul-lx16a
README.md
LewanSoul LX-16A servos driver & GUI
LewanSoul LX-16A is a wonderful servos although using them requires implementing custom serial protocol and you need to configure them before use (set unique IDs for all servos, configur limits, etc.). Company provides a tool for that, but it works only on Windows.
This project aims to fill the gap for non-Windows users. It consists of two parts: a servo library and a GUI tool. Servo library implements an interface to talk to servos using their protocol and GUI provides convenient way to configure them.
alt text
Installation
Just install both packages to use them:
( cd lewansoul-lx16a && python setup.py install )
( cd lewansoul-lx16a-terminal && python setup.py install )
To launch GUI use lewansoul-lx16a-terminal command.
Code example
import serial
import lewansoul_lx16a
SERIAL_PORT = ‘/dev/tty.wchusbserial1410’
controller = lewansoul_lx16a.ServoController(
serial.Serial(SERIAL_PORT, 115200, timeout=1),
)
# control servos directly
controller.move(1, 100) # move servo ID=1 to position 100
# or through proxy objects
servo1 = controller.servo(1)
servo2 = controller.servo(2)
servo1.move(100)
# synchronous move of multiple servos
servo1.move_prepare(300)
servo2.move_prepare(600)
controller.move_start()
Example of controlling servos through Bus Servo Controller:
import serial
import time
from lewansoul_lx16a_controller import ServoController
logging.basicConfig(level=logging.DEBUG)
s = serial.Serial(‘/dev/tty.usbserial-00000000’, 9600, timeout=2)
c = ServoController(s, timeout=5)
print(c.get_battery_voltage())
servo1_id = 3
servo2_id = 5
print(c.get_positions([servo1_id, servo2_id]))
c.move({servo1_id: 1000, servo2_id: 500}, time=300)
time.sleep(0.3)
c.move({servo1_id: 800, servo2_id: 500}, time=2000)
time.sleep(2)
c.unload([servo1_id, servo2_id])
time.sleep(0.1) # important not to close serial connection immediately
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
lewansoul_lx16a_terminal.py – 700 lines
https://github.com/maximkulkin/lewansoul-lx16a/blob/master/lewansoul-lx16a-terminal/lewansoul_lx16a_terminal.py
#!/usr/bin/env python3
“””
Terminal GUI for LewanSoul LX-16A servos
Author: Maxim Kulkin
Website: https://github.com/maximkulkin/lewansoul-lx16a
“””
import sys
from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal
from PyQt5.QtWidgets import (QWidget, QApplication, QDialog, QMessageBox, QListWidgetItem)
from PyQt5.uic import loadUi as _loadUi
from collections import namedtuple
import pkg_resources
import logging
import serial
from serial.tools.list_ports import comports
import lewansoul_lx16a
def loadUi(path, widget):
real_path = pkg_resources.resource_filename(‘lewansoul_lx16a_terminal’, path)
_loadUi(real_path, widget)
ServoConfiguration = namedtuple(‘ServoConfiguration’, [
‘servo_id’, ‘position_limits’, ‘voltage_limits’, ‘max_temperature’, ‘position_offset’,
])
ServoState = namedtuple(‘ServoState’, [
‘servo_id’, ‘voltage’, ‘temperature’,
‘mode’, ‘position’, ‘speed’, ‘motor_on’, ‘led_on’, ‘led_errors’,
])
class ConfigureIdDialog(QDialog):
def __init__(self):
super(ConfigureIdDialog, self).__init__()
loadUi(‘resources/ConfigureId.ui’, self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def servoId(self):
return self.servoIdEdit.value()
@servoId.setter
def servoId(self, value):
self.servoIdEdit.setValue(value)
class ConfigurePositionLimitsDialog(QDialog):
def __init__(self):
super(ConfigurePositionLimitsDialog, self).__init__()
loadUi(‘resources/ConfigurePositionLimits.ui’, self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def minPosition(self):
return self.rangeMin.value()
@minPosition.setter
def minPosition(self, value):
self.rangeMin.setValue(value)
@property
def maxPosition(self):
return self.rangeMax.value()
@maxPosition.setter
def maxPosition(self, value):
self.rangeMax.setValue(value)
class ConfigureVoltageLimitsDialog(QDialog):
def __init__(self):
super(ConfigureVoltageLimitsDialog, self).__init__()
loadUi(‘resources/ConfigureVoltageLimits.ui’, self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def minVoltage(self):
return self.rangeMin.value()
@minVoltage.setter
def minVoltage(self, value):
self.rangeMin.setValue(value)
@property
def maxVoltage(self):
return self.rangeMax.value()
@maxVoltage.setter
def maxVoltage(self, value):
self.rangeMax.setValue(value)
class ConfigureMaxTemperatureDialog(QDialog):
def __init__(self):
super(ConfigureMaxTemperatureDialog, self).__init__()
loadUi(‘resources/ConfigureMaxTemperature.ui’, self)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
@property
def maxTemperature(self):
return self.maxTemperatureEdit.value()
@maxTemperature.setter
def maxTemperature(self, value):
self.maxTemperatureEdit.setValue(value)
class ConfigurePositionOffsetDialog(QDialog):
def __init__(self, servo):
super(ConfigurePositionOffsetDialog, self).__init__()
self._servo = servo
loadUi(‘resources/ConfigurePositionOffset.ui’, self)
self.positionOffsetSlider.valueChanged.connect(self._on_slider_change)
self.positionOffsetEdit.valueChanged.connect(self._on_edit_change)
self.okButton.clicked.connect(self.accept)
self.cancelButton.clicked.connect(self.reject)
def _update_servo(self, deviation):
self._servo.set_position_offset(deviation)
def _on_slider_change(self, position):
self.positionOffsetEdit.setValue(position)
self._update_servo(position)
def _on_edit_change(self, position):
self.positionOffsetSlider.setValue(position)
self._update_servo(position)
@property
def positionOffset(self):
return self.positionOffsetEdit.value()
@positionOffset.setter
def positionOffset(self, value):
self.positionOffsetEdit.setValue(value)
class ServoScanThread(QThread):
servoFound = pyqtSignal(int)
def __init__(self, controller):
super(ServoScanThread, self).__init__()
self._controller = controller
def _servo_exists(self, id):
try:
servo_id = self._controller.get_servo_id(id, timeout=0.2)
return (servo_id == id)
except TimeoutError:
return False
def run(self):
for servoId in range(1, 253):
if self.isInterruptionRequested():
break
try:
if self._servo_exists(servoId):
self.servoFound.emit(servoId)
except lewansoul_lx16a.TimeoutError:
pass
self.yieldCurrentThread()
class GetServoConfigurationThread(QThread):
servoConfigurationUpdated = pyqtSignal(ServoConfiguration)
servoConfigurationTimeout = pyqtSignal()
MAX_RETRIES = 5
def __init__(self, servo):
super(GetServoConfigurationThread, self).__init__()
self._servo = servo
@property
def servo_id(self):
return self._servo.servo_id
def run(self):
try:
retries = 0
while True:
if self.isInterruptionRequested():
return
try:
position_limits = self._servo.get_position_limits()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
while True:
if self.isInterruptionRequested():
return
try:
voltage_limits = self._servo.get_voltage_limits()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
while True:
if self.isInterruptionRequested():
return
try:
max_temperature = self._servo.get_max_temperature_limit()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
while True:
if self.isInterruptionRequested():
return
try:
position_offset = self._servo.get_position_offset()
break
except lewansoul_lx16a.TimeoutError:
retries += 1
if retries >= self.MAX_RETRIES:
raise
self.sleep(1)
self.servoConfigurationUpdated.emit(ServoConfiguration(
servo_id=self.servo_id,
position_limits=position_limits,
voltage_limits=voltage_limits,
max_temperature=max_temperature,
position_offset=position_offset,
))
except lewansoul_lx16a.TimeoutError:
self.servoConfigurationTimeout.emit()
class ServoMonitorThread(QThread):
servoStateUpdated = pyqtSignal(ServoState)
def __init__(self, servo):
super(ServoMonitorThread, self).__init__()
self._servo = servo
def run(self):
while not self.isInterruptionRequested():
try:
voltage = self._servo.get_voltage()
temperature = self._servo.get_temperature()
mode = self._servo.get_mode()
if mode == 0:
position, speed = self._servo.get_position(), 0
else:
position, speed = 0, self._servo.get_motor_speed()
motor_on = self._servo.is_motor_on()
led_on = self._servo.is_led_on()
led_errors = self._servo.get_led_errors()
self.servoStateUpdated.emit(ServoState(
servo_id=self._servo.servo_id,
voltage=voltage,
temperature=temperature,
mode=mode,
position=position,
speed=speed,
motor_on=motor_on,
led_on=led_on,
led_errors=led_errors,
))
except lewansoul_lx16a.TimeoutError:
pass
self.sleep(1)
class Terminal(QWidget):
logger = logging.getLogger(‘lewansoul.terminal’)
def __init__(self):
super(Terminal, self).__init__()
self.connection = False
self.selected_servo_id = False
self.servo = None
self._servo_initialization = False
self._available_ports = []
loadUi(‘resources/ServoTerminal.ui’, self)
self.configureIdButton.clicked.connect(self._configure_servo_id)
self.configurePositionLimitsButton.clicked.connect(self._configure_position_limits)
self.configureVoltageLimitsButton.clicked.connect(self._configure_voltage_limits)
self.configureMaxTemperatureButton.clicked.connect(self._configure_max_temperature)
self.configurePositionOffsetButton.clicked.connect(self._configure_position_offset)
self.portCombo.currentTextChanged.connect(self._on_port_change)
self.refreshPortsButton.clicked.connect(self._refresh_ports)
self.servoList.currentItemChanged.connect(lambda curItem, prevItem: self._on_servo_selected(curItem))
self.scanServosButton.clicked.connect(self._scan_servos)
self.servoOrMotorSwitch.valueChanged.connect(self._on_servo_motor_switch)
self.speedSlider.valueChanged.connect(self._on_speed_slider_change)
self.speedEdit.valueChanged.connect(self._on_speed_edit_change)
self.positionSlider.valueChanged.connect(self._on_position_slider_change)
self.positionEdit.valueChanged.connect(self._on_position_edit_change)
self.motorOnButton.clicked.connect(self._on_motor_on_button)
self.ledOnButton.clicked.connect(self._on_led_on_button)
self.clearLedErrorsButton.clicked.connect(self._on_clear_led_errors_button)
self._servoScanThread = None
self._servoReadConfigurationThread = None
self._servoStateMonitorThread = None
self.connectionGroup.setEnabled(False)
self._refresh_ports()
self._on_servo_selected(None)
self.show()
def _refresh_ports(self):
old_text = self.portCombo.currentText()
self._available_ports = sorted(comports())
self.portCombo.blockSignals(True)
self.portCombo.clear()
self.portCombo.addItem(”)
self.portCombo.addItems([port.device for port in self._available_ports])
self.portCombo.setCurrentText(old_text)
self.portCombo.blockSignals(False)
if self.portCombo.currentText() != old_text:
self._on_port_change(self.portCombo.currentIndex())
def _on_port_change(self, portIdx):
self._connect_to_port(self.portCombo.currentText())
def _connect_to_port(self, device):
if self.connection:
self.connection.close()
self.logger.info(‘Disconnected’)
self.connection = None
self.controller = None
self.connectionGroup.setEnabled(False)
self.servoList.clear()
self._on_servo_selected(None)
if device:
self.logger.info(‘Connecting to port %s’ % device)
try:
self.connection = serial.Serial(device, 115200, timeout=1)
self.controller = lewansoul_lx16a.ServoController(self.connection, timeout=1)
self.connectionGroup.setEnabled(True)
self.logger.info(‘Connected to {}’.format(device))
except serial.serialutil.SerialException as e:
self.logger.error(‘Failed to connect to port {}’.format(device))
QMessageBox.critical(self, “Connection error”, “Failed to connect to device”)
def _scan_servos(self):
if not self.controller:
return
def scanStarted():
self.servoList.clear()
self.scanServosButton.setText(‘Stop Scan’)
self.scanServosButton.setEnabled(True)
def scanFinished():
self.scanServosButton.setText(‘Scan Servos’)
self.scanServosButton.setEnabled(True)
self._servoScanThread = None
def servoFound(servoId):
item = QListWidgetItem(‘Servo ID=%s’ % servoId)
item.setData(Qt.UserRole, servoId)
self.servoList.addItem(item)
if not self._servoScanThread:
self._servoScanThread = ServoScanThread(self.controller)
self._servoScanThread.servoFound.connect(servoFound)
self._servoScanThread.started.connect(scanStarted)
self._servoScanThread.finished.connect(scanFinished)
self.scanServosButton.setEnabled(False)
if self._servoScanThread.isRunning():
self._servoScanThread.requestInterruption()
else:
self._servoScanThread.start()
def _on_servo_selected(self, item):
servo_id = None
if item is not None:
servo_id = item.data(Qt.UserRole)
if servo_id:
self.servo = self.controller.servo(servo_id)
self.servoGroup.setEnabled(True)
def servo_configuration_updated(details):
self.servoIdLabel.setText(str(details.servo_id))
self.positionLimits.setText(‘%d .. %d’ % details.position_limits)
self.voltageLimits.setText(‘%d .. %d’ % details.voltage_limits)
self.maxTemperature.setText(str(details.max_temperature))
self.positionOffset.setText(str(details.position_offset))
self._servo_initialization = True
if self._servoStateMonitorThread:
self._servoStateMonitorThread.requestInterruption()
self._servoStateMonitorThread = ServoMonitorThread(self.servo)
self._servoStateMonitorThread.servoStateUpdated.connect(self._update_servo_state)
self._servoStateMonitorThread.start()
def servo_configuration_timeout():
self._on_servo_selected(None)
QMessageBox.warning(self, “Timeout”, “Timeout reading servo data”)
if self._servoReadConfigurationThread and self._servoReadConfigurationThread.servo_id != servo_id:
self._servoReadConfigurationThread.requestInterruption()
self._servoReadConfigurationThread.wait()
self._servoReadConfigurationThread = None
if self._servoReadConfigurationThread is None:
self._servoReadConfigurationThread = GetServoConfigurationThread(self.servo)
self._servoReadConfigurationThread.servoConfigurationUpdated.connect(servo_configuration_updated)
self._servoReadConfigurationThread.servoConfigurationTimeout.connect(servo_configuration_timeout)
self._servoReadConfigurationThread.start()
else:
if self._servoStateMonitorThread:
self._servoStateMonitorThread.requestInterruption()
self._servoStateMonitorThread.wait()
self._servoStateMonitorThread = None
if self._servoReadConfigurationThread:
self._servoReadConfigurationThread.requestInterruption()
self._servoReadConfigurationThread.wait()
self._servoReadConfigurationThread = None
self.servo = None
self.servoIdLabel.setText(”)
self.positionLimits.setText(”)
self.voltageLimits.setText(”)
self.maxTemperature.setText(”)
self.currentVoltage.setText(‘Voltage:’)
self.currentTemperature.setText(‘Temperature:’)
self.currentPosition.setText(”)
self.servoGroup.setEnabled(False)
def _configure_servo_id(self):
if not self.servo:
return
dialog = ConfigureIdDialog()
dialog.servoId = self.servo.get_servo_id()
if dialog.exec_():
self.logger.info(‘Setting servo ID to %d’ % dialog.servoId)
self.servo.set_servo_id(dialog.servoId)
self.servo = self.controller.servo(dialog.servoId)
self.servoIdLabel.setText(str(dialog.servoId))
item = self.servoList.currentItem()
if item is not None:
item.setText(‘Servo ID=%d’ % dialog.servoId)
item.setData(Qt.UserRole, dialog.servoId)
def _configure_position_limits(self):
if not self.servo:
return
dialog = ConfigurePositionLimitsDialog()
dialog.minPosition, dialog.maxPosition = self.servo.get_position_limits()
if dialog.exec_():
self.logger.info(‘Setting position limits to %d..%d’ % (dialog.minPosition, dialog.maxPosition))
self.servo.set_position_limits(dialog.minPosition, dialog.maxPosition)
self.positionLimits.setText(‘%d .. %d’ % (dialog.minPosition, dialog.maxPosition))
def _configure_voltage_limits(self):
if not self.servo:
return
dialog = ConfigureVoltageLimitsDialog()
dialog.minVoltage, dialog.maxVoltage = self.servo.get_voltage_limits()
if dialog.exec_():
self.logger.info(‘Setting voltage limits to %d..%d’ % (dialog.minVoltage, dialog.maxVoltage))
self.servo.set_voltage_limits(dialog.minVoltage, dialog.maxVoltage)
self.voltageLimits.setText(‘%d .. %d’ % (dialog.minVoltage, dialog.maxVoltage))
def _configure_max_temperature(self):
if not self.servo:
return
dialog = ConfigureMaxTemperatureDialog()
dialog.maxTemperature = self.servo.get_max_temperature_limit()
if dialog.exec_():
self.logger.info(‘Setting max temperature limit to %d’ % (dialog.maxTemperature))
self.servo.set_max_temperature_limit(dialog.maxTemperature)
self.maxTemperature.setText(str(dialog.maxTemperature))
def _configure_position_offset(self):
if not self.servo:
return
dialog = ConfigurePositionOffsetDialog(self.servo)
old_position_offset = self.servo.get_position_offset()
dialog.positionOffset = old_position_offset
if dialog.exec_():
self.logger.info(‘Setting position offset limit to %d’ % (dialog.positionOffset))
self.servo.set_position_offset(dialog.positionOffset)
self.servo.save_position_offset()
self.positionOffset.setText(str(dialog.positionOffset))
else:
self.servo.set_position_offset(old_position_offset)
def _on_servo_motor_switch(self, value):
try:
if value == 0:
self.servoOrMotorModeUi.setCurrentIndex(0)
self.servo.set_servo_mode()
else:
self.servoOrMotorModeUi.setCurrentIndex(1)
self.servo.set_motor_mode()
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout changing motor mode”)
def _on_speed_slider_change(self, speed):
try:
self.speedEdit.setValue(speed)
self.logger.info(‘Setting motor speed to %d’ % speed)
self.servo.set_motor_mode(speed)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout updating motor speed”)
def _on_speed_edit_change(self, speed):
try:
self.speedSlider.setValue(speed)
self.logger.info(‘Setting motor speed to %d’ % speed)
self.servo.set_motor_mode(speed)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout updating motor speed”)
def _on_position_slider_change(self, position):
try:
self.positionEdit.setValue(position)
self.logger.info(‘Setting servo position to %d’ % position)
self.servo.move(position)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout setting servo position”)
def _on_position_edit_change(self, position):
try:
self.positionSlider.setValue(position)
self.logger.info(‘Setting servo position to %d’ % position)
self.servo.move(position)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout setting servo position”)
def _on_motor_on_button(self):
if not self.servo:
return
try:
if self.servo.is_motor_on():
if self.servo.get_mode() == 0:
self.servo.motor_off()
else:
self.servo.set_motor_mode(0)
else:
if self.servo.get_mode() == 0:
self.servo.motor_on()
else:
self.servo.set_motor_mode(self.speedSlider.value())
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout updating motor state”)
def _on_led_on_button(self):
if not self.servo:
return
try:
if self.servo.is_led_on():
self.servo.led_off()
self.ledOnButton.setChecked(False)
else:
self.servo.led_on()
self.ledOnButton.setChecked(True)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout updating LED state”)
def _on_clear_led_errors_button(self):
if not self.servo:
return
try:
self.servo.set_led_errors(0)
except lewansoul_lx16a.TimeoutError:
QMessageBox.critical(self, “Timeout”, “Timeout resetting LED errors”)
def _update_servo_state(self, servo_state):
self.currentVoltage.setText(‘Voltage: %d’ % servo_state.voltage)
self.currentTemperature.setText(‘Temperature: %d’ % servo_state.temperature)
self.motorOnButton.setChecked(servo_state.motor_on)
self.ledOnButton.setChecked(servo_state.led_on)
if servo_state.led_errors:
messages = []
if lewansoul_lx16a.SERVO_ERROR_OVER_TEMPERATURE | servo_state.led_errors:
messages.append(‘Overheating’)
if lewansoul_lx16a.SERVO_ERROR_OVER_VOLTAGE | servo_state.led_errors:
messages.append(‘Voltage is out of limits’)
if lewansoul_lx16a.SERVO_ERROR_LOCKED_ROTOR | servo_state.led_errors:
messages.append(‘Locked rotor’)
self.ledErrors.setText(‘\n’.join(messages))
self.clearLedErrorsButton.setEnabled(True)
else:
self.ledErrors.setText(”)
self.clearLedErrorsButton.setEnabled(False)
if self.servoOrMotorModeUi.currentIndex() == 0:
self.currentPosition.setText(str(servo_state.position))
if self._servo_initialization:
self.positionSlider.setValue(servo_state.position)
else:
self.currentSpeed.setText(str(servo_state.speed))
if self._servo_initialization:
self.speedSlider.setValue(servo_state.speed)
self._servo_initialization = False
def closeEvent(self, event):
if self._servoScanThread:
self._servoScanThread.requestInterruption()
self._servoScanThread.wait()
self._servoScanThread = None
if self._servoReadConfigurationThread:
self._servoReadConfigurationThread.requestInterruption()
self._servoReadConfigurationThread.wait()
self._servoReadConfigurationThread = None
if self._servoStateMonitorThread:
self._servoStateMonitorThread.requestInterruption()
self._servoStateMonitorThread.wait()
self._servoStateMonitorThread = None
if self.connection:
self.connection.close()
event.accept()
def main():
logging.basicConfig(level=logging.DEBUG)
app = QApplication(sys.argv)
terminal = Terminal()
sys.exit(app.exec_())
if __name__ == ‘__main__’:
main()
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
.END
Categories: Uncategorized