Search code examples
python-3.xgpstwisted

Concox GT800 / Wetrack800 GPS device data decoding


I'm using twisted python tcp server

I encoded the byte string with

byte.hex()

data_1 = 78 78 11 01 0 XXXXXXXXXXXXXXX 80 00 21 21 00 00 38 54 0d0a

data_2 = 78 78 11 01 0 XXXXXXXXXXXXXXX 80 00 21 21 00 32 2a c5 0d0a

7878 -> Start bit

11 -> Packet length

01 -> Protocol Number

XX..XX -> IMEI

0d0a -> End bit

I find problem in decoding this part 8000212100322ac5 according to their documentation its a login packet and I should send some response to it.

For C# GPS Socket communication (CONCOX)

I'm in need of algorithm or a decoding method in python


Solution

  • For GT-800 this i think this will help you

    class TcpGpsReceiver(protocol.Protocol):
        """ Event driven callback handler
        """
        def __init__(self):
            """Constructor
            """
            self.imei_no = ''
    
        def gt_800(self, decoded_data):
            """ Decode the GT-800 Device Data
            """
            if decoded_data[4:6].decode() == '11': # Check if login
                print('Login')
                self.imei_no = decoded_data[9:24]
                data = codecs.decode('7878050100059FF80D0A', encoding='hex', errors='strict')
                self.transport.write(data)
            if decoded_data[4:6].decode().upper() == '0A':
                print('Heat beat received')
                serial_number = hex(int(decoded_data[18:22], 16))[2:]
                data = codecs.decode('78780513{}670E0D0A'.format(get_serial_number(serial_number)),
                                     encoding='hex', errors='strict')
                print(decoded_data[18:22], ' -> serial number')
                self.transport.write(data)
            if decoded_data[4:6].decode() == '22':
                print('Decoding')
                s_no = hex(int(decoded_data[66:70], 16))[2:]
                date_time = decoded_data[8:20]
                latitude = int(decoded_data[23:30], 16) / 1800000
                longitude = int(decoded_data[31:38], 16) / 1800000
    
                timestamp = datetime(year=int(date_time[0:2], 16),
                                     month=int(date_time[2:4], 16),
                                     day=int(date_time[4:6], 16),
                                     hour=int(date_time[6:8], 16),
                                     minute=int(date_time[8:10], 16),
                                     second=int(date_time[10:12], 16)).strftime('%Y-%m-%d %H:%M:%S')
    
                location = dict(device_id=self.imei_no.decode(), imei=self.imei_no.decode(), timestamp=timestamp,
                                location=[latitude, longitude],
                                speed=0, course=0, state=0, altitude=0)
                print(location)
                # Code to access location data
                self.imei_no = ''
                self.transport.loseConnection()
            if decoded_data[6:8].decode().upper() == '8A':
                print('Date time requested')
                s_no = hex(int(decoded_data[8:12], 16))[2:]
                now = datetime.now()
                month_hex = '0' + hex(now.month)[2:]
                print(decoded_data[12:16], hex(int(decoded_data[12:16], 16))[2:])
                date_string = '{}{}{}{}{}{}{}{}'.format(hex(int(str(now.year)[2:]))[2:], month_hex, return_date_hex(now.day),
                                                        return_date_hex(now.hour), return_date_hex(now.minute),
                                                        return_date_hex(now.second),
                                                        get_serial_number(s_no), get_serial_number(hex(int(decoded_data[12:16],
                                                                                                           16))[2:]))
                print(date_string)
                data = codecs.decode('78780B8A{}0D0A'.format(date_string), encoding='hex', errors='strict')
                self.transport.write(data)
    
        def dataReceived(self, data):
            """ Called when data is received across a transport
            """
            decoded_data = codecs.encode(data, encoding='hex', errors='strict')
            print(datetime.now())
            self.gt_800(decoded_data)
    
    def get_serial_number(s_no):
        """Decoding serial No
        """
        if len(s_no) == 1:
            return '000' + s_no
        if len(s_no) == 2:
            return '00' + s_no
        if len(s_no) == 3:
            return '0' + s_no
        if len(s_no) > 4:
            return '0000'
        return s_no
    
    def retrun_date_hex(data):
        print(data)
        if len(str(hex(data)[2:])) == 2:
            return hex(data)[2:]
        else:
            return '0' + hex(data)[2:]
    
    class TcpGpsReceiverFactory(protocol.Factory):
        """ Persistent configuration information, inherits from protocol.Factory
        """
        def buildProtocol(self, addr):
            """ Creates a protocol for each new connection
            """
            return TcpGpsReceiver()
    
    if _name_ == '_main_':
        # Called when the module ran directly
        reactor.listenTCP(10206, TcpGpsReceiverFactory())
        reactor.run()