// // DevicePrs Devie Server based on Go net package // specify Mode=net in model.ServerConfig // package server import ( "database/sql" "encoding/binary" "errors" "io" "log" "net" "os" "os/signal" "syscall" "time" "encoding/hex" "sync" "math" "github.com/gansidui/gotcp" "github.com/ipsusila/opt" "github.com/confluentinc/confluent-kafka-go/kafka" "../model" "../parser" ) //------------------------------------------------------------------- var serverId = "" type ConnectionDevice struct { imei uint64 data []byte timeProcess time.Time } type MapConnection struct { addr map[string]ConnectionDevice mux sync.Mutex } func (c *MapConnection) AddUpdt(key string,connectionDevice ConnectionDevice) { c.mux.Lock() // Lock so only one goroutine at a time can access the map c.v. c.addr[key] = connectionDevice c.mux.Unlock() } func (c *MapConnection) Del(key string) { c.mux.Lock() // Lock so only one goroutine at a time can access the map c.v. delete(c.addr, key) c.mux.Unlock() } func (c *MapConnection) Value(key string) ConnectionDevice { c.mux.Lock() // Lock so only one goroutine at a time can access the map c.v. defer c.mux.Unlock() return c.addr[key] } var connectionDevices = MapConnection{addr: make(map[string]ConnectionDevice)} //DevicePrsServerPacket holds packet information type DevicePrsServerPacket struct { buff []byte } //NewDevicePrsServerPacket create new packet func NewDevicePrsServerPacket(data []byte) *DevicePrsServerPacket { pkt := new(DevicePrsServerPacket) pkt.buff = data return pkt } //Serialize packet func (p *DevicePrsServerPacket) Serialize() []byte { return p.buff } //------------------------------------------------------------------- //DevicePrsServerNet info type DevicePrsServerNet struct { Options *opt.Options Server *gotcp.Server Callback *DevicePrsServerCallback } //Stop tcp server func (rup *DevicePrsServerNet) Stop() { rup.Server.Stop() } //Start the server func (rup *DevicePrsServerNet) Start() { //---- options --------------------------------------- serverOpt := rup.Options.Get("server") maxSend := uint32(serverOpt.GetInt("maxSend", 40)) maxReceive := uint32(serverOpt.GetInt("maxReceive", 40)) addr := serverOpt.GetString("listenAddr", ":8081") acceptTimeout := time.Duration(serverOpt.GetInt("acceptTimeout", 2)) * time.Second writeTimeout := time.Duration(serverOpt.GetInt("writeTimeout", 5)) * time.Second //database options gpsData := rup.Options.Get("gpsdata") storage := gpsData.GetString("storage", "sql") dbOpt := gpsData.Get(storage) dbDriver := dbOpt.GetString("driver", "postgres") dbInfo := dbOpt.GetString("connection", "") dbMaxIdle := dbOpt.GetInt("maxIdle", 0) dbMaxOpen := dbOpt.GetInt("maxOpen", 5) dbMaxLifetime := time.Duration(dbOpt.GetInt("maxLifetime", 60)) * time.Second //devicePrs options rupOpt := parser.NewDevicePrsOptions(rup.Options) //server ID serverId = serverOpt.GetString("id", "undefined (check configuration)") //listen to TCP tcpAddr, err := net.ResolveTCPAddr("tcp4", addr) if err != nil { log.Printf("Error resolving address %v\n", addr) return } listener, err := net.ListenTCP("tcp", tcpAddr) if err != nil { log.Printf("Error listening to tcp address %v\n", tcpAddr) return } //open database connection db, err := sql.Open(dbDriver, dbInfo) if err != nil { log.Printf("Error opening db %v\n", dbDriver) return } defer db.Close() err = db.Ping() if err != nil { log.Printf("DB connection error: %v\n", err) return } db.SetMaxIdleConns(dbMaxIdle) db.SetMaxOpenConns(dbMaxOpen) db.SetConnMaxLifetime(dbMaxLifetime) //create procedure broker broker := rupOpt.BrokerServer procdr, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker}) if err != nil { log.Printf("Failed to create producer: %s\n", err) os.Exit(1) } log.Printf("Created Producer %v\n", procdr) // creates a server config := &gotcp.Config{ PacketSendChanLimit: maxSend, PacketReceiveChanLimit: maxReceive, } proto := &DevicePrsServerProtocol{ maxPacketSize: parser.NewDevicePrs().MaxPacketSize, } //map for storing various statistics callback := &DevicePrsServerCallback{ options: rupOpt, gpsDb: db, writeTimeout: writeTimeout, procdr: procdr, } srv := gotcp.NewServer(config, callback, proto) rup.Server = srv rup.Callback = callback // starts service go srv.Start(listener, acceptTimeout) log.Printf("listening: %v\n", listener.Addr()) // catchs system signal chSig := make(chan os.Signal) signal.Notify(chSig, syscall.SIGINT, syscall.SIGTERM) ch := <-chSig log.Printf("Signal: %v\n", ch) // stops service srv.Stop() } //------------------------------------------------------------------- //DevicePrsServerCallback holds server callback info type DevicePrsServerCallback struct { options *parser.DevicePrsOptions gpsDb *sql.DB writeTimeout time.Duration procdr *kafka.Producer } //OnConnect is called when new connection established func (cb *DevicePrsServerCallback) OnConnect(c *gotcp.Conn) bool { addr := c.GetRawConn().RemoteAddr() c.PutExtraData(addr) log.Printf("%v New connection(client: %v)\n", serverId,addr) //insert to array new address var connDevice ConnectionDevice connDevice.imei = 0 connDevice.timeProcess = time.Now() connectionDevices.AddUpdt(addr.String(),connDevice) return true } //OnMessage is called when packet readed func (cb *DevicePrsServerCallback) OnMessage(c *gotcp.Conn, p gotcp.Packet) bool { addr := c.GetRawConn().RemoteAddr() //downcast packet to devicePrs packet packet := p.(*DevicePrsServerPacket) data := packet.Serialize() //fmt.Println(hex.EncodeToString(data)) //imei is empty so, we have to add last imei to the data connectionDevice := connectionDevices.Value(addr.String()) //create parser par := parser.NewDevicePrsParser(cb.options, cb.gpsDb, data, cb.procdr) if err := par.GetError(); err != nil { log.Printf("Error verifying packet(client: %v): %v, Err=%v\n", addr,hex.EncodeToString(data), err) } //set imei in array imei imei_ := connectionDevice.imei if(imei_ == 0){ imei_ = par.IMEI connectionDevice.imei = par.IMEI connectionDevice.data = data connectionDevices.AddUpdt(addr.String(),connectionDevice) } duration := time.Since(connectionDevice.timeProcess) log.Printf("Data received(client: %v, IMEI: %v) with duration %v s\n", addr, imei_,math.Round(duration.Seconds()*100)/100) //save to database statusInsert := par.ExecuteAsync() if(statusInsert){ if par.GetCommand() == model.CMD_RUP_RECORD { // send response to client (ACK or NACK) rep := par.GetClientResponse() c.AsyncWritePacket(NewDevicePrsServerPacket(rep), cb.writeTimeout) log.Printf("Sent ACK (client: %v, IMEI: %v)\n", addr, imei_) } } log.Printf("par.GetCommand() = %v\n", par.GetCommand()) //send message if record command if cb.options.SetActive == true { if par.GetCommand() == model.CMD_RUP_TRCH { msg, err := par.GetBroadcastMessage() if len(msg) > 0 { c.AsyncWritePacket(NewDevicePrsServerPacket(msg), cb.writeTimeout) log.Printf("Sent MSG (client: %v, IMEI: %v)\n", addr, imei_) } else if err != nil { log.Printf("Get message error(client: %v, IMEI: %v) = %v\n", addr, imei_,err) } } } //run parallel processing to handle packet parsing //go par.ExecuteAsync() return true } //OnClose is called when connection being closed func (cb *DevicePrsServerCallback) OnClose(c *gotcp.Conn) { addr := c.GetRawConn().RemoteAddr() connectionDevice := connectionDevices.Value(addr.String()) duration := time.Since(connectionDevice.timeProcess) connectionDevices.Del(addr.String()) log.Printf("%v Close connection (client: %v, IMEI: %v) with duration %v s\n", serverId ,c.GetExtraData(),connectionDevice.imei,math.Round(duration.Seconds()*100)/100) } //------------------------------------------------------------------- //DevicePrsServerProtocol holds parser type DevicePrsServerProtocol struct { maxPacketSize int } //ReadPacket called everytime data is available func (p *DevicePrsServerProtocol) ReadPacket(conn *net.TCPConn) (gotcp.Packet, error) { var ( lengthBytes = make([]byte, 2) length uint16 ) //TODO handle packet other than Device Record //OK // read length if _, err := io.ReadFull(conn, lengthBytes); err != nil { return nil, err } Limit := uint16(p.maxPacketSize) if length = binary.BigEndian.Uint16(lengthBytes); length > Limit { return nil, errors.New("The size of packet is larger than the limit") } log.Printf("data packet length(client: %v) : %d", conn.RemoteAddr(), length) //Packet structure //LEN + DATA + CRC16 buff := make([]byte, 2+length+2) copy(buff[0:2], lengthBytes) // read body ( buff = lengthBytes + body ) if _, err := io.ReadFull(conn, buff[2:]); err != nil { return nil, err } return NewDevicePrsServerPacket(buff), nil }