OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_signalk_net.cpp
1/***************************************************************************
2 *
3 * Project: OpenCPN
4 * Purpose:
5 * Author: David Register, Alec Leamas
6 *
7 ***************************************************************************
8 * Copyright (C) 2022 by David Register, Alec Leamas *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the *
22 * Free Software Foundation, Inc., *
23 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
24 **************************************************************************/
25
26#include <vector>
27#include <mutex> // std::mutex
28#include <queue> // std::queue
29
30#include "rapidjson/document.h"
31
32#include "comm_drv_signalk_net.h"
33#include "comm_navmsg_bus.h"
34#include "comm_drv_registry.h"
35
36#include "easywsclient.hpp"
37#include "geodesic.h"
38
39class CommDriverSignalKNetEvent; // fwd
40
42class CommDriverSignalKNetThread : public wxThread {
43public:
45 const wxString& PortName,
46 const wxString& strBaudRate);
47
49 void* Entry();
50 bool SetOutMsg(const wxString& msg);
51 void OnExit(void);
52
53private:
54 void ThreadMessage(const wxString& msg);
55 bool OpenComPortPhysical(const wxString& com_name, int baud_rate);
56 void CloseComPortPhysical();
57 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
58 size_t WriteComPortPhysical(unsigned char *msg, size_t length);
59 void SetGatewayOperationMode(void);
60
61 CommDriverSignalKNet* m_pParentDriver;
62 wxString m_PortName;
63 wxString m_FullPortName;
64
65 unsigned char* put_ptr;
66 unsigned char* tak_ptr;
67
68 unsigned char* rx_buffer;
69
70 int m_baud;
71 int m_n_timeout;
72
73// n2k_atomic_queue<char*> out_que;
74
75};
76
78wxDECLARE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
79
81class CommDriverSignalKNetEvent : public wxEvent {
82public:
83 CommDriverSignalKNetEvent(wxEventType commandType = wxEVT_NULL, int id = 0)
84 : wxEvent(id, commandType){};
86
87 // accessors
88 void SetPayload(std::shared_ptr<std::string> data) {
89 m_payload = data;
90 }
91 std::shared_ptr<std::string> GetPayload() { return m_payload; }
92
93 // required for sending with wxPostEvent()
94 wxEvent* Clone() const {
96 newevent->m_payload = this->m_payload;
97 return newevent;
98 };
99
100private:
101 std::shared_ptr<std::string> m_payload;
102};
103
104// WebSocket implementation
105static wxEvtHandler *s_wsSKConsumer;
106
108class WebSocketThread : public wxThread {
109public:
110 WebSocketThread(CommDriverSignalKNet *parent, wxIPV4address address,
111 wxEvtHandler *consumer);
112 virtual void *Entry();
113
114private:
115 static void HandleMessage(const std::string &message);
116
117 wxIPV4address m_address;
118 wxEvtHandler *m_consumer;
119 CommDriverSignalKNet *m_parentStream;
120};
121
122WebSocketThread::WebSocketThread(CommDriverSignalKNet *parent,
123 wxIPV4address address,
124 wxEvtHandler *consumer) {
125 m_address = address;
126 m_consumer = consumer;
127 m_parentStream = parent;
128}
129
130void *WebSocketThread::Entry() {
131 using easywsclient::WebSocket;
132
133 bool not_done = true;
134
135 m_parentStream->SetThreadRunning(true);
136
137 s_wsSKConsumer = m_consumer;
138
139 wxString host = m_address.IPAddress();
140 int port = m_address.Service();
141
142 // Craft the address string
143 std::stringstream wsAddress;
144 wsAddress << "ws://" << host.mb_str() << ":" << port
145 << "/signalk/v1/stream?subscribe=all&sendCachedValues=false";
146
147 WebSocket::pointer ws = 0;
148
149 while ((not_done) && (m_parentStream->m_Thread_run_flag > 0)) {
150 bool not_connected = true;
151 while ((not_connected) && (m_parentStream->m_Thread_run_flag > 0)) {
152 ws = WebSocket::from_url(wsAddress.str());
153 if (ws == NULL)
154 printf("No Connect\n");
155 else
156 not_connected = false;
157
158 if (m_parentStream->m_Thread_run_flag == 0){
159 m_parentStream->SetThreadRunning(false);
160 return 0;
161 }
162 }
163
164 while ((not_done) && (m_parentStream->m_Thread_run_flag > 0)) {
165 if (TestDestroy()) {
166 //printf("ws receiving delete\n");
167 ws->close();
168 not_done = false; // smooth exit
169 //break;
170 }
171
172 if (ws->getReadyState() == WebSocket::CLOSED) {
173 //printf("ws closed\n");
174 break;
175 }
176 ws->poll(10);
177 if (ws->getReadyState() == WebSocket::OPEN) {
178 ws->dispatch(HandleMessage);
179 }
180 if( m_parentStream->m_Thread_run_flag <= 0){
181 //printf("done\n");
182 ws->close();
183 not_done = false; // smooth exit
184 }
185 }
186 }
187
188 //printf("ws delete\n");
189 delete ws;
190
191 m_parentStream->SetThreadRunning(false);
192 m_parentStream->m_Thread_run_flag = -1;
193
194 return 0;
195}
196
197void WebSocketThread::HandleMessage(const std::string &message) {
198 int yyp = 0;
199 if (s_wsSKConsumer) {
200 CommDriverSignalKNetEvent signalKEvent(wxEVT_COMMDRIVER_SIGNALK_NET, 0);
201 auto buffer = std::make_shared<std::string>(message);
202
203 signalKEvent.SetPayload(buffer);
204 s_wsSKConsumer->AddPendingEvent(signalKEvent);
205 }
206}
207
208/*========================================================================
209 * CommDriverSignalKNet implementation
210 */
211
212wxDEFINE_EVENT(wxEVT_COMMDRIVER_SIGNALK_NET, CommDriverSignalKNetEvent);
213
214CommDriverSignalKNet::CommDriverSignalKNet(const ConnectionParams* params,
215 DriverListener& listener)
216 : CommDriverSignalK(((ConnectionParams*)params)->GetStrippedDSPort()),
217 m_Thread_run_flag(-1),
218 m_params(*params),
219 m_listener(listener) {
220
221 // Prepare the wxEventHandler to accept events from the actual hardware thread
222 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
223 this);
224
225 m_addr.Hostname(params->NetworkAddress);
226 m_addr.Service(params->NetworkPort);
227 m_socketread_watchdog_timer.SetOwner(this, TIMER_SOCKET);
228 m_wsThread = NULL;
229 m_threadActive = false;
230
231 Open();
232}
233
234CommDriverSignalKNet::~CommDriverSignalKNet() {
235 Close();
236}
237
239 CommDriverRegistry::GetInstance().Activate(shared_from_this());
240}
241
242void CommDriverSignalKNet::Open(void) {
243 wxString discoveredIP;
244 int discoveredPort;
245
246 //if (m_useWebSocket)
247 {
248 std::string serviceIdent =
249 std::string("_signalk-ws._tcp.local."); // Works for node.js server
250#if 0
251 if (m_params->AutoSKDiscover) {
252 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
253 1)) { // 1 second scan
254 wxLogDebug(wxString::Format(
255 _T("SK server autodiscovery finds WebSocket service: %s:%d"),
256 discoveredIP.c_str(), discoveredPort));
257 m_addr.Hostname(discoveredIP);
258 m_addr.Service(discoveredPort);
259
260 // Update the connection params, by pointer to item in global params
261 // array
262 ConnectionParams *params = (ConnectionParams *)m_params; // non-const
263 params->NetworkAddress = discoveredIP;
264 params->NetworkPort = discoveredPort;
265 } else
266 wxLogDebug(_T("SK server autodiscovery finds no WebSocket server."));
267 }
268#endif
269 OpenWebSocket();
270 }
271}
272void CommDriverSignalKNet::Close(){
273 CloseWebSocket();
274}
275
276void CommDriverSignalKNet::OpenWebSocket() {
277 // printf("OpenWebSocket\n");
278 wxLogMessage(wxString::Format(_T("Opening Signal K WebSocket client: %s"),
279 m_params.GetDSPort().c_str()));
280
281 // Start a thread to run the client without blocking
282
283 m_wsThread = new WebSocketThread(this, GetAddr(), this);
284 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
285 wxLogError(wxT("Can't create WebSocketThread!"));
286
287 return;
288 }
289
290 ResetWatchdog();
291 GetSocketThreadWatchdogTimer()->Start(1000,
292 wxTIMER_ONE_SHOT); // Start the dog
293 SetThreadRunFlag(1);
294
295 m_wsThread->Run();
296}
297
298void CommDriverSignalKNet::CloseWebSocket() {
299 if (m_wsThread) {
300 if (IsThreadRunning()) {
301 wxLogMessage(_T("Stopping Secondary SignalK Thread"));
302
303 m_Thread_run_flag = 0;
304 int tsec = 10;
305 while ((m_Thread_run_flag >= 0) && (tsec--)) wxSleep(1);
306
307 wxString msg;
308 if (m_Thread_run_flag < 0)
309 msg.Printf(_T("Stopped in %d sec."), 10 - tsec);
310 else
311 msg.Printf(_T("Not Stopped after 10 sec."));
312 wxLogMessage(msg);
313 }
314
315 m_bsec_thread_active = false;
316 wxMilliSleep(100);
317
318
319
320#if 0
321 m_Thread_run_flag = 0;
322 printf("sending delete\n");
323 m_wsThread->Delete();
324 wxMilliSleep(100);
325
326 int nDeadman = 0;
327 while (IsThreadRunning() && (++nDeadman < 200)) { // spin for max 2 secs.
328 wxMilliSleep(10);
329 }
330 printf("Closed in %d\n", nDeadman);
331 wxMilliSleep(100);
332#endif
333 }
334}
335
336void CommDriverSignalKNet::handle_SK_sentence(
338
339 rapidjson::Document root;
340
341 //LOG_DEBUG("%s\n", msg.c_str());
342
343 std::string *msg = event.GetPayload().get();
344 std::string msgTerminated = *msg;
345 msgTerminated.append("\r\n");
346
347 root.Parse(*msg);
348 if (root.HasParseError()) {
349 wxLogMessage(wxString::Format(
350 _T("SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
351 root.GetParseError()));
352 return;
353 }
354
355 // Decode just enough of string to extract some identifiers
356 // such as the sK version, "self" context, and target context
357 if (root.HasMember("version")) {
358 wxString msg = _T("Connected to Signal K server version: ");
359 msg << (root["version"].GetString());
360 wxLogMessage(msg);
361 }
362
363 if (root.HasMember("self")) {
364 if (strncmp(root["self"].GetString(), "vessels.", 8) == 0)
365 m_self = (root["self"].GetString()); // for java server, and OpenPlotter
366 // node.js server 1.20
367 else
368 m_self =
369 std::string("vessels.").append(root["self"].GetString()); // for Node.js server
370 }
371
372 if (root.HasMember("context") && root["context"].IsString()) {
373 m_context = root["context"].GetString();
374 }
375
376 //Notify all listeners
377 auto navmsg = std::make_shared<const SignalkMsg>(m_self, m_context,
378 msgTerminated );
379 m_listener.Notify(std::move(navmsg));
380}
381
382#if 0
383void CommDriverSignalKNet::handleUpdate(wxJSONValue &update) {
384 wxString sfixtime = "";
385
386 if (update.HasMember("timestamp")) {
387 sfixtime = update["timestamp"].AsString();
388 }
389 if (update.HasMember("values") && update["values"].IsArray()) {
390 for (int j = 0; j < update["values"].Size(); ++j) {
391 wxJSONValue &item = update["values"][j];
392 updateItem(item, sfixtime);
393 }
394 }
395}
396
397void CommDriverSignalKNet::updateItem(wxJSONValue &item,
398 wxString &sfixtime) {
399 if (item.HasMember("path") && item.HasMember("value")) {
400 const wxString &update_path = item["path"].AsString();
401 wxJSONValue &value = item["value"];
402
403 if (update_path == _T("navigation.position") && !value.IsNull()) {
404 updateNavigationPosition(value, sfixtime);
405 } else if (update_path == _T("navigation.speedOverGround") &&
406 m_bGPSValid_SK && !value.IsNull()) {
407 updateNavigationSpeedOverGround(value, sfixtime);
408 } else if (update_path == _T("navigation.courseOverGroundTrue") &&
409 m_bGPSValid_SK && !value.IsNull()) {
410 updateNavigationCourseOverGround(value, sfixtime);
411 } else if (update_path == _T("navigation.courseOverGroundMagnetic")) {
412 }
413 else if (update_path ==
414 _T("navigation.gnss.satellites")) // From GGA sats in use
415 {
416 /*if (g_priSats >= 2)*/ updateGnssSatellites(value, sfixtime);
417 } else if (update_path ==
418 _T("navigation.gnss.satellitesInView")) // From GSV sats in view
419 {
420 /*if (g_priSats >= 3)*/ updateGnssSatellites(value, sfixtime);
421 } else if (update_path == _T("navigation.headingTrue")) {
422 if(!value.IsNull())
423 updateHeadingTrue(value, sfixtime);
424 } else if (update_path == _T("navigation.headingMagnetic")) {
425 if(!value.IsNull())
426 updateHeadingMagnetic(value, sfixtime);
427 } else if (update_path == _T("navigation.magneticVariation")) {
428 if(!value.IsNull())
429 updateMagneticVariance(value, sfixtime);
430 } else {
431 // wxLogMessage(wxString::Format(_T("** Signal K unhandled update: %s"),
432 // update_path));
433#if 0
434 wxString dbg;
435 wxJSONWriter writer;
436 writer.Write(item, dbg);
437 wxString msg( _T("update: ") );
438 msg.append(dbg);
439 wxLogMessage(msg);
440#endif
441 }
442 }
443}
444
445void CommDriverSignalKNet::updateNavigationPosition(
446 wxJSONValue &value, const wxString &sfixtime) {
447 if ((value.HasMember("latitude" && value["latitude"].IsDouble())) &&
448 (value.HasMember("longitude") && value["longitude"].IsDouble())) {
449 // wxLogMessage(_T(" ***** Position Update"));
450 m_lat = value["latitude"].AsDouble();
451 m_lon = value["longitude"].AsDouble();
452 m_bGPSValid_SK = true;
453 } else {
454 m_bGPSValid_SK = false;
455 }
456}
457
458
459void CommDriverSignalKNet::updateNavigationSpeedOverGround(
460 wxJSONValue &value, const wxString &sfixtime){
461 double sog_ms = value.AsDouble();
462 double sog_knot = sog_ms * ms_to_knot_factor;
463 // wxLogMessage(wxString::Format(_T(" ***** SOG: %f, %f"), sog_ms, sog_knot));
464 m_sog = sog_knot;
465}
466
467void CommDriverSignalKNet::updateNavigationCourseOverGround(
468 wxJSONValue &value, const wxString &sfixtime) {
469 double cog_rad = value.AsDouble();
470 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
471 // wxLogMessage(wxString::Format(_T(" ***** COG: %f, %f"), cog_rad, cog_deg));
472 m_cog = cog_deg;
473}
474
475void CommDriverSignalKNet::updateGnssSatellites(wxJSONValue &value,
476 const wxString &sfixtime) {
477#if 0
478 if (value.IsInt()) {
479 if (value.AsInt() > 0) {
480 m_frame->setSatelitesInView(value.AsInt());
481 g_priSats = 2;
482 }
483 } else if ((value.HasMember("count") && value["count"].IsInt())) {
484 m_frame->setSatelitesInView(value["count"].AsInt());
485 g_priSats = 3;
486 }
487#endif
488}
489
490void CommDriverSignalKNet::updateHeadingTrue(wxJSONValue &value,
491 const wxString &sfixtime) {
492 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
493}
494
495void CommDriverSignalKNet::updateHeadingMagnetic(
496 wxJSONValue &value, const wxString &sfixtime) {
497 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
498}
499
500void CommDriverSignalKNet::updateMagneticVariance(
501 wxJSONValue &value, const wxString &sfixtime) {
502 m_var = GEODESIC_RAD2DEG(value.AsDouble());
503}
504
505#endif
507
508// std::vector<unsigned char>* payload = p.get();
509//
510// // Extract the NMEA0183 sentence
511// std::string full_sentence = std::string(payload->begin(), payload->end());
512
513
514
void Activate(DriverPtr driver)
Add driver to list of active drivers.
Internal event worker thread -> main driver.
Driver for SignalK messages over TCP/IP.
void Activate() override
Register driver and possibly do other post-ctor steps.
Abstract SignalK driver interface.
Interface implemented by transport layer and possible other parties like test code which should handl...
Definition: comm_driver.h:47
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
Internal worker thread.