//Message model /* Table "public.bcs_list" Column | Type | Modifiers ----------------+-----------------------------+------------------------------------------------------- id | bigint | not null default nextval('bcs_list_id_seq'::regclass) bcs_message_id | bigint | imei | character varying(25) | sent_ts | timestamp without time zone | delivered_ts | timestamp without time zone | Indexes: "bcs_list_pkey" PRIMARY KEY, btree (id) Foreign-key constraints: "bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id) Table "public.bcs_message" Column | Type | Modifiers ------------------+-----------------------------+---------------------------------------------------------- id | bigint | not null default nextval('bcs_message_id_seq'::regclass) bcs_message_time | timestamp without time zone | duration | integer | mbuzzer | integer | message | text | expired | timestamp without time zone | company_id | bigint | Indexes: "bcs_message_pkey" PRIMARY KEY, btree (id) Foreign-key constraints: "bcs_message_company_id_fkey" FOREIGN KEY (company_id) REFERENCES m_company(id) Referenced by: TABLE "bcs_list" CONSTRAINT "bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id) */ package model import ( "bytes" "encoding/json" "errors" "fmt" "io/ioutil" "log" "net/http" "strconv" "strings" "sync" "time" "github.com/ipsusila/opt" ) //Message status report const ( MessageError = -1 MessageSent = 1 MessageDelivered = 2 ) //ISOTime encapsulate timestamp for JSON parsing type ISOTime struct { time.Time } //UserMessage retrieved from GUI type UserMessage struct { Vehicles []string `json:"vehicles"` Duration int `json:"duration"` NBuzzer int `json:"nbuzzer"` Message string `json:"message"` Expired ISOTime `json:"expired"` } //BroadcastMessage holds message information need to be broadcast type BroadcastMessage struct { Timestamp ISOTime `json:"ts"` Duration int `json:"duration"` NBuzzer int `json:"nbuzzer"` Message string `json:"message"` Expired ISOTime `json:"expired"` } //MessageStatus for specific IMEI type MessageStatus struct { IMEI string `json:"imei"` Status string `json:"status"` Timestamp string `json:"timestamp"` StatusCode int `json:"-"` } //MessageProvider send/receive message type MessageProvider interface { Get(imei uint64) (*BroadcastMessage, error) Put(message *UserMessage) error Update(status *MessageStatus) error } type messageSet struct { sync.RWMutex messages []*BroadcastMessage readPos int writePos int capacity int numItems int } //DbMessageProvider provides message with DB storage type DbMessageProvider struct { sync.RWMutex queueMsg map[uint64]*messageSet sentMsg map[uint64]*messageSet capacity int saveToDb bool loadFromDb bool messageAPI string reportAPI string serviceTimeout time.Duration appendIMEI bool serviceClient *http.Client } //------------------------------------------------------------------------------------------------- //UnmarshalJSON convert string to timestamp //Convet from Y-m-d H:M:S func (tm *ISOTime) UnmarshalJSON(b []byte) (err error) { //TODO time other than local s := strings.Trim(string(b), "\"") tm.Time, err = time.ParseInLocation("2006-01-02 15:04:05", s, time.Local) if err != nil { tm.Time = time.Time{} } return err } //------------------------------------------------------------------------------------------------- func newMessageSet(capacity int) *messageSet { sets := &messageSet{ capacity: capacity, numItems: 0, readPos: 0, writePos: 0, messages: make([]*BroadcastMessage, capacity), } return sets } func (q *messageSet) doPut(msg *BroadcastMessage) bool { //check count (full or not?) q.Lock() defer q.Unlock() if q.numItems == q.capacity { return false } msg.Timestamp.Time = time.Now() pos := q.writePos % q.capacity q.messages[pos] = msg q.writePos = pos + 1 q.numItems++ return true } func (q *messageSet) put(msg *BroadcastMessage) bool { //check message pointer if msg == nil { return false } return q.doPut(msg) } func (q *messageSet) putUnique(msg *BroadcastMessage) bool { //check message pointer if msg == nil || q.contains(msg) { return false } return q.doPut(msg) } func (q *messageSet) take() *BroadcastMessage { q.Lock() defer q.Unlock() //check count (empty or not?) if q.numItems == 0 { return nil } pos := q.readPos % q.capacity msg := q.messages[pos] q.readPos = pos + 1 q.numItems-- return msg } func (q *messageSet) get() *BroadcastMessage { q.Lock() defer q.Unlock() //check count (empty or not?) if q.numItems == 0 { return nil } pos := q.readPos % q.capacity return q.messages[pos] } func (q *messageSet) count() int { q.RLock() defer q.RUnlock() return q.numItems } func (q *messageSet) contains(m *BroadcastMessage) bool { q.RLock() defer q.RUnlock() beg := q.readPos end := q.writePos for { pos := beg % q.capacity if pos == end { break } //check message m2 := q.messages[pos] eq := m.EqualTo(m2) if eq { return true } beg++ } return false } //------------------------------------------------------------------------------------------------- //EqualTo return true if message is equal func (m1 *BroadcastMessage) EqualTo(m2 *BroadcastMessage) bool { eq := m1.Timestamp == m2.Timestamp && m1.Duration == m2.Duration && m1.NBuzzer == m2.NBuzzer && m1.Expired == m2.Expired && m1.Message == m2.Message return eq } //------------------------------------------------------------------------------------------------- //NewDbMessageProvider create new provider func NewDbMessageProvider(options *opt.Options) (*DbMessageProvider, error) { //make sure url ends with / p := &DbMessageProvider{} //TODO init return p, nil } func (p *DbMessageProvider) getFromDb(imei uint64) (*BroadcastMessage, error) { if !p.loadFromDb { return nil, nil } //Get message from API //TODO: can it handle many connections? msg := &BroadcastMessage{} //alamat internal imeiStr := fmt.Sprintf("%d", imei) url := p.messageAPI + imeiStr err := p.getJSON(url, msg) if err != nil { return nil, err } now := time.Now() if msg.Expired.Before(now) { return nil, nil } return msg, nil } func (p *DbMessageProvider) putToDb(message *UserMessage) error { if !p.saveToDb { return nil } return nil } func (p *DbMessageProvider) updateDb(status *MessageStatus) error { url := p.reportAPI if p.appendIMEI { url += status.IMEI } err := p.putJSON(url, status) if err != nil { log.Printf("PUT request error: %v\n", err) return err } return nil } func (p *DbMessageProvider) getJSON(url string, target interface{}) error { r, err := p.serviceClient.Get(url) if err != nil { return err } defer r.Body.Close() return json.NewDecoder(r.Body).Decode(target) } func (p *DbMessageProvider) putJSON(url string, data interface{}) error { payload, err := json.Marshal(data) if err != nil { return err } reader := bytes.NewReader(payload) client := p.serviceClient req, err := http.NewRequest("PUT", url, reader) if err != nil { return err } req.ContentLength = int64(len(payload)) req.Header.Set("Content-Type", "application/json") response, err := client.Do(req) if err != nil { return err } defer response.Body.Close() //read content result, err := ioutil.ReadAll(response.Body) if err != nil { return err } str := string(result) if strings.Contains(str, "no") { return errors.New("Failed to PUT data to service: " + str) } //request code code := response.StatusCode if code == http.StatusOK || code == http.StatusCreated { return nil } status := response.Status return errors.New("PUT error with status = " + status) } //------------------------------------------------------------------------------------------------- //Get return broadcast message for given IMEI func (p *DbMessageProvider) Get(imei uint64) (*BroadcastMessage, error) { p.RLock() defer p.RUnlock() //get queue for this imei queue, ok := p.queueMsg[imei] if !ok { return p.getFromDb(imei) } //get message msg := queue.get() if msg == nil { return p.getFromDb(imei) } return msg, nil } //Put setup new messages func (p *DbMessageProvider) Put(message *UserMessage) error { p.Lock() defer p.Unlock() now := ISOTime{time.Now()} for _, vid := range message.Vehicles { imei, err := strconv.ParseUint(vid, 10, 64) if err != nil { log.Printf("Parse IMEI (%v) error: %v\n", vid, err) continue } bm := &BroadcastMessage{ Timestamp: now, Duration: message.Duration, NBuzzer: message.NBuzzer, Message: message.Message, Expired: message.Expired, } queue, ok := p.queueMsg[imei] if !ok { queue = newMessageSet(p.capacity) p.queueMsg[imei] = queue } ok = queue.putUnique(bm) if !ok { //log log.Printf("Put message error-> %v:%v", imei, bm.Message) } } //save to database return p.putToDb(message) } //Update message status func (p *DbMessageProvider) Update(status *MessageStatus) error { p.Lock() defer p.Unlock() imei, err := strconv.ParseUint(status.IMEI, 10, 64) if err != nil { return err } //message delivered if status.StatusCode == MessageDelivered || status.StatusCode == MessageError { //get message from "sent message" collection sets, ok := p.sentMsg[imei] if !ok { return errors.New("delivered message not found in sets") } msg := sets.take() //if status is error, return message to broadcast queue if msg != nil && status.StatusCode == MessageError { //return to sets queue, ok := p.queueMsg[imei] if !ok { queue = newMessageSet(p.capacity) p.queueMsg[imei] = queue } queue.putUnique(msg) } } else if status.StatusCode == MessageSent { //message sent, get from queue queue, ok := p.queueMsg[imei] if !ok { return errors.New("message not found in sets") } msg := queue.take() if msg != nil { //move to "sent message" collection sets, ok := p.sentMsg[imei] if !ok { sets = newMessageSet(p.capacity) p.sentMsg[imei] = sets } sets.put(msg) } } //update status in database return p.updateDb(status) }