You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

575 lines
14 KiB

/*
Package for parsing devicePrs byte stream to struct.
*/
package parser
import (
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"strings"
"time"
"bytes"
"io/ioutil"
"github.com/ipsusila/opt"
"github.com/confluentinc/confluent-kafka-go/kafka"
"../model"
)
const (
statusSent = 1
statusDelivered = 2
statusError = -1
)
const (
timeLayout = "2006-01-02 15:04:05"
)
//NACK response, CRC = 565
var rNACK = []byte{0x00, 0x02, 0x64, 0x00, 0x02, 0x35}
//ACK response, CRC = 5052
var rACK = []byte{0x00, 0x02, 0x64, 0x01, 0x13, 0xBC}
//trasparent channel error
var rEmpty = []byte{0x00, 0x02, 0x64, 0x01, 0x13, 0xBC}
//tch empty
var tEmpty = []byte{0, 4, 114, 1, 0, 0, 57, 239}
//PortID for transparent channel
var PortID byte = 0x01
//auto-increment id
var iCounter int64
var sReplacer = strings.NewReplacer("\\", "\\\\", ":", "\\:", "$", "\\$")
//--------------------------------------------------------------------
//DevicePrsOptions store specific parser options
type DevicePrsOptions struct {
messageAPI string
reportAPI string
serviceTimeout time.Duration
appendIMEI bool
SetActive bool
insertQuery string
serviceClient *http.Client
tagOption *opt.Options
BrokerServer string
BrokerTopic string
DeviceName string
}
//ISOTime encapsulate timestamp for JSON parsing
type ISOTime struct {
time.Time
}
//BroadcastMessage store message information to be broadcasted
type BroadcastMessage struct {
Timestamp ISOTime `json:"ts"`
Duration int `json:"duration"`
NBuzzer int `json:"nbuzzer"`
Message string `json:"message"`
Expired ISOTime `json:"expired"`
}
//MessageStatus for specific IMEI
type MessageStatus struct {
IMEI string `json:"imei"`
Status string `json:"status"`
Timestamp string `json:"timestamp"`
}
//DevicePrsParser encapsulate parser configuration for DevicePrs device
type DevicePrsParser struct {
options *DevicePrsOptions
db *sql.DB
MaxPacketSize int
MinPacketSize int
BufferSize int
Data []byte
IMEI uint64
Error error
Command byte
//Records *model.DeviceRecords
Records DeviceRecordHeader
procdr *kafka.Producer
}
//UnmarshalJSON convert string to timestamp
func (tm *ISOTime) UnmarshalJSON(b []byte) (err error) {
//TODO time other than local
s := strings.Trim(string(b), "\"")
tm.Time, err = time.ParseInLocation(timeLayout, s, time.Local)
if err != nil {
tm.Time = time.Time{}
}
return err
}
//GetMaxPacketSize return maximum packet size for this parser
func (p *DevicePrsParser) GetMaxPacketSize() int {
return p.MaxPacketSize
}
//GetMinPacketSize return minimum valid packet size for this parser
func (p *DevicePrsParser) GetMinPacketSize() int {
return p.MinPacketSize
}
//GetBufferSize return buffer size for reading data
func (p *DevicePrsParser) GetBufferSize() int {
return p.BufferSize
}
//GetError return last error
func (p *DevicePrsParser) GetError() error {
if p.Error != nil {
return p.Error
}
return nil
}
//Payload return data associated stored in
func (p *DevicePrsParser) Payload() []byte {
//Field Packet length IMEI Command ID Payload Data CRC16
//Size (bytes) 2 8 1 [1-1009] 2
n := len(p.Data)
if n < p.MinPacketSize {
return []byte{}
}
return p.Data[11:(n - 2)]
}
//GetStream return data stream
func (p *DevicePrsParser) GetStream() []byte {
return p.Data
}
//GetIMEI return IMEI
func (p *DevicePrsParser) GetIMEI() uint64 {
return p.IMEI
}
//GetCommand return command associated to this packet
func (p *DevicePrsParser) GetCommand() byte {
return p.Command
}
func (p *DevicePrsParser) saveToDatabase() error {
imei := fmt.Sprintf("%d", p.IMEI)
dInHex := hex.EncodeToString(p.Data)
//TODO, make packet invalid if IMEI is not registered
db := p.db
if db != nil {
//TODO: save data to database
//`INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN")
//VALUES($1, $2, $3, $4, $5, $6, $7)`
now := time.Now().Local().Format(timeLayout)
query := p.options.insertQuery
result, err := db.Exec(query, imei, dInHex, false, now, "", p.options.DeviceName, len(p.Data))
if err != nil {
log.Printf("INSERT ERROR: %s -> %s; %s: %s\n", err.Error(), query, imei, dInHex)
return err
}
id, _ := result.LastInsertId()
nrows, _ := result.RowsAffected()
log.Printf("IMEI: %v, insert-id: %d, nrows: %d\n", imei, id, nrows)
} else {
//if database not defined, display packet in log
log.Printf("IMEI: %v\n Data: %v\n", imei, dInHex)
}
return nil
}
func (p *DevicePrsParser) sendToBroker() error {
imei := fmt.Sprintf("%d", p.IMEI)
//TODO, make packet invalid if IMEI is not registered
procdr := p.procdr
if procdr != nil {
dataJson, err := json.Marshal(p.Records)
if err != nil {
log.Printf("Error parse to JSON (IMEI: %v): %s\n", imei, err)
}
//log.Printf("JSON : %v\n",string(dataJson))
log.Printf("Number of records: %d (IMEI: %v)\n", p.Records.NumRec, imei)
//broker
topic := p.options.BrokerTopic
//producer channel
doneChan := make(chan bool)
go func() {
defer close(doneChan)
for e := range procdr.Events() {
switch ev := e.(type) {
case *kafka.Message:
m := ev
if m.TopicPartition.Error != nil {
log.Printf("Sent failed: %v (IMEI: %v)\n", m.TopicPartition.Error, imei)
} else {
log.Printf("Sent message to topic %s [%d] at offset %v (IMEI: %v)\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset, imei)
}
return
default:
log.Printf("Ignored event: %s (IMEI: %v)\n", ev,imei)
}
}
}()
value := string(dataJson)
procdr.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
Headers: []kafka.Header{{Key: imei, Value: []byte( p.Records.Tstamp.String() )}},
}
// wait for delivery report goroutine to finish
_ = <-doneChan
} else {
//if database not defined, display packet in log
log.Printf("Message Broker not defined, IMEI: %v\n", imei)
}
return nil
}
func (p *DevicePrsParser) getJSON(url string, target interface{}) error {
r, err := p.options.serviceClient.Get(url)
if err != nil {
return err
}
defer r.Body.Close()
return json.NewDecoder(r.Body).Decode(target)
}
func (p *DevicePrsParser) putJSON(url string, data interface{}) error {
if p.options == nil {
return errors.New("PUT error, options not defined")
}
payload, err := json.Marshal(data)
if err != nil {
return err
}
reader := bytes.NewReader(payload)
client := p.options.serviceClient
req, err := http.NewRequest("PUT", url, reader)
if err != nil {
return err
}
req.ContentLength = int64(len(payload))
req.Header.Set("Content-Type", "application/json")
response, err := client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
//read content
result, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
str := string(result)
if strings.Contains(str, "no") {
return errors.New("Failed to update message status: " + str)
}
//request code
code := response.StatusCode
if code == http.StatusOK || code == http.StatusCreated {
return nil
}
status := response.Status
return errors.New("PUT error with status = " + status)
}
func (p *DevicePrsParser) askForBroadcastMessage() (string, error) {
if p.options == nil {
return "", errors.New("GET error, options not defined")
}
//Get message from API
//TODO: can it handle many connections?
msg := &BroadcastMessage{}
//alamat internal
imei := fmt.Sprintf("%d", p.IMEI)
url := p.options.messageAPI + imei
err := p.getJSON(url, msg)
if err != nil {
return "", err
}
now := time.Now()
if msg.Expired.Before(now) {
return "", nil
}
//get buzzer count
nbeep := msg.NBuzzer
if nbeep <= 0 {
nbeep = 1
} else if nbeep > 9 {
nbeep = 9
}
dispSec := msg.Duration
if dispSec <= 0 {
dispSec = 1
} else if dispSec > 999 {
dispSec = 999
}
escapedMsg := sReplacer.Replace(msg.Message)
pesan := fmt.Sprintf(":t%d%03d%v$", nbeep, dispSec, escapedMsg)
//log message
log.Printf("IMEI = %d, message = %v\n", p.IMEI, pesan)
//go update broadcast status (mark as "sent")
go p.updateBroadcastStatus(statusSent)
//return the message
return pesan, nil
}
//Update broadcast status
func (p *DevicePrsParser) updateBroadcastStatus(status int) error {
now := time.Now()
report := &MessageStatus{
IMEI: fmt.Sprintf("%d", p.IMEI),
Timestamp: now.Format(timeLayout),
}
switch status {
case statusError:
//Do nothing
log.Printf("IMEI=%d, Message status = Error\n", p.IMEI)
case statusSent:
report.Status = "sent"
case statusDelivered:
report.Status = "delivered"
}
if len(report.Status) > 0 && p.options != nil {
url := p.options.reportAPI
if p.options.appendIMEI {
url += fmt.Sprintf("%d", p.IMEI)
}
err := p.putJSON(url, report)
if err != nil {
log.Printf("PUT request error: %v\n", err)
return err
}
log.Printf("IMEI=%v, Message status = %v\n", report.IMEI, report.Status)
}
return nil
}
//GetBroadcastMessage return message payload tobe broadcased to client
func (p *DevicePrsParser) GetBroadcastMessage() ([]byte, error) {
pesan, err := p.askForBroadcastMessage()
n := len(pesan)
if err != nil {
return nil, err
}
//construct message
npacket := n + 4
data := make([]byte, 8+n)
data[0] = byte(npacket >> 8)
data[1] = byte(npacket)
data[2] = 0x72
data[3] = 0x01 //PORT 'B' (TODO: configurable)
data[4] = 0x00
data[5] = 0x00
//setup Payload
payload := []byte(pesan)
ptr := data[6:]
copy(ptr, payload)
//calculate CRC
crc := crc16CCITT(data[2:(n + 6)])
data[n+6] = byte(crc >> 8)
data[n+7] = byte(crc)
return data, nil
}
//GetClientResponse return appropriate response to client according to packet
func (p *DevicePrsParser) GetClientResponse() []byte {
//Get response to client
//TODO other response
switch int(p.Command) {
case model.CMD_RUP_RECORD:
if p.IMEI == 0 || p.Error != nil {
return rNACK
}
return rACK
case model.CMD_RUP_TRCH:
if p.IMEI == 0 || p.Error != nil {
return tEmpty
}
//parse incoming message
tch, err := parseTchStream(p.Data, p.MinPacketSize, p.MaxPacketSize)
if err != nil {
return tEmpty
}
//ceck command type
payload := string(tch.Data)
if strings.HasPrefix(payload, "GMn") ||
strings.HasPrefix(payload, "GMi") ||
strings.HasPrefix(payload, "GMe") {
//send message if initial, none or error
data, err := p.GetBroadcastMessage()
if err != nil {
p.Error = err
return tEmpty
}
return data
} else if strings.HasPrefix(payload, "GMo") {
//ok, message delivered
go p.updateBroadcastStatus(statusDelivered)
}
//empty transparent channel
return tEmpty
}
//empty
return rEmpty
}
//GetRecords return parsed records
// func (p *DevicePrsParser) GetRecords() *model.DeviceRecords {
// if p.IMEI == 0 || p.Error != nil {
// return nil
// }
// //parse stream if not yet done
// if p.Records == nil {
// //Stream already verified when creating parser
// p.Records, p.Error = parseStream(p.Data, p.MinPacketSize, p.MaxPacketSize, true)
// if p.Error != nil {
// return nil
// }
// }
// return p.Records
// }
//ExecuteAsync perform task
func (p *DevicePrsParser) ExecuteAsync() bool {
var status bool
status = true
//TODO identify according to command
cmd := p.GetCommand()
switch cmd {
case model.CMD_RUP_RECORD:
err := p.saveToDatabase()
if err != nil {
status = false
p.Error = err
}
err = p.sendToBroker()
}
return status
}
//--------------------------------------------------------------------
//NewDevicePrsOptions create options for devicePrs
func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions {
svcOpt := options.Get("commToDevice")
defQuery := `INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN")
VALUES($1, $2, $3, $4, $5, $6, $7)`
insertQuery := svcOpt.GetString("insertQuery", defQuery)
rupOpt := &DevicePrsOptions{
messageAPI: svcOpt.GetString("message", ""),
reportAPI: svcOpt.GetString("report", ""),
serviceTimeout: time.Duration(svcOpt.GetInt("serviceTimeout", 10)) * time.Second,
appendIMEI: svcOpt.GetBool("appendIMEI", false),
SetActive: svcOpt.GetBool("setActive", false),
insertQuery: insertQuery,
tagOption: options.Get("iotag"),
BrokerServer: options.Get("messagebroker").GetString("brokerServer",""),
BrokerTopic: options.Get("messagebroker").GetString("brokerTopic",""),
DeviceName: options.Get("server").GetString("device",""),
}
//create client
rupOpt.serviceClient = &http.Client{Timeout: rupOpt.serviceTimeout}
return rupOpt
}
//NewDevicePrs create empty parser
func NewDevicePrs() *DevicePrsParser {
p := &DevicePrsParser{
MaxPacketSize: 2048,
MinPacketSize: 14,
BufferSize: 2048,
}
return p
}
//NewDevicePrsParser create new parser. Maximum packet size is 1KB
func NewDevicePrsParser(options *DevicePrsOptions, db *sql.DB, data []byte, procdr *kafka.Producer) *DevicePrsParser {
//allocate parser with maximum and minimum packet size
p := NewDevicePrs()
p.options = options
p.db = db
p.procdr = procdr
//verify stream
if data != nil {
p.IMEI, p.Command, p.Records, p.Error = verifyStream(data, p.MinPacketSize, p.MaxPacketSize, options.tagOption)
if p.Error == nil {
p.Data = data
}
} else {
p.Error = fmt.Errorf("Stream is empty")
}
return p
}
//NewDevicePrsStringParser create new parser which accept HEX string.
//Maximum packet size is 1KB i.e. maximum string length is 2KB
func NewDevicePrsStringParser(options *DevicePrsOptions, db *sql.DB, data string, procdr *kafka.Producer) *DevicePrsParser {
stream, err := hex.DecodeString(data)
p := NewDevicePrsParser(options, db, stream,procdr)
if err != nil {
p.Error = err
}
return p
}
//--------------------------------------------------------------------