You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

326 lines
8.5 KiB

6 years ago
//
// DevicePrs Devie Server based on Go net package
// specify Mode=net in model.ServerConfig
//
package server
import (
"database/sql"
"encoding/binary"
"errors"
"io"
"log"
"net"
"os"
"os/signal"
"syscall"
"time"
"encoding/hex"
"sync"
"math"
"github.com/gansidui/gotcp"
"github.com/ipsusila/opt"
"github.com/confluentinc/confluent-kafka-go/kafka"
//"../model"
"../parser"
)
//-------------------------------------------------------------------
var serverId = ""
type ConnectionDevice struct {
imei uint64
data []byte
timeProcess time.Time
}
type MapConnection struct {
addr map[string]ConnectionDevice
mux sync.Mutex
}
func (c *MapConnection) AddUpdt(key string,connectionDevice ConnectionDevice) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.addr[key] = connectionDevice
c.mux.Unlock()
}
func (c *MapConnection) Del(key string) {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
delete(c.addr, key)
c.mux.Unlock()
}
func (c *MapConnection) Value(key string) ConnectionDevice {
c.mux.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mux.Unlock()
return c.addr[key]
}
var connectionDevices = MapConnection{addr: make(map[string]ConnectionDevice)}
//DevicePrsServerPacket holds packet information
type DevicePrsServerPacket struct {
buff []byte
}
//NewDevicePrsServerPacket create new packet
func NewDevicePrsServerPacket(data []byte) *DevicePrsServerPacket {
pkt := new(DevicePrsServerPacket)
pkt.buff = data
return pkt
}
//Serialize packet
func (p *DevicePrsServerPacket) Serialize() []byte {
return p.buff
}
//-------------------------------------------------------------------
//DevicePrsServerNet info
type DevicePrsServerNet struct {
Options *opt.Options
Server *gotcp.Server
Callback *DevicePrsServerCallback
}
//Stop tcp server
func (rup *DevicePrsServerNet) Stop() {
rup.Server.Stop()
}
//Start the server
func (rup *DevicePrsServerNet) Start() {
//---- options ---------------------------------------
serverOpt := rup.Options.Get("server")
maxSend := uint32(serverOpt.GetInt("maxSend", 40))
maxReceive := uint32(serverOpt.GetInt("maxReceive", 40))
addr := serverOpt.GetString("listenAddr", ":8081")
acceptTimeout := time.Duration(serverOpt.GetInt("acceptTimeout", 2)) * time.Second
writeTimeout := time.Duration(serverOpt.GetInt("writeTimeout", 5)) * time.Second
//database options
gpsData := rup.Options.Get("gpsdata")
storage := gpsData.GetString("storage", "sql")
dbOpt := gpsData.Get(storage)
dbDriver := dbOpt.GetString("driver", "postgres")
dbInfo := dbOpt.GetString("connection", "")
dbMaxIdle := dbOpt.GetInt("maxIdle", 0)
dbMaxOpen := dbOpt.GetInt("maxOpen", 5)
dbMaxLifetime := time.Duration(dbOpt.GetInt("maxLifetime", 60)) * time.Second
//devicePrs options
rupOpt := parser.NewDevicePrsOptions(rup.Options)
//server ID
serverId = serverOpt.GetString("id", "undefined (check configuration)")
//listen to TCP
tcpAddr, err := net.ResolveTCPAddr("tcp4", addr)
if err != nil {
log.Printf("Error resolving address %v\n", addr)
return
}
listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
log.Printf("Error listening to tcp address %v\n", tcpAddr)
return
}
//open database connection
db, err := sql.Open(dbDriver, dbInfo)
if err != nil {
log.Printf("Error opening db %v\n", dbDriver)
return
}
defer db.Close()
err = db.Ping()
if err != nil {
log.Printf("DB connection error: %v\n", err)
return
}
db.SetMaxIdleConns(dbMaxIdle)
db.SetMaxOpenConns(dbMaxOpen)
db.SetConnMaxLifetime(dbMaxLifetime)
//create procedure broker
broker := rupOpt.BrokerServer
procdr, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})
if err != nil {
log.Printf("Failed to create producer: %s\n", err)
os.Exit(1)
}
log.Printf("Created Producer %v\n", procdr)
// creates a server
config := &gotcp.Config{
PacketSendChanLimit: maxSend,
PacketReceiveChanLimit: maxReceive,
}
proto := &DevicePrsServerProtocol{
maxPacketSize: parser.NewDevicePrs().MaxPacketSize,
}
//map for storing various statistics
callback := &DevicePrsServerCallback{
options: rupOpt,
gpsDb: db,
writeTimeout: writeTimeout,
procdr: procdr,
}
srv := gotcp.NewServer(config, callback, proto)
rup.Server = srv
rup.Callback = callback
// starts service
go srv.Start(listener, acceptTimeout)
log.Printf("listening: %v\n", listener.Addr())
// catchs system signal
chSig := make(chan os.Signal)
signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM)
ch := <-chSig
log.Printf("Signal: %v\n", ch)
// stops service
srv.Stop()
}
//-------------------------------------------------------------------
//DevicePrsServerCallback holds server callback info
type DevicePrsServerCallback struct {
options *parser.DevicePrsOptions
gpsDb *sql.DB
writeTimeout time.Duration
procdr *kafka.Producer
}
//OnConnect is called when new connection established
func (cb *DevicePrsServerCallback) OnConnect(c *gotcp.Conn) bool {
addr := c.GetRawConn().RemoteAddr()
c.PutExtraData(addr)
log.Printf("%v New connection(client: %v)\n", serverId,addr)
//insert to array new address
var connDevice ConnectionDevice
connDevice.imei = 0
connDevice.timeProcess = time.Now()
connectionDevices.AddUpdt(addr.String(),connDevice)
return true
}
//OnMessage is called when packet readed
func (cb *DevicePrsServerCallback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool {
addr := c.GetRawConn().RemoteAddr()
//downcast packet to devicePrs packet
packet := p.(*DevicePrsServerPacket)
data := packet.Serialize()
//fmt.Println(hex.EncodeToString(data))
//imei is empty so, we have to add last imei to the data
connectionDevice := connectionDevices.Value(addr.String())
//create parser
par := parser.NewDevicePrsParser(cb.options, cb.gpsDb, data, cb.procdr)
if err := par.GetError(); err != nil {
log.Printf("Error verifying packet(client: %v): %v, Err=%v\n",
addr,hex.EncodeToString(data), err)
}
//set imei in array imei
imei_ := connectionDevice.imei
if(imei_ == 0){
imei_ = par.IMEI
connectionDevice.imei = par.IMEI
connectionDevice.data = data
connectionDevices.AddUpdt(addr.String(),connectionDevice)
}
duration := time.Since(connectionDevice.timeProcess)
log.Printf("Data received(client: %v, IMEI: %v) with duration %v s\n", addr, imei_,math.Round(duration.Seconds()*100)/100)
//save to database
statusInsert := par.ExecuteAsync()
if(statusInsert){
// send response to client (ACK or NACK)
rep := par.GetClientResponse()
c.AsyncWritePacket(NewDevicePrsServerPacket(rep), cb.writeTimeout)
log.Printf("Sent ACK (client: %v, IMEI: %v)\n", addr, imei_)
}
//send message if record command
//if par.GetCommand() == model.CMD_RUP_RECORD {
// msg, err := par.GetBroadcastMessage()
// if len(msg) > 0 {
// c.AsyncWritePacket(NewRuptelaServerPacket(msg), cb.writeTimeout)
// } else if err != nil {
// log.Printf("Get message error(client: %v, IMEI: %v) = %v\n", addr, par.IMEI,err)
// }
//}
//run parallel processing to handle packet parsing
//go par.ExecuteAsync()
return true
}
//OnClose is called when connection being closed
func (cb *DevicePrsServerCallback) OnClose(c *gotcp.Conn) {
addr := c.GetRawConn().RemoteAddr()
connectionDevice := connectionDevices.Value(addr.String())
duration := time.Since(connectionDevice.timeProcess)
connectionDevices.Del(addr.String())
log.Printf("%v Close connection (client: %v, IMEI: %v) with duration %v s\n", serverId ,c.GetExtraData(),connectionDevice.imei,math.Round(duration.Seconds()*100)/100)
}
//-------------------------------------------------------------------
//DevicePrsServerProtocol holds parser
type DevicePrsServerProtocol struct {
maxPacketSize int
}
//ReadPacket called everytime data is available
func (p *DevicePrsServerProtocol) ReadPacket(conn *net.TCPConn) (gotcp.Packet, error) {
var (
lengthBytes = make([]byte, 2)
length uint16
)
//TODO handle packet other than Device Record
//OK
// read length
if _, err := io.ReadFull(conn, lengthBytes); err != nil {
return nil, err
}
Limit := uint16(p.maxPacketSize)
if length = binary.BigEndian.Uint16(lengthBytes); length > Limit {
return nil, errors.New("The size of packet is larger than the limit")
}
log.Printf("data packet length(client: %v) : %d", conn.RemoteAddr(), length)
//Packet structure
//LEN + DATA + CRC16
buff := make([]byte, 2+length+2)
copy(buff[0:2], lengthBytes)
// read body ( buff = lengthBytes + body )
if _, err := io.ReadFull(conn, buff[2:]); err != nil {
return nil, err
}
return NewDevicePrsServerPacket(buff), nil
}