|
|
|
/*
|
|
|
|
Package for parsing devicePrs byte stream to struct.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package parser
|
|
|
|
|
|
|
|
import (
|
|
|
|
"database/sql"
|
|
|
|
"encoding/hex"
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
"net/http"
|
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"bytes"
|
|
|
|
|
|
|
|
"io/ioutil"
|
|
|
|
|
|
|
|
"github.com/ipsusila/opt"
|
|
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
|
|
"../model"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
statusSent = 1
|
|
|
|
statusDelivered = 2
|
|
|
|
statusError = -1
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
timeLayout = "2006-01-02 15:04:05"
|
|
|
|
)
|
|
|
|
|
|
|
|
//NACK response, CRC = 565
|
|
|
|
var rNACK = []byte{0x00, 0x02, 0x64, 0x00, 0x02, 0x35}
|
|
|
|
|
|
|
|
//ACK response, CRC = 5052
|
|
|
|
var rACK = []byte{0x00, 0x02, 0x64, 0x01, 0x13, 0xBC}
|
|
|
|
|
|
|
|
//trasparent channel error
|
|
|
|
var rEmpty = []byte{0x00, 0x02, 0x64, 0x01, 0x13, 0xBC}
|
|
|
|
|
|
|
|
//tch empty
|
|
|
|
var tEmpty = []byte{0, 4, 114, 1, 0, 0, 57, 239}
|
|
|
|
|
|
|
|
//PortID for transparent channel
|
|
|
|
var PortID byte = 0x01
|
|
|
|
|
|
|
|
//auto-increment id
|
|
|
|
var iCounter int64
|
|
|
|
|
|
|
|
var sReplacer = strings.NewReplacer("\\", "\\\\", ":", "\\:", "$", "\\$")
|
|
|
|
|
|
|
|
//--------------------------------------------------------------------
|
|
|
|
|
|
|
|
//DevicePrsOptions store specific parser options
|
|
|
|
type DevicePrsOptions struct {
|
|
|
|
messageAPI string
|
|
|
|
reportAPI string
|
|
|
|
serviceTimeout time.Duration
|
|
|
|
appendIMEI bool
|
|
|
|
SetActive bool
|
|
|
|
insertQuery string
|
|
|
|
serviceClient *http.Client
|
|
|
|
tagOption *opt.Options
|
|
|
|
BrokerServer string
|
|
|
|
BrokerTopic string
|
|
|
|
DeviceName string
|
|
|
|
}
|
|
|
|
|
|
|
|
//ISOTime encapsulate timestamp for JSON parsing
|
|
|
|
type ISOTime struct {
|
|
|
|
time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
//BroadcastMessage store message information to be broadcasted
|
|
|
|
type BroadcastMessage struct {
|
|
|
|
Timestamp ISOTime `json:"ts"`
|
|
|
|
Duration int `json:"duration"`
|
|
|
|
NBuzzer int `json:"nbuzzer"`
|
|
|
|
Message string `json:"message"`
|
|
|
|
Expired ISOTime `json:"expired"`
|
|
|
|
}
|
|
|
|
|
|
|
|
//MessageStatus for specific IMEI
|
|
|
|
type MessageStatus struct {
|
|
|
|
IMEI string `json:"imei"`
|
|
|
|
Status string `json:"status"`
|
|
|
|
Timestamp string `json:"timestamp"`
|
|
|
|
}
|
|
|
|
|
|
|
|
//DevicePrsParser encapsulate parser configuration for DevicePrs device
|
|
|
|
type DevicePrsParser struct {
|
|
|
|
options *DevicePrsOptions
|
|
|
|
db *sql.DB
|
|
|
|
MaxPacketSize int
|
|
|
|
MinPacketSize int
|
|
|
|
BufferSize int
|
|
|
|
Data []byte
|
|
|
|
IMEI uint64
|
|
|
|
Error error
|
|
|
|
Command byte
|
|
|
|
//Records *model.DeviceRecords
|
|
|
|
Records DeviceRecordHeader
|
|
|
|
procdr *kafka.Producer
|
|
|
|
}
|
|
|
|
|
|
|
|
//UnmarshalJSON convert string to timestamp
|
|
|
|
func (tm *ISOTime) UnmarshalJSON(b []byte) (err error) {
|
|
|
|
//TODO time other than local
|
|
|
|
s := strings.Trim(string(b), "\"")
|
|
|
|
tm.Time, err = time.ParseInLocation(timeLayout, s, time.Local)
|
|
|
|
if err != nil {
|
|
|
|
tm.Time = time.Time{}
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetMaxPacketSize return maximum packet size for this parser
|
|
|
|
func (p *DevicePrsParser) GetMaxPacketSize() int {
|
|
|
|
return p.MaxPacketSize
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetMinPacketSize return minimum valid packet size for this parser
|
|
|
|
func (p *DevicePrsParser) GetMinPacketSize() int {
|
|
|
|
return p.MinPacketSize
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetBufferSize return buffer size for reading data
|
|
|
|
func (p *DevicePrsParser) GetBufferSize() int {
|
|
|
|
return p.BufferSize
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetError return last error
|
|
|
|
func (p *DevicePrsParser) GetError() error {
|
|
|
|
if p.Error != nil {
|
|
|
|
return p.Error
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
//Payload return data associated stored in
|
|
|
|
func (p *DevicePrsParser) Payload() []byte {
|
|
|
|
//Field Packet length IMEI Command ID Payload Data CRC16
|
|
|
|
//Size (bytes) 2 8 1 [1-1009] 2
|
|
|
|
n := len(p.Data)
|
|
|
|
if n < p.MinPacketSize {
|
|
|
|
return []byte{}
|
|
|
|
}
|
|
|
|
|
|
|
|
return p.Data[11:(n - 2)]
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetStream return data stream
|
|
|
|
func (p *DevicePrsParser) GetStream() []byte {
|
|
|
|
return p.Data
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetIMEI return IMEI
|
|
|
|
func (p *DevicePrsParser) GetIMEI() uint64 {
|
|
|
|
return p.IMEI
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetCommand return command associated to this packet
|
|
|
|
func (p *DevicePrsParser) GetCommand() byte {
|
|
|
|
return p.Command
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *DevicePrsParser) saveToDatabase() error {
|
|
|
|
imei := fmt.Sprintf("%d", p.IMEI)
|
|
|
|
dInHex := hex.EncodeToString(p.Data)
|
|
|
|
|
|
|
|
//TODO, make packet invalid if IMEI is not registered
|
|
|
|
db := p.db
|
|
|
|
if db != nil {
|
|
|
|
//TODO: save data to database
|
|
|
|
//`INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN")
|
|
|
|
//VALUES($1, $2, $3, $4, $5, $6, $7)`
|
|
|
|
now := time.Now().Local().Format(timeLayout)
|
|
|
|
query := p.options.insertQuery
|
|
|
|
result, err := db.Exec(query, imei, dInHex, false, now, "", p.options.DeviceName, len(p.Data))
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("INSERT ERROR: %s -> %s; %s: %s\n", err.Error(), query, imei, dInHex)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
id, _ := result.LastInsertId()
|
|
|
|
nrows, _ := result.RowsAffected()
|
|
|
|
log.Printf("IMEI: %v, insert-id: %d, nrows: %d\n", imei, id, nrows)
|
|
|
|
} else {
|
|
|
|
//if database not defined, display packet in log
|
|
|
|
log.Printf("IMEI: %v\n Data: %v\n", imei, dInHex)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *DevicePrsParser) sendToBroker() error {
|
|
|
|
imei := fmt.Sprintf("%d", p.IMEI)
|
|
|
|
|
|
|
|
//TODO, make packet invalid if IMEI is not registered
|
|
|
|
procdr := p.procdr
|
|
|
|
if procdr != nil {
|
|
|
|
dataJson, err := json.Marshal(p.Records)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error parse to JSON (IMEI: %v): %s\n", imei, err)
|
|
|
|
}
|
|
|
|
//log.Printf("JSON : %v\n",string(dataJson))
|
|
|
|
|
|
|
|
log.Printf("Number of records: %d (IMEI: %v)\n", p.Records.NumRec, imei)
|
|
|
|
|
|
|
|
//broker
|
|
|
|
topic := p.options.BrokerTopic
|
|
|
|
//producer channel
|
|
|
|
doneChan := make(chan bool)
|
|
|
|
go func() {
|
|
|
|
defer close(doneChan)
|
|
|
|
for e := range procdr.Events() {
|
|
|
|
switch ev := e.(type) {
|
|
|
|
case *kafka.Message:
|
|
|
|
m := ev
|
|
|
|
if m.TopicPartition.Error != nil {
|
|
|
|
log.Printf("Sent failed: %v (IMEI: %v)\n", m.TopicPartition.Error, imei)
|
|
|
|
} else {
|
|
|
|
log.Printf("Sent message to topic %s [%d] at offset %v (IMEI: %v)\n",
|
|
|
|
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset, imei)
|
|
|
|
}
|
|
|
|
return
|
|
|
|
|
|
|
|
default:
|
|
|
|
log.Printf("Ignored event: %s (IMEI: %v)\n", ev,imei)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
value := string(dataJson)
|
|
|
|
procdr.ProduceChannel() <- &kafka.Message{
|
|
|
|
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
|
|
|
|
Value: []byte(value),
|
|
|
|
Headers: []kafka.Header{{Key: imei, Value: []byte( p.Records.Tstamp.String() )}},
|
|
|
|
}
|
|
|
|
// wait for delivery report goroutine to finish
|
|
|
|
_ = <-doneChan
|
|
|
|
|
|
|
|
} else {
|
|
|
|
//if database not defined, display packet in log
|
|
|
|
log.Printf("Message Broker not defined, IMEI: %v\n", imei)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *DevicePrsParser) getJSON(url string, target interface{}) error {
|
|
|
|
r, err := p.options.serviceClient.Get(url)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer r.Body.Close()
|
|
|
|
|
|
|
|
return json.NewDecoder(r.Body).Decode(target)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *DevicePrsParser) putJSON(url string, data interface{}) error {
|
|
|
|
if p.options == nil {
|
|
|
|
return errors.New("PUT error, options not defined")
|
|
|
|
}
|
|
|
|
|
|
|
|
payload, err := json.Marshal(data)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
reader := bytes.NewReader(payload)
|
|
|
|
client := p.options.serviceClient
|
|
|
|
req, err := http.NewRequest("PUT", url, reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
req.ContentLength = int64(len(payload))
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
response, err := client.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer response.Body.Close()
|
|
|
|
|
|
|
|
//read content
|
|
|
|
result, err := ioutil.ReadAll(response.Body)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
str := string(result)
|
|
|
|
if strings.Contains(str, "no") {
|
|
|
|
return errors.New("Failed to update message status: " + str)
|
|
|
|
}
|
|
|
|
|
|
|
|
//request code
|
|
|
|
code := response.StatusCode
|
|
|
|
if code == http.StatusOK || code == http.StatusCreated {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
status := response.Status
|
|
|
|
return errors.New("PUT error with status = " + status)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *DevicePrsParser) askForBroadcastMessage() (string, error) {
|
|
|
|
if p.options == nil {
|
|
|
|
return "", errors.New("GET error, options not defined")
|
|
|
|
}
|
|
|
|
//Get message from API
|
|
|
|
//TODO: can it handle many connections?
|
|
|
|
msg := &BroadcastMessage{}
|
|
|
|
|
|
|
|
//alamat internal
|
|
|
|
imei := fmt.Sprintf("%d", p.IMEI)
|
|
|
|
url := p.options.messageAPI + imei
|
|
|
|
err := p.getJSON(url, msg)
|
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
now := time.Now()
|
|
|
|
if msg.Expired.Before(now) {
|
|
|
|
return "", nil
|
|
|
|
}
|
|
|
|
|
|
|
|
//get buzzer count
|
|
|
|
nbeep := msg.NBuzzer
|
|
|
|
if nbeep <= 0 {
|
|
|
|
nbeep = 1
|
|
|
|
} else if nbeep > 9 {
|
|
|
|
nbeep = 9
|
|
|
|
}
|
|
|
|
|
|
|
|
dispSec := msg.Duration
|
|
|
|
if dispSec <= 0 {
|
|
|
|
dispSec = 1
|
|
|
|
} else if dispSec > 999 {
|
|
|
|
dispSec = 999
|
|
|
|
}
|
|
|
|
|
|
|
|
escapedMsg := sReplacer.Replace(msg.Message)
|
|
|
|
pesan := fmt.Sprintf(":t%d%03d%v$", nbeep, dispSec, escapedMsg)
|
|
|
|
|
|
|
|
//log message
|
|
|
|
log.Printf("IMEI = %d, message = %v\n", p.IMEI, pesan)
|
|
|
|
|
|
|
|
//go update broadcast status (mark as "sent")
|
|
|
|
go p.updateBroadcastStatus(statusSent)
|
|
|
|
|
|
|
|
//return the message
|
|
|
|
return pesan, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
//Update broadcast status
|
|
|
|
func (p *DevicePrsParser) updateBroadcastStatus(status int) error {
|
|
|
|
now := time.Now()
|
|
|
|
report := &MessageStatus{
|
|
|
|
IMEI: fmt.Sprintf("%d", p.IMEI),
|
|
|
|
Timestamp: now.Format(timeLayout),
|
|
|
|
}
|
|
|
|
switch status {
|
|
|
|
case statusError:
|
|
|
|
//Do nothing
|
|
|
|
log.Printf("IMEI=%d, Message status = Error\n", p.IMEI)
|
|
|
|
case statusSent:
|
|
|
|
report.Status = "sent"
|
|
|
|
case statusDelivered:
|
|
|
|
report.Status = "delivered"
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(report.Status) > 0 && p.options != nil {
|
|
|
|
url := p.options.reportAPI
|
|
|
|
if p.options.appendIMEI {
|
|
|
|
url += fmt.Sprintf("%d", p.IMEI)
|
|
|
|
}
|
|
|
|
err := p.putJSON(url, report)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("PUT request error: %v\n", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("IMEI=%v, Message status = %v\n", report.IMEI, report.Status)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetBroadcastMessage return message payload tobe broadcased to client
|
|
|
|
func (p *DevicePrsParser) GetBroadcastMessage() ([]byte, error) {
|
|
|
|
pesan, err := p.askForBroadcastMessage()
|
|
|
|
n := len(pesan)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
//construct message
|
|
|
|
npacket := n + 4
|
|
|
|
data := make([]byte, 8+n)
|
|
|
|
data[0] = byte(npacket >> 8)
|
|
|
|
data[1] = byte(npacket)
|
|
|
|
data[2] = 0x72
|
|
|
|
data[3] = 0x01 //PORT 'B' (TODO: configurable)
|
|
|
|
data[4] = 0x00
|
|
|
|
data[5] = 0x00
|
|
|
|
|
|
|
|
//setup Payload
|
|
|
|
payload := []byte(pesan)
|
|
|
|
ptr := data[6:]
|
|
|
|
copy(ptr, payload)
|
|
|
|
|
|
|
|
//calculate CRC
|
|
|
|
crc := crc16CCITT(data[2:(n + 6)])
|
|
|
|
data[n+6] = byte(crc >> 8)
|
|
|
|
data[n+7] = byte(crc)
|
|
|
|
|
|
|
|
return data, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetClientResponse return appropriate response to client according to packet
|
|
|
|
func (p *DevicePrsParser) GetClientResponse() []byte {
|
|
|
|
//Get response to client
|
|
|
|
//TODO other response
|
|
|
|
switch int(p.Command) {
|
|
|
|
case model.CMD_RUP_RECORD:
|
|
|
|
if p.IMEI == 0 || p.Error != nil {
|
|
|
|
return rNACK
|
|
|
|
}
|
|
|
|
return rACK
|
|
|
|
case model.CMD_RUP_TRCH:
|
|
|
|
if p.IMEI == 0 || p.Error != nil {
|
|
|
|
return tEmpty
|
|
|
|
}
|
|
|
|
|
|
|
|
//parse incoming message
|
|
|
|
tch, err := parseTchStream(p.Data, p.MinPacketSize, p.MaxPacketSize)
|
|
|
|
if err != nil {
|
|
|
|
return tEmpty
|
|
|
|
}
|
|
|
|
|
|
|
|
//ceck command type
|
|
|
|
payload := string(tch.Data)
|
|
|
|
if strings.HasPrefix(payload, "GMn") ||
|
|
|
|
strings.HasPrefix(payload, "GMi") ||
|
|
|
|
strings.HasPrefix(payload, "GMe") {
|
|
|
|
|
|
|
|
//send message if initial, none or error
|
|
|
|
data, err := p.GetBroadcastMessage()
|
|
|
|
if err != nil {
|
|
|
|
p.Error = err
|
|
|
|
return tEmpty
|
|
|
|
}
|
|
|
|
|
|
|
|
return data
|
|
|
|
} else if strings.HasPrefix(payload, "GMo") {
|
|
|
|
//ok, message delivered
|
|
|
|
go p.updateBroadcastStatus(statusDelivered)
|
|
|
|
}
|
|
|
|
|
|
|
|
//empty transparent channel
|
|
|
|
return tEmpty
|
|
|
|
}
|
|
|
|
|
|
|
|
//empty
|
|
|
|
return rEmpty
|
|
|
|
}
|
|
|
|
|
|
|
|
//GetRecords return parsed records
|
|
|
|
// func (p *DevicePrsParser) GetRecords() *model.DeviceRecords {
|
|
|
|
// if p.IMEI == 0 || p.Error != nil {
|
|
|
|
// return nil
|
|
|
|
// }
|
|
|
|
|
|
|
|
// //parse stream if not yet done
|
|
|
|
// if p.Records == nil {
|
|
|
|
// //Stream already verified when creating parser
|
|
|
|
// p.Records, p.Error = parseStream(p.Data, p.MinPacketSize, p.MaxPacketSize, true)
|
|
|
|
// if p.Error != nil {
|
|
|
|
// return nil
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
// return p.Records
|
|
|
|
// }
|
|
|
|
|
|
|
|
//ExecuteAsync perform task
|
|
|
|
func (p *DevicePrsParser) ExecuteAsync() bool {
|
|
|
|
var status bool
|
|
|
|
status = true
|
|
|
|
//TODO identify according to command
|
|
|
|
cmd := p.GetCommand()
|
|
|
|
switch cmd {
|
|
|
|
case model.CMD_RUP_RECORD:
|
|
|
|
err := p.saveToDatabase()
|
|
|
|
if err != nil {
|
|
|
|
status = false
|
|
|
|
p.Error = err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = p.sendToBroker()
|
|
|
|
}
|
|
|
|
|
|
|
|
return status
|
|
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------------------------------------
|
|
|
|
|
|
|
|
//NewDevicePrsOptions create options for devicePrs
|
|
|
|
func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions {
|
|
|
|
svcOpt := options.Get("commToDevice")
|
|
|
|
|
|
|
|
defQuery := `INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN")
|
|
|
|
VALUES($1, $2, $3, $4, $5, $6, $7)`
|
|
|
|
insertQuery := svcOpt.GetString("insertQuery", defQuery)
|
|
|
|
rupOpt := &DevicePrsOptions{
|
|
|
|
messageAPI: svcOpt.GetString("message", ""),
|
|
|
|
reportAPI: svcOpt.GetString("report", ""),
|
|
|
|
serviceTimeout: time.Duration(svcOpt.GetInt("serviceTimeout", 10)) * time.Second,
|
|
|
|
appendIMEI: svcOpt.GetBool("appendIMEI", false),
|
|
|
|
SetActive: svcOpt.GetBool("setActive", false),
|
|
|
|
insertQuery: insertQuery,
|
|
|
|
tagOption: options.Get("iotag"),
|
|
|
|
BrokerServer: options.Get("messagebroker").GetString("brokerServer",""),
|
|
|
|
BrokerTopic: options.Get("messagebroker").GetString("brokerTopic",""),
|
|
|
|
DeviceName: options.Get("server").GetString("device",""),
|
|
|
|
}
|
|
|
|
//create client
|
|
|
|
rupOpt.serviceClient = &http.Client{Timeout: rupOpt.serviceTimeout}
|
|
|
|
|
|
|
|
return rupOpt
|
|
|
|
}
|
|
|
|
|
|
|
|
//NewDevicePrs create empty parser
|
|
|
|
func NewDevicePrs() *DevicePrsParser {
|
|
|
|
p := &DevicePrsParser{
|
|
|
|
MaxPacketSize: 2048,
|
|
|
|
MinPacketSize: 14,
|
|
|
|
BufferSize: 2048,
|
|
|
|
}
|
|
|
|
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
|
|
|
//NewDevicePrsParser create new parser. Maximum packet size is 1KB
|
|
|
|
func NewDevicePrsParser(options *DevicePrsOptions, db *sql.DB, data []byte, procdr *kafka.Producer) *DevicePrsParser {
|
|
|
|
//allocate parser with maximum and minimum packet size
|
|
|
|
p := NewDevicePrs()
|
|
|
|
p.options = options
|
|
|
|
p.db = db
|
|
|
|
p.procdr = procdr
|
|
|
|
|
|
|
|
//verify stream
|
|
|
|
if data != nil {
|
|
|
|
p.IMEI, p.Command, p.Records, p.Error = verifyStream(data, p.MinPacketSize, p.MaxPacketSize, options.tagOption)
|
|
|
|
if p.Error == nil {
|
|
|
|
p.Data = data
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
p.Error = fmt.Errorf("Stream is empty")
|
|
|
|
}
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
|
|
|
//NewDevicePrsStringParser create new parser which accept HEX string.
|
|
|
|
//Maximum packet size is 1KB i.e. maximum string length is 2KB
|
|
|
|
func NewDevicePrsStringParser(options *DevicePrsOptions, db *sql.DB, data string, procdr *kafka.Producer) *DevicePrsParser {
|
|
|
|
stream, err := hex.DecodeString(data)
|
|
|
|
p := NewDevicePrsParser(options, db, stream,procdr)
|
|
|
|
if err != nil {
|
|
|
|
p.Error = err
|
|
|
|
}
|
|
|
|
return p
|
|
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------------------------------------
|