From 8e78307efccb255f9a83b1b215ea813f63373e2f Mon Sep 17 00:00:00 2001 From: Angga Bayu Marthafifsa Date: Sat, 29 Dec 2018 00:13:38 +0700 Subject: [PATCH] update ruptela --- parser/devicePrs.go | 4 +++- parser/devicePrsInternal.go | 34 ++++++++++++++++++++++------------ server/devicePrsNet.go | 35 +++++++++++++++++++++-------------- server/ruptela_test.go | 4 ++-- 4 files changed, 48 insertions(+), 29 deletions(-) diff --git a/parser/devicePrs.go b/parser/devicePrs.go index ea4a252..8fcc240 100755 --- a/parser/devicePrs.go +++ b/parser/devicePrs.go @@ -62,6 +62,7 @@ type DevicePrsOptions struct { reportAPI string serviceTimeout time.Duration appendIMEI bool + SetActive bool insertQuery string serviceClient *http.Client tagOption *opt.Options @@ -506,7 +507,7 @@ func (p *DevicePrsParser) ExecuteAsync() bool { //NewDevicePrsOptions create options for devicePrs func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions { - svcOpt := options.Get(model.DEV_RUPTELA) + svcOpt := options.Get("commToDevice") defQuery := `INSERT INTO "GPS_DATA"("IMEI", "DATA_LOG", "FLAG", "DATE_INS", "DESC", "GPS_CODE", "DATA_LEN") VALUES($1, $2, $3, $4, $5, $6, $7)` @@ -516,6 +517,7 @@ func NewDevicePrsOptions(options *opt.Options) *DevicePrsOptions { reportAPI: svcOpt.GetString("report", ""), serviceTimeout: time.Duration(svcOpt.GetInt("serviceTimeout", 10)) * time.Second, appendIMEI: svcOpt.GetBool("appendIMEI", false), + SetActive: svcOpt.GetBool("setActive", false), insertQuery: insertQuery, tagOption: options.Get("iotag"), BrokerServer: options.Get("messagebroker").GetString("brokerServer",""), diff --git a/parser/devicePrsInternal.go b/parser/devicePrsInternal.go index 4f131a0..9eab194 100755 --- a/parser/devicePrsInternal.go +++ b/parser/devicePrsInternal.go @@ -94,6 +94,8 @@ func verifyStream(data []byte, minSize int, maxSize int, tagOption *opt.Options) //extract command cmd = data[10] + fmt.Printf("cmd %v\n",cmd) + return imei, cmd, rec, nil } @@ -127,6 +129,7 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco deviceRecords := make([]DeviceRecord, 0) for j := 0; j < int(numRec); j++ { var deviceRecord DeviceRecord + tags_ := tags //imei deviceRecord.Imei = imei @@ -216,11 +219,13 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco ioVal := convBinaryToUint16(addOneByteInTwoByte(data[currByte:plusByte]),2,"io1Val") //fmt.Printf("io1Val %d\n",ioVal) - tagDevice_ := tags[strconv.Itoa(int(ioID))] + tagDevice_ := tags_[strconv.Itoa(int(ioID))] tagDevice_.TagId = strconv.Itoa(int(ioID)) tagDevice_.TagVal = strconv.Itoa(int(ioVal)) if tagDevice_.TagName != ""{ - tags[strconv.Itoa(int(ioID))] = tagDevice_ + //tags_[strconv.Itoa(int(ioID))] = tagDevice_ + tags_[tagDevice_.TagName] = tagDevice_ + delete(tags_, strconv.Itoa(int(ioID))) } } @@ -245,11 +250,10 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco ioVal := convBinaryToUint16(data[currByte:plusByte],2,"io2Val") //fmt.Printf("io2Val %d\n",ioVal) - tagDevice_ := tags[strconv.Itoa(int(ioID))] - tagDevice_.TagId = strconv.Itoa(int(ioID)) + tagDevice_ := tags_[strconv.Itoa(int(ioID))] tagDevice_.TagVal = strconv.Itoa(int(ioVal)) if tagDevice_.TagName != ""{ - tags[strconv.Itoa(int(ioID))] = tagDevice_ + tags_[tagDevice_.TagName] = tagDevice_ } } @@ -273,11 +277,10 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco ioVal := convBinaryToInt32(data[currByte:plusByte],4,"io4Val") //fmt.Printf("io4Val %d\n",ioVal) - tagDevice_ := tags[strconv.Itoa(int(ioID))] - tagDevice_.TagId = strconv.Itoa(int(ioID)) + tagDevice_ := tags_[strconv.Itoa(int(ioID))] tagDevice_.TagVal = strconv.FormatInt(int64(ioVal), 10) if tagDevice_.TagName != ""{ - tags[strconv.Itoa(int(ioID))] = tagDevice_ + tags_[tagDevice_.TagName] = tagDevice_ } } @@ -301,15 +304,22 @@ func parseData(data []byte, imei uint64, tagOption *opt.Options) (rec DeviceReco ioVal := convBinaryToInt64(data[currByte:plusByte],8,"io8Val") //fmt.Printf("io8Val %d\n",ioVal) - tagDevice_ := tags[strconv.Itoa(int(ioID))] - tagDevice_.TagId = strconv.Itoa(int(ioID)) + tagDevice_ := tags_[strconv.Itoa(int(ioID))] tagDevice_.TagVal = strconv.FormatInt(int64(ioVal), 10) if tagDevice_.TagName != ""{ - tags[strconv.Itoa(int(ioID))] = tagDevice_ + tags_[tagDevice_.TagName] = tagDevice_ } } - deviceRecord.TagDevices = tags + //Delete unset tags value + for tagKey := range tags { + tagDevice_ := tags_[tagKey] + if tagDevice_.TagId == tagKey { + delete(tags_, tagKey) + } + } + + deviceRecord.TagDevices = tags_ deviceRecords = append(deviceRecords, deviceRecord) } diff --git a/server/devicePrsNet.go b/server/devicePrsNet.go index 4f30d55..53650f1 100755 --- a/server/devicePrsNet.go +++ b/server/devicePrsNet.go @@ -23,7 +23,7 @@ import ( "github.com/gansidui/gotcp" "github.com/ipsusila/opt" "github.com/confluentinc/confluent-kafka-go/kafka" - //"../model" + "../model" "../parser" ) @@ -252,21 +252,28 @@ func (cb *DevicePrsServerCallback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool statusInsert := par.ExecuteAsync() if(statusInsert){ - // send response to client (ACK or NACK) - rep := par.GetClientResponse() - c.AsyncWritePacket(NewDevicePrsServerPacket(rep), cb.writeTimeout) - log.Printf("Sent ACK (client: %v, IMEI: %v)\n", addr, imei_) + if par.GetCommand() == model.CMD_RUP_RECORD { + // send response to client (ACK or NACK) + rep := par.GetClientResponse() + c.AsyncWritePacket(NewDevicePrsServerPacket(rep), cb.writeTimeout) + log.Printf("Sent ACK (client: %v, IMEI: %v)\n", addr, imei_) + } + } - + log.Printf("par.GetCommand() = %v\n", par.GetCommand()) //send message if record command - //if par.GetCommand() == model.CMD_RUP_RECORD { - // msg, err := par.GetBroadcastMessage() - // if len(msg) > 0 { - // c.AsyncWritePacket(NewRuptelaServerPacket(msg), cb.writeTimeout) - // } else if err != nil { - // log.Printf("Get message error(client: %v, IMEI: %v) = %v\n", addr, par.IMEI,err) - // } - //} + if cb.options.SetActive == true { + if par.GetCommand() == model.CMD_RUP_TRCH { + msg, err := par.GetBroadcastMessage() + if len(msg) > 0 { + c.AsyncWritePacket(NewDevicePrsServerPacket(msg), cb.writeTimeout) + log.Printf("Sent MSG (client: %v, IMEI: %v)\n", addr, imei_) + } else if err != nil { + log.Printf("Get message error(client: %v, IMEI: %v) = %v\n", addr, imei_,err) + } + } + } + //run parallel processing to handle packet parsing //go par.ExecuteAsync() diff --git a/server/ruptela_test.go b/server/ruptela_test.go index 6b822ad..e8ebc7f 100755 --- a/server/ruptela_test.go +++ b/server/ruptela_test.go @@ -50,7 +50,7 @@ func sendData(conf *model.ServerConfig, data []byte, dur chan<- int64) error { } func TestInvalid(t *testing.T) { //config - conf := model.ServerConfig{IP: "127.0.0.1", Port: 8095} + conf := model.ServerConfig{IP: "127.0.0.1", Port: 4000} //test failed data dur := make(chan int64, 1) @@ -89,7 +89,7 @@ func TestRuptelaReceiver(t *testing.T) { "0011000315A07F44865A0E01000053EA01DF65AD6D") const NTEST = 1 - conf := model.ServerConfig{IP: "127.0.0.1", Port: 8081} + conf := model.ServerConfig{IP: "127.0.0.1", Port: 4000} ch := make(chan int64, NTEST*3) n := 0