You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
469 lines
11 KiB
469 lines
11 KiB
6 years ago
|
//Message model
|
||
|
/*
|
||
|
Table "public.bcs_list"
|
||
|
Column | Type | Modifiers
|
||
|
----------------+-----------------------------+-------------------------------------------------------
|
||
|
id | bigint | not null default nextval('bcs_list_id_seq'::regclass)
|
||
|
bcs_message_id | bigint |
|
||
|
imei | character varying(25) |
|
||
|
sent_ts | timestamp without time zone |
|
||
|
delivered_ts | timestamp without time zone |
|
||
|
Indexes:
|
||
|
"bcs_list_pkey" PRIMARY KEY, btree (id)
|
||
|
Foreign-key constraints:
|
||
|
"bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id)
|
||
|
|
||
|
Table "public.bcs_message"
|
||
|
Column | Type | Modifiers
|
||
|
------------------+-----------------------------+----------------------------------------------------------
|
||
|
id | bigint | not null default nextval('bcs_message_id_seq'::regclass)
|
||
|
bcs_message_time | timestamp without time zone |
|
||
|
duration | integer |
|
||
|
mbuzzer | integer |
|
||
|
message | text |
|
||
|
expired | timestamp without time zone |
|
||
|
company_id | bigint |
|
||
|
Indexes:
|
||
|
"bcs_message_pkey" PRIMARY KEY, btree (id)
|
||
|
Foreign-key constraints:
|
||
|
"bcs_message_company_id_fkey" FOREIGN KEY (company_id) REFERENCES m_company(id)
|
||
|
Referenced by:
|
||
|
TABLE "bcs_list" CONSTRAINT "bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id)
|
||
|
*/
|
||
|
|
||
|
package model
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io/ioutil"
|
||
|
"log"
|
||
|
"net/http"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/ipsusila/opt"
|
||
|
)
|
||
|
|
||
|
//Message status report
|
||
|
const (
|
||
|
MessageError = -1
|
||
|
MessageSent = 1
|
||
|
MessageDelivered = 2
|
||
|
)
|
||
|
|
||
|
//ISOTime encapsulate timestamp for JSON parsing
|
||
|
type ISOTime struct {
|
||
|
time.Time
|
||
|
}
|
||
|
|
||
|
//UserMessage retrieved from GUI
|
||
|
type UserMessage struct {
|
||
|
Vehicles []string `json:"vehicles"`
|
||
|
Duration int `json:"duration"`
|
||
|
NBuzzer int `json:"nbuzzer"`
|
||
|
Message string `json:"message"`
|
||
|
Expired ISOTime `json:"expired"`
|
||
|
}
|
||
|
|
||
|
//BroadcastMessage holds message information need to be broadcast
|
||
|
type BroadcastMessage struct {
|
||
|
Timestamp ISOTime `json:"ts"`
|
||
|
Duration int `json:"duration"`
|
||
|
NBuzzer int `json:"nbuzzer"`
|
||
|
Message string `json:"message"`
|
||
|
Expired ISOTime `json:"expired"`
|
||
|
}
|
||
|
|
||
|
//MessageStatus for specific IMEI
|
||
|
type MessageStatus struct {
|
||
|
IMEI string `json:"imei"`
|
||
|
Status string `json:"status"`
|
||
|
Timestamp string `json:"timestamp"`
|
||
|
StatusCode int `json:"-"`
|
||
|
}
|
||
|
|
||
|
//MessageProvider send/receive message
|
||
|
type MessageProvider interface {
|
||
|
Get(imei uint64) (*BroadcastMessage, error)
|
||
|
Put(message *UserMessage) error
|
||
|
Update(status *MessageStatus) error
|
||
|
}
|
||
|
|
||
|
type messageSet struct {
|
||
|
sync.RWMutex
|
||
|
messages []*BroadcastMessage
|
||
|
readPos int
|
||
|
writePos int
|
||
|
capacity int
|
||
|
numItems int
|
||
|
}
|
||
|
|
||
|
//DbMessageProvider provides message with DB storage
|
||
|
type DbMessageProvider struct {
|
||
|
sync.RWMutex
|
||
|
queueMsg map[uint64]*messageSet
|
||
|
sentMsg map[uint64]*messageSet
|
||
|
capacity int
|
||
|
saveToDb bool
|
||
|
loadFromDb bool
|
||
|
messageAPI string
|
||
|
reportAPI string
|
||
|
serviceTimeout time.Duration
|
||
|
appendIMEI bool
|
||
|
serviceClient *http.Client
|
||
|
}
|
||
|
|
||
|
//-------------------------------------------------------------------------------------------------
|
||
|
|
||
|
//UnmarshalJSON convert string to timestamp
|
||
|
//Convet from Y-m-d H:M:S
|
||
|
func (tm *ISOTime) UnmarshalJSON(b []byte) (err error) {
|
||
|
//TODO time other than local
|
||
|
s := strings.Trim(string(b), "\"")
|
||
|
tm.Time, err = time.ParseInLocation("2006-01-02 15:04:05", s, time.Local)
|
||
|
if err != nil {
|
||
|
tm.Time = time.Time{}
|
||
|
}
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
//-------------------------------------------------------------------------------------------------
|
||
|
func newMessageSet(capacity int) *messageSet {
|
||
|
sets := &messageSet{
|
||
|
capacity: capacity,
|
||
|
numItems: 0,
|
||
|
readPos: 0,
|
||
|
writePos: 0,
|
||
|
messages: make([]*BroadcastMessage, capacity),
|
||
|
}
|
||
|
|
||
|
return sets
|
||
|
}
|
||
|
|
||
|
func (q *messageSet) doPut(msg *BroadcastMessage) bool {
|
||
|
//check count (full or not?)
|
||
|
q.Lock()
|
||
|
defer q.Unlock()
|
||
|
|
||
|
if q.numItems == q.capacity {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
msg.Timestamp.Time = time.Now()
|
||
|
pos := q.writePos % q.capacity
|
||
|
q.messages[pos] = msg
|
||
|
q.writePos = pos + 1
|
||
|
q.numItems++
|
||
|
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (q *messageSet) put(msg *BroadcastMessage) bool {
|
||
|
//check message pointer
|
||
|
if msg == nil {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
return q.doPut(msg)
|
||
|
}
|
||
|
func (q *messageSet) putUnique(msg *BroadcastMessage) bool {
|
||
|
//check message pointer
|
||
|
if msg == nil || q.contains(msg) {
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
return q.doPut(msg)
|
||
|
}
|
||
|
|
||
|
func (q *messageSet) take() *BroadcastMessage {
|
||
|
q.Lock()
|
||
|
defer q.Unlock()
|
||
|
|
||
|
//check count (empty or not?)
|
||
|
if q.numItems == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
pos := q.readPos % q.capacity
|
||
|
msg := q.messages[pos]
|
||
|
q.readPos = pos + 1
|
||
|
q.numItems--
|
||
|
|
||
|
return msg
|
||
|
}
|
||
|
|
||
|
func (q *messageSet) get() *BroadcastMessage {
|
||
|
q.Lock()
|
||
|
defer q.Unlock()
|
||
|
|
||
|
//check count (empty or not?)
|
||
|
if q.numItems == 0 {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
pos := q.readPos % q.capacity
|
||
|
return q.messages[pos]
|
||
|
}
|
||
|
|
||
|
func (q *messageSet) count() int {
|
||
|
q.RLock()
|
||
|
defer q.RUnlock()
|
||
|
|
||
|
return q.numItems
|
||
|
}
|
||
|
|
||
|
func (q *messageSet) contains(m *BroadcastMessage) bool {
|
||
|
q.RLock()
|
||
|
defer q.RUnlock()
|
||
|
|
||
|
beg := q.readPos
|
||
|
end := q.writePos
|
||
|
for {
|
||
|
pos := beg % q.capacity
|
||
|
if pos == end {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
//check message
|
||
|
m2 := q.messages[pos]
|
||
|
eq := m.EqualTo(m2)
|
||
|
if eq {
|
||
|
return true
|
||
|
}
|
||
|
beg++
|
||
|
}
|
||
|
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
//-------------------------------------------------------------------------------------------------
|
||
|
|
||
|
//EqualTo return true if message is equal
|
||
|
func (m1 *BroadcastMessage) EqualTo(m2 *BroadcastMessage) bool {
|
||
|
eq := m1.Timestamp == m2.Timestamp &&
|
||
|
m1.Duration == m2.Duration &&
|
||
|
m1.NBuzzer == m2.NBuzzer &&
|
||
|
m1.Expired == m2.Expired &&
|
||
|
m1.Message == m2.Message
|
||
|
|
||
|
return eq
|
||
|
}
|
||
|
|
||
|
//-------------------------------------------------------------------------------------------------
|
||
|
|
||
|
//NewDbMessageProvider create new provider
|
||
|
func NewDbMessageProvider(options *opt.Options) (*DbMessageProvider, error) {
|
||
|
//make sure url ends with /
|
||
|
p := &DbMessageProvider{}
|
||
|
|
||
|
//TODO init
|
||
|
return p, nil
|
||
|
}
|
||
|
|
||
|
func (p *DbMessageProvider) getFromDb(imei uint64) (*BroadcastMessage, error) {
|
||
|
if !p.loadFromDb {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
//Get message from API
|
||
|
//TODO: can it handle many connections?
|
||
|
msg := &BroadcastMessage{}
|
||
|
|
||
|
//alamat internal
|
||
|
imeiStr := fmt.Sprintf("%d", imei)
|
||
|
url := p.messageAPI + imeiStr
|
||
|
err := p.getJSON(url, msg)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
now := time.Now()
|
||
|
if msg.Expired.Before(now) {
|
||
|
return nil, nil
|
||
|
}
|
||
|
|
||
|
return msg, nil
|
||
|
}
|
||
|
func (p *DbMessageProvider) putToDb(message *UserMessage) error {
|
||
|
if !p.saveToDb {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *DbMessageProvider) updateDb(status *MessageStatus) error {
|
||
|
url := p.reportAPI
|
||
|
if p.appendIMEI {
|
||
|
url += status.IMEI
|
||
|
}
|
||
|
err := p.putJSON(url, status)
|
||
|
if err != nil {
|
||
|
log.Printf("PUT request error: %v\n", err)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *DbMessageProvider) getJSON(url string, target interface{}) error {
|
||
|
r, err := p.serviceClient.Get(url)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer r.Body.Close()
|
||
|
|
||
|
return json.NewDecoder(r.Body).Decode(target)
|
||
|
}
|
||
|
|
||
|
func (p *DbMessageProvider) putJSON(url string, data interface{}) error {
|
||
|
payload, err := json.Marshal(data)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
reader := bytes.NewReader(payload)
|
||
|
client := p.serviceClient
|
||
|
req, err := http.NewRequest("PUT", url, reader)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
req.ContentLength = int64(len(payload))
|
||
|
req.Header.Set("Content-Type", "application/json")
|
||
|
response, err := client.Do(req)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
defer response.Body.Close()
|
||
|
|
||
|
//read content
|
||
|
result, err := ioutil.ReadAll(response.Body)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
str := string(result)
|
||
|
if strings.Contains(str, "no") {
|
||
|
return errors.New("Failed to PUT data to service: " + str)
|
||
|
}
|
||
|
|
||
|
//request code
|
||
|
code := response.StatusCode
|
||
|
if code == http.StatusOK || code == http.StatusCreated {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
status := response.Status
|
||
|
return errors.New("PUT error with status = " + status)
|
||
|
}
|
||
|
|
||
|
//-------------------------------------------------------------------------------------------------
|
||
|
|
||
|
//Get return broadcast message for given IMEI
|
||
|
func (p *DbMessageProvider) Get(imei uint64) (*BroadcastMessage, error) {
|
||
|
p.RLock()
|
||
|
defer p.RUnlock()
|
||
|
|
||
|
//get queue for this imei
|
||
|
queue, ok := p.queueMsg[imei]
|
||
|
if !ok {
|
||
|
return p.getFromDb(imei)
|
||
|
}
|
||
|
|
||
|
//get message
|
||
|
msg := queue.get()
|
||
|
if msg == nil {
|
||
|
return p.getFromDb(imei)
|
||
|
}
|
||
|
return msg, nil
|
||
|
}
|
||
|
|
||
|
//Put setup new messages
|
||
|
func (p *DbMessageProvider) Put(message *UserMessage) error {
|
||
|
p.Lock()
|
||
|
defer p.Unlock()
|
||
|
|
||
|
now := ISOTime{time.Now()}
|
||
|
for _, vid := range message.Vehicles {
|
||
|
imei, err := strconv.ParseUint(vid, 10, 64)
|
||
|
if err != nil {
|
||
|
log.Printf("Parse IMEI (%v) error: %v\n", vid, err)
|
||
|
continue
|
||
|
}
|
||
|
bm := &BroadcastMessage{
|
||
|
Timestamp: now,
|
||
|
Duration: message.Duration,
|
||
|
NBuzzer: message.NBuzzer,
|
||
|
Message: message.Message,
|
||
|
Expired: message.Expired,
|
||
|
}
|
||
|
queue, ok := p.queueMsg[imei]
|
||
|
if !ok {
|
||
|
queue = newMessageSet(p.capacity)
|
||
|
p.queueMsg[imei] = queue
|
||
|
}
|
||
|
|
||
|
ok = queue.putUnique(bm)
|
||
|
if !ok {
|
||
|
//log
|
||
|
log.Printf("Put message error-> %v:%v", imei, bm.Message)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
//save to database
|
||
|
return p.putToDb(message)
|
||
|
}
|
||
|
|
||
|
//Update message status
|
||
|
func (p *DbMessageProvider) Update(status *MessageStatus) error {
|
||
|
p.Lock()
|
||
|
defer p.Unlock()
|
||
|
|
||
|
imei, err := strconv.ParseUint(status.IMEI, 10, 64)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
//message delivered
|
||
|
if status.StatusCode == MessageDelivered || status.StatusCode == MessageError {
|
||
|
//get message from "sent message" collection
|
||
|
sets, ok := p.sentMsg[imei]
|
||
|
if !ok {
|
||
|
return errors.New("delivered message not found in sets")
|
||
|
}
|
||
|
msg := sets.take()
|
||
|
|
||
|
//if status is error, return message to broadcast queue
|
||
|
if msg != nil && status.StatusCode == MessageError {
|
||
|
//return to sets
|
||
|
queue, ok := p.queueMsg[imei]
|
||
|
if !ok {
|
||
|
queue = newMessageSet(p.capacity)
|
||
|
p.queueMsg[imei] = queue
|
||
|
}
|
||
|
queue.putUnique(msg)
|
||
|
}
|
||
|
} else if status.StatusCode == MessageSent {
|
||
|
//message sent, get from queue
|
||
|
queue, ok := p.queueMsg[imei]
|
||
|
if !ok {
|
||
|
return errors.New("message not found in sets")
|
||
|
}
|
||
|
msg := queue.take()
|
||
|
if msg != nil {
|
||
|
//move to "sent message" collection
|
||
|
sets, ok := p.sentMsg[imei]
|
||
|
if !ok {
|
||
|
sets = newMessageSet(p.capacity)
|
||
|
p.sentMsg[imei] = sets
|
||
|
}
|
||
|
sets.put(msg)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
//update status in database
|
||
|
return p.updateDb(status)
|
||
|
}
|