You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

468 lines
11 KiB

//Message model
/*
Table "public.bcs_list"
Column | Type | Modifiers
----------------+-----------------------------+-------------------------------------------------------
id | bigint | not null default nextval('bcs_list_id_seq'::regclass)
bcs_message_id | bigint |
imei | character varying(25) |
sent_ts | timestamp without time zone |
delivered_ts | timestamp without time zone |
Indexes:
"bcs_list_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
"bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id)
Table "public.bcs_message"
Column | Type | Modifiers
------------------+-----------------------------+----------------------------------------------------------
id | bigint | not null default nextval('bcs_message_id_seq'::regclass)
bcs_message_time | timestamp without time zone |
duration | integer |
mbuzzer | integer |
message | text |
expired | timestamp without time zone |
company_id | bigint |
Indexes:
"bcs_message_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
"bcs_message_company_id_fkey" FOREIGN KEY (company_id) REFERENCES m_company(id)
Referenced by:
TABLE "bcs_list" CONSTRAINT "bcs_list_bcs_message_id_fkey" FOREIGN KEY (bcs_message_id) REFERENCES bcs_message(id)
*/
package model
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
"github.com/ipsusila/opt"
)
//Message status report
const (
MessageError = -1
MessageSent = 1
MessageDelivered = 2
)
//ISOTime encapsulate timestamp for JSON parsing
type ISOTime struct {
time.Time
}
//UserMessage retrieved from GUI
type UserMessage struct {
Vehicles []string `json:"vehicles"`
Duration int `json:"duration"`
NBuzzer int `json:"nbuzzer"`
Message string `json:"message"`
Expired ISOTime `json:"expired"`
}
//BroadcastMessage holds message information need to be broadcast
type BroadcastMessage struct {
Timestamp ISOTime `json:"ts"`
Duration int `json:"duration"`
NBuzzer int `json:"nbuzzer"`
Message string `json:"message"`
Expired ISOTime `json:"expired"`
}
//MessageStatus for specific IMEI
type MessageStatus struct {
IMEI string `json:"imei"`
Status string `json:"status"`
Timestamp string `json:"timestamp"`
StatusCode int `json:"-"`
}
//MessageProvider send/receive message
type MessageProvider interface {
Get(imei uint64) (*BroadcastMessage, error)
Put(message *UserMessage) error
Update(status *MessageStatus) error
}
type messageSet struct {
sync.RWMutex
messages []*BroadcastMessage
readPos int
writePos int
capacity int
numItems int
}
//DbMessageProvider provides message with DB storage
type DbMessageProvider struct {
sync.RWMutex
queueMsg map[uint64]*messageSet
sentMsg map[uint64]*messageSet
capacity int
saveToDb bool
loadFromDb bool
messageAPI string
reportAPI string
serviceTimeout time.Duration
appendIMEI bool
serviceClient *http.Client
}
//-------------------------------------------------------------------------------------------------
//UnmarshalJSON convert string to timestamp
//Convet from Y-m-d H:M:S
func (tm *ISOTime) UnmarshalJSON(b []byte) (err error) {
//TODO time other than local
s := strings.Trim(string(b), "\"")
tm.Time, err = time.ParseInLocation("2006-01-02 15:04:05", s, time.Local)
if err != nil {
tm.Time = time.Time{}
}
return err
}
//-------------------------------------------------------------------------------------------------
func newMessageSet(capacity int) *messageSet {
sets := &messageSet{
capacity: capacity,
numItems: 0,
readPos: 0,
writePos: 0,
messages: make([]*BroadcastMessage, capacity),
}
return sets
}
func (q *messageSet) doPut(msg *BroadcastMessage) bool {
//check count (full or not?)
q.Lock()
defer q.Unlock()
if q.numItems == q.capacity {
return false
}
msg.Timestamp.Time = time.Now()
pos := q.writePos % q.capacity
q.messages[pos] = msg
q.writePos = pos + 1
q.numItems++
return true
}
func (q *messageSet) put(msg *BroadcastMessage) bool {
//check message pointer
if msg == nil {
return false
}
return q.doPut(msg)
}
func (q *messageSet) putUnique(msg *BroadcastMessage) bool {
//check message pointer
if msg == nil || q.contains(msg) {
return false
}
return q.doPut(msg)
}
func (q *messageSet) take() *BroadcastMessage {
q.Lock()
defer q.Unlock()
//check count (empty or not?)
if q.numItems == 0 {
return nil
}
pos := q.readPos % q.capacity
msg := q.messages[pos]
q.readPos = pos + 1
q.numItems--
return msg
}
func (q *messageSet) get() *BroadcastMessage {
q.Lock()
defer q.Unlock()
//check count (empty or not?)
if q.numItems == 0 {
return nil
}
pos := q.readPos % q.capacity
return q.messages[pos]
}
func (q *messageSet) count() int {
q.RLock()
defer q.RUnlock()
return q.numItems
}
func (q *messageSet) contains(m *BroadcastMessage) bool {
q.RLock()
defer q.RUnlock()
beg := q.readPos
end := q.writePos
for {
pos := beg % q.capacity
if pos == end {
break
}
//check message
m2 := q.messages[pos]
eq := m.EqualTo(m2)
if eq {
return true
}
beg++
}
return false
}
//-------------------------------------------------------------------------------------------------
//EqualTo return true if message is equal
func (m1 *BroadcastMessage) EqualTo(m2 *BroadcastMessage) bool {
eq := m1.Timestamp == m2.Timestamp &&
m1.Duration == m2.Duration &&
m1.NBuzzer == m2.NBuzzer &&
m1.Expired == m2.Expired &&
m1.Message == m2.Message
return eq
}
//-------------------------------------------------------------------------------------------------
//NewDbMessageProvider create new provider
func NewDbMessageProvider(options *opt.Options) (*DbMessageProvider, error) {
//make sure url ends with /
p := &DbMessageProvider{}
//TODO init
return p, nil
}
func (p *DbMessageProvider) getFromDb(imei uint64) (*BroadcastMessage, error) {
if !p.loadFromDb {
return nil, nil
}
//Get message from API
//TODO: can it handle many connections?
msg := &BroadcastMessage{}
//alamat internal
imeiStr := fmt.Sprintf("%d", imei)
url := p.messageAPI + imeiStr
err := p.getJSON(url, msg)
if err != nil {
return nil, err
}
now := time.Now()
if msg.Expired.Before(now) {
return nil, nil
}
return msg, nil
}
func (p *DbMessageProvider) putToDb(message *UserMessage) error {
if !p.saveToDb {
return nil
}
return nil
}
func (p *DbMessageProvider) updateDb(status *MessageStatus) error {
url := p.reportAPI
if p.appendIMEI {
url += status.IMEI
}
err := p.putJSON(url, status)
if err != nil {
log.Printf("PUT request error: %v\n", err)
return err
}
return nil
}
func (p *DbMessageProvider) getJSON(url string, target interface{}) error {
r, err := p.serviceClient.Get(url)
if err != nil {
return err
}
defer r.Body.Close()
return json.NewDecoder(r.Body).Decode(target)
}
func (p *DbMessageProvider) putJSON(url string, data interface{}) error {
payload, err := json.Marshal(data)
if err != nil {
return err
}
reader := bytes.NewReader(payload)
client := p.serviceClient
req, err := http.NewRequest("PUT", url, reader)
if err != nil {
return err
}
req.ContentLength = int64(len(payload))
req.Header.Set("Content-Type", "application/json")
response, err := client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
//read content
result, err := ioutil.ReadAll(response.Body)
if err != nil {
return err
}
str := string(result)
if strings.Contains(str, "no") {
return errors.New("Failed to PUT data to service: " + str)
}
//request code
code := response.StatusCode
if code == http.StatusOK || code == http.StatusCreated {
return nil
}
status := response.Status
return errors.New("PUT error with status = " + status)
}
//-------------------------------------------------------------------------------------------------
//Get return broadcast message for given IMEI
func (p *DbMessageProvider) Get(imei uint64) (*BroadcastMessage, error) {
p.RLock()
defer p.RUnlock()
//get queue for this imei
queue, ok := p.queueMsg[imei]
if !ok {
return p.getFromDb(imei)
}
//get message
msg := queue.get()
if msg == nil {
return p.getFromDb(imei)
}
return msg, nil
}
//Put setup new messages
func (p *DbMessageProvider) Put(message *UserMessage) error {
p.Lock()
defer p.Unlock()
now := ISOTime{time.Now()}
for _, vid := range message.Vehicles {
imei, err := strconv.ParseUint(vid, 10, 64)
if err != nil {
log.Printf("Parse IMEI (%v) error: %v\n", vid, err)
continue
}
bm := &BroadcastMessage{
Timestamp: now,
Duration: message.Duration,
NBuzzer: message.NBuzzer,
Message: message.Message,
Expired: message.Expired,
}
queue, ok := p.queueMsg[imei]
if !ok {
queue = newMessageSet(p.capacity)
p.queueMsg[imei] = queue
}
ok = queue.putUnique(bm)
if !ok {
//log
log.Printf("Put message error-> %v:%v", imei, bm.Message)
}
}
//save to database
return p.putToDb(message)
}
//Update message status
func (p *DbMessageProvider) Update(status *MessageStatus) error {
p.Lock()
defer p.Unlock()
imei, err := strconv.ParseUint(status.IMEI, 10, 64)
if err != nil {
return err
}
//message delivered
if status.StatusCode == MessageDelivered || status.StatusCode == MessageError {
//get message from "sent message" collection
sets, ok := p.sentMsg[imei]
if !ok {
return errors.New("delivered message not found in sets")
}
msg := sets.take()
//if status is error, return message to broadcast queue
if msg != nil && status.StatusCode == MessageError {
//return to sets
queue, ok := p.queueMsg[imei]
if !ok {
queue = newMessageSet(p.capacity)
p.queueMsg[imei] = queue
}
queue.putUnique(msg)
}
} else if status.StatusCode == MessageSent {
//message sent, get from queue
queue, ok := p.queueMsg[imei]
if !ok {
return errors.New("message not found in sets")
}
msg := queue.take()
if msg != nil {
//move to "sent message" collection
sets, ok := p.sentMsg[imei]
if !ok {
sets = newMessageSet(p.capacity)
p.sentMsg[imei] = sets
}
sets.put(msg)
}
}
//update status in database
return p.updateDb(status)
}