/* Package for parsing devicePrs byte stream to struct. */ package parser import ( "database/sql" "encoding/hex" "encoding/json" "errors" "fmt" "log" "net/http" "strings" "time" "bytes" "io/ioutil" "github.com/ipsusila/opt" "github.com/confluentinc/confluent-kafka-go/kafka" "../model" ) const ( statusSent = 1 statusDelivered = 2 statusError = -1 ) const ( timeLayout = "2006-01-02 15:04:05" ) //NACK response, CRC = 565 var rNACK = []byte{0x00, 0x02, 0x64, 0x00, 0x02, 0x35} //ACK response, CRC = 5052 var rACK = []byte{0x00, 0x02, 0x64, 0x01, 0x13, 0xBC} //trasparent channel error var rEmpty = []byte{0x00, 0x02, 0x64, 0x01, 0x13, 0xBC} //tch empty var tEmpty = []byte{0, 4, 114, 1, 0, 0, 57, 239} //PortID for transparent channel var PortID byte = 0x01 //auto-increment id var iCounter int64 var sReplacer = strings.NewReplacer("\\", "\\\\", ":", "\\:", "$", "\\$") //-------------------------------------------------------------------- //DevicePrsOptions store specific parser options type DevicePrsOptions struct { messageAPI string reportAPI string serviceTimeout time.Duration appendIMEI bool SetActive bool insertQuery string serviceClient *http.Client tagOption *opt.Options BrokerServer string BrokerTopic string DeviceName string } //ISOTime encapsulate timestamp for JSON parsing type ISOTime struct { time.Time } //BroadcastMessage store message information to be broadcasted type BroadcastMessage struct { Timestamp ISOTime `json:"ts"` Duration int `json:"duration"` NBuzzer int `json:"nbuzzer"` Message string `json:"message"` Expired ISOTime `json:"expired"` } //MessageStatus for specific IMEI type MessageStatus struct { IMEI string `json:"imei"` Status string `json:"status"` Timestamp string `json:"timestamp"` } //DevicePrsParser encapsulate parser configuration for DevicePrs device type DevicePrsParser struct { options *DevicePrsOptions db *sql.DB MaxPacketSize int MinPacketSize int BufferSize int Data []byte IMEI uint64 Error error Command byte //Records *model.DeviceRecords Records DeviceRecordHeader procdr *kafka.Producer } //UnmarshalJSON convert string to timestamp func (tm *ISOTime) UnmarshalJSON(b []byte) (err error) { //TODO time other than local s := strings.Trim(string(b), "\"") tm.Time, err = time.ParseInLocation(timeLayout, s, time.Local) if err != nil { tm.Time = time.Time{} } return err } //GetMaxPacketSize return maximum packet size for this parser func (p *DevicePrsParser) GetMaxPacketSize() int { return p.MaxPacketSize } //GetMinPacketSize return minimum valid packet size for this parser func (p *DevicePrsParser) GetMinPacketSize() int { return p.MinPacketSize } //GetBufferSize return buffer size for reading data func (p *DevicePrsParser) GetBufferSize() int { return p.BufferSize } //GetError return last error func (p *DevicePrsParser) GetError() error { if p.Error != nil { return p.Error } return nil } //Payload return data associated stored in func (p *DevicePrsParser) Payload() []byte { //Field Packet length IMEI Command ID Payload Data CRC16 //Size (bytes) 2 8 1 [1-1009] 2 n := len(p.Data) if n < p.MinPacketSize { return []byte{} } return p.Data[11:(n - 2)] } //GetStream return data stream func (p *DevicePrsParser) GetStream() []byte { return p.Data } //GetIMEI return IMEI func (p *DevicePrsParser) GetIMEI() uint64 { return p.IMEI } //GetCommand return command associated to this packet func (p *DevicePrsParser) GetCommand() byte { return p.Command } func (p *DevicePrsParser) saveToDatabase() error { imei := fmt.Sprintf("%d", p.IMEI) dInHex := hex.EncodeToString(p.Data) //TODO, make packet invalid if IMEI is not registered db := p.db if db != nil { //TODO: save data to database //`INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN") //VALUES($1, $2, $3, $4, $5, $6, $7)` now := time.Now().Local().Format(timeLayout) query := p.options.insertQuery result, err := db.Exec(query, imei, dInHex, false, now, "", p.options.DeviceName, len(p.Data)) if err != nil { log.Printf("INSERT ERROR: %s -> %s; %s: %s\n", err.Error(), query, imei, dInHex) return err } id, _ := result.LastInsertId() nrows, _ := result.RowsAffected() log.Printf("IMEI: %v, insert-id: %d, nrows: %d\n", imei, id, nrows) } else { //if database not defined, display packet in log log.Printf("IMEI: %v\n Data: %v\n", imei, dInHex) } return nil } func (p *DevicePrsParser) sendToBroker() error { imei := fmt.Sprintf("%d", p.IMEI) //TODO, make packet invalid if IMEI is not registered procdr := p.procdr if procdr != nil { dataJson, err := json.Marshal(p.Records) if err != nil { log.Printf("Error parse to JSON (IMEI: %v): %s\n", imei, err) } //log.Printf("JSON : %v\n",string(dataJson)) log.Printf("Number of records: %d (IMEI: %v)\n", p.Records.NumRec, imei) //broker topic := p.options.BrokerTopic //producer channel doneChan := make(chan bool) go func() { defer close(doneChan) for e := range procdr.Events() { switch ev := e.(type) { case *kafka.Message: m := ev if m.TopicPartition.Error != nil { log.Printf("Sent failed: %v (IMEI: %v)\n", m.TopicPartition.Error, imei) } else { log.Printf("Sent message to topic %s [%d] at offset %v (IMEI: %v)\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset, imei) } return default: log.Printf("Ignored event: %s (IMEI: %v)\n", ev,imei) } } }() value := string(dataJson) procdr.ProduceChannel() <- &kafka.Message{ TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value), Headers: []kafka.Header{{Key: imei, Value: []byte( p.Records.Tstamp.String() )}}, } // wait for delivery report goroutine to finish _ = <-doneChan } else { //if database not defined, display packet in log log.Printf("Message Broker not defined, IMEI: %v\n", imei) } return nil } func (p *DevicePrsParser) getJSON(url string, target interface{}) error { r, err := p.options.serviceClient.Get(url) if err != nil { return err } defer r.Body.Close() return json.NewDecoder(r.Body).Decode(target) } func (p *DevicePrsParser) putJSON(url string, data interface{}) error { if p.options == nil { return errors.New("PUT error, options not defined") } payload, err := json.Marshal(data) if err != nil { return err } reader := bytes.NewReader(payload) client := p.options.serviceClient req, err := http.NewRequest("PUT", url, reader) if err != nil { return err } req.ContentLength = int64(len(payload)) req.Header.Set("Content-Type", "application/json") response, err := client.Do(req) if err != nil { return err } defer response.Body.Close() //read content result, err := ioutil.ReadAll(response.Body) if err != nil { return err } str := string(result) if strings.Contains(str, "no") { return errors.New("Failed to update message status: " + str) } //request code code := response.StatusCode if code == http.StatusOK || code == http.StatusCreated { return nil } status := response.Status return errors.New("PUT error with status = " + status) } func (p *DevicePrsParser) askForBroadcastMessage() (string, error) { if p.options == nil { return "", errors.New("GET error, options not defined") } //Get message from API //TODO: can it handle many connections? msg := &BroadcastMessage{} //alamat internal imei := fmt.Sprintf("%d", p.IMEI) url := p.options.messageAPI + imei err := p.getJSON(url, msg) if err != nil { return "", err } now := time.Now() if msg.Expired.Before(now) { return "", nil } //get buzzer count nbeep := msg.NBuzzer if nbeep <= 0 { nbeep = 1 } else if nbeep > 9 { nbeep = 9 } dispSec := msg.Duration if dispSec <= 0 { dispSec = 1 } else if dispSec > 999 { dispSec = 999 } escapedMsg := sReplacer.Replace(msg.Message) pesan := fmt.Sprintf(":t%d%03d%v$", nbeep, dispSec, escapedMsg) //log message log.Printf("IMEI = %d, message = %v\n", p.IMEI, pesan) //go update broadcast status (mark as "sent") go p.updateBroadcastStatus(statusSent) //return the message return pesan, nil } //Update broadcast status func (p *DevicePrsParser) updateBroadcastStatus(status int) error { now := time.Now() report := &MessageStatus{ IMEI: fmt.Sprintf("%d", p.IMEI), Timestamp: now.Format(timeLayout), } switch status { case statusError: //Do nothing log.Printf("IMEI=%d, Message status = Error\n", p.IMEI) case statusSent: report.Status = "sent" case statusDelivered: report.Status = "delivered" } if len(report.Status) > 0 && p.options != nil { url := p.options.reportAPI if p.options.appendIMEI { url += fmt.Sprintf("%d", p.IMEI) } err := p.putJSON(url, report) if err != nil { log.Printf("PUT request error: %v\n", err) return err } log.Printf("IMEI=%v, Message status = %v\n", report.IMEI, report.Status) } return nil } //GetBroadcastMessage return message payload tobe broadcased to client func (p *DevicePrsParser) GetBroadcastMessage() ([]byte, error) { pesan, err := p.askForBroadcastMessage() n := len(pesan) if err != nil { return nil, err } //construct message npacket := n + 4 data := make([]byte, 8+n) data[0] = byte(npacket >> 8) data[1] = byte(npacket) data[2] = 0x72 data[3] = 0x01 //PORT 'B' (TODO: configurable) data[4] = 0x00 data[5] = 0x00 //setup Payload payload := []byte(pesan) ptr := data[6:] copy(ptr, payload) //calculate CRC crc := crc16CCITT(data[2:(n + 6)]) data[n+6] = byte(crc >> 8) data[n+7] = byte(crc) return data, nil } //GetClientResponse return appropriate response to client according to packet func (p *DevicePrsParser) GetClientResponse() []byte { //Get response to client //TODO other response switch int(p.Command) { case model.CMD_RUP_RECORD: if p.IMEI == 0 || p.Error != nil { return rNACK } return rACK case model.CMD_RUP_TRCH: if p.IMEI == 0 || p.Error != nil { return tEmpty } //parse incoming message tch, err := parseTchStream(p.Data, p.MinPacketSize, p.MaxPacketSize) if err != nil { return tEmpty } //ceck command type payload := string(tch.Data) if strings.HasPrefix(payload, "GMn") || strings.HasPrefix(payload, "GMi") || strings.HasPrefix(payload, "GMe") { //send message if initial, none or error data, err := p.GetBroadcastMessage() if err != nil { p.Error = err return tEmpty } return data } else if strings.HasPrefix(payload, "GMo") { //ok, message delivered go p.updateBroadcastStatus(statusDelivered) } //empty transparent channel return tEmpty } //empty return rEmpty } //GetRecords return parsed records // func (p *DevicePrsParser) GetRecords() *model.DeviceRecords { // if p.IMEI == 0 || p.Error != nil { // return nil // } // //parse stream if not yet done // if p.Records == nil { // //Stream already verified when creating parser // p.Records, p.Error = parseStream(p.Data, p.MinPacketSize, p.MaxPacketSize, true) // if p.Error != nil { // return nil // } // } // return p.Records // } //ExecuteAsync perform task func (p *DevicePrsParser) ExecuteAsync() bool { var status bool status = true //TODO identify according to command cmd := p.GetCommand() switch cmd { case model.CMD_RUP_RECORD: err := p.saveToDatabase() if err != nil { status = false p.Error = err } err = p.sendToBroker() } return status } //-------------------------------------------------------------------- //NewDevicePrsOptions create options for devicePrs func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions { svcOpt := options.Get("commToDevice") defQuery := `INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN") VALUES($1, $2, $3, $4, $5, $6, $7)` insertQuery := svcOpt.GetString("insertQuery", defQuery) rupOpt := &DevicePrsOptions{ messageAPI: svcOpt.GetString("message", ""), reportAPI: svcOpt.GetString("report", ""), serviceTimeout: time.Duration(svcOpt.GetInt("serviceTimeout", 10)) * time.Second, appendIMEI: svcOpt.GetBool("appendIMEI", false), SetActive: svcOpt.GetBool("setActive", false), insertQuery: insertQuery, tagOption: options.Get("iotag"), BrokerServer: options.Get("messagebroker").GetString("brokerServer",""), BrokerTopic: options.Get("messagebroker").GetString("brokerTopic",""), DeviceName: options.Get("server").GetString("device",""), } //create client rupOpt.serviceClient = &http.Client{Timeout: rupOpt.serviceTimeout} return rupOpt } //NewDevicePrs create empty parser func NewDevicePrs() *DevicePrsParser { p := &DevicePrsParser{ MaxPacketSize: 2048, MinPacketSize: 14, BufferSize: 2048, } return p } //NewDevicePrsParser create new parser. Maximum packet size is 1KB func NewDevicePrsParser(options *DevicePrsOptions, db *sql.DB, data []byte, procdr *kafka.Producer) *DevicePrsParser { //allocate parser with maximum and minimum packet size p := NewDevicePrs() p.options = options p.db = db p.procdr = procdr //verify stream if data != nil { p.IMEI, p.Command, p.Records, p.Error = verifyStream(data, p.MinPacketSize, p.MaxPacketSize, options.tagOption) if p.Error == nil { p.Data = data } } else { p.Error = fmt.Errorf("Stream is empty") } return p } //NewDevicePrsStringParser create new parser which accept HEX string. //Maximum packet size is 1KB i.e. maximum string length is 2KB func NewDevicePrsStringParser(options *DevicePrsOptions, db *sql.DB, data string, procdr *kafka.Producer) *DevicePrsParser { stream, err := hex.DecodeString(data) p := NewDevicePrsParser(options, db, stream,procdr) if err != nil { p.Error = err } return p } //--------------------------------------------------------------------