You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
468 lines
11 KiB
468 lines
11 KiB
//Message model |
|
/* |
|
Table "public.bcs_list" |
|
Column | Type | Modifiers |
|
----------------+-----------------------------+------------------------------------------------------- |
|
id | bigint | not null default nextval('bcs_list_id_seq'::regclass) |
|
bcs_message_id | bigint | |
|
imei | character varying(25) | |
|
sent_ts | timestamp without time zone | |
|
delivered_ts | timestamp without time zone | |
|
Indexes: |
|
"bcs_list_pkey" PRIMARY KEY, btree (id) |
|
Foreign-key constraints: |
|
"bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id) |
|
|
|
Table "public.bcs_message" |
|
Column | Type | Modifiers |
|
------------------+-----------------------------+---------------------------------------------------------- |
|
id | bigint | not null default nextval('bcs_message_id_seq'::regclass) |
|
bcs_message_time | timestamp without time zone | |
|
duration | integer | |
|
mbuzzer | integer | |
|
message | text | |
|
expired | timestamp without time zone | |
|
company_id | bigint | |
|
Indexes: |
|
"bcs_message_pkey" PRIMARY KEY, btree (id) |
|
Foreign-key constraints: |
|
"bcs_message_company_id_fkey" FOREIGN KEY (company_id) REFERENCES m_company(id) |
|
Referenced by: |
|
TABLE "bcs_list" CONSTRAINT "bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id) |
|
*/ |
|
|
|
package model |
|
|
|
import ( |
|
"bytes" |
|
"encoding/json" |
|
"errors" |
|
"fmt" |
|
"io/ioutil" |
|
"log" |
|
"net/http" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"time" |
|
|
|
"github.com/ipsusila/opt" |
|
) |
|
|
|
//Message status report |
|
const ( |
|
MessageError = -1 |
|
MessageSent = 1 |
|
MessageDelivered = 2 |
|
) |
|
|
|
//ISOTime encapsulate timestamp for JSON parsing |
|
type ISOTime struct { |
|
time.Time |
|
} |
|
|
|
//UserMessage retrieved from GUI |
|
type UserMessage struct { |
|
Vehicles []string `json:"vehicles"` |
|
Duration int `json:"duration"` |
|
NBuzzer int `json:"nbuzzer"` |
|
Message string `json:"message"` |
|
Expired ISOTime `json:"expired"` |
|
} |
|
|
|
//BroadcastMessage holds message information need to be broadcast |
|
type BroadcastMessage struct { |
|
Timestamp ISOTime `json:"ts"` |
|
Duration int `json:"duration"` |
|
NBuzzer int `json:"nbuzzer"` |
|
Message string `json:"message"` |
|
Expired ISOTime `json:"expired"` |
|
} |
|
|
|
//MessageStatus for specific IMEI |
|
type MessageStatus struct { |
|
IMEI string `json:"imei"` |
|
Status string `json:"status"` |
|
Timestamp string `json:"timestamp"` |
|
StatusCode int `json:"-"` |
|
} |
|
|
|
//MessageProvider send/receive message |
|
type MessageProvider interface { |
|
Get(imei uint64) (*BroadcastMessage, error) |
|
Put(message *UserMessage) error |
|
Update(status *MessageStatus) error |
|
} |
|
|
|
type messageSet struct { |
|
sync.RWMutex |
|
messages []*BroadcastMessage |
|
readPos int |
|
writePos int |
|
capacity int |
|
numItems int |
|
} |
|
|
|
//DbMessageProvider provides message with DB storage |
|
type DbMessageProvider struct { |
|
sync.RWMutex |
|
queueMsg map[uint64]*messageSet |
|
sentMsg map[uint64]*messageSet |
|
capacity int |
|
saveToDb bool |
|
loadFromDb bool |
|
messageAPI string |
|
reportAPI string |
|
serviceTimeout time.Duration |
|
appendIMEI bool |
|
serviceClient *http.Client |
|
} |
|
|
|
//------------------------------------------------------------------------------------------------- |
|
|
|
//UnmarshalJSON convert string to timestamp |
|
//Convet from Y-m-d H:M:S |
|
func (tm *ISOTime) UnmarshalJSON(b []byte) (err error) { |
|
//TODO time other than local |
|
s := strings.Trim(string(b), "\"") |
|
tm.Time, err = time.ParseInLocation("2006-01-02 15:04:05", s, time.Local) |
|
if err != nil { |
|
tm.Time = time.Time{} |
|
} |
|
return err |
|
} |
|
|
|
//------------------------------------------------------------------------------------------------- |
|
func newMessageSet(capacity int) *messageSet { |
|
sets := &messageSet{ |
|
capacity: capacity, |
|
numItems: 0, |
|
readPos: 0, |
|
writePos: 0, |
|
messages: make([]*BroadcastMessage, capacity), |
|
} |
|
|
|
return sets |
|
} |
|
|
|
func (q *messageSet) doPut(msg *BroadcastMessage) bool { |
|
//check count (full or not?) |
|
q.Lock() |
|
defer q.Unlock() |
|
|
|
if q.numItems == q.capacity { |
|
return false |
|
} |
|
|
|
msg.Timestamp.Time = time.Now() |
|
pos := q.writePos % q.capacity |
|
q.messages[pos] = msg |
|
q.writePos = pos + 1 |
|
q.numItems++ |
|
|
|
return true |
|
} |
|
|
|
func (q *messageSet) put(msg *BroadcastMessage) bool { |
|
//check message pointer |
|
if msg == nil { |
|
return false |
|
} |
|
|
|
return q.doPut(msg) |
|
} |
|
func (q *messageSet) putUnique(msg *BroadcastMessage) bool { |
|
//check message pointer |
|
if msg == nil || q.contains(msg) { |
|
return false |
|
} |
|
|
|
return q.doPut(msg) |
|
} |
|
|
|
func (q *messageSet) take() *BroadcastMessage { |
|
q.Lock() |
|
defer q.Unlock() |
|
|
|
//check count (empty or not?) |
|
if q.numItems == 0 { |
|
return nil |
|
} |
|
|
|
pos := q.readPos % q.capacity |
|
msg := q.messages[pos] |
|
q.readPos = pos + 1 |
|
q.numItems-- |
|
|
|
return msg |
|
} |
|
|
|
func (q *messageSet) get() *BroadcastMessage { |
|
q.Lock() |
|
defer q.Unlock() |
|
|
|
//check count (empty or not?) |
|
if q.numItems == 0 { |
|
return nil |
|
} |
|
|
|
pos := q.readPos % q.capacity |
|
return q.messages[pos] |
|
} |
|
|
|
func (q *messageSet) count() int { |
|
q.RLock() |
|
defer q.RUnlock() |
|
|
|
return q.numItems |
|
} |
|
|
|
func (q *messageSet) contains(m *BroadcastMessage) bool { |
|
q.RLock() |
|
defer q.RUnlock() |
|
|
|
beg := q.readPos |
|
end := q.writePos |
|
for { |
|
pos := beg % q.capacity |
|
if pos == end { |
|
break |
|
} |
|
|
|
//check message |
|
m2 := q.messages[pos] |
|
eq := m.EqualTo(m2) |
|
if eq { |
|
return true |
|
} |
|
beg++ |
|
} |
|
|
|
return false |
|
} |
|
|
|
//------------------------------------------------------------------------------------------------- |
|
|
|
//EqualTo return true if message is equal |
|
func (m1 *BroadcastMessage) EqualTo(m2 *BroadcastMessage) bool { |
|
eq := m1.Timestamp == m2.Timestamp && |
|
m1.Duration == m2.Duration && |
|
m1.NBuzzer == m2.NBuzzer && |
|
m1.Expired == m2.Expired && |
|
m1.Message == m2.Message |
|
|
|
return eq |
|
} |
|
|
|
//------------------------------------------------------------------------------------------------- |
|
|
|
//NewDbMessageProvider create new provider |
|
func NewDbMessageProvider(options *opt.Options) (*DbMessageProvider, error) { |
|
//make sure url ends with / |
|
p := &DbMessageProvider{} |
|
|
|
//TODO init |
|
return p, nil |
|
} |
|
|
|
func (p *DbMessageProvider) getFromDb(imei uint64) (*BroadcastMessage, error) { |
|
if !p.loadFromDb { |
|
return nil, nil |
|
} |
|
|
|
//Get message from API |
|
//TODO: can it handle many connections? |
|
msg := &BroadcastMessage{} |
|
|
|
//alamat internal |
|
imeiStr := fmt.Sprintf("%d", imei) |
|
url := p.messageAPI + imeiStr |
|
err := p.getJSON(url, msg) |
|
if err != nil { |
|
return nil, err |
|
} |
|
now := time.Now() |
|
if msg.Expired.Before(now) { |
|
return nil, nil |
|
} |
|
|
|
return msg, nil |
|
} |
|
func (p *DbMessageProvider) putToDb(message *UserMessage) error { |
|
if !p.saveToDb { |
|
return nil |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (p *DbMessageProvider) updateDb(status *MessageStatus) error { |
|
url := p.reportAPI |
|
if p.appendIMEI { |
|
url += status.IMEI |
|
} |
|
err := p.putJSON(url, status) |
|
if err != nil { |
|
log.Printf("PUT request error: %v\n", err) |
|
return err |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (p *DbMessageProvider) getJSON(url string, target interface{}) error { |
|
r, err := p.serviceClient.Get(url) |
|
if err != nil { |
|
return err |
|
} |
|
defer r.Body.Close() |
|
|
|
return json.NewDecoder(r.Body).Decode(target) |
|
} |
|
|
|
func (p *DbMessageProvider) putJSON(url string, data interface{}) error { |
|
payload, err := json.Marshal(data) |
|
if err != nil { |
|
return err |
|
} |
|
reader := bytes.NewReader(payload) |
|
client := p.serviceClient |
|
req, err := http.NewRequest("PUT", url, reader) |
|
if err != nil { |
|
return err |
|
} |
|
req.ContentLength = int64(len(payload)) |
|
req.Header.Set("Content-Type", "application/json") |
|
response, err := client.Do(req) |
|
if err != nil { |
|
return err |
|
} |
|
defer response.Body.Close() |
|
|
|
//read content |
|
result, err := ioutil.ReadAll(response.Body) |
|
if err != nil { |
|
return err |
|
} |
|
str := string(result) |
|
if strings.Contains(str, "no") { |
|
return errors.New("Failed to PUT data to service: " + str) |
|
} |
|
|
|
//request code |
|
code := response.StatusCode |
|
if code == http.StatusOK || code == http.StatusCreated { |
|
return nil |
|
} |
|
|
|
status := response.Status |
|
return errors.New("PUT error with status = " + status) |
|
} |
|
|
|
//------------------------------------------------------------------------------------------------- |
|
|
|
//Get return broadcast message for given IMEI |
|
func (p *DbMessageProvider) Get(imei uint64) (*BroadcastMessage, error) { |
|
p.RLock() |
|
defer p.RUnlock() |
|
|
|
//get queue for this imei |
|
queue, ok := p.queueMsg[imei] |
|
if !ok { |
|
return p.getFromDb(imei) |
|
} |
|
|
|
//get message |
|
msg := queue.get() |
|
if msg == nil { |
|
return p.getFromDb(imei) |
|
} |
|
return msg, nil |
|
} |
|
|
|
//Put setup new messages |
|
func (p *DbMessageProvider) Put(message *UserMessage) error { |
|
p.Lock() |
|
defer p.Unlock() |
|
|
|
now := ISOTime{time.Now()} |
|
for _, vid := range message.Vehicles { |
|
imei, err := strconv.ParseUint(vid, 10, 64) |
|
if err != nil { |
|
log.Printf("Parse IMEI (%v) error: %v\n", vid, err) |
|
continue |
|
} |
|
bm := &BroadcastMessage{ |
|
Timestamp: now, |
|
Duration: message.Duration, |
|
NBuzzer: message.NBuzzer, |
|
Message: message.Message, |
|
Expired: message.Expired, |
|
} |
|
queue, ok := p.queueMsg[imei] |
|
if !ok { |
|
queue = newMessageSet(p.capacity) |
|
p.queueMsg[imei] = queue |
|
} |
|
|
|
ok = queue.putUnique(bm) |
|
if !ok { |
|
//log |
|
log.Printf("Put message error-> %v:%v", imei, bm.Message) |
|
} |
|
} |
|
|
|
//save to database |
|
return p.putToDb(message) |
|
} |
|
|
|
//Update message status |
|
func (p *DbMessageProvider) Update(status *MessageStatus) error { |
|
p.Lock() |
|
defer p.Unlock() |
|
|
|
imei, err := strconv.ParseUint(status.IMEI, 10, 64) |
|
if err != nil { |
|
return err |
|
} |
|
|
|
//message delivered |
|
if status.StatusCode == MessageDelivered || status.StatusCode == MessageError { |
|
//get message from "sent message" collection |
|
sets, ok := p.sentMsg[imei] |
|
if !ok { |
|
return errors.New("delivered message not found in sets") |
|
} |
|
msg := sets.take() |
|
|
|
//if status is error, return message to broadcast queue |
|
if msg != nil && status.StatusCode == MessageError { |
|
//return to sets |
|
queue, ok := p.queueMsg[imei] |
|
if !ok { |
|
queue = newMessageSet(p.capacity) |
|
p.queueMsg[imei] = queue |
|
} |
|
queue.putUnique(msg) |
|
} |
|
} else if status.StatusCode == MessageSent { |
|
//message sent, get from queue |
|
queue, ok := p.queueMsg[imei] |
|
if !ok { |
|
return errors.New("message not found in sets") |
|
} |
|
msg := queue.take() |
|
if msg != nil { |
|
//move to "sent message" collection |
|
sets, ok := p.sentMsg[imei] |
|
if !ok { |
|
sets = newMessageSet(p.capacity) |
|
p.sentMsg[imei] = sets |
|
} |
|
sets.put(msg) |
|
} |
|
} |
|
|
|
//update status in database |
|
return p.updateDb(status) |
|
}
|
|
|