OpenCPN Partial API docs
Loading...
Searching...
No Matches
comm_drv_n2k_socketcan.cpp
1/***************************************************************************
2 *
3 * Project: OpenCPN
4 * Purpose: Implement comm_drv_socketcan.h -- socketcan driver.
5 * Author: David Register, Alec Leamas
6 *
7 ***************************************************************************
8 * Copyright (C) 2022 by David Register, Alec Leamas *
9 * *
10 * This program is free software; you can redistribute it and/or modify *
11 * it under the terms of the GNU General Public License as published by *
12 * the Free Software Foundation; either version 2 of the License, or *
13 * (at your option) any later version. *
14 * *
15 * This program is distributed in the hope that it will be useful, *
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of *
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
18 * GNU General Public License for more details. *
19 * *
20 * You should have received a copy of the GNU General Public License *
21 * along with this program; if not, write to the *
22 * Free Software Foundation, Inc., *
23 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. *
24 **************************************************************************/
25
26#if !defined(__linux__) || defined(__ANDROID__)
27#error "This file can only be compiled on Linux"
28#endif
29
30#include <algorithm>
31#include <atomic>
32#include <chrono>
33#include <mutex>
34#include <thread>
35#include <vector>
36#include <future>
37
38#include <linux/can.h>
39#include <linux/can/raw.h>
40#include <net/if.h>
41#include <serial/serial.h>
42#include <sys/ioctl.h>
43#include <sys/socket.h>
44#include <sys/time.h>
45
46#include <wx/log.h>
47#include <wx/string.h>
48#include <wx/utils.h>
49#include <wx/thread.h>
50
51#include "comm_drv_n2k_socketcan.h"
52#include "comm_drv_registry.h"
53#include "comm_navmsg_bus.h"
54#include "config_vars.h"
55
56#define DEFAULT_N2K_SOURCE_ADDRESS 72
57
58wxDEFINE_EVENT(EVT_N2K_59904, ObservedEvt);
59
60using namespace std::chrono_literals;
61
62using TimePoint = std::chrono::time_point<std::chrono::system_clock,
63 std::chrono::duration<double>>;
64
65static const int kNotFound = -1;
66
68static const int kGcThreshold = 100;
69
71static const std::chrono::milliseconds kGcInterval(10s);
72
74static const std::chrono::milliseconds kEntryMaxAge(100s);
75
77static const int kSocketTimeoutSeconds = 2;
78
79typedef struct can_frame CanFrame;
80
81
82unsigned long BuildCanID(int priority, int source, int destination, int pgn) {
83 // build CanID
84 unsigned long cid = 0;
85 unsigned char pf = (unsigned char) (pgn >> 8);
86 if (pf < 240){
87 cid = ((unsigned long)(priority & 0x7))<<26 | pgn<<8 | ((unsigned long)destination)<<8 | (unsigned long)source;
88 }
89 else {
90 cid = ((unsigned long)(priority & 0x7))<<26 | pgn<<8 | (unsigned long)source;
91 }
92 return cid;
93}
94
96class CanHeader {
97public:
98 CanHeader() : priority('\0'), source('\0'), destination('\0'), pgn(-1) {};
99
101 CanHeader(CanFrame frame);
102
104 bool IsFastMessage() const;
105
106 unsigned char priority;
107 unsigned char source;
108 unsigned char destination;
109 int pgn;
110};
111
114public:
115 class Entry {
116 public:
117 Entry()
118 : time_arrived(std::chrono::system_clock::now()),
119 sid(0), expected_length(0), cursor(0) {}
120
121 bool IsExpired() const {
122 auto age = std::chrono::system_clock::now() - time_arrived;
123 return age > kEntryMaxAge;
124 }
125
126 TimePoint time_arrived;
127
130
133 unsigned int sid;
134
135 unsigned int expected_length;
136 unsigned int cursor;
137 std::vector<unsigned char> data;
138 };
139
140 FastMessageMap() : dropped_frames(0) {}
141
142 Entry operator[](int i) const { return entries[i]; }
143 Entry& operator[](int i) { return entries[i]; }
144
146 int FindMatchingEntry(const CanHeader header, const unsigned char sid);
147
149 int AddNewEntry(void);
150
152 bool InsertEntry(const CanHeader header, const unsigned char* data,
153 int index);
154
156 bool AppendEntry(const CanHeader hdr, const unsigned char* data, int index);
157
159 void Remove(int pos);
160
161private:
162 int GarbageCollector(void);
163
164 void CheckGc() {
165 if (std::chrono::system_clock::now() - last_gc_run > kGcInterval ||
166 entries.size() > kGcThreshold) {
167 GarbageCollector();
168 last_gc_run = std::chrono::system_clock::now();
169 }
170 }
171
172 std::vector<Entry> entries;
173 TimePoint last_gc_run;
174 int dropped_frames;
175 TimePoint dropped_frame_time;
176};
177
178class CommDriverN2KSocketCanImpl; // fwd
179
204class Worker {
205public:
206 Worker(CommDriverN2KSocketCAN* parent, const wxString& PortName);
207
208 bool StartThread();
209 void StopThread();
210 int GetSocket(){return m_socket;}
211
212private:
213 void Entry();
214
215 void ThreadMessage(const std::string& msg, wxLogLevel l = wxLOG_Message);
216
217 int InitSocket(const std::string port_name);
218 void SocketMessage(const std::string& msg, const std::string& device);
219 void HandleInput(CanFrame frame);
220 void ProcessRxMessages(std::shared_ptr<const Nmea2000Msg> n2k_msg);
221
222 std::vector<unsigned char> PushCompleteMsg(const CanHeader header,
223 int position,
224 const CanFrame frame);
225 std::vector<unsigned char> PushFastMsgFragment(const CanHeader& header,
226 int position);
227
228 CommDriverN2KSocketCanImpl* const m_parent_driver;
229 const wxString m_port_name;
230 std::atomic<int> m_run_flag;
231 FastMessageMap fast_messages;
232 int m_socket;
233};
234
237 friend class Worker;
238
239public:
241 : CommDriverN2KSocketCAN(p, l), m_worker(this, p->socketCAN_port),
242 m_source_address(-1){
243 SetN2K_Name();
244 Open();
245 }
246
247 ~CommDriverN2KSocketCanImpl() { Close(); }
248
249 bool Open();
250 void Close();
251 void SetN2K_Name();
252
253 bool SendMessage(std::shared_ptr<const NavMsg> msg,
254 std::shared_ptr<const NavAddr> addr);
255
256 int DoAddressClaim();
257 bool SendAddressClaim(int proposed_source_address);
258
259 Worker& GetWorker(){ return m_worker; }
260
261private:
262 N2kName node_name;
263 Worker m_worker;
264 int m_source_address;
265 int m_last_TX_sequence;
266 std::future<int> m_AddressClaimFuture;
267 wxMutex m_TX_mutex;
268
269 ObservableListener listener_N2K_59904;
270 bool HandleN2K_59904( std::shared_ptr<const Nmea2000Msg> n2k_msg );
271
272};
273
274
275// Static CommDriverN2KSocketCAN factory implementation.
276
277std::shared_ptr<CommDriverN2KSocketCAN> CommDriverN2KSocketCAN::Create(
278 const ConnectionParams* params, DriverListener& listener) {
279 return std::shared_ptr<CommDriverN2KSocketCAN>(
280 new CommDriverN2KSocketCanImpl(params, listener));
281}
282
283
284// CanHeader implementation
285
286CanHeader::CanHeader(const CanFrame frame) {
287 unsigned char buf[4];
288 buf[0] = frame.can_id & 0xFF;
289 buf[1] = (frame.can_id >> 8) & 0xFF;
290 buf[2] = (frame.can_id >> 16) & 0xFF;
291 buf[3] = (frame.can_id >> 24) & 0xFF;
292
293 source = buf[0];
294 destination = buf[2] < 240 ? buf[1] : 255;
295 pgn = (buf[3] & 0x01) << 16 | (buf[2] << 8) | (buf[2] < 240 ? 0 : buf[1]);
296 priority = (buf[3] & 0x1c) >> 2;
297
298// if (pgn == 129029){
299// unsigned char *d = (unsigned char *)&frame;
300// for (size_t i=0 ; i < sizeof(frame) ; i++){
301// printf("%02X ", *d);
302// d++;
303// }
304// printf("\n\n");
305// }
306}
307
308
310 static const std::vector<unsigned> haystack = {
311 // All known multiframe fast messages
312 65240u, 126208u, 126464u, 126996u, 126998u, 127233u, 127237u, 127489u,
313 127496u, 127506u, 128275u, 129029u, 129038u, 129039u, 129040u, 129041u,
314 129284u, 129285u, 129540u, 129793u, 129794u, 129795u, 129797u, 129798u,
315 129801u, 129802u, 129808u, 129809u, 129810u, 130065u, 130074u, 130323u,
316 130577u, 130820u, 130822u, 130824u};
317
318 unsigned needle = static_cast<unsigned>(pgn);
319 auto found = std::find_if(haystack.begin(), haystack.end(),
320 [needle](unsigned i) { return i == needle; });
321 return found != haystack.end();
322}
323
324
325// CommDriverN2KSocketCanImpl implementation
326
327void CommDriverN2KSocketCanImpl::SetN2K_Name() {
328 // We choose some "benign" values for OCPN socketCan interface
329
330 int unique_number = 1;
331#ifndef CLIAPP
332 // Build a simple 16 bit hash of g_hostname, to use as unique "serial number"
333 int hash = 0;
334 std::string str(g_hostname.mb_str());
335 int len = str.size();
336 const char* ch = str.data();
337 for (int i = 0; i < len; i++)
338 hash = hash + ((hash) << 5) + *(ch + i) + ((*(ch + i)) << 7);
339 unique_number = ((hash) ^ (hash >> 16)) & 0xffff;
340#endif
341
342 node_name.SetManufacturerCode(2046);
343 node_name.SetUniqueNumber(unique_number);
344 node_name.SetDeviceFunction(130); // PC Gateway
345 node_name.SetDeviceClass(25); // Inter/Intranetwork Device
346 node_name.SetIndustryGroup(4); // Marine
347}
348
349bool CommDriverN2KSocketCanImpl::Open() {
350
351 // Start the RX worker thread
352 bool bws = m_worker.StartThread();
353 return bws;
354
355}
356
357void CommDriverN2KSocketCanImpl::Close() {
358 wxLogMessage("Closing N2K socketCAN: %s", m_params.socketCAN_port.c_str());
359 m_worker.StopThread();
360
361 // We cannot use shared_from_this() since we might be in the destructor.
362 auto& registry = CommDriverRegistry::GetInstance();
363 auto me = FindDriver(registry.GetDrivers(), iface, bus);
364 registry.Deactivate(me);
365}
366
367
368bool CommDriverN2KSocketCanImpl::SendAddressClaim(int proposed_source_address) {
369
370 wxMutexLocker lock(m_TX_mutex);
371
372 int socket = GetWorker().GetSocket();
373
374 if (socket < 0)
375 return false;
376
377 CanFrame frame;
378 memset(&frame, 0, sizeof(frame));
379
380 uint64_t _pgn = 60928;
381 unsigned long canId = BuildCanID(6, proposed_source_address, 255, _pgn);
382 frame.can_id = canId | CAN_EFF_FLAG;
383
384 // Load the data
385 uint32_t b32_0 = node_name.value.UnicNumberAndManCode;
386 memcpy(&frame.data, &b32_0, 4);
387
388 unsigned char b81 = node_name.value.DeviceInstance;
389 memcpy(&frame.data[4], &b81, 1);
390
391 b81 = node_name.value.DeviceFunction;
392 memcpy(&frame.data[5], &b81, 1);
393
394 b81 = (node_name.value.DeviceClass&0x7f)<<1;
395 memcpy(&frame.data[6], &b81, 1);
396
397 b81 = node_name.value.IndustryGroupAndSystemInstance;
398 memcpy(&frame.data[7], &b81, 1);
399
400 frame.can_dlc = 8; // data length
401
402 int sentbytes = write(socket, &frame, sizeof(frame));
403
404 return (sentbytes == 16);
405}
406
407bool CommDriverN2KSocketCanImpl::SendMessage(std::shared_ptr<const NavMsg> msg,
408 std::shared_ptr<const NavAddr> addr) {
409
410 wxMutexLocker lock(m_TX_mutex);
411
412 // Verify claimed address is useable
413 if ( m_source_address < 0)
414 return false;
415
416 int socket = GetWorker().GetSocket();
417
418 if (socket < 0)
419 return false;
420
421 CanFrame frame;
422 memset(&frame, 0, sizeof(frame));
423
424 auto msg_n2k = std::dynamic_pointer_cast<const Nmea2000Msg>(msg);
425 std::vector<uint8_t> load = msg_n2k->payload;
426
427 uint64_t _pgn = msg_n2k->PGN.pgn;
428
429 unsigned long canId = BuildCanID(6, m_source_address, 255, _pgn);
430
431 frame.can_id = canId | CAN_EFF_FLAG;
432
433 if (load.size() <= 8){
434 frame.can_dlc = load.size();
435 if (load.size() > 0)
436 memcpy(&frame.data, load.data(), load.size());
437
438 int sentbytes = write(socket, &frame, sizeof(frame));
439 }
440 else { // Fast Packet
441 int sequence = (m_last_TX_sequence + 0x20) & 0xE0;
442 m_last_TX_sequence = sequence;
443 unsigned char *data_ptr = load.data();
444 int n_remaining = load.size();
445
446 // First packet
447 frame.can_dlc = 8;
448 frame.data[0] = sequence;
449 frame.data[1] = load.size();
450 int data_len_0 = wxMin(load.size(), 6);
451 memcpy(&frame.data[2], load.data(), data_len_0);
452
453 int sentbytes0 = write(socket, &frame, sizeof(frame));
454
455 data_ptr += data_len_0;
456 n_remaining -= data_len_0;
457 sequence++;
458
459 // The rest of the bytes
460 while (n_remaining > 0){
461 frame.data[0] = sequence;
462 int data_len_n = wxMin(n_remaining, 7);
463 memcpy(&frame.data[1], data_ptr, data_len_n);
464
465 int sentbytesn = write(socket, &frame, sizeof(frame));
466
467 data_ptr += data_len_n;
468 n_remaining -= data_len_n;
469 sequence++;
470 }
471 }
472
473
474 return true;
475}
476
477
478
479// CommDriverN2KSocketCAN implementation
480
481CommDriverN2KSocketCAN::CommDriverN2KSocketCAN(const ConnectionParams* params,
482 DriverListener& listener)
483 : CommDriverN2K(((ConnectionParams*)params)->GetStrippedDSPort()),
484 m_params(*params),
485 m_listener(listener),
486 m_ok(false),
487 m_portstring(params->GetDSPort()),
488 m_baudrate(wxString::Format("%i", params->Baudrate)) {}
489
490CommDriverN2KSocketCAN::~CommDriverN2KSocketCAN() {}
491
493 CommDriverRegistry::GetInstance().Activate(shared_from_this());
494}
495
496
497// Worker implementation
498
499Worker::Worker(CommDriverN2KSocketCAN* parent, const wxString& port_name)
500 : m_parent_driver(dynamic_cast<CommDriverN2KSocketCanImpl*>(parent)),
501 m_port_name(port_name.Clone()),
502 m_run_flag(-1),
503 m_socket(-1) {
504 assert(m_parent_driver != 0);
505}
506
507std::vector<unsigned char> Worker::PushCompleteMsg(const CanHeader header,
508 int position,
509 const CanFrame frame) {
510 std::vector<unsigned char> data;
511 data.push_back(0x93);
512 data.push_back(0x13);
513 data.push_back(header.priority);
514 data.push_back(header.pgn & 0xFF);
515 data.push_back((header.pgn >> 8) & 0xFF);
516 data.push_back((header.pgn >> 16) & 0xFF);
517 data.push_back(header.destination);
518 data.push_back(header.source);
519 data.push_back(0xFF); // FIXME (dave) generate the time fields
520 data.push_back(0xFF);
521 data.push_back(0xFF);
522 data.push_back(0xFF);
523 data.push_back(CAN_MAX_DLEN); // nominally 8
524 for (size_t n = 0; n < CAN_MAX_DLEN; n++) data.push_back(frame.data[n]);
525 data.push_back(0x55); // CRC dummy, not checked
526 return data;
527}
528
529std::vector<unsigned char> Worker::PushFastMsgFragment(const CanHeader& header,
530 int position) {
531 std::vector<unsigned char> data;
532 data.push_back(0x93);
533 data.push_back(fast_messages[position].expected_length + 11);
534 data.push_back(header.priority);
535 data.push_back(header.pgn & 0xFF);
536 data.push_back((header.pgn >> 8) & 0xFF);
537 data.push_back((header.pgn >> 16) & 0xFF);
538 data.push_back(header.destination);
539 data.push_back(header.source);
540 data.push_back(0xFF); // FIXME (dave) Could generate the time fields
541 data.push_back(0xFF);
542 data.push_back(0xFF);
543 data.push_back(0xFF);
544 data.push_back(fast_messages[position].expected_length);
545 for (size_t n = 0; n < fast_messages[position].expected_length; n++)
546 data.push_back(fast_messages[position].data[n]);
547 data.push_back(0x55); // CRC dummy
548 fast_messages.Remove(position);
549 return data;
550}
551
552void Worker::ThreadMessage(const std::string& msg, wxLogLevel level) {
553 wxLogGeneric(level, wxString(msg.c_str()));
554 auto s = std::string("CommDriverN2KSocketCAN: ") + msg;
555 CommDriverRegistry::GetInstance().evt_driver_msg.Notify(level, s);
556}
557
558void Worker::SocketMessage(const std::string& msg, const std::string& device) {
559 std::stringstream ss;
560 ss << msg << device << ": " << strerror(errno);
561 ThreadMessage(ss.str());
562}
563
570int Worker::InitSocket(const std::string port_name) {
571 int sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
572 if (sock < 0) {
573 SocketMessage("SocketCAN socket create failed: ", port_name);
574 return -1;
575 }
576
577 // Get the interface index
578 struct ifreq if_request;
579 strcpy(if_request.ifr_name, port_name.c_str());
580 if (ioctl(sock, SIOCGIFINDEX, &if_request) < 0) {
581 SocketMessage("SocketCAN ioctl (SIOCGIFINDEX) failed: ", port_name);
582 return -1;
583 }
584
585 // Check if interface is UP
586 struct sockaddr_can can_address;
587 can_address.can_family = AF_CAN;
588 can_address.can_ifindex = if_request.ifr_ifindex;
589 if (ioctl(sock, SIOCGIFFLAGS, &if_request) < 0) {
590 SocketMessage("SocketCAN socket IOCTL (SIOCGIFFLAGS) failed: ", port_name);
591 return -1;
592 }
593 if (if_request.ifr_flags & IFF_UP) {
594 ThreadMessage("socketCan interface is UP");
595 } else {
596 ThreadMessage("socketCan interface is NOT UP");
597 return -1;
598 }
599
600 // Set timeout and bind
601 struct timeval tv;
602 tv.tv_sec = kSocketTimeoutSeconds;
603 tv.tv_usec = 0;
604 int r =
605 setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof tv);
606 if (r < 0) {
607 SocketMessage("SocketCAN setsockopt SO_RCVTIMEO failed on device: ",
608 port_name);
609 return -1;
610 }
611 r = bind(sock, (struct sockaddr*)&can_address, sizeof(can_address));
612 if (r < 0) {
613 SocketMessage("SocketCAN socket bind() failed: ", port_name);
614 return -1;
615 }
616 return sock;
617}
618
625void Worker::HandleInput(CanFrame frame) {
626 int position = -1;
627 bool ready = true;
628
629 CanHeader header(frame);
630 if (header.IsFastMessage()) {
631 position = fast_messages.FindMatchingEntry(header, frame.data[0]);
632 if (position == kNotFound) {
633 // Not an existing fast message: create new entry and insert first frame
634 position = fast_messages.AddNewEntry();
635 ready = fast_messages.InsertEntry(header, frame.data, position);
636 } else {
637 // An existing fast message entry is present, append the frame
638 ready = fast_messages.AppendEntry(header, frame.data, position);
639 }
640 }
641 if (ready) {
642 std::vector<unsigned char> vec;
643 if (position >= 0) {
644 // Re-assembled fast message
645 vec = PushFastMsgFragment(header, position);
646 } else {
647 // Single frame message
648 vec = PushCompleteMsg(header, position, frame);
649 }
650 //auto name = N2kName(static_cast<uint64_t>(header.pgn));
651 auto src_addr = m_parent_driver->GetAddress(m_parent_driver->node_name);
652 auto msg = std::make_shared<const Nmea2000Msg>(header.pgn, vec, src_addr);
653 ProcessRxMessages(msg);
654 m_parent_driver->m_listener.Notify(std::move(msg));
655 }
656}
657
659void Worker::ProcessRxMessages(std::shared_ptr<const Nmea2000Msg> n2k_msg){
660
661 if(n2k_msg->PGN.pgn == 59904 ){
662 }
663
664}
665
666
668void Worker::Entry() {
669 int recvbytes;
670 int socket;
671 CanFrame frame;
672
673 socket = InitSocket(m_port_name.ToStdString());
674 if (socket < 0) {
675 std::string msg("SocketCAN socket create failed: ");
676 ThreadMessage(msg + m_port_name.ToStdString());
677 m_run_flag = -1;
678 return;
679 }
680 m_socket = socket;
681
682 // Claim our default address
683 if (m_parent_driver->SendAddressClaim(DEFAULT_N2K_SOURCE_ADDRESS))
684 m_parent_driver->m_source_address = DEFAULT_N2K_SOURCE_ADDRESS;
685
686 // The main loop
687 while (m_run_flag > 0) {
688 recvbytes = read(socket, &frame, sizeof(frame));
689 if (recvbytes == -1) {
690 if (errno == EAGAIN || errno == EWOULDBLOCK) continue; // timeout
691
692 wxLogWarning("can socket %s: fatal error %s", m_port_name.c_str(),
693 strerror(errno));
694 break;
695 }
696 if (recvbytes != 16) {
697 wxLogWarning("can socket %s: bad frame size: %d (ignored)",
698 m_port_name.c_str(), recvbytes);
699 sleep(1);
700 continue;
701 }
702 HandleInput(frame);
703 }
704 m_run_flag = -1;
705 return;
706}
707
708bool Worker::StartThread() {
709 m_run_flag = 1;
710 std::thread t(&Worker::Entry, this);
711 t.detach();
712 return true;
713}
714
715void Worker::StopThread() {
716 if (m_run_flag < 0) {
717 wxLogMessage("Attempt to stop already dead thread (ignored).");
718 return;
719 }
720 wxLogMessage("Stopping Worker Thread");
721
722 m_run_flag = 0;
723 int tsec = 10;
724 while ((m_run_flag >= 0) && (tsec--)) wxSleep(1);
725
726 if (m_run_flag < 0)
727 wxLogMessage("StopThread: Stopped in %d sec.", 10 - tsec);
728 else
729 wxLogWarning("StopThread: Not Stopped after 10 sec.");
730}
731
732// FastMessage implementation
733
735 const unsigned char sid) {
736 for (unsigned i = 0; i < entries.size(); i++) {
737 if (((sid & 0xE0) == (entries[i].sid & 0xE0)) &&
738 (entries[i].header.pgn == header.pgn) &&
739 (entries[i].header.source == header.source) &&
740 (entries[i].header.destination == header.destination)) {
741 return i;
742 }
743 }
744 return kNotFound;
745}
746
748 entries.push_back(Entry());
749 return entries.size() - 1;
750}
751
752int FastMessageMap::GarbageCollector(void) {
753 std::vector<unsigned> stale_entries;
754 for (unsigned i = 0; i < entries.size(); i++) {
755 if (entries[i].IsExpired()) stale_entries.push_back(i);
756 }
757 for (auto i : stale_entries) Remove(i);
758 return stale_entries.size();
759}
760
762 const unsigned char* data, int index) {
763 // first message of fast packet
764 // data[0] Sequence Identifier (sid)
765 // data[1] Length of data bytes
766 // data[2..7] 6 data bytes
767
768 CheckGc();
769 // Ensure that this is indeed the first frame of a fast message
770 if ((data[0] & 0x1F) == 0) {
771 int total_data_len; // will also include padding as we memcpy all of the
772 // frame, because I'm lazy
773 total_data_len = static_cast<unsigned int>(data[1]);
774 total_data_len += 7 - ((total_data_len - 6) % 7);
775
776 entries[index].sid = static_cast<unsigned int>(data[0]);
777 entries[index].expected_length = static_cast<unsigned int>(data[1]);
778 entries[index].header = header;
779 entries[index].time_arrived = std::chrono::system_clock::now();
780
781 entries[index].data.resize(total_data_len);
782 memcpy(&entries[index].data[0], &data[2], 6);
783 // First frame of a multi-frame Fast Message contains six data bytes.
784 // Position the cursor ready for next message
785 entries[index].cursor = 6;
786
787 // Fusion, using fast messages to sends frames less than eight bytes
788 return entries[index].expected_length <= 6;
789 }
790 return false;
791 // No further processing is performed if this is not a start frame.
792 // A start frame may have been dropped and we received a subsequent frame
793}
794
796 const unsigned char* data, int position) {
797 // Check that this is the next message in the sequence
798 if ((entries[position].sid + 1) == data[0]) {
799 memcpy(&entries[position].data[entries[position].cursor], &data[1], 7);
800 entries[position].sid = data[0];
801 // Subsequent messages contains seven data bytes (last message may be padded
802 // with 0xFF)
803 entries[position].cursor += 7;
804 // Is this the last message ?
805 return entries[position].cursor >= entries[position].expected_length;
806 } else if ((data[0] & 0x1F) == 0) {
807 // We've found a matching entry, however this is a start frame, therefore
808 // we've missed an end frame, and now we have a start frame with the same id
809 // (top 3 bits). The id has obviously rolled over. Should really double
810 // check that (data[0] & 0xE0) Clear the entry as we don't want to leak
811 // memory, prior to inserting a start frame
812 entries.erase(entries.begin() + position);
813 position = AddNewEntry();
814 // And now insert it
815 InsertEntry(header, data, position);
816 // FIXME (dave) Should update the dropped frame stats
817 return true;
818 } else {
819 // This is not the next frame in the sequence and not a start frame
820 // We've dropped an intermedite frame, so free the slot and do no further
821 // processing
822 entries.erase(entries.begin() + position);
823 // Dropped Frame Statistics
824 if (dropped_frames == 0) {
825 dropped_frame_time = std::chrono::system_clock::now();
826 dropped_frames += 1;
827 } else {
828 dropped_frames += 1;
829 }
830 // FIXME (dave)
831 // if ((dropped_frames > CONST_DROPPEDFRAME_THRESHOLD) &&
832 // (wxDateTime::Now() < (dropped_frame_time +
833 // wxTimeSpan::Seconds(CONST_DROPPEDFRAME_PERIOD) ) ) ) {
834 // wxLogError(_T("TwoCan Device, Dropped Frames rate exceeded"));
835 // wxLogError(wxString::Format(_T("Frame: Source: %d Destination: %d
836 // Priority: %d PGN: %d"),header.source, header.destination,
837 // header.priority, header.pgn)); dropped_frames = 0;
838 // }
839 return false;
840 }
841}
842
843void FastMessageMap::Remove(int pos) { entries.erase(entries.begin() + pos); }
const std::string iface
Physical device for 0183, else a unique string.
Definition: comm_driver.h:89
CAN v2.0 29 bit header as used by NMEA 2000.
bool IsFastMessage() const
Return true if header reflects a multipart fast message.
Driver for NMEA2000 messages over Linux can drivers.
void Activate() override
Register driver and possibly do other post-ctor steps.
Local driver implementation, not visible outside this file.
Abstract driver interface for NMEA2000 messages.
Definition: comm_drv_n2k.h:33
EventVar evt_driver_msg
Notified for messages from drivers.
void Activate(DriverPtr driver)
Add driver to list of active drivers.
Interface implemented by transport layer and possible other parties like test code which should handl...
Definition: comm_driver.h:47
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
const void Notify()
Notify all listeners, no data supplied.
CanHeader header
Can header, used to "map" the incoming fast message fragments.
TimePoint time_arrived
time of last fragment.
unsigned int cursor
cursor into the current position in data.
unsigned int sid
Sequence identifier, used to check if a received message is the next message in the sequence.
unsigned int expected_length
total data length from first frame
std::vector< unsigned char > data
Received data.
Track fast message fragments eventually forming complete messages.
int AddNewEntry(void)
Allocate a new, fresh entry and return index to it.
Entry & operator[](int i)
Getter.
void Remove(int pos)
Remove entry at pos.
bool AppendEntry(const CanHeader hdr, const unsigned char *data, int index)
Append fragment to existing multipart message.
int FindMatchingEntry(const CanHeader header, const unsigned char sid)
Setter.
bool InsertEntry(const CanHeader header, const unsigned char *data, int index)
Insert a new entry, first part of a multipart message.
Keeps listening over it's lifespan, removes itself on destruction.
Definition: observable.h:143
Adds a std::shared<void> element to wxCommandEvent.
Definition: ocpn_plugin.h:1615
Manages reading the N2K data stream provided by some N2K gateways from the declared serial port.
N2k uses CAN which defines the basic properties of messages.
Definition: comm_navmsg.h:62