Browse Source

update ruptela

master
Angga Bayu Marthafifsa 6 years ago
parent
commit
8e78307efc
  1. 4
      parser/devicePrs.go
  2. 34
      parser/devicePrsInternal.go
  3. 35
      server/devicePrsNet.go
  4. 4
      server/ruptela_test.go

4
parser/devicePrs.go

@ -62,6 +62,7 @@ type DevicePrsOptions struct {
reportAPI string reportAPI string
serviceTimeout time.Duration serviceTimeout time.Duration
appendIMEI bool appendIMEI bool
SetActive bool
insertQuery string insertQuery string
serviceClient *http.Client serviceClient *http.Client
tagOption *opt.Options tagOption *opt.Options
@ -506,7 +507,7 @@ func (p *DevicePrsParser) ExecuteAsync() bool {
//NewDevicePrsOptions create options for devicePrs //NewDevicePrsOptions create options for devicePrs
func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions { func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions {
svcOpt := options.Get(model.DEV_RUPTELA) svcOpt := options.Get("commToDevice")
defQuery := `INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN") defQuery := `INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN")
VALUES($1, $2, $3, $4, $5, $6, $7)` VALUES($1, $2, $3, $4, $5, $6, $7)`
@ -516,6 +517,7 @@ func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions {
reportAPI: svcOpt.GetString("report", ""), reportAPI: svcOpt.GetString("report", ""),
serviceTimeout: time.Duration(svcOpt.GetInt("serviceTimeout", 10)) * time.Second, serviceTimeout: time.Duration(svcOpt.GetInt("serviceTimeout", 10)) * time.Second,
appendIMEI: svcOpt.GetBool("appendIMEI", false), appendIMEI: svcOpt.GetBool("appendIMEI", false),
SetActive: svcOpt.GetBool("setActive", false),
insertQuery: insertQuery, insertQuery: insertQuery,
tagOption: options.Get("iotag"), tagOption: options.Get("iotag"),
BrokerServer: options.Get("messagebroker").GetString("brokerServer",""), BrokerServer: options.Get("messagebroker").GetString("brokerServer",""),

34
parser/devicePrsInternal.go

@ -94,6 +94,8 @@ func verifyStream(data []byte, minSize int, maxSize int, tagOption *opt.Options)
//extract command //extract command
cmd = data[10] cmd = data[10]
fmt.Printf("cmd %v\n",cmd)
return imei, cmd, rec, nil return imei, cmd, rec, nil
} }
@ -127,6 +129,7 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco
deviceRecords := make([]DeviceRecord, 0) deviceRecords := make([]DeviceRecord, 0)
for j := 0; j < int(numRec); j++ { for j := 0; j < int(numRec); j++ {
var deviceRecord DeviceRecord var deviceRecord DeviceRecord
tags_ := tags
//imei //imei
deviceRecord.Imei = imei deviceRecord.Imei = imei
@ -216,11 +219,13 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco
ioVal := convBinaryToUint16(addOneByteInTwoByte(data[currByte:plusByte]),2,"io1Val") ioVal := convBinaryToUint16(addOneByteInTwoByte(data[currByte:plusByte]),2,"io1Val")
//fmt.Printf("io1Val %d\n",ioVal) //fmt.Printf("io1Val %d\n",ioVal)
tagDevice_ := tags[strconv.Itoa(int(ioID))] tagDevice_ := tags_[strconv.Itoa(int(ioID))]
tagDevice_.TagId = strconv.Itoa(int(ioID)) tagDevice_.TagId = strconv.Itoa(int(ioID))
tagDevice_.TagVal = strconv.Itoa(int(ioVal)) tagDevice_.TagVal = strconv.Itoa(int(ioVal))
if tagDevice_.TagName != ""{ if tagDevice_.TagName != ""{
tags[strconv.Itoa(int(ioID))] = tagDevice_ //tags_[strconv.Itoa(int(ioID))] = tagDevice_
tags_[tagDevice_.TagName] = tagDevice_
delete(tags_, strconv.Itoa(int(ioID)))
} }
} }
@ -245,11 +250,10 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco
ioVal := convBinaryToUint16(data[currByte:plusByte],2,"io2Val") ioVal := convBinaryToUint16(data[currByte:plusByte],2,"io2Val")
//fmt.Printf("io2Val %d\n",ioVal) //fmt.Printf("io2Val %d\n",ioVal)
tagDevice_ := tags[strconv.Itoa(int(ioID))] tagDevice_ := tags_[strconv.Itoa(int(ioID))]
tagDevice_.TagId = strconv.Itoa(int(ioID))
tagDevice_.TagVal = strconv.Itoa(int(ioVal)) tagDevice_.TagVal = strconv.Itoa(int(ioVal))
if tagDevice_.TagName != ""{ if tagDevice_.TagName != ""{
tags[strconv.Itoa(int(ioID))] = tagDevice_ tags_[tagDevice_.TagName] = tagDevice_
} }
} }
@ -273,11 +277,10 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco
ioVal := convBinaryToInt32(data[currByte:plusByte],4,"io4Val") ioVal := convBinaryToInt32(data[currByte:plusByte],4,"io4Val")
//fmt.Printf("io4Val %d\n",ioVal) //fmt.Printf("io4Val %d\n",ioVal)
tagDevice_ := tags[strconv.Itoa(int(ioID))] tagDevice_ := tags_[strconv.Itoa(int(ioID))]
tagDevice_.TagId = strconv.Itoa(int(ioID))
tagDevice_.TagVal = strconv.FormatInt(int64(ioVal), 10) tagDevice_.TagVal = strconv.FormatInt(int64(ioVal), 10)
if tagDevice_.TagName != ""{ if tagDevice_.TagName != ""{
tags[strconv.Itoa(int(ioID))] = tagDevice_ tags_[tagDevice_.TagName] = tagDevice_
} }
} }
@ -301,15 +304,22 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco
ioVal := convBinaryToInt64(data[currByte:plusByte],8,"io8Val") ioVal := convBinaryToInt64(data[currByte:plusByte],8,"io8Val")
//fmt.Printf("io8Val %d\n",ioVal) //fmt.Printf("io8Val %d\n",ioVal)
tagDevice_ := tags[strconv.Itoa(int(ioID))] tagDevice_ := tags_[strconv.Itoa(int(ioID))]
tagDevice_.TagId = strconv.Itoa(int(ioID))
tagDevice_.TagVal = strconv.FormatInt(int64(ioVal), 10) tagDevice_.TagVal = strconv.FormatInt(int64(ioVal), 10)
if tagDevice_.TagName != ""{ if tagDevice_.TagName != ""{
tags[strconv.Itoa(int(ioID))] = tagDevice_ tags_[tagDevice_.TagName] = tagDevice_
} }
} }
deviceRecord.TagDevices = tags //Delete unset tags value
for tagKey := range tags {
tagDevice_ := tags_[tagKey]
if tagDevice_.TagId == tagKey {
delete(tags_, tagKey)
}
}
deviceRecord.TagDevices = tags_
deviceRecords = append(deviceRecords, deviceRecord) deviceRecords = append(deviceRecords, deviceRecord)
} }

35
server/devicePrsNet.go

@ -23,7 +23,7 @@ import (
"github.com/gansidui/gotcp" "github.com/gansidui/gotcp"
"github.com/ipsusila/opt" "github.com/ipsusila/opt"
"github.com/confluentinc/confluent-kafka-go/kafka" "github.com/confluentinc/confluent-kafka-go/kafka"
//"../model" "../model"
"../parser" "../parser"
) )
@ -252,21 +252,28 @@ func (cb *DevicePrsServerCallback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool
statusInsert := par.ExecuteAsync() statusInsert := par.ExecuteAsync()
if(statusInsert){ if(statusInsert){
// send response to client (ACK or NACK) if par.GetCommand() == model.CMD_RUP_RECORD {
rep := par.GetClientResponse() // send response to client (ACK or NACK)
c.AsyncWritePacket(NewDevicePrsServerPacket(rep), cb.writeTimeout) rep := par.GetClientResponse()
log.Printf("Sent ACK (client: %v, IMEI: %v)\n", addr, imei_) c.AsyncWritePacket(NewDevicePrsServerPacket(rep), cb.writeTimeout)
log.Printf("Sent ACK (client: %v, IMEI: %v)\n", addr, imei_)
}
} }
log.Printf("par.GetCommand() = %v\n", par.GetCommand())
//send message if record command //send message if record command
//if par.GetCommand() == model.CMD_RUP_RECORD { if cb.options.SetActive == true {
// msg, err := par.GetBroadcastMessage() if par.GetCommand() == model.CMD_RUP_TRCH {
// if len(msg) > 0 { msg, err := par.GetBroadcastMessage()
// c.AsyncWritePacket(NewRuptelaServerPacket(msg), cb.writeTimeout) if len(msg) > 0 {
// } else if err != nil { c.AsyncWritePacket(NewDevicePrsServerPacket(msg), cb.writeTimeout)
// log.Printf("Get message error(client: %v, IMEI: %v) = %v\n", addr, par.IMEI,err) log.Printf("Sent MSG (client: %v, IMEI: %v)\n", addr, imei_)
// } } else if err != nil {
//} log.Printf("Get message error(client: %v, IMEI: %v) = %v\n", addr, imei_,err)
}
}
}
//run parallel processing to handle packet parsing //run parallel processing to handle packet parsing
//go par.ExecuteAsync() //go par.ExecuteAsync()

4
server/ruptela_test.go

@ -50,7 +50,7 @@ func sendData(conf *model.ServerConfig, data []byte, dur chan<- int64) error {
} }
func TestInvalid(t *testing.T) { func TestInvalid(t *testing.T) {
//config //config
conf := model.ServerConfig{IP: "127.0.0.1", Port: 8095} conf := model.ServerConfig{IP: "127.0.0.1", Port: 4000}
//test failed data //test failed data
dur := make(chan int64, 1) dur := make(chan int64, 1)
@ -89,7 +89,7 @@ func TestRuptelaReceiver(t *testing.T) {
"0011000315A07F44865A0E01000053EA01DF65AD6D") "0011000315A07F44865A0E01000053EA01DF65AD6D")
const NTEST = 1 const NTEST = 1
conf := model.ServerConfig{IP: "127.0.0.1", Port: 8081} conf := model.ServerConfig{IP: "127.0.0.1", Port: 4000}
ch := make(chan int64, NTEST*3) ch := make(chan int64, NTEST*3)
n := 0 n := 0

Loading…
Cancel
Save