Uncategorized

LewanSoul Arduino C++/ Rpi python Libraries v0.1

LewanSoul Arduino C++/ Rpi python Libraries v0.1

Mon. 9/21/2020

by tl fong01

8:05PM MON. 9/21/2020

LewanSoul Arduino C++/ Rpi python Libraries v0.1

https://penzu.com/p/923e5dc9


LX-16A Arduino Libraries 2020sep16

https://penzu.com/p/c1c4380d

https://www.arduinolibraries.info/libraries/lx16a-servo


Contents

1. Arduino Library 1/2

2. Kevin Harrington madhephaestus Arduino Library 2/2
https://github.com/madhephaestus

3. Maxim Kulkin maximkulkin Python Library – Git 2020aug18

https://github.com/maximkulkin/lewansoul-lx16a


Arduino Library 1/2

lx-16a.cpp

lx-16a.h

lx-16a.cpp

#pragma once
#include <Arduino.h>


#define LX16A_BROADCAST_ID 0xFE
#define LX16A_SERVO_MOVE_TIME_WRITE 1
#define LX16A_SERVO_MOVE_TIME_READ 2
#define LX16A_SERVO_MOVE_TIME_WAIT_WRITE 7
#define LX16A_SERVO_MOVE_TIME_WAIT_READ 8
#define LX16A_SERVO_MOVE_START 11
#define LX16A_SERVO_MOVE_STOP 12
#define LX16A_SERVO_ID_WRITE 13
#define LX16A_SERVO_ID_READ 14
#define LX16A_SERVO_ANGLE_OFFSET_ADJUST 17
#define LX16A_SERVO_ANGLE_OFFSET_WRITE 18
#define LX16A_SERVO_ANGLE_OFFSET_READ 19
#define LX16A_SERVO_ANGLE_LIMIT_WRITE 20
#define LX16A_SERVO_ANGLE_LIMIT_READ 21
#define LX16A_SERVO_VIN_LIMIT_WRITE 22
#define LX16A_SERVO_VIN_LIMIT_READ 23
#define LX16A_SERVO_TEMP_MAX_LIMIT_WRITE 24
#define LX16A_SERVO_TEMP_MAX_LIMIT_READ 25
#define LX16A_SERVO_TEMP_READ 26
#define LX16A_SERVO_VIN_READ 27
#define LX16A_SERVO_POS_READ 28
#define LX16A_SERVO_OR_MOTOR_MODE_WRITE 29
#define LX16A_SERVO_OR_MOTOR_MODE_READ 30
#define LX16A_SERVO_LOAD_OR_UNLOAD_WRITE 31
#define LX16A_SERVO_LOAD_OR_UNLOAD_READ 32
#define LX16A_SERVO_LED_CTRL_WRITE 33
#define LX16A_SERVO_LED_CTRL_READ 34
#define LX16A_SERVO_LED_ERROR_WRITE 35
#define LX16A_SERVO_LED_ERROR_READ 36

class LX16ABus {
private:
    bool _debug;
    int myTXFlagGPIO = -1;
    int myTXPin=-1;
    int lastCommand=0;
    void setTXFlag(int flag) {
        myTXFlagGPIO = flag;
        if (myTXFlagGPIO >= 0) {
            pinMode(myTXFlagGPIO, OUTPUT);
            digitalWrite(myTXFlagGPIO, 0);
        }
    }

public:
    bool _deepDebug = false;
    LX16ABus() :
            _debug(false) {
    }

    // debug enables/disables debug printf’s for this servo
    void debug(bool on) {
        _debug = on;

    }

    void begin(HardwareSerial * port, int tXpin, int TXFlagGPIO = -1) {
        _port = port;
        myTXPin=tXpin;
#if defined ARDUINO_ARCH_ESP32
        port->begin(_baud, SERIAL_8N1,myTXPin,myTXPin);
        pinMode(myTXPin, OUTPUT|PULLUP|OPEN_DRAIN);
#elif defined(CORE_TEENSY)
        pinMode(myTXPin, OUTPUT_OPENDRAIN);
        port->begin(_baud, SERIAL_8N1);
        port->setTX(myTXPin, true);

#else
        pinMode(myTXPin, OUTPUT);
        port->begin(_baud, SERIAL_8N1);
#endif
        delay(3);
        while (port->available())
            port->read();
        setTXFlag(TXFlagGPIO);

    }


    // time returns the number of ms to TX/RX n characters
    uint32_t time(uint8_t n) {
        return n * 10 * 1000 / _baud; // 10 bits per char
    }
    uint32_t timeus(uint8_t n) {
        return n * 10 * 1000000 / _baud; // 10 bits per char
    }
    // methods passed through to Serial
    bool available() {
        return _port->available();
    }
    int read() {
        return _port->read();
    }
    void write(const uint8_t *buf, int buflen) {

#if defined ARDUINO_ARCH_ESP32
        pinMode(myTXPin, OUTPUT);
#elif defined(CORE_TEENSY)
        _port->setTX(myTXPin, false);
#endif
//        _port->write(buf, buflen);
//        _port->flush();
        delayMicroseconds(10);
        for(int i=0;i<buflen;i++){
            _port->write(buf[i]);
            _port->flush();
        }

#if defined ARDUINO_ARCH_ESP32
        pinMode(myTXPin, OUTPUT|PULLUP|OPEN_DRAIN);
#elif defined(CORE_TEENSY)
        _port->setTX(myTXPin, true);
#endif


    }
    int retry = 3;
    void setRetryCount(int count) {
        retry = count;
    }
    // write a command with the provided parameters
    // returns true if the command was written without conflict onto the bus
    bool write_no_retry(uint8_t cmd, const uint8_t *params, int param_cnt,
            uint8_t MYID);

    // read sends a command to the servo and reads back the response into the params buffer.
    // returns true if everything checks out correctly.
    bool read_no_retry(uint8_t cmd, uint8_t *params, int param_len,
            uint8_t MYID);
    // read sends a command to the servo and reads back the response into the params buffer.
    // returns true if everything checks out correctly.
    bool rcv(uint8_t cmd, uint8_t *params, int param_len,
            uint8_t MYID);
    // write a command with the provided parameters
    // returns true if the command was written without conflict onto the bus
    bool write(uint8_t cmd, const uint8_t *params, int param_cnt,
            uint8_t MYID) {
        if (retry == 0) {
            return write_no_retry(cmd, params, param_cnt, MYID);
        } else {
            for (int i = 0; i < retry; i++) {
                bool ok = write_no_retry(cmd, params, param_cnt, MYID);
                if (ok)
                    return true;
            }
        }
        return false;
    }

    // read sends a command to the servo and reads back the response into the params buffer.
    // returns true if everything checks out correctly.
    bool read(uint8_t cmd, uint8_t *params, int param_len, uint8_t MYID) {
        if (retry == 0) {
            return read_no_retry(cmd, params, param_len, MYID);
        } else {
            for (int i = 0; i < retry; i++) {
                bool ok = read_no_retry(cmd, params, param_len, MYID);
                if (ok)
                    return true;
            }
        }
        return false;
    }

    HardwareSerial * _port = NULL;
    int _baud = 115200;
    /**
     * Command name: SERVO_LOAD_OR_UNLOAD_WRITE
     Command value: 31 Length: 4
     Parameter 1: Whether the internal motor of the servo is unloaded power-down
     or not, the range 0 or 1, 0 represents the unloading power down, and the servo
     has no torque output. 1 represents the loaded motor, then the servo has a
     torque output, the default value is 0.
     @return sucess
     */
    bool disableAll() {
        uint8_t params[] = { 0 };
        return write(LX16A_SERVO_ID_WRITE, params, 1,
        LX16A_BROADCAST_ID);
    }
    /**
     * Command name: SERVO_LOAD_OR_UNLOAD_WRITE
     Command value: 31 Length: 4
     Parameter 1: Whether the internal motor of the servo is unloaded power-down
     or not, the range 0 or 1, 0 represents the unloading power down, and the servo
     has no torque output. 1 represents the loaded motor, then the servo has a
     torque output, the default value is 0.
     @return sucess
     */
    bool enableAll() {
        uint8_t params[] = { 1 };
        return write(LX16A_SERVO_ID_WRITE, params, 1,
        LX16A_BROADCAST_ID);
    }
    /**
     * Command name: SERVO_MOVE_START Command value: 11
     Length: 3
     With the use of command SERVO_MOVE_TIME_WAIT_WRITE, described in
     point 3
     @return sucess
     */
    bool move_sync_start() {
        uint8_t params[1];
        return write(LX16A_SERVO_MOVE_START, params, 1,
        LX16A_BROADCAST_ID);
    }
    /**
     * Command name: SERVO_MOVE_STOP Command value: 12
     Length: 3
     When the command arrives at the servo, it will stop running
     This is sent to all servos at once
     */
    void stopAll() {
        uint8_t params[1];
        write(LX16A_SERVO_MOVE_STOP, params, 1,
        LX16A_BROADCAST_ID);
    }
    // id_read returns the ID of the servo, useful if the id is 0xfe, which is broadcast…
    uint8_t id_read() {
        uint8_t params[1];
        if (!read(LX16A_SERVO_ID_READ, params, 1, LX16A_BROADCAST_ID)) {
            return 0;
        }
        return params[0];

    }
    // id_write sets the id of the servo, updates the object’s id if write appears successful
    void id_write(uint8_t id) {
        uint8_t params[] = { id };
        write(LX16A_SERVO_ID_WRITE, params, 1, LX16A_BROADCAST_ID);
    }
};

class LX16AServo {
private:
    bool commandOK = true;
    int32_t lastKnownGoodPosition = 0;
    bool isMotorMode = false;
    bool isInitialized = false;
    //private:
    LX16ABus * _bus;
    uint8_t _id = LX16A_BROADCAST_ID;
    int32_t staticOffset =0;
    int32_t maxCentDegrees =240000;
    int32_t minCentDegrees =0;
public:
    LX16AServo(LX16ABus * bus, int id) :
            _bus(bus), _id(id) {

    }
    /**
     * Set the current position as a specific angle.
     *
     * This function will first read the current position, then
     * calculate an offset to be applied to all position reads and writes.
     * The offset will make convert the current position to the value passed in

     *
     * This is a function to be called when the motor hits a limit switch
     * and load in a specific value when the limit is reached
     *
     */
    void calibrate(int32_t currentAngleCentDegrees,int32_t min_angle_cent_deg, int32_t max_angle_cent_deg){
        int32_t current;
        initialize();
        do{
            current=pos_read()-staticOffset;
        }while(!isCommandOk());// this is a calibration and can not be allowed to fail
        staticOffset=currentAngleCentDegrees-current;
        readLimits();
        int32_t min_angle = (min_angle_cent_deg-staticOffset) / 24;
        int32_t max_angle = (max_angle_cent_deg-staticOffset) / 24;
        if(min_angle<0){
            //staticOffset +=-min_angle_cent_deg;

            int theoretivalMinError  =  currentAngleCentDegrees-min_angle_cent_deg;

            Serial.println(“ERROR! Minimum of servo ID”+String(_id)+” can not be below hardware limit”);

            Serial.println(“Given the offset”+String(currentAngleCentDegrees)+”\n minimum in centDegrees “+String(minCentDegrees)+” of “+String(min_angle_cent_deg));
            Serial.println(” maximum in centDegrees “+String(maxCentDegrees)+” of “+String(max_angle_cent_deg));
            Serial.println(” error in centDegrees “+String(theoretivalMinError));
            Serial.println(” current in centDegrees “+String(currentAngleCentDegrees));
            minCentDegrees= (min_angle*24)+staticOffset;
            maxCentDegrees= ((max_angle)*24)+staticOffset;
            do {
                uint8_t params[] = { (uint8_t) min_angle, (uint8_t) (min_angle
                        >> 8),

                (uint8_t) max_angle, (uint8_t) (max_angle >> 8) };
                commandOK = _bus->write(LX16A_SERVO_ANGLE_LIMIT_WRITE, params,
                        4, _id);
            } while (!isCommandOk());// this is a calibration and can not be allowed to fail
            move_time(theoretivalMinError+2,0);
            //while(1);
        }

        if(max_angle>1000){
            int theoretivalMinError  =  (max_angle_cent_deg-(24000+staticOffset));
            Serial.println(“ERROR! Maximum of servo ID”+String(_id)+” can not be above hardware limit”);
            Serial.println(“Given the offset “+String(currentAngleCentDegrees)+”\n minimum in centDegrees “+String(minCentDegrees)+” of “+String(min_angle_cent_deg));
            Serial.println(” maximum in centDegrees “+String(maxCentDegrees)+” of “+String(max_angle_cent_deg));
            Serial.println(” error in centDegrees “+String(theoretivalMinError));
            Serial.println(” current in centDegrees “+String(currentAngleCentDegrees));
            Serial.println(” range in centDegrees “+String(maxCentDegrees-minCentDegrees));
            max_angle=1000;
            min_angle=0;
            minCentDegrees= (min_angle*24)+staticOffset;
            maxCentDegrees= ((max_angle)*24)+staticOffset;
            do {
                uint8_t params[] = { (uint8_t) min_angle, (uint8_t) (min_angle
                        >> 8),

                (uint8_t) max_angle, (uint8_t) (max_angle >> 8) };
                commandOK = _bus->write(LX16A_SERVO_ANGLE_LIMIT_WRITE, params,
                        4, _id);
            } while (!isCommandOk());// this is a calibration and can not be allowed to fail

            move_time(currentAngleCentDegrees-theoretivalMinError-2,0);
            //while(1);
        }
        if(min_angle<max_angle){
            do{
                uint8_t params[] = { (uint8_t) min_angle, (uint8_t) (min_angle >> 8),
                        (uint8_t) max_angle, (uint8_t) (max_angle >> 8) };
                commandOK = _bus->write(LX16A_SERVO_ANGLE_LIMIT_WRITE, params, 4, _id);
            }while(!isCommandOk());// this is a calibration and can not be allowed to fail
        }else{
            Serial.println(“ERROR! Max of servo “+String(_id)+” must be larger than min”);
            //while(1);
        }
        minCentDegrees= (min_angle*24)+staticOffset;
        maxCentDegrees= ((max_angle)*24)+staticOffset;
    }
    int32_t getMinCentDegrees(){
        return minCentDegrees;
    }
    int32_t getMaxCentDegrees(){
        return maxCentDegrees;

    }
    bool isCommandOk() {
        return commandOK;
    }
    void initialize() {
        if (isInitialized) {
            return;
        }
        isInitialized = true;
        motor_mode(0);
        pos_read();
        readLimits();
    }

    void readLimits(){
        uint8_t params[4];
        do{
            if (!_bus->read(LX16A_SERVO_ANGLE_LIMIT_READ, params, 4, _id)) {
                commandOK = false;
            }
            commandOK = true;
            minCentDegrees= ((params[0] | ((uint16_t) params[1] << 8))*24)+staticOffset;
            maxCentDegrees= ((params[2] | ((uint16_t) params[3] << 8))*24)+staticOffset;
        }while(!isCommandOk());// this is a calibration and can not be allowed to fail
    }

    /**
     * Length: 7
     Parameter 1: lower 8 bits of angle value
     Parameter 2: higher 8 bits of angle value.range 0~100. corresponding to the
     servo angle of 0 ~ 240 °, that means the minimum angle of the servo can be
     varied is 0.24 degree.
     Parameter 3: lower 8 bits of time value
     Parameter 4: higher 8 bits of time value. the range of time is 0~30000ms.
     When the command is sent to servo, the servo will be rotated from current
     angle to parameter angle at uniform speed within param
     */
    void move_time(int32_t angle, uint16_t time) {
        initialize();
        if(angle> maxCentDegrees){
            angle=maxCentDegrees;
            Serial.println(“ERROR Capped set at max “+String(maxCentDegrees));
        }
        if(angle<minCentDegrees){
            angle=minCentDegrees;
            Serial.println(“ERROR Capped set at min “+String(minCentDegrees));
        }
        if (isMotorMode)
            motor_mode(0);
        angle = (angle-staticOffset) / 24;
        uint8_t params[] = { (uint8_t) angle, (uint8_t) (angle >> 8),
                (uint8_t) time, (uint8_t) (time >> 8) };
        commandOK = _bus->write(LX16A_SERVO_MOVE_TIME_WRITE, params, 4, _id);

    }
    /**
     * Command name: SERVO_MOVE_TIME_WAIT_WRITE
     Command value: 7
     Length : 7
     Parameter1: lower 8 bits of preset angle
     Parameter2: higher 8 bits of preset angle. range 0~100. corresponding to the
     servo angle of 0 ~ 240 °. that means the minimum angle of the servo can be
     varied is 0.24 degree.
     Parameter3: lower 8 bits of preset time
     Parameter3: higher 8 bits of preset time. the range of time is 0~30000ms.
     The function of this command is similar to this
     “SERVO_MOVE_TIME_WRITE” command in the first point. But the difference
     is that the servo will not immediately turn when the command arrives at the
     servo,the servo will be rotated from current angle to parameter angle at unifor
     m speed within parameter time until the command name SERVO_MOVE_ST
     ART sent to servo(command value of 11)
     , then the servo will be rotate
     */
    void move_time_and_wait_for_sync(int32_t angle, uint16_t time) {
        initialize();
        if(angle> maxCentDegrees){
            angle=maxCentDegrees;
            Serial.println(“ERROR Capped set at max “+String(maxCentDegrees));
        }
        if(angle<minCentDegrees){
            angle=minCentDegrees;
            Serial.println(“ERROR Capped set at min “+String(minCentDegrees));
        }
        if (isMotorMode)
            motor_mode(0);
        angle = (angle-staticOffset) / 24;
        uint8_t params[] = { (uint8_t) angle, (uint8_t) (angle >> 8),
                (uint8_t) time, (uint8_t) (time >> 8) };
        commandOK = _bus->write(LX16A_SERVO_MOVE_TIME_WAIT_WRITE, params, 4,
                _id);
//        // this write command has a packet coming back
//        commandOK = _bus->rcv(LX16A_SERVO_MOVE_TIME_WAIT_WRITE, params, 4,
//                        _id);
    }

    /**
     * Command name: SERVO_MOVE_STOP Command value: 12
     Length: 3
     When the command arrives at the servo, it will stop running
     */
    void stop() {
        uint8_t par
ams[1];
        commandOK = _bus->write(LX16A_SERVO_MOVE_STOP, params, 1, _id);
    }

    /**
     * Command name: SERVO_LOAD_OR_UNLOAD_WRITE
     Command value: 31 Length: 4
     Parameter 1: Whether the internal motor of the servo is unloaded power-down
     or not, the range 0 or 1, 0 represents the unloading power down, and the servo
     has no torque output. 1 represents the loaded motor, then the servo has a
     torque output, the default value is 0.
     */
    void disable() {
        uint8_t params[] = { 0 };
        commandOK = _bus->write(LX16A_SERVO_LOAD_OR_UNLOAD_WRITE, params, 1, _id);
    }
    /**
     * Command name: SERVO_LOAD_OR_UNLOAD_WRITE
     Command value: 31 Length: 4
     Parameter 1: Whether the internal motor of the servo is unloaded power-down
     or not, the range 0 or 1, 0 represents the unloading power down, and the servo
     has no torque output. 1 represents the loaded motor, then the servo has a
     torque output, the default value is 0.
     */
    void enable() {
        uint8_t params[] = { 1 };
        commandOK = _bus->write(LX16A_SERVO_LOAD_OR_UNLOAD_WRITE, params, 1, _id);
    }

    /**
     * Command name: SERVO_OR_MOTOR_MODE_WRITE
     Command value: 29
     Length: 7
     Parameter 1: Servo mode, range 0 or 1, 0 for position control mode, 1 for
     motor control mode, default 0,
     Parameter 2: null value
     Parameter 3: lower 8 bits of rotation speed value
     Parameter 4: higher 8 bits of rotation speed value. range -1000~1000,
     Only in the motor control mode is valid, control the motor speed, the value of
     the negative value represents the reverse, positive value represents the
     forward rotation. Write mode and speed do not support power-down save.
     Note: Since the rotation speed is the “signed short int” type of data, it is forced
     to convert the data to convert the data to “unsigned short int “type of data before sending the
     command packet.
     */
    void motor_mode(int16_t speed) {
        bool isMotorMode_tmp = speed != 0;
        uint8_t params[] = { (uint8_t) (isMotorMode_tmp ? 1 : 0), 0,
                (uint8_t) speed, (uint8_t) (speed >> 8) };
        commandOK = _bus->write(LX16A_SERVO_OR_MOTOR_MODE_WRITE, params, 4,
                _id);
        if (commandOK)
            isMotorMode = isMotorMode_tmp;
    }
    // angle_adjust sets the position angle offset in centi-degrees (-3000..3000)
    void angle_offset_save() {
        uint8_t params[1];
        _bus-> write(LX16A_SERVO_ANGLE_OFFSET_WRITE, params, 1,
                _id);
    }
    // angle_adjust sets the position angle offset in centi-degrees (-3000..3000)
    void angle_offset_adjust(int16_t angle) {
        int32_t tmp = (int32_t) angle;

        uint8_t params[] = { (uint8_t) tmp};
        commandOK = _bus->write(LX16A_SERVO_ANGLE_OFFSET_ADJUST, params, 1,
                _id);
    }
    // angle_adjust sets the position angle offset in centi-degrees (-3000..3000)
    int16_t read_angle_offset() {
        uint8_t params[1];
        if (!_bus->read(LX16A_SERVO_ANGLE_OFFSET_READ, params, 1, _id)) {
            commandOK = false;
            return 0;
        }
        commandOK = true;
        return params[0] ;
    }


    // pos_read returns the servo position in centi-degrees (0..24000)
    int32_t pos_read() {
        initialize();
        uint8_t params[2];
        if (!_bus->read(LX16A_SERVO_POS_READ, params, 2, _id)) {
            commandOK = false;
            return pos_read_cached()+staticOffset;
        }
        commandOK = true;
        lastKnownGoodPosition = ((int16_t) params[0]
                | ((int16_t) params[1] << 8)) * 24;
        return pos_read_cached()+staticOffset;
    }
    /**
     * Get the cached position from the most recent read
     */
    int32_t pos_read_cached() {
        return lastKnownGoodPosition;
    }

    // id_read returns the ID of the servo, useful if the id is 0xfe, which is broadcast…
    uint8_t id_read() {
        uint8_t params[1];
        if (!_bus->read(LX16A_SERVO_ID_READ, params, 1, LX16A_BROADCAST_ID)) {
            commandOK = false;
            return 0;
        }
        commandOK = true;
        return params[0];

    }
    // id_write sets the id of the servo, updates the object’s id if write appears successful
    void id_write(uint8_t id) {
        uint8_t params[] = { id };
        bool ok = _bus->write(LX16A_SERVO_ID_WRITE, params, 1, LX16A_BROADCAST_ID);
        if (ok && _id != LX16A_BROADCAST_ID)
            _id = id;
        commandOK = ok;
    }
    /**
     * Command name: SERVO_OR_MOTOR_MODE_READ
     Command value: 30 Length: 3
     Read the relative values of the servo, for the details of the command package
     that the servo returns to host computer, please refer to the description of Table
     4 below.
     */
    bool readIsMotorMode() {

        uint8_t params[4];
        if (!_bus->read(LX16A_SERVO_OR_MOTOR_MODE_READ, params, 4, _id)) {
            commandOK = false;
            return false;
        }
        commandOK = true;
        isMotorMode = params[0] == 1;
        return isMotorMode;
    }


    // temp_read returns the servo temperature in centigrade
    uint8_t temp() {
        uint8_t params[1];
        if (!_bus->read(LX16A_SERVO_TEMP_READ, params, 1, _id)) {
            commandOK = false;
            return 0;
        }
        commandOK = true;
        return params[0];
    }

    // vin_read returns the servo input voltage in millivolts
    uint16_t vin() {
        uint8_t params[2];
        if (!_bus->read(LX16A_SERVO_VIN_READ, params, 2, _id)) {
            commandOK = false;
            return 0;
        }
        commandOK = true;
        return params[0] | ((uint16_t) params[1] << 8);
    }

};
 


lx16a-servo.h

#include <Arduino.h>
#include <lx16a-servo.h>

// write a command with the provided parameters
// returns true if the command was written without conflict onto the bus
bool LX16ABus::write_no_retry(uint8_t cmd, const uint8_t *params, int param_cnt,
        uint8_t MYID) {
    if (param_cnt < 0 || param_cnt > 4)
        return false;

    // prepare packet in a buffer
    int buflen = 6 + param_cnt;
    uint8_t buf[buflen];
    uint8_t ret[buflen];
    buf[0] = 0x55;
    buf[1] = 0x55;
    buf[2] = MYID;
    buf[3] = buflen – 3;
    buf[4] = cmd;
    for (int i = 0; i < param_cnt; i++)
        buf[5 + i] = params[i];
    uint8_t cksum = 0;
    for (int i = 2; i < buflen – 1; i++)
        cksum += buf[i];
    buf[buflen – 1] = ~cksum;

// clear input buffer
    int junkCount = 0;
    delayMicroseconds(timeus(2));
    while (available()) {
        if (junkCount == 0) {
//            if (_debug)
//                Serial.print(“\n\t\t[ “);
        }
//        if (_debug)
//            Serial.print(” ” + String(read()) + “, “);
//        else
            read();
        delayMicroseconds(timeus(3));
        junkCount++;
    }

//    if (_debug && junkCount!=0) {
//        Serial.print(“]\n “);
//        Serial.println(
//                “\t\t Junk bytes = ” + String(junkCount) + ” id ” + String(MYID)
//                        + ” cmd ” + String(cmd) + ” last cmd “
//                        + String(lastCommand));
//    }
    lastCommand = cmd;
    // send command packet
    uint32_t t0 = millis();
    if (myTXFlagGPIO >= 0) {
        digitalWrite(myTXFlagGPIO, 1);
    }
    delayMicroseconds(timeus(1));
    write(buf, buflen);
    delayMicroseconds(timeus(1));
    if (myTXFlagGPIO >= 0) {
        digitalWrite(myTXFlagGPIO, 0);
    }
    // expect to read back command by virtue of single-pin loop-back
    uint32_t tout = time(buflen+4) + 4; // 2ms margin
    int got = 0;
    bool ok = true;
    if (_deepDebug)
        Serial.printf(“RCV: “);
    while ((got < buflen) && ((millis() – t0) < tout)) {
        if (available()) {
            ret[got] = read();
            if (ret[got] != buf[got]) {
                ok = false;
            }
            got++;
        }
    }
    if (got<buflen){
        ok = false;

    }
//    if (_debug) {
//        if (!ok) {
//            Serial.print(“\n\n\tWrote:[ “);
//            for(int i=0;i<buflen;i++){
//                Serial.print(” ” + String(buf[i]) + “, “);
//            }
//            Serial.print(“] id ” + String(MYID)
//                    + ” cmd ” + String(cmd) + ” last cmd “
//                    + String(lastCommand));
//
//            Serial.print(“\n\tGot  :[ “);
//            for(int i=0;i<got;i++){
//                Serial.print(” ” + String(ret[i]) + “, “);
//            }
//            Serial.print(“] in “+String(millis()-t0)+”\n”);
//        }
//    }

    return ok;
}
bool LX16ABus::rcv(uint8_t cmd, uint8_t *params, int param_len, uint8_t MYID) {
    // read back the expected response
    uint32_t t0 = millis();
    uint32_t tout = time(param_len + 6) + 30; // 20ms for the servo to think
    int got = 0;
    uint8_t sum = 0;
//    if (_deepDebug)
//        Serial.printf(“RCV: “);
    int len = 7; // minimum length
    while (got < len && ((millis() – t0) < tout)) {
        if (available()) {
            int ch = read();
//            if (_deepDebug)
//                Serial.printf(” 0x%02x”, ch);
            switch (got) {
            case 0:
            case 1:
                if (ch != 0x55) {
//                    if (_debug)
//                        Serial.printf(” ERR (hdr)\n”);
                    return false;
                }
                break;
            case 2:
                if (ch != MYID && MYID != 0xfe) {
//                    if (_debug)
//                        Serial.printf(” ERR (id)\n”);
                    return false;
                }
                break;
            case 3:
                if (ch < 3 || ch > 7) {
//                    if (_debug)
//                        Serial.printf(” ERR (len)\n”);
                    return false;
                }
                len = ch + 3;
                if (len > param_len + 6) {
//                    if (_debug)
//                        Serial.println(
//                                ” ERR (param_len) got ” + String(len)
//                                        + ” expected “
//                                        + String((param_len + 6)));
                    return false;
                }
                break;
            case 4:
                if (ch != cmd) {
//                    if (_debug)
//                        Serial.printf(” ERR (cmd)\n”);
                    return false;
                }
                break;
            default:
                if (got == len – 1) {
                    if ((uint8_t) ch == (uint8_t) ~sum) {
//                        if (_deepDebug)
//                            Serial.printf(” OK\n”);
                        return true;
                    } else {
//                        if (_debug)
//                            Serial.printf(” ERR (cksum!=%02x)\n”, ~sum);
                        return false;
                    }
                }
                if (got – 5 > param_len) {
//                    if (_debug)
//                        Serial.printf(” ERR (cksum)\n”);
                    return false;
                }
                params[got – 5] = ch;
            }
            if (got > 1)
                sum += ch;
            got++;
        }
    }
//    if (_debug){
//        long done = millis();
//        Serial.println(
//                “Read TIMEOUT Expected ” + String(len) + ” got ” + String(got)
//
//                        + ” on cmd: ” + String(cmd) + ” id ” + String(MYID)+
//                        ” started at “+String(t0)+” finished at “+String (done)+” took “+String(done-t0));
//    }
    return false;
}
// read sends a command to the servo and reads back the response into the params buffer.
// returns true if everything checks out correctly.
bool LX16ABus::read_no_retry(uint8_t cmd, uint8_t *params, int param_len,
        uint8_t MYID) {
    // send the read command
    bool ok = write(cmd, NULL, 0, MYID);
//    if (!ok) {
//        if (_debug)
//            Serial.println(
//                    “Command of read failed on cmd: ” + String(cmd) + ” id “
//                            + String(MYID));
//        return false;
//    }

    return rcv(cmd, params, param_len, MYID);
}
 


##############################################################################
##############################################################################

Kevin Harrington madhephaestus
https://github.com/madhephaestus

I am a roboticist who likes to engineer tools that make the process of designing 
robots easier. I build free and open source robotics development software.

##############################################################################
##############################################################################

Maxim Kulkin

maximkulkin / lewansoul-lx16a (70 lines) – Git 2020aug18
https://github.com/maximkulkin/lewansoul-lx16a

##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################

@madhephaestus
madhephaestus Moving ID read/write to the bus
Latest commit c2f8a66 on Jun 20
 History
 1 contributor
45 lines (41 sloc)  1.17 KB
  
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);

void setup() {
    servoBus.begin(&Serial1,
            1,// on TX pin 1
            2);// use pin 2 as the TX flag for buffer
    Serial.begin(115200);
    servoBus.debug(true);
    Serial.println(“Beginning Servo Example”);
}

void loop() {
    int divisor =4;
    for (int i = 0; i < 1000/divisor; i++) {
        long start = millis();
        uint16_t angle = i * 24*divisor;
        int16_t pos = 0;
        pos = servo.pos_read();
        Serial.printf(“\n\nPosition at %d -> %s\n”, pos,
                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);

        do {
            servo.move_time(angle, 10*divisor);
        } while (!servo.isCommandOk());
        Serial.printf(“Move to %d -> %s\n”, angle,
                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
        Serial.println(“Voltage = ” + String(servo.vin()));
        Serial.println(“Temp = ” + String(servo.temp()));
        Serial.println(“ID  = ” + String(servo.id_read()));
        Serial.println(“Motor Mode  = ” + String(servo.readIsMotorMode()));
        long took = millis()-start;
        long time = (10*divisor)-took;
        if(time>0)
            delay(time);
        else{
            Serial.println(“Real Time broken, took: “+String(took));
        }
    }

    servo.move_time(0, 3000);
    delay(3000);
}

##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################

@madhephaestus
madhephaestus Tested working calibration protocol
Latest commit dd164c4 on May 19
 History
 1 contributor
126 lines (111 sloc)  3.49 KB
  
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
LX16AServo servo2(&servoBus, 2);
LX16AServo servo3(&servoBus, 3);
int16_t startingAngles []= {-9000,0,9000};
bool calibrationDone = false;
#define HOME_SWITCH_PIN 32
int divisor = 3;
void setup() {
    while(!Serial);
    delay(1000);
    Serial.begin(115200);
    Serial.println(“Starting”);
    servoBus.begin(&Serial1, 1, // on TX pin 1
            2); // use pin 2 as the TX flag for buffer

    servoBus.retry = 2; // enforce synchronous real time
    //servoBus.debug(true);
    Serial.println(“Beginning Coordinated Servo Example”);
    pinMode(HOME_SWITCH_PIN, INPUT_PULLUP);
    servo.disable();
    servo2.disable();
    servo3.disable();
}
// 40ms trajectory planning loop seems the most stable
void interp(int i){
    long start = millis();
    servoBus.move_sync_start();
    int32_t pos = servo.pos_read();
    int32_t pos2 = servo2.pos_read();
    int32_t pos3 = servo3.pos_read();

    int32_t angle = (i * 9 * divisor)-4500;
    int timingOffset = millis()-start;
    servo2.move_time_and_wait_for_sync(angle+4500, 10 * divisor+timingOffset);
    servo3.move_time_and_wait_for_sync(-4500-(angle), 10 * divisor+timingOffset);
    servo.move_time_and_wait_for_sync(angle*2, 10 * divisor+timingOffset);

    //if(abs(pos2-pos)>100){
//        Serial.printf(“\n\nPosition at %d, %d  and %d-> %s\n”, pos, pos2,pos3,
//                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);

//    Serial.printf(“Move to %d -> %s\n”, angle,
//            servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
    //}
    long took = millis() – start;

    long time = (10 * divisor) – took;
    if (time > 0)
        delay(time);
    else {
        Serial.println(“Real Time broken, took: ” + String(took));
    }
}
void loop() {
    bool HOME = !digitalRead(HOME_SWITCH_PIN);
    if(!calibrationDone){

        Serial.println(“Homing pin “+String(HOME));

        if(HOME){
            Serial.println(“Calibrating 1”);
            servo.calibrate(startingAngles[0],-9000,9000);
            Serial.println(“Calibrating 2”);
            servo2.calibrate(startingAngles[1],-2512,10000);
            Serial.println(“Calibrating 3”);
            servo3.calibrate(startingAngles[2],-9000,9000);

            Serial.println(“Limits read”);
        }
        Serial.println(“Read 1”);
        int32_t pos = servo.pos_read();
        Serial.println(“Read 2”);
        int32_t pos2 = servo2.pos_read();
        Serial.println(“Read 3”);
        int32_t pos3 = servo3.pos_read();
        Serial.printf(“\n\nPosition at %d, %d and %d-> %s\n”, pos, pos2, pos3,
                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
        if(pos==startingAngles[0]&&
                pos2==startingAngles[1]&&
                pos3==startingAngles[2]&&
                HOME){
            calibrationDone=true;
            Serial.println(“Calibration Done!”);
            servo.move_time(0, 2000);
            delay(2000);
            servo2.move_time_and_wait_for_sync(0, 2000);
            servo3.move_time_and_wait_for_sync(0, 2000);
            servo.move_time_and_wait_for_sync(-9000, 2000);
            servoBus.move_sync_start();
            delay(2000);
        }
    }

    if(!calibrationDone){
        delay(50);
        return;
    }
    if(HOME){
        int32_t pos = servo.pos_read();
        int32_t         pos2 = servo2.pos_read();
        int32_t         pos3 = servo3.pos_read();
        Serial.printf(“\n\nMAX at %d, %d and %d”, servo.getMaxCentDegrees(),  servo2.getMaxCentDegrees(),  servo3.getMaxCentDegrees());
        Serial.printf(“\nPos at %d, %d and %d”, pos, pos2, pos3);
        Serial.printf(“\nmin at %d, %d and %d”, servo.getMinCentDegrees(), servo2.getMinCentDegrees(), servo3.getMinCentDegrees());
//
        delay(50);
        return;
    }

    for (int i = 0; i < 1000 / divisor; i++) {
        interp( i);
    }
    Serial.println(“Interpolated Set pos done, not long set”);

    for (int i = 1000 / divisor; i >0; i–) {
        interp( i);
    }
    Serial.println(“Loop resetting”);
}

##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################

33 lines (28 sloc)  797 Bytes
  
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
void setup() {
    servoBus.begin(&Serial1,
            1,// on TX pin 1
            2);// use pin 2 as the TX flag for buffer
    Serial.begin(115200);
}

void loop() {
    for (int i = -10; i < 10; i++) {
        int16_t pos = 0;
        pos = servo.pos_read();
        Serial.printf(“\n\nPosition at %d -> %s\n”, pos,
                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);

        int16_t angle = i * 100;


        servo.motor_mode(angle);

        Serial.printf(“Move to %d -> %s\n”, angle,
                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
        Serial.println(“Voltage = ” + String(servo.vin()));
        Serial.println(“Temp = ” + String(servo.temp()));
        Serial.println(“ID  = ” + String(servo.id_read()));
        delay(500);
    }
    servo.move_time(0, 500);
    delay(5000);
}

##############################################################################
##############################################################################
##############################################################################
##############################################################################

79 lines (67 sloc)  2.14 KB
  
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, 1);
LX16AServo servo2(&servoBus, 2);
LX16AServo servo3(&servoBus, 3);
void setup() {
    servoBus.begin(&Serial1, 1, // on TX pin 1
            2); // use pin 2 as the TX flag for buffer
    Serial.begin(115200);
    servoBus.retry = 1; // enforce synchronous real time
    servoBus.debug(true);
    Serial.println(“Beginning Coordinated Servo Example”);

}
// 40ms trajectory planning loop seems the most stable

void loop() {
    int divisor = 3;
    for (int i = 0; i < 1000 / divisor; i++) {
        long start = millis();
        int16_t pos = 0;
        pos = servo.pos_read();
        int16_t pos2 = servo2.pos_read();
        int16_t pos3 = servo3.pos_read();

        uint16_t angle = i * 24 * divisor;

        servo2.move_time_and_wait_for_sync(angle, 10 * divisor);
        servo3.move_time_and_wait_for_sync(angle, 10 * divisor);
        servo.move_time_and_wait_for_sync(angle, 10 * divisor);

        servoBus.move_sync_start();

        //if(abs(pos2-pos)>100){
        Serial.printf(“\n\nPosition at %d and %d-> %s\n”, pos, pos2,
                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
        Serial.printf(“Move to %d -> %s\n”, angle,
                servo.isCommandOk() ? “OK” : “\n\nERR!!\n\n”);
        //}
        long took = millis() – start;

        long time = (10 * divisor) – took;
        if (time > 0)
            delay(time);
        else {
            Serial.println(“Real Time broken, took: ” + String(took));
        }
    }
    Serial.println(“Interpolated Set pos done, not long set”);

    servoBus.retry = 5; // These commands must arrive
    servo.move_time(0, 10000);
    servo2.move_time(0, 10000);
    servo3.move_time(0, 10000);
    servoBus.retry = 0; // Back to low latency mode
    for (int i = 0; i < 1000 / divisor; i++) {
        long start = millis();
        int16_t pos = 0;
        pos = servo.pos_read();
        int16_t pos2 = servo2.pos_read();
        int16_t pos3 = servo3.pos_read();

        Serial.printf(“\n\nPosition at %d and %d\n”, pos, pos2);

        Serial.println(“Voltage = ” + String(servo.vin()));
        Serial.println(“Temp = ” + String(servo.temp()));

        long took = millis() – start;

        long time = (10 * divisor) – took;
        if (time > 0)
            delay(time);
        else {
            Serial.println(“Real Time broken, took: ” + String(took));
        }
    }
    Serial.println(“Loop resetting”);
}

##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################

21 lines (18 sloc)  505 Bytes
  
#include <Arduino.h>
#include <lx16a-servo.h>
LX16ABus servoBus;
LX16AServo servo(&servoBus, LX16A_BROADCAST_ID);// send these commands to any motor on the bus
int id =3;
void setup() {
    servoBus.begin(&Serial1,
            1,// on TX pin 1
            2);// use pin 2 as the TX flag for buffer
    Serial.begin(115200);
}

void loop() {
    // Set any motor plugged in to ID 3
    // this INO acts as an auto-provisioner for any motor plugged in
    servo.id_write(id);
    Serial.println(“Setting to ID “+String (id));
    delay(200);

}


##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################

maximkulkin / lewansoul-lx16a (70 lines) – Git 2020aug18
https://github.com/maximkulkin/lewansoul-lx16a

README.md

LewanSoul LX-16A servos driver & GUI
LewanSoul LX-16A is a wonderful servos although using them requires implementing custom serial protocol and you need to configure them before use (set unique IDs for all servos, configur limits, etc.). Company provides a tool for that, but it works only on Windows.

This project aims to fill the gap for non-Windows users. It consists of two parts: a servo library and a GUI tool. Servo library implements an interface to talk to servos using their protocol and GUI provides convenient way to configure them.

alt text

Installation
Just install both packages to use them:

( cd lewansoul-lx16a && python setup.py install )
( cd lewansoul-lx16a-terminal && python setup.py install )
To launch GUI use lewansoul-lx16a-terminal command.

Code example
import serial
import lewansoul_lx16a

SERIAL_PORT = ‘/dev/tty.wchusbserial1410’

controller = lewansoul_lx16a.ServoController(
    serial.Serial(SERIAL_PORT, 115200, timeout=1),
)

# control servos directly
controller.move(1, 100) # move servo ID=1 to position 100

# or through proxy objects
servo1 = controller.servo(1)
servo2 = controller.servo(2)

servo1.move(100)

# synchronous move of multiple servos
servo1.move_prepare(300)
servo2.move_prepare(600)
controller.move_start()
Example of controlling servos through Bus Servo Controller:

import serial
import time
from lewansoul_lx16a_controller import ServoController

logging.basicConfig(level=logging.DEBUG)

s = serial.Serial(‘/dev/tty.usbserial-00000000’, 9600, timeout=2)
c = ServoController(s, timeout=5)

print(c.get_battery_voltage())

servo1_id = 3
servo2_id = 5

print(c.get_positions([servo1_id, servo2_id]))

c.move({servo1_id: 1000, servo2_id: 500}, time=300)
time.sleep(0.3)
c.move({servo1_id: 800, servo2_id: 500}, time=2000)
time.sleep(2)
c.unload([servo1_id, servo2_id])
time.sleep(0.1) # important not to close serial connection immediately

##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################

lewansoul_lx16a_terminal.py – 700 lines  
https://github.com/maximkulkin/lewansoul-lx16a/blob/master/lewansoul-lx16a-terminal/lewansoul_lx16a_terminal.py
  
#!/usr/bin/env python3
“””
Terminal GUI for LewanSoul LX-16A servos
Author: Maxim Kulkin
Website: https://github.com/maximkulkin/lewansoul-lx16a
“””

import sys

from PyQt5.QtCore import Qt, QTimer, QThread, pyqtSignal
from PyQt5.QtWidgets import (QWidget, QApplication, QDialog, QMessageBox, QListWidgetItem)
from PyQt5.uic import loadUi as _loadUi

from collections import namedtuple
import pkg_resources
import logging
import serial
from serial.tools.list_ports import comports
import lewansoul_lx16a


def loadUi(path, widget):
    real_path = pkg_resources.resource_filename(‘lewansoul_lx16a_terminal’, path)
    _loadUi(real_path, widget)


ServoConfiguration = namedtuple(‘ServoConfiguration’, [
    ‘servo_id’, ‘position_limits’, ‘voltage_limits’, ‘max_temperature’, ‘position_offset’,
])

ServoState = namedtuple(‘ServoState’, [
    ‘servo_id’, ‘voltage’, ‘temperature’,
    ‘mode’, ‘position’, ‘speed’, ‘motor_on’, ‘led_on’, ‘led_errors’,
])


class ConfigureIdDialog(QDialog):
    def __init__(self):
        super(ConfigureIdDialog, self).__init__()

        loadUi(‘resources/ConfigureId.ui’, self)

        self.okButton.clicked.connect(self.accept)
        self.cancelButton.clicked.connect(self.reject)

    @property
    def servoId(self):
        return self.servoIdEdit.value()

    @servoId.setter
    def servoId(self, value):
        self.servoIdEdit.setValue(value)


class ConfigurePositionLimitsDialog(QDialog):
    def __init__(self):
        super(ConfigurePositionLimitsDialog, self).__init__()

        loadUi(‘resources/ConfigurePositionLimits.ui’, self)

        self.okButton.clicked.connect(self.accept)
        self.cancelButton.clicked.connect(self.reject)

    @property
    def minPosition(self):
        return self.rangeMin.value()

    @minPosition.setter
    def minPosition(self, value):
        self.rangeMin.setValue(value)

    @property
    def maxPosition(self):
        return self.rangeMax.value()

    @maxPosition.setter
    def maxPosition(self, value):
        self.rangeMax.setValue(value)


class ConfigureVoltageLimitsDialog(QDialog):
    def __init__(self):
        super(ConfigureVoltageLimitsDialog, self).__init__()

        loadUi(‘resources/ConfigureVoltageLimits.ui’, self)

        self.okButton.clicked.connect(self.accept)
        self.cancelButton.clicked.connect(self.reject)

    @property
    def minVoltage(self):
        return self.rangeMin.value()

    @minVoltage.setter
    def minVoltage(self, value):
        self.rangeMin.setValue(value)

    @property
    def maxVoltage(self):
        return self.rangeMax.value()

    @maxVoltage.setter
    def maxVoltage(self, value):
        self.rangeMax.setValue(value)


class ConfigureMaxTemperatureDialog(QDialog):
    def __init__(self):
        super(ConfigureMaxTemperatureDialog, self).__init__()

        loadUi(‘resources/ConfigureMaxTemperature.ui’, self)

        self.okButton.clicked.connect(self.accept)
        self.cancelButton.clicked.connect(self.reject)

    @property
    def maxTemperature(self):
        return self.maxTemperatureEdit.value()

    @maxTemperature.setter
    def maxTemperature(self, value):
        self.maxTemperatureEdit.setValue(value)


class ConfigurePositionOffsetDialog(QDialog):
    def __init__(self, servo):
        super(ConfigurePositionOffsetDialog, self).__init__()
        self._servo = servo

        loadUi(‘resources/ConfigurePositionOffset.ui’, self)

        self.positionOffsetSlider.valueChanged.connect(self._on_slider_change)
        self.positionOffsetEdit.valueChanged.connect(self._on_edit_change)

        self.okButton.clicked.connect(self.accept)
        self.cancelButton.clicked.connect(self.reject)

    def _update_servo(self, deviation):
        self._servo.set_position_offset(deviation)

    def _on_slider_change(self, position):
        self.positionOffsetEdit.setValue(position)
        self._update_servo(position)

    def _on_edit_change(self, position):
        self.positionOffsetSlider.setValue(position)
        self._update_servo(position)

    @property
    def positionOffset(self):
        return self.positionOffsetEdit.value()

    @positionOffset.setter
    def positionOffset(self, value):
        self.positionOffsetEdit.setValue(value)


class ServoScanThread(QThread):
    servoFound = pyqtSignal(int)

    def __init__(self, controller):
        super(ServoScanThread, self).__init__()
        self._controller = controller

    def _servo_exists(self, id):
        try:
            servo_id = self._controller.get_servo_id(id, timeout=0.2)
            return (servo_id == id)
        except TimeoutError:
            return False

    def run(self):
        for servoId in range(1, 253):
            if self.isInterruptionRequested():
                break

            try:
                if self._servo_exists(servoId):
                    self.servoFound.emit(servoId)
            except lewansoul_lx16a.TimeoutError:
                pass

            self.yieldCurrentThread()


class GetServoConfigurationThread(QThread):
    servoConfigurationUpdated = pyqtSignal(ServoConfiguration)
    servoConfigurationTimeout = pyqtSignal()

    MAX_RETRIES = 5

    def __init__(self, servo):
        super(GetServoConfigurationThread, self).__init__()
        self._servo = servo

    @property
    def servo_id(self):
        return self._servo.servo_id

    def run(self):
        try:
            retries = 0
            while True:
                if self.isInterruptionRequested():
                    return

                try:
                    position_limits = self._servo.get_position_limits()
                    break
                except lewansoul_lx16a.TimeoutError:
                    retries += 1
                    if retries >= self.MAX_RETRIES:
                        raise
                    self.sleep(1)

            while True:
                if self.isInterruptionRequested():
                    return

                try:
                    voltage_limits = self._servo.get_voltage_limits()
                    break
                except lewansoul_lx16a.TimeoutError:
                    retries += 1
                    if retries >= self.MAX_RETRIES:
                        raise
                    self.sleep(1)

            while True:
                if self.isInterruptionRequested():
                    return

                try:
                    max_temperature = self._servo.get_max_temperature_limit()
                    break
                except lewansoul_lx16a.TimeoutError:
                    retries += 1
                    if retries >= self.MAX_RETRIES:
                        raise
                    self.sleep(1)

            while True:
                if self.isInterruptionRequested():
                    return

                try:
                    position_offset = self._servo.get_position_offset()
                    break
                except lewansoul_lx16a.TimeoutError:
                    retries += 1
                    if retries >= self.MAX_RETRIES:
                        raise
                    self.sleep(1)

            self.servoConfigurationUpdated.emit(ServoConfiguration(
                servo_id=self.servo_id,
                position_limits=position_limits,
                voltage_limits=voltage_limits,
                max_temperature=max_temperature,
                position_offset=position_offset,
            ))
        except lewansoul_lx16a.TimeoutError:
            self.servoConfigurationTimeout.emit()


class ServoMonitorThread(QThread):
    servoStateUpdated = pyqtSignal(ServoState)

    def __init__(self, servo):
        super(ServoMonitorThread, self).__init__()
        self._servo = servo

    def run(self):
        while not self.isInterruptionRequested():
            try:
                voltage = self._servo.get_voltage()
                temperature = self._servo.get_temperature()

                mode = self._servo.get_mode()
                if mode == 0:
                    position, speed = self._servo.get_position(), 0
                else:
                    position, speed = 0, self._servo.get_motor_speed()

                motor_on = self._servo.is_motor_on()
                led_on = self._servo.is_led_on()
                led_errors = self._servo.get_led_errors()

                self.servoStateUpdated.emit(ServoState(
                    servo_id=self._servo.servo_id,
                    voltage=voltage,
                    temperature=temperature,
                    mode=mode,
                    position=position,
                    speed=speed,
                    motor_on=motor_on,
                    led_on=led_on,
                    led_errors=led_errors,
                ))
            except lewansoul_lx16a.TimeoutError:
                pass

            self.sleep(1)


class Terminal(QWidget):
    logger = logging.getLogger(‘lewansoul.terminal’)

    def __init__(self):
        super(Terminal, self).__init__()

        self.connection = False
        self.selected_servo_id = False
        self.servo = None
        self._servo_initialization = False

        self._available_ports = []

        loadUi(‘resources/ServoTerminal.ui’, self)

        self.configureIdButton.clicked.connect(self._configure_servo_id)
        self.configurePositionLimitsButton.clicked.connect(self._configure_position_limits)
        self.configureVoltageLimitsButton.clicked.connect(self._configure_voltage_limits)
        self.configureMaxTemperatureButton.clicked.connect(self._configure_max_temperature)
        self.configurePositionOffsetButton.clicked.connect(self._configure_position_offset)

        self.portCombo.currentTextChanged.connect(self._on_port_change)
        self.refreshPortsButton.clicked.connect(self._refresh_ports)
        self.servoList.currentItemChanged.connect(lambda curItem, prevItem: self._on_servo_selected(curItem))
        self.scanServosButton.clicked.connect(self._scan_servos)

        self.servoOrMotorSwitch.valueChanged.connect(self._on_servo_motor_switch)
        self.speedSlider.valueChanged.connect(self._on_speed_slider_change)
        self.speedEdit.valueChanged.connect(self._on_speed_edit_change)

        self.positionSlider.valueChanged.connect(self._on_position_slider_change)
        self.positionEdit.valueChanged.connect(self._on_position_edit_change)

        self.motorOnButton.clicked.connect(self._on_motor_on_button)
        self.ledOnButton.clicked.connect(self._on_led_on_button)
        self.clearLedErrorsButton.clicked.connect(self._on_clear_led_errors_button)

        self._servoScanThread = None
        self._servoReadConfigurationThread = None
        self._servoStateMonitorThread = None

        self.connectionGroup.setEnabled(False)

        self._refresh_ports()
        self._on_servo_selected(None)

        self.show()

    def _refresh_ports(self):
        old_text = self.portCombo.currentText()

        self._available_ports = sorted(comports())

        self.portCombo.blockSignals(True)
        self.portCombo.clear()
        self.portCombo.addItem(”)
        self.portCombo.addItems([port.device for port in self._available_ports])
        self.portCombo.setCurrentText(old_text)
        self.portCombo.blockSignals(False)

        if self.portCombo.currentText() != old_text:
            self._on_port_change(self.portCombo.currentIndex())

    def _on_port_change(self, portIdx):
        self._connect_to_port(self.portCombo.currentText())

    def _connect_to_port(self, device):
        if self.connection:
            self.connection.close()
            self.logger.info(‘Disconnected’)
            self.connection = None
            self.controller = None
            self.connectionGroup.setEnabled(False)
            self.servoList.clear()
            self._on_servo_selected(None)

        if device:
            self.logger.info(‘Connecting to port %s’ % device)
            try:
                self.connection = serial.Serial(device, 115200, timeout=1)
                self.controller = lewansoul_lx16a.ServoController(self.connection, timeout=1)
                self.connectionGroup.setEnabled(True)
                self.logger.info(‘Connected to {}’.format(device))
            except serial.serialutil.SerialException as e:
                self.logger.error(‘Failed to connect to port {}’.format(device))
                QMessageBox.critical(self, “Connection error”, “Failed to connect to device”)

    def _scan_servos(self):
        if not self.controller:
            return

        def scanStarted():
            self.servoList.clear()
            self.scanServosButton.setText(‘Stop Scan’)
            self.scanServosButton.setEnabled(True)

        def scanFinished():
            self.scanServosButton.setText(‘Scan Servos’)
            self.scanServosButton.setEnabled(True)
            self._servoScanThread = None

        def servoFound(servoId):
            item = QListWidgetItem(‘Servo ID=%s’ % servoId)
            item.setData(Qt.UserRole, servoId)
            self.servoList.addItem(item)

        if not self._servoScanThread:
            self._servoScanThread = ServoScanThread(self.controller)
            self._servoScanThread.servoFound.connect(servoFound)
            self._servoScanThread.started.connect(scanStarted)
            self._servoScanThread.finished.connect(scanFinished)

        self.scanServosButton.setEnabled(False)
        if self._servoScanThread.isRunning():
            self._servoScanThread.requestInterruption()
        else:
            self._servoScanThread.start()

    def _on_servo_selected(self, item):
        servo_id = None
        if item is not None:
            servo_id = item.data(Qt.UserRole)

        if servo_id:
            self.servo = self.controller.servo(servo_id)
            self.servoGroup.setEnabled(True)

            def servo_configuration_updated(details):
                self.servoIdLabel.setText(str(details.servo_id))
                self.positionLimits.setText(‘%d .. %d’ % details.position_limits)
                self.voltageLimits.setText(‘%d .. %d’ % details.voltage_limits)
                self.maxTemperature.setText(str(details.max_temperature))
                self.positionOffset.setText(str(details.position_offset))
                self._servo_initialization = True

                if self._servoStateMonitorThread:
                    self._servoStateMonitorThread.requestInterruption()

                self._servoStateMonitorThread = ServoMonitorThread(self.servo)
                self._servoStateMonitorThread.servoStateUpdated.connect(self._update_servo_state)
                self._servoStateMonitorThread.start()

            def servo_configuration_timeout():
                self._on_servo_selected(None)
                QMessageBox.warning(self, “Timeout”, “Timeout reading servo data”)

            if self._servoReadConfigurationThread and self._servoReadConfigurationThread.servo_id != servo_id:
                self._servoReadConfigurationThread.requestInterruption()
                self._servoReadConfigurationThread.wait()
                self._servoReadConfigurationThread = None

            if self._servoReadConfigurationThread is None:
                self._servoReadConfigurationThread = GetServoConfigurationThread(self.servo)
                self._servoReadConfigurationThread.servoConfigurationUpdated.connect(servo_configuration_updated)
                self._servoReadConfigurationThread.servoConfigurationTimeout.connect(servo_configuration_timeout)
                self._servoReadConfigurationThread.start()
        else:
            if self._servoStateMonitorThread:
                self._servoStateMonitorThread.requestInterruption()
                self._servoStateMonitorThread.wait()
                self._servoStateMonitorThread = None

            if self._servoReadConfigurationThread:
                self._servoReadConfigurationThread.requestInterruption()
                self._servoReadConfigurationThread.wait()
                self._servoReadConfigurationThread = None

            self.servo = None

            self.servoIdLabel.setText(”)
            self.positionLimits.setText(”)
            self.voltageLimits.setText(”)
            self.maxTemperature.setText(”)

            self.currentVoltage.setText(‘Voltage:’)
            self.currentTemperature.setText(‘Temperature:’)
            self.currentPosition.setText(”)

            self.servoGroup.setEnabled(False)

    def _configure_servo_id(self):
        if not self.servo:
            return

        dialog = ConfigureIdDialog()
        dialog.servoId = self.servo.get_servo_id()
        if dialog.exec_():
            self.logger.info(‘Setting servo ID to %d’ % dialog.servoId)
            self.servo.set_servo_id(dialog.servoId)
            self.servo = self.controller.servo(dialog.servoId)
            self.servoIdLabel.setText(str(dialog.servoId))
            item = self.servoList.currentItem()
            if item is not None:
                item.setText(‘Servo ID=%d’ % dialog.servoId)
                item.setData(Qt.UserRole, dialog.servoId)

    def _configure_position_limits(self):
        if not self.servo:
            return

        dialog = ConfigurePositionLimitsDialog()
        dialog.minPosition, dialog.maxPosition = self.servo.get_position_limits()
        if dialog.exec_():
            self.logger.info(‘Setting position limits to %d..%d’ % (dialog.minPosition, dialog.maxPosition))
            self.servo.set_position_limits(dialog.minPosition, dialog.maxPosition)
            self.positionLimits.setText(‘%d .. %d’ % (dialog.minPosition, dialog.maxPosition))

    def _configure_voltage_limits(self):
        if not self.servo:
            return

        dialog = ConfigureVoltageLimitsDialog()
        dialog.minVoltage, dialog.maxVoltage = self.servo.get_voltage_limits()
        if dialog.exec_():
            self.logger.info(‘Setting voltage limits to %d..%d’ % (dialog.minVoltage, dialog.maxVoltage))
            self.servo.set_voltage_limits(dialog.minVoltage, dialog.maxVoltage)
            self.voltageLimits.setText(‘%d .. %d’ % (dialog.minVoltage, dialog.maxVoltage))

    def _configure_max_temperature(self):
        if not self.servo:
            return

        dialog = ConfigureMaxTemperatureDialog()
        dialog.maxTemperature = self.servo.get_max_temperature_limit()
        if dialog.exec_():
            self.logger.info(‘Setting max temperature limit to %d’ % (dialog.maxTemperature))
            self.servo.set_max_temperature_limit(dialog.maxTemperature)
            self.maxTemperature.setText(str(dialog.maxTemperature))

    def _configure_position_offset(self):
        if not self.servo:
            return

        dialog = ConfigurePositionOffsetDialog(self.servo)
        old_position_offset = self.servo.get_position_offset()
        dialog.positionOffset = old_position_offset
        if dialog.exec_():
            self.logger.info(‘Setting position offset limit to %d’ % (dialog.positionOffset))
            self.servo.set_position_offset(dialog.positionOffset)
            self.servo.save_position_offset()
            self.positionOffset.setText(str(dialog.positionOffset))
        else:
            self.servo.set_position_offset(old_position_offset)

    def _on_servo_motor_switch(self, value):
        try:
            if value == 0:
                self.servoOrMotorModeUi.setCurrentIndex(0)
                self.servo.set_servo_mode()
            else:
                self.servoOrMotorModeUi.setCurrentIndex(1)
                self.servo.set_motor_mode()
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout changing motor mode”)

    def _on_speed_slider_change(self, speed):
        try:
            self.speedEdit.setValue(speed)
            self.logger.info(‘Setting motor speed to %d’ % speed)
            self.servo.set_motor_mode(speed)
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout updating motor speed”)

    def _on_speed_edit_change(self, speed):
        try:
            self.speedSlider.setValue(speed)
            self.logger.info(‘Setting motor speed to %d’ % speed)
            self.servo.set_motor_mode(speed)
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout updating motor speed”)

    def _on_position_slider_change(self, position):
        try:
            self.positionEdit.setValue(position)
            self.logger.info(‘Setting servo position to %d’ % position)
            self.servo.move(position)
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout setting servo position”)

    def _on_position_edit_change(self, position):
        try:
            self.positionSlider.setValue(position)
            self.logger.info(‘Setting servo position to %d’ % position)
            self.servo.move(position)
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout setting servo position”)

    def _on_motor_on_button(self):
        if not self.servo:
            return

        try:
            if self.servo.is_motor_on():
                if self.servo.get_mode() == 0:
                    self.servo.motor_off()
                else:
                    self.servo.set_motor_mode(0)
            else:
                if self.servo.get_mode() == 0:
                    self.servo.motor_on()
                else:
                    self.servo.set_motor_mode(self.speedSlider.value())
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout updating motor state”)

    def _on_led_on_button(self):
        if not self.servo:
            return

        try:
            if self.servo.is_led_on():
                self.servo.led_off()
                self.ledOnButton.setChecked(False)
            else:
                self.servo.led_on()
                self.ledOnButton.setChecked(True)
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout updating LED state”)

    def _on_clear_led_errors_button(self):
        if not self.servo:
            return

        try:
            self.servo.set_led_errors(0)
        except lewansoul_lx16a.TimeoutError:
            QMessageBox.critical(self, “Timeout”, “Timeout resetting LED errors”)

    def _update_servo_state(self, servo_state):
        self.currentVoltage.setText(‘Voltage: %d’ % servo_state.voltage)
        self.currentTemperature.setText(‘Temperature: %d’ % servo_state.temperature)
        self.motorOnButton.setChecked(servo_state.motor_on)
        self.ledOnButton.setChecked(servo_state.led_on)
        if servo_state.led_errors:
            messages = []
            if lewansoul_lx16a.SERVO_ERROR_OVER_TEMPERATURE | servo_state.led_errors:
                messages.append(‘Overheating’)
            if lewansoul_lx16a.SERVO_ERROR_OVER_VOLTAGE | servo_state.led_errors:
                messages.append(‘Voltage is out of limits’)
            if lewansoul_lx16a.SERVO_ERROR_LOCKED_ROTOR | servo_state.led_errors:
                messages.append(‘Locked rotor’)
            self.ledErrors.setText(‘\n’.join(messages))
            self.clearLedErrorsButton.setEnabled(True)
        else:
            self.ledErrors.setText(”)
            self.clearLedErrorsButton.setEnabled(False)

        if self.servoOrMotorModeUi.currentIndex() == 0:
            self.currentPosition.setText(str(servo_state.position))
            if self._servo_initialization:
                self.positionSlider.setValue(servo_state.position)
        else:
            self.currentSpeed.setText(str(servo_state.speed))
            if self._servo_initialization:
                self.speedSlider.setValue(servo_state.speed)

        self._servo_initialization = False

    def closeEvent(self, event):
        if self._servoScanThread:
            self._servoScanThread.requestInterruption()
            self._servoScanThread.wait()
            self._servoScanThread = None

        if self._servoReadConfigurationThread:
            self._servoReadConfigurationThread.requestInterruption()
            self._servoReadConfigurationThread.wait()
            self._servoReadConfigurationThread = None

        if self._servoStateMonitorThread:
            self._servoStateMonitorThread.requestInterruption()
            self._servoStateMonitorThread.wait()
            self._servoStateMonitorThread = None

        if self.connection:
            self.connection.close()

        event.accept()


def main():
    logging.basicConfig(level=logging.DEBUG)
    app = QApplication(sys.argv)
    terminal = Terminal()
    sys.exit(app.exec_())


if __name__ == ‘__main__’:
    main()

##############################################################################
##############################################################################
##############################################################################
##############################################################################
##############################################################################

.END

Categories: Uncategorized

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.