30#include "rapidjson/document.h"
32#include "comm_drv_signalk_net.h"
33#include "comm_navmsg_bus.h"
34#include "comm_drv_registry.h"
36#include "easywsclient.hpp"
45 const wxString& PortName,
46 const wxString& strBaudRate);
50 bool SetOutMsg(
const wxString& msg);
54 void ThreadMessage(
const wxString& msg);
55 bool OpenComPortPhysical(
const wxString& com_name,
int baud_rate);
56 void CloseComPortPhysical();
57 size_t WriteComPortPhysical(std::vector<unsigned char> msg);
58 size_t WriteComPortPhysical(
unsigned char *msg,
size_t length);
59 void SetGatewayOperationMode(
void);
63 wxString m_FullPortName;
65 unsigned char* put_ptr;
66 unsigned char* tak_ptr;
68 unsigned char* rx_buffer;
84 : wxEvent(
id, commandType){};
88 void SetPayload(std::shared_ptr<std::string> data) {
91 std::shared_ptr<std::string> GetPayload() {
return m_payload; }
94 wxEvent* Clone()
const {
96 newevent->m_payload = this->m_payload;
101 std::shared_ptr<std::string> m_payload;
105static wxEvtHandler *s_wsSKConsumer;
111 wxEvtHandler *consumer);
112 virtual void *Entry();
115 static void HandleMessage(
const std::string &message);
117 wxIPV4address m_address;
118 wxEvtHandler *m_consumer;
123 wxIPV4address address,
124 wxEvtHandler *consumer) {
126 m_consumer = consumer;
127 m_parentStream = parent;
130void *WebSocketThread::Entry() {
131 using easywsclient::WebSocket;
133 bool not_done =
true;
135 m_parentStream->SetThreadRunning(
true);
137 s_wsSKConsumer = m_consumer;
139 wxString host = m_address.IPAddress();
140 int port = m_address.Service();
143 std::stringstream wsAddress;
144 wsAddress <<
"ws://" << host.mb_str() <<
":" << port
145 <<
"/signalk/v1/stream?subscribe=all&sendCachedValues=false";
147 WebSocket::pointer ws = 0;
149 while ((not_done) && (m_parentStream->m_Thread_run_flag > 0)) {
150 bool not_connected =
true;
151 while ((not_connected) && (m_parentStream->m_Thread_run_flag > 0)) {
152 ws = WebSocket::from_url(wsAddress.str());
154 printf(
"No Connect\n");
156 not_connected =
false;
158 if (m_parentStream->m_Thread_run_flag == 0){
159 m_parentStream->SetThreadRunning(
false);
164 while ((not_done) && (m_parentStream->m_Thread_run_flag > 0)) {
172 if (ws->getReadyState() == WebSocket::CLOSED) {
177 if (ws->getReadyState() == WebSocket::OPEN) {
178 ws->dispatch(HandleMessage);
180 if( m_parentStream->m_Thread_run_flag <= 0){
191 m_parentStream->SetThreadRunning(
false);
192 m_parentStream->m_Thread_run_flag = -1;
197void WebSocketThread::HandleMessage(
const std::string &message) {
199 if (s_wsSKConsumer) {
201 auto buffer = std::make_shared<std::string>(message);
203 signalKEvent.SetPayload(buffer);
204 s_wsSKConsumer->AddPendingEvent(signalKEvent);
217 m_Thread_run_flag(-1),
219 m_listener(listener) {
222 Bind(wxEVT_COMMDRIVER_SIGNALK_NET, &CommDriverSignalKNet::handle_SK_sentence,
225 m_addr.Hostname(params->NetworkAddress);
226 m_addr.Service(params->NetworkPort);
227 m_socketread_watchdog_timer.SetOwner(
this, TIMER_SOCKET);
229 m_threadActive =
false;
234CommDriverSignalKNet::~CommDriverSignalKNet() {
239 CommDriverRegistry::GetInstance().
Activate(shared_from_this());
242void CommDriverSignalKNet::Open(
void) {
243 wxString discoveredIP;
248 std::string serviceIdent =
249 std::string(
"_signalk-ws._tcp.local.");
251 if (m_params->AutoSKDiscover) {
252 if (DiscoverSKServer(serviceIdent, discoveredIP, discoveredPort,
254 wxLogDebug(wxString::Format(
255 _T(
"SK server autodiscovery finds WebSocket service: %s:%d"),
256 discoveredIP.c_str(), discoveredPort));
257 m_addr.Hostname(discoveredIP);
258 m_addr.Service(discoveredPort);
263 params->NetworkAddress = discoveredIP;
264 params->NetworkPort = discoveredPort;
266 wxLogDebug(_T(
"SK server autodiscovery finds no WebSocket server."));
272void CommDriverSignalKNet::Close(){
276void CommDriverSignalKNet::OpenWebSocket() {
278 wxLogMessage(wxString::Format(_T(
"Opening Signal K WebSocket client: %s"),
279 m_params.GetDSPort().c_str()));
284 if (m_wsThread->Create() != wxTHREAD_NO_ERROR) {
285 wxLogError(wxT(
"Can't create WebSocketThread!"));
291 GetSocketThreadWatchdogTimer()->Start(1000,
298void CommDriverSignalKNet::CloseWebSocket() {
300 if (IsThreadRunning()) {
301 wxLogMessage(_T(
"Stopping Secondary SignalK Thread"));
303 m_Thread_run_flag = 0;
305 while ((m_Thread_run_flag >= 0) && (tsec--)) wxSleep(1);
308 if (m_Thread_run_flag < 0)
309 msg.Printf(_T(
"Stopped in %d sec."), 10 - tsec);
311 msg.Printf(_T(
"Not Stopped after 10 sec."));
315 m_bsec_thread_active =
false;
321 m_Thread_run_flag = 0;
322 printf(
"sending delete\n");
323 m_wsThread->Delete();
327 while (IsThreadRunning() && (++nDeadman < 200)) {
330 printf(
"Closed in %d\n", nDeadman);
336void CommDriverSignalKNet::handle_SK_sentence(
339 rapidjson::Document root;
343 std::string *msg =
event.GetPayload().get();
344 std::string msgTerminated = *msg;
345 msgTerminated.append(
"\r\n");
348 if (root.HasParseError()) {
349 wxLogMessage(wxString::Format(
350 _T(
"SignalKDataStream ERROR: the JSON document is not well-formed:%d"),
351 root.GetParseError()));
357 if (root.HasMember(
"version")) {
358 wxString msg = _T(
"Connected to Signal K server version: ");
359 msg << (root[
"version"].GetString());
363 if (root.HasMember(
"self")) {
364 if (strncmp(root[
"self"].GetString(),
"vessels.", 8) == 0)
365 m_self = (root[
"self"].GetString());
369 std::string(
"vessels.").append(root[
"self"].GetString());
372 if (root.HasMember(
"context") && root[
"context"].IsString()) {
373 m_context = root[
"context"].GetString();
377 auto navmsg = std::make_shared<const SignalkMsg>(m_self, m_context,
379 m_listener.
Notify(std::move(navmsg));
383void CommDriverSignalKNet::handleUpdate(wxJSONValue &update) {
384 wxString sfixtime =
"";
386 if (update.HasMember(
"timestamp")) {
387 sfixtime = update[
"timestamp"].AsString();
389 if (update.HasMember(
"values") && update[
"values"].IsArray()) {
390 for (
int j = 0; j < update[
"values"].Size(); ++j) {
391 wxJSONValue &item = update[
"values"][j];
392 updateItem(item, sfixtime);
397void CommDriverSignalKNet::updateItem(wxJSONValue &item,
398 wxString &sfixtime) {
399 if (item.HasMember(
"path") && item.HasMember(
"value")) {
400 const wxString &update_path = item[
"path"].AsString();
401 wxJSONValue &value = item[
"value"];
403 if (update_path == _T(
"navigation.position") && !value.IsNull()) {
404 updateNavigationPosition(value, sfixtime);
405 }
else if (update_path == _T(
"navigation.speedOverGround") &&
406 m_bGPSValid_SK && !value.IsNull()) {
407 updateNavigationSpeedOverGround(value, sfixtime);
408 }
else if (update_path == _T(
"navigation.courseOverGroundTrue") &&
409 m_bGPSValid_SK && !value.IsNull()) {
410 updateNavigationCourseOverGround(value, sfixtime);
411 }
else if (update_path == _T(
"navigation.courseOverGroundMagnetic")) {
413 else if (update_path ==
414 _T(
"navigation.gnss.satellites"))
416 updateGnssSatellites(value, sfixtime);
417 }
else if (update_path ==
418 _T(
"navigation.gnss.satellitesInView"))
420 updateGnssSatellites(value, sfixtime);
421 }
else if (update_path == _T(
"navigation.headingTrue")) {
423 updateHeadingTrue(value, sfixtime);
424 }
else if (update_path == _T(
"navigation.headingMagnetic")) {
426 updateHeadingMagnetic(value, sfixtime);
427 }
else if (update_path == _T(
"navigation.magneticVariation")) {
429 updateMagneticVariance(value, sfixtime);
436 writer.Write(item, dbg);
437 wxString msg( _T(
"update: ") );
445void CommDriverSignalKNet::updateNavigationPosition(
446 wxJSONValue &value,
const wxString &sfixtime) {
447 if ((value.HasMember(
"latitude" && value[
"latitude"].IsDouble())) &&
448 (value.HasMember(
"longitude") && value[
"longitude"].IsDouble())) {
450 m_lat = value[
"latitude"].AsDouble();
451 m_lon = value[
"longitude"].AsDouble();
452 m_bGPSValid_SK =
true;
454 m_bGPSValid_SK =
false;
459void CommDriverSignalKNet::updateNavigationSpeedOverGround(
460 wxJSONValue &value,
const wxString &sfixtime){
461 double sog_ms = value.AsDouble();
462 double sog_knot = sog_ms * ms_to_knot_factor;
467void CommDriverSignalKNet::updateNavigationCourseOverGround(
468 wxJSONValue &value,
const wxString &sfixtime) {
469 double cog_rad = value.AsDouble();
470 double cog_deg = GEODESIC_RAD2DEG(cog_rad);
475void CommDriverSignalKNet::updateGnssSatellites(wxJSONValue &value,
476 const wxString &sfixtime) {
479 if (value.AsInt() > 0) {
480 m_frame->setSatelitesInView(value.AsInt());
483 }
else if ((value.HasMember(
"count") && value[
"count"].IsInt())) {
484 m_frame->setSatelitesInView(value[
"count"].AsInt());
490void CommDriverSignalKNet::updateHeadingTrue(wxJSONValue &value,
491 const wxString &sfixtime) {
492 m_hdt = GEODESIC_RAD2DEG(value.AsDouble());
495void CommDriverSignalKNet::updateHeadingMagnetic(
496 wxJSONValue &value,
const wxString &sfixtime) {
497 m_hdm = GEODESIC_RAD2DEG(value.AsDouble());
500void CommDriverSignalKNet::updateMagneticVariance(
501 wxJSONValue &value,
const wxString &sfixtime) {
502 m_var = GEODESIC_RAD2DEG(value.AsDouble());
void Activate(DriverPtr driver)
Add driver to list of active drivers.
Internal event worker thread -> main driver.
Driver for SignalK messages over TCP/IP.
void Activate() override
Register driver and possibly do other post-ctor steps.
Abstract SignalK driver interface.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.