|
|
|
//
|
|
|
|
// DevicePrs Devie Server based on Go net package
|
|
|
|
// specify Mode=net in model.ServerConfig
|
|
|
|
//
|
|
|
|
|
|
|
|
package server
|
|
|
|
|
|
|
|
import (
|
|
|
|
"database/sql"
|
|
|
|
"encoding/binary"
|
|
|
|
"errors"
|
|
|
|
"io"
|
|
|
|
"log"
|
|
|
|
"net"
|
|
|
|
"os"
|
|
|
|
"os/signal"
|
|
|
|
"syscall"
|
|
|
|
"time"
|
|
|
|
"encoding/hex"
|
|
|
|
"sync"
|
|
|
|
"math"
|
|
|
|
|
|
|
|
"github.com/gansidui/gotcp"
|
|
|
|
"github.com/ipsusila/opt"
|
|
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
|
|
"../model"
|
|
|
|
"../parser"
|
|
|
|
)
|
|
|
|
|
|
|
|
//-------------------------------------------------------------------
|
|
|
|
|
|
|
|
var serverId = ""
|
|
|
|
|
|
|
|
type ConnectionDevice struct {
|
|
|
|
imei uint64
|
|
|
|
data []byte
|
|
|
|
timeProcess time.Time
|
|
|
|
}
|
|
|
|
type MapConnection struct {
|
|
|
|
addr map[string]ConnectionDevice
|
|
|
|
mux sync.Mutex
|
|
|
|
}
|
|
|
|
func (c *MapConnection) AddUpdt(key string,connectionDevice ConnectionDevice) {
|
|
|
|
c.mux.Lock()
|
|
|
|
// Lock so only one goroutine at a time can access the map c.v.
|
|
|
|
c.addr[key] = connectionDevice
|
|
|
|
c.mux.Unlock()
|
|
|
|
}
|
|
|
|
func (c *MapConnection) Del(key string) {
|
|
|
|
c.mux.Lock()
|
|
|
|
// Lock so only one goroutine at a time can access the map c.v.
|
|
|
|
delete(c.addr, key)
|
|
|
|
c.mux.Unlock()
|
|
|
|
}
|
|
|
|
func (c *MapConnection) Value(key string) ConnectionDevice {
|
|
|
|
c.mux.Lock()
|
|
|
|
// Lock so only one goroutine at a time can access the map c.v.
|
|
|
|
defer c.mux.Unlock()
|
|
|
|
return c.addr[key]
|
|
|
|
}
|
|
|
|
var connectionDevices = MapConnection{addr: make(map[string]ConnectionDevice)}
|
|
|
|
|
|
|
|
//DevicePrsServerPacket holds packet information
|
|
|
|
type DevicePrsServerPacket struct {
|
|
|
|
buff []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
//NewDevicePrsServerPacket create new packet
|
|
|
|
func NewDevicePrsServerPacket(data []byte) *DevicePrsServerPacket {
|
|
|
|
pkt := new(DevicePrsServerPacket)
|
|
|
|
pkt.buff = data
|
|
|
|
|
|
|
|
return pkt
|
|
|
|
}
|
|
|
|
|
|
|
|
//Serialize packet
|
|
|
|
func (p *DevicePrsServerPacket) Serialize() []byte {
|
|
|
|
return p.buff
|
|
|
|
}
|
|
|
|
|
|
|
|
//-------------------------------------------------------------------
|
|
|
|
|
|
|
|
//DevicePrsServerNet info
|
|
|
|
type DevicePrsServerNet struct {
|
|
|
|
Options *opt.Options
|
|
|
|
Server *gotcp.Server
|
|
|
|
Callback *DevicePrsServerCallback
|
|
|
|
}
|
|
|
|
|
|
|
|
//Stop tcp server
|
|
|
|
func (rup *DevicePrsServerNet) Stop() {
|
|
|
|
rup.Server.Stop()
|
|
|
|
}
|
|
|
|
|
|
|
|
//Start the server
|
|
|
|
func (rup *DevicePrsServerNet) Start() {
|
|
|
|
//---- options ---------------------------------------
|
|
|
|
serverOpt := rup.Options.Get("server")
|
|
|
|
maxSend := uint32(serverOpt.GetInt("maxSend", 40))
|
|
|
|
maxReceive := uint32(serverOpt.GetInt("maxReceive", 40))
|
|
|
|
addr := serverOpt.GetString("listenAddr", ":8081")
|
|
|
|
acceptTimeout := time.Duration(serverOpt.GetInt("acceptTimeout", 2)) * time.Second
|
|
|
|
writeTimeout := time.Duration(serverOpt.GetInt("writeTimeout", 5)) * time.Second
|
|
|
|
|
|
|
|
//database options
|
|
|
|
gpsData := rup.Options.Get("gpsdata")
|
|
|
|
storage := gpsData.GetString("storage", "sql")
|
|
|
|
|
|
|
|
dbOpt := gpsData.Get(storage)
|
|
|
|
dbDriver := dbOpt.GetString("driver", "postgres")
|
|
|
|
dbInfo := dbOpt.GetString("connection", "")
|
|
|
|
dbMaxIdle := dbOpt.GetInt("maxIdle", 0)
|
|
|
|
dbMaxOpen := dbOpt.GetInt("maxOpen", 5)
|
|
|
|
dbMaxLifetime := time.Duration(dbOpt.GetInt("maxLifetime", 60)) * time.Second
|
|
|
|
|
|
|
|
//devicePrs options
|
|
|
|
rupOpt := parser.NewDevicePrsOptions(rup.Options)
|
|
|
|
|
|
|
|
//server ID
|
|
|
|
serverId = serverOpt.GetString("id", "undefined (check configuration)")
|
|
|
|
|
|
|
|
//listen to TCP
|
|
|
|
tcpAddr, err := net.ResolveTCPAddr("tcp4", addr)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error resolving address %v\n", addr)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
listener, err := net.ListenTCP("tcp", tcpAddr)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error listening to tcp address %v\n", tcpAddr)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
//open database connection
|
|
|
|
db, err := sql.Open(dbDriver, dbInfo)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Error opening db %v\n", dbDriver)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer db.Close()
|
|
|
|
|
|
|
|
err = db.Ping()
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("DB connection error: %v\n", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
db.SetMaxIdleConns(dbMaxIdle)
|
|
|
|
db.SetMaxOpenConns(dbMaxOpen)
|
|
|
|
db.SetConnMaxLifetime(dbMaxLifetime)
|
|
|
|
|
|
|
|
//create procedure broker
|
|
|
|
broker := rupOpt.BrokerServer
|
|
|
|
procdr, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("Failed to create producer: %s\n", err)
|
|
|
|
os.Exit(1)
|
|
|
|
}
|
|
|
|
log.Printf("Created Producer %v\n", procdr)
|
|
|
|
|
|
|
|
// creates a server
|
|
|
|
config := &gotcp.Config{
|
|
|
|
PacketSendChanLimit: maxSend,
|
|
|
|
PacketReceiveChanLimit: maxReceive,
|
|
|
|
}
|
|
|
|
proto := &DevicePrsServerProtocol{
|
|
|
|
maxPacketSize: parser.NewDevicePrs().MaxPacketSize,
|
|
|
|
}
|
|
|
|
|
|
|
|
//map for storing various statistics
|
|
|
|
callback := &DevicePrsServerCallback{
|
|
|
|
options: rupOpt,
|
|
|
|
gpsDb: db,
|
|
|
|
writeTimeout: writeTimeout,
|
|
|
|
procdr: procdr,
|
|
|
|
}
|
|
|
|
|
|
|
|
srv := gotcp.NewServer(config, callback, proto)
|
|
|
|
rup.Server = srv
|
|
|
|
rup.Callback = callback
|
|
|
|
|
|
|
|
// starts service
|
|
|
|
go srv.Start(listener, acceptTimeout)
|
|
|
|
log.Printf("listening: %v\n", listener.Addr())
|
|
|
|
|
|
|
|
// catchs system signal
|
|
|
|
chSig := make(chan os.Signal)
|
|
|
|
signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
ch := <-chSig
|
|
|
|
log.Printf("Signal: %v\n", ch)
|
|
|
|
|
|
|
|
// stops service
|
|
|
|
srv.Stop()
|
|
|
|
}
|
|
|
|
|
|
|
|
//-------------------------------------------------------------------
|
|
|
|
|
|
|
|
//DevicePrsServerCallback holds server callback info
|
|
|
|
type DevicePrsServerCallback struct {
|
|
|
|
options *parser.DevicePrsOptions
|
|
|
|
gpsDb *sql.DB
|
|
|
|
writeTimeout time.Duration
|
|
|
|
procdr *kafka.Producer
|
|
|
|
}
|
|
|
|
|
|
|
|
//OnConnect is called when new connection established
|
|
|
|
func (cb *DevicePrsServerCallback) OnConnect(c *gotcp.Conn) bool {
|
|
|
|
addr := c.GetRawConn().RemoteAddr()
|
|
|
|
c.PutExtraData(addr)
|
|
|
|
log.Printf("%v New connection(client: %v)\n", serverId,addr)
|
|
|
|
|
|
|
|
//insert to array new address
|
|
|
|
var connDevice ConnectionDevice
|
|
|
|
connDevice.imei = 0
|
|
|
|
connDevice.timeProcess = time.Now()
|
|
|
|
connectionDevices.AddUpdt(addr.String(),connDevice)
|
|
|
|
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
//OnMessage is called when packet readed
|
|
|
|
func (cb *DevicePrsServerCallback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool {
|
|
|
|
addr := c.GetRawConn().RemoteAddr()
|
|
|
|
|
|
|
|
//downcast packet to devicePrs packet
|
|
|
|
packet := p.(*DevicePrsServerPacket)
|
|
|
|
data := packet.Serialize()
|
|
|
|
//fmt.Println(hex.EncodeToString(data))
|
|
|
|
|
|
|
|
//imei is empty so, we have to add last imei to the data
|
|
|
|
connectionDevice := connectionDevices.Value(addr.String())
|
|
|
|
|
|
|
|
//create parser
|
|
|
|
par := parser.NewDevicePrsParser(cb.options, cb.gpsDb, data, cb.procdr)
|
|
|
|
if err := par.GetError(); err != nil {
|
|
|
|
log.Printf("Error verifying packet(client: %v): %v, Err=%v\n",
|
|
|
|
addr,hex.EncodeToString(data), err)
|
|
|
|
}
|
|
|
|
|
|
|
|
//set imei in array imei
|
|
|
|
imei_ := connectionDevice.imei
|
|
|
|
if(imei_ == 0){
|
|
|
|
imei_ = par.IMEI
|
|
|
|
connectionDevice.imei = par.IMEI
|
|
|
|
connectionDevice.data = data
|
|
|
|
connectionDevices.AddUpdt(addr.String(),connectionDevice)
|
|
|
|
}
|
|
|
|
|
|
|
|
duration := time.Since(connectionDevice.timeProcess)
|
|
|
|
log.Printf("Data received(client: %v, IMEI: %v) with duration %v s\n", addr, imei_,math.Round(duration.Seconds()*100)/100)
|
|
|
|
|
|
|
|
//save to database
|
|
|
|
statusInsert := par.ExecuteAsync()
|
|
|
|
|
|
|
|
if(statusInsert){
|
|
|
|
if par.GetCommand() == model.CMD_RUP_RECORD {
|
|
|
|
// send response to client (ACK or NACK)
|
|
|
|
rep := par.GetClientResponse()
|
|
|
|
c.AsyncWritePacket(NewDevicePrsServerPacket(rep), cb.writeTimeout)
|
|
|
|
log.Printf("Sent ACK (client: %v, IMEI: %v)\n", addr, imei_)
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
log.Printf("par.GetCommand() = %v\n", par.GetCommand())
|
|
|
|
//send message if record command
|
|
|
|
if cb.options.SetActive == true {
|
|
|
|
if par.GetCommand() == model.CMD_RUP_TRCH {
|
|
|
|
msg, err := par.GetBroadcastMessage()
|
|
|
|
if len(msg) > 0 {
|
|
|
|
c.AsyncWritePacket(NewDevicePrsServerPacket(msg), cb.writeTimeout)
|
|
|
|
log.Printf("Sent MSG (client: %v, IMEI: %v)\n", addr, imei_)
|
|
|
|
} else if err != nil {
|
|
|
|
log.Printf("Get message error(client: %v, IMEI: %v) = %v\n", addr, imei_,err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
//run parallel processing to handle packet parsing
|
|
|
|
//go par.ExecuteAsync()
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
//OnClose is called when connection being closed
|
|
|
|
func (cb *DevicePrsServerCallback) OnClose(c *gotcp.Conn) {
|
|
|
|
addr := c.GetRawConn().RemoteAddr()
|
|
|
|
connectionDevice := connectionDevices.Value(addr.String())
|
|
|
|
duration := time.Since(connectionDevice.timeProcess)
|
|
|
|
|
|
|
|
connectionDevices.Del(addr.String())
|
|
|
|
log.Printf("%v Close connection (client: %v, IMEI: %v) with duration %v s\n", serverId ,c.GetExtraData(),connectionDevice.imei,math.Round(duration.Seconds()*100)/100)
|
|
|
|
}
|
|
|
|
|
|
|
|
//-------------------------------------------------------------------
|
|
|
|
|
|
|
|
//DevicePrsServerProtocol holds parser
|
|
|
|
type DevicePrsServerProtocol struct {
|
|
|
|
maxPacketSize int
|
|
|
|
}
|
|
|
|
|
|
|
|
//ReadPacket called everytime data is available
|
|
|
|
func (p *DevicePrsServerProtocol) ReadPacket(conn *net.TCPConn) (gotcp.Packet, error) {
|
|
|
|
var (
|
|
|
|
lengthBytes = make([]byte, 2)
|
|
|
|
length uint16
|
|
|
|
)
|
|
|
|
|
|
|
|
//TODO handle packet other than Device Record
|
|
|
|
//OK
|
|
|
|
|
|
|
|
// read length
|
|
|
|
if _, err := io.ReadFull(conn, lengthBytes); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
Limit := uint16(p.maxPacketSize)
|
|
|
|
if length = binary.BigEndian.Uint16(lengthBytes); length > Limit {
|
|
|
|
return nil, errors.New("The size of packet is larger than the limit")
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Printf("data packet length(client: %v) : %d", conn.RemoteAddr(), length)
|
|
|
|
|
|
|
|
//Packet structure
|
|
|
|
//LEN + DATA + CRC16
|
|
|
|
buff := make([]byte, 2+length+2)
|
|
|
|
copy(buff[0:2], lengthBytes)
|
|
|
|
|
|
|
|
// read body ( buff = lengthBytes + body )
|
|
|
|
if _, err := io.ReadFull(conn, buff[2:]); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return NewDevicePrsServerPacket(buff), nil
|
|
|
|
}
|