43#include "comm_drv_n0183_serial.h"
44#include "comm_navmsg_bus.h"
45#include "comm_drv_registry.h"
48#include "serial/serial.h"
52#include "androidUTIL.h"
55typedef enum DS_ENUM_BUFFER_STATE {
58} _DS_ENUM_BUFFER_STATE;
62#define MAX_OUT_QUEUE_MESSAGE_LENGTH 100
68 std::lock_guard<std::mutex> lock(m_mutex);
69 return m_queque.size();
73 std::lock_guard<std::mutex> lock(m_mutex);
74 return m_queque.empty();
78 std::lock_guard<std::mutex> lock(m_mutex);
79 return m_queque.front();
82 void push(
const T& value) {
83 std::lock_guard<std::mutex> lock(m_mutex);
88 std::lock_guard<std::mutex> lock(m_mutex);
93 std::queue<T> m_queque;
94 mutable std::mutex m_mutex;
97#define OUT_QUEUE_LENGTH 20
98#define MAX_OUT_QUEUE_MESSAGE_LENGTH 100
106 const wxString& PortName,
107 const wxString& strBaudRate);
111 bool SetOutMsg(
const wxString& msg);
116 serial::Serial m_serial;
118 void ThreadMessage(
const wxString& msg);
119 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
120 void CloseComPortPhysical();
121 size_t WriteComPortPhysical(
char* msg);
122 size_t WriteComPortPhysical(
const wxString&
string);
126 wxString m_FullPortName;
138 : buf_(std::unique_ptr<T[]>(
new T[size])), max_size_(size) {}
141 size_t capacity()
const;
146 return (!full_ && (head_ == tail_));
155 std::lock_guard<std::mutex> lock(mutex_);
157 if (full_) tail_ = (tail_ + 1) % max_size_;
159 head_ = (head_ + 1) % max_size_;
161 full_ = head_ == tail_;
165 std::lock_guard<std::mutex> lock(mutex_);
167 if (empty())
return T();
170 auto val = buf_[tail_];
172 tail_ = (tail_ + 1) % max_size_;
179 std::unique_ptr<T[]> buf_;
182 const size_t max_size_;
193 wxEventType commandType = wxEVT_COMMDRIVER_N0183_SERIAL,
int id = 0)
194 : wxEvent(
id, commandType){};
198 void SetPayload(std::shared_ptr<std::vector<unsigned char>> data) {
201 std::shared_ptr<std::vector<unsigned char>> GetPayload() {
return m_payload; }
204 wxEvent* Clone()
const {
207 newevent->m_payload = this->m_payload;
212 std::shared_ptr<std::vector<unsigned char>> m_payload;
224 m_Thread_run_flag(-1),
226 m_portstring(params->GetDSPort()),
227 m_pSecondary_Thread(NULL),
229 m_listener(listener) {
230 m_BaudRate = wxString::Format(
"%i", params->Baudrate), SetSecThreadInActive();
233 Bind(wxEVT_COMMDRIVER_N0183_SERIAL, &CommDriverN0183Serial::handle_N0183_MSG,
239CommDriverN0183Serial::~CommDriverN0183Serial() { Close(); }
241bool CommDriverN0183Serial::Open() {
243 comx = m_params.GetDSPort().AfterFirst(
':');
246 comx = comx.BeforeFirst(
' ');
251 std::thread t(&CommDriverN0183SerialThread::Entry, GetSecondaryThread());
256void CommDriverN0183Serial::Close() {
258 wxString::Format(_T(
"Closing NMEA Driver %s"), m_portstring.c_str()));
265 Unbind(wxEVT_COMMDRIVER_N0183_SERIAL, &CommDriverN0183Serial::handle_N0183_MSG,
269 if (m_pSecondary_Thread) {
271 if (m_bsec_thread_active)
273 wxLogMessage(_T(
"Stopping Secondary Thread"));
275 m_Thread_run_flag = 0;
278 while ((m_Thread_run_flag >= 0) && (tsec--)) wxSleep(1);
281 if (m_Thread_run_flag < 0)
282 msg.Printf(_T(
"Stopped in %d sec."), 10 - tsec);
284 msg.Printf(_T(
"Not Stopped after 10 sec."));
288 delete m_pSecondary_Thread;
289 m_pSecondary_Thread = NULL;
290 m_bsec_thread_active =
false;
301 CommDriverRegistry::GetInstance().
Activate(shared_from_this());
305bool CommDriverN0183Serial::SendMessage(std::shared_ptr<const NavMsg> msg,
306 std::shared_ptr<const NavAddr> addr) {
308 auto msg_0183 = std::dynamic_pointer_cast<const Nmea0183Msg>(msg);
309 wxString sentence(msg_0183->payload.c_str());
312 wxString payload = sentence;
313 if( !sentence.EndsWith(_T(
"\r\n")) )
314 payload += _T(
"\r\n");
317 wxString port = m_params.GetStrippedDSPort();
318 androidWriteSerial( port, payload );
321 if( GetSecondaryThread() ) {
322 if( IsSecThreadActive() )
326 if( GetSecondaryThread()->SetOutMsg( sentence ))
341void CommDriverN0183Serial::handle_N0183_MSG(
343 auto p =
event.GetPayload();
344 std::vector<unsigned char>* payload = p.get();
347 std::string full_sentence = std::string(payload->begin(), payload->end());
349 if ((full_sentence[0] ==
'$') || (full_sentence[0] ==
'!')) {
350 std::string identifier;
352 identifier = full_sentence.substr(1, 5);
356 auto msg = std::make_shared<const Nmea0183Msg>(identifier, full_sentence,
358 auto msg_all = std::make_shared<const Nmea0183Msg>(*msg,
"ALL");
360 if (m_params.SentencePassesFilter(full_sentence, FILTER_INPUT))
361 m_listener.
Notify(std::move(msg));
363 m_listener.
Notify(std::move(msg_all));
369#define DS_RX_BUFFER_SIZE 4096
371CommDriverN0183SerialThread::CommDriverN0183SerialThread(
373 const wxString& strBaudRate) {
374 m_pParentDriver = Launcher;
376 m_PortName = PortName;
377 m_FullPortName = _T(
"Serial:") + PortName;
381 if (strBaudRate.ToLong(&lbaud)) m_baud = (int)lbaud;
384CommDriverN0183SerialThread::~CommDriverN0183SerialThread(
void) {}
386void CommDriverN0183SerialThread::OnExit(
void) {}
388bool CommDriverN0183SerialThread::OpenComPortPhysical(
const wxString& com_name,
391 m_serial.setPort(com_name.ToStdString());
392 m_serial.setBaudrate(baud_rate);
394 m_serial.setTimeout(250, 250, 0, 250, 0);
395 }
catch (std::exception& e) {
399 return m_serial.isOpen();
402void CommDriverN0183SerialThread::CloseComPortPhysical() {
405 }
catch (std::exception& e) {
411bool CommDriverN0183SerialThread::SetOutMsg(
const wxString &msg)
413 if(out_que.size() < OUT_QUEUE_LENGTH){
414 wxCharBuffer buf = msg.ToUTF8();
416 char *qmsg = (
char *)malloc(strlen(buf.data()) +1);
417 strcpy(qmsg, buf.data());
426void CommDriverN0183SerialThread::ThreadMessage(
const wxString& msg) {
433size_t CommDriverN0183SerialThread::WriteComPortPhysical(
char* msg) {
434 if (m_serial.isOpen()) {
437 status = m_serial.write((uint8_t*)msg, strlen(msg));
438 }
catch (std::exception& e) {
449void* CommDriverN0183SerialThread::Entry() {
450 bool not_done =
true;
451 m_pParentDriver->SetSecThreadActive();
457 if (!OpenComPortPhysical(m_PortName, m_baud)) {
458 wxString msg(_T(
"NMEA input device open failed: "));
459 msg.Append(m_PortName);
470 static size_t retries = 0;
472 while ((not_done) && (m_pParentDriver->m_Thread_run_flag > 0)) {
473 if ( m_pParentDriver->m_Thread_run_flag == 0)
goto thread_exit;
475 uint8_t next_byte = 0;
476 unsigned int newdata = 0;
479 if (m_serial.isOpen()) {
481 newdata = m_serial.read(rdata, 200);
482 }
catch (std::exception& e) {
485 if (10 < retries++) {
489 CloseComPortPhysical();
497 wxMilliSleep(250 * retries);
498 CloseComPortPhysical();
499 if (OpenComPortPhysical(m_PortName, m_baud))
501 else if (retries < 10)
507 for (
unsigned int i = 0; i < newdata; i++) {
508 circle.put(rdata[i]);
509 if (0x0a == rdata[i]) nl_found++;
516 if (circle.empty()) {
522 auto buffer = std::make_shared<std::vector<unsigned char>>();
523 std::vector<unsigned char>* vec = buffer.get();
525 uint8_t take_byte = circle.get();
526 while ((take_byte != 0x0a) && !circle.empty()) {
527 vec->push_back(take_byte);
528 take_byte = circle.get();
531 if (take_byte == 0x0a) {
532 vec->push_back(take_byte);
534 if ( m_pParentDriver->m_Thread_run_flag == 0)
goto thread_exit;
542 if (vec->at(0) ==
'\r') vec->erase(vec->begin());
545 Nevent.SetPayload(buffer);
546 m_pParentDriver->AddPendingEvent(Nevent);
557 bool b_qdata = !out_que.empty();
561 char* qmsg = out_que.front();
564 char msg[MAX_OUT_QUEUE_MESSAGE_LENGTH];
565 strncpy(msg, qmsg, MAX_OUT_QUEUE_MESSAGE_LENGTH - 1);
568 if (
static_cast<size_t>(-1) == WriteComPortPhysical(msg) &&
573 CloseComPortPhysical();
576 b_qdata = !out_que.empty();
581 CloseComPortPhysical();
582 m_pParentDriver->SetSecThreadInActive();
583 m_pParentDriver->m_Thread_run_flag = -1;
Internal event worker thread -> driver main code.
Driver for NMEA0183 over serial connections.
void Activate() override
Register driver and possibly do other post-ctor steps.
Abstract NMEA0183 drivers common parts.
void Activate(DriverPtr driver)
Add driver to list of active drivers.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Where messages are sent to or received from.