26#if !defined(__linux__) || defined(__ANDROID__)
27#error "This file can only be compiled on Linux"
39#include <linux/can/raw.h>
41#include <serial/serial.h>
43#include <sys/socket.h>
51#include "comm_drv_n2k_socketcan.h"
52#include "comm_drv_registry.h"
53#include "comm_navmsg_bus.h"
54#include "config_vars.h"
56#define DEFAULT_N2K_SOURCE_ADDRESS 72
60using namespace std::chrono_literals;
62using TimePoint = std::chrono::time_point<std::chrono::system_clock,
63 std::chrono::duration<double>>;
65static const int kNotFound = -1;
68static const int kGcThreshold = 100;
71static const std::chrono::milliseconds kGcInterval(10s);
74static const std::chrono::milliseconds kEntryMaxAge(100s);
77static const int kSocketTimeoutSeconds = 2;
79typedef struct can_frame CanFrame;
82unsigned long BuildCanID(
int priority,
int source,
int destination,
int pgn) {
84 unsigned long cid = 0;
85 unsigned char pf = (
unsigned char) (pgn >> 8);
87 cid = ((
unsigned long)(priority & 0x7))<<26 | pgn<<8 | ((
unsigned long)destination)<<8 | (
unsigned long)source;
90 cid = ((
unsigned long)(priority & 0x7))<<26 | pgn<<8 | (
unsigned long)source;
98 CanHeader() : priority(
'\0'), source(
'\0'), destination(
'\0'), pgn(-1) {};
106 unsigned char priority;
107 unsigned char source;
108 unsigned char destination;
121 bool IsExpired()
const {
122 auto age = std::chrono::system_clock::now() -
time_arrived;
123 return age > kEntryMaxAge;
137 std::vector<unsigned char>
data;
142 Entry operator[](
int i)
const {
return entries[i]; }
162 int GarbageCollector(
void);
165 if (std::chrono::system_clock::now() - last_gc_run > kGcInterval ||
166 entries.size() > kGcThreshold) {
168 last_gc_run = std::chrono::system_clock::now();
172 std::vector<Entry> entries;
173 TimePoint last_gc_run;
175 TimePoint dropped_frame_time;
210 int GetSocket(){
return m_socket;}
215 void ThreadMessage(
const std::string& msg, wxLogLevel l = wxLOG_Message);
217 int InitSocket(
const std::string port_name);
218 void SocketMessage(
const std::string& msg,
const std::string& device);
219 void HandleInput(CanFrame frame);
220 void ProcessRxMessages(std::shared_ptr<const Nmea2000Msg> n2k_msg);
222 std::vector<unsigned char> PushCompleteMsg(
const CanHeader header,
224 const CanFrame frame);
225 std::vector<unsigned char> PushFastMsgFragment(
const CanHeader& header,
229 const wxString m_port_name;
230 std::atomic<int> m_run_flag;
242 m_source_address(-1){
253 bool SendMessage(std::shared_ptr<const NavMsg> msg,
254 std::shared_ptr<const NavAddr> addr);
256 int DoAddressClaim();
257 bool SendAddressClaim(
int proposed_source_address);
259 Worker& GetWorker(){
return m_worker; }
264 int m_source_address;
265 int m_last_TX_sequence;
266 std::future<int> m_AddressClaimFuture;
270 bool HandleN2K_59904( std::shared_ptr<const Nmea2000Msg> n2k_msg );
277std::shared_ptr<CommDriverN2KSocketCAN> CommDriverN2KSocketCAN::Create(
279 return std::shared_ptr<CommDriverN2KSocketCAN>(
286CanHeader::CanHeader(
const CanFrame frame) {
287 unsigned char buf[4];
288 buf[0] = frame.can_id & 0xFF;
289 buf[1] = (frame.can_id >> 8) & 0xFF;
290 buf[2] = (frame.can_id >> 16) & 0xFF;
291 buf[3] = (frame.can_id >> 24) & 0xFF;
294 destination = buf[2] < 240 ? buf[1] : 255;
295 pgn = (buf[3] & 0x01) << 16 | (buf[2] << 8) | (buf[2] < 240 ? 0 : buf[1]);
296 priority = (buf[3] & 0x1c) >> 2;
310 static const std::vector<unsigned> haystack = {
312 65240u, 126208u, 126464u, 126996u, 126998u, 127233u, 127237u, 127489u,
313 127496u, 127506u, 128275u, 129029u, 129038u, 129039u, 129040u, 129041u,
314 129284u, 129285u, 129540u, 129793u, 129794u, 129795u, 129797u, 129798u,
315 129801u, 129802u, 129808u, 129809u, 129810u, 130065u, 130074u, 130323u,
316 130577u, 130820u, 130822u, 130824u};
318 unsigned needle =
static_cast<unsigned>(pgn);
319 auto found = std::find_if(haystack.begin(), haystack.end(),
320 [needle](
unsigned i) { return i == needle; });
321 return found != haystack.end();
327void CommDriverN2KSocketCanImpl::SetN2K_Name() {
330 int unique_number = 1;
334 std::string str(g_hostname.mb_str());
335 int len = str.size();
336 const char* ch = str.data();
337 for (
int i = 0; i < len; i++)
338 hash = hash + ((hash) << 5) + *(ch + i) + ((*(ch + i)) << 7);
339 unique_number = ((hash) ^ (hash >> 16)) & 0xffff;
342 node_name.SetManufacturerCode(2046);
343 node_name.SetUniqueNumber(unique_number);
344 node_name.SetDeviceFunction(130);
345 node_name.SetDeviceClass(25);
346 node_name.SetIndustryGroup(4);
349bool CommDriverN2KSocketCanImpl::Open() {
352 bool bws = m_worker.StartThread();
357void CommDriverN2KSocketCanImpl::Close() {
358 wxLogMessage(
"Closing N2K socketCAN: %s", m_params.socketCAN_port.c_str());
359 m_worker.StopThread();
362 auto& registry = CommDriverRegistry::GetInstance();
363 auto me = FindDriver(registry.GetDrivers(),
iface, bus);
364 registry.Deactivate(me);
368bool CommDriverN2KSocketCanImpl::SendAddressClaim(
int proposed_source_address) {
370 wxMutexLocker lock(m_TX_mutex);
372 int socket = GetWorker().GetSocket();
378 memset(&frame, 0,
sizeof(frame));
380 uint64_t _pgn = 60928;
381 unsigned long canId = BuildCanID(6, proposed_source_address, 255, _pgn);
382 frame.can_id = canId | CAN_EFF_FLAG;
385 uint32_t b32_0 = node_name.value.UnicNumberAndManCode;
386 memcpy(&frame.data, &b32_0, 4);
388 unsigned char b81 = node_name.value.DeviceInstance;
389 memcpy(&frame.data[4], &b81, 1);
391 b81 = node_name.value.DeviceFunction;
392 memcpy(&frame.data[5], &b81, 1);
394 b81 = (node_name.value.DeviceClass&0x7f)<<1;
395 memcpy(&frame.data[6], &b81, 1);
397 b81 = node_name.value.IndustryGroupAndSystemInstance;
398 memcpy(&frame.data[7], &b81, 1);
402 int sentbytes = write(socket, &frame,
sizeof(frame));
404 return (sentbytes == 16);
407bool CommDriverN2KSocketCanImpl::SendMessage(std::shared_ptr<const NavMsg> msg,
408 std::shared_ptr<const NavAddr> addr) {
410 wxMutexLocker lock(m_TX_mutex);
413 if ( m_source_address < 0)
416 int socket = GetWorker().GetSocket();
422 memset(&frame, 0,
sizeof(frame));
424 auto msg_n2k = std::dynamic_pointer_cast<const Nmea2000Msg>(msg);
425 std::vector<uint8_t> load = msg_n2k->payload;
427 uint64_t _pgn = msg_n2k->PGN.pgn;
429 unsigned long canId = BuildCanID(6, m_source_address, 255, _pgn);
431 frame.can_id = canId | CAN_EFF_FLAG;
433 if (load.size() <= 8){
434 frame.can_dlc = load.size();
436 memcpy(&frame.data, load.data(), load.size());
438 int sentbytes = write(socket, &frame,
sizeof(frame));
441 int sequence = (m_last_TX_sequence + 0x20) & 0xE0;
442 m_last_TX_sequence = sequence;
443 unsigned char *data_ptr = load.data();
444 int n_remaining = load.size();
448 frame.data[0] = sequence;
449 frame.data[1] = load.size();
450 int data_len_0 = wxMin(load.size(), 6);
451 memcpy(&frame.data[2], load.data(), data_len_0);
453 int sentbytes0 = write(socket, &frame,
sizeof(frame));
455 data_ptr += data_len_0;
456 n_remaining -= data_len_0;
460 while (n_remaining > 0){
461 frame.data[0] = sequence;
462 int data_len_n = wxMin(n_remaining, 7);
463 memcpy(&frame.data[1], data_ptr, data_len_n);
465 int sentbytesn = write(socket, &frame,
sizeof(frame));
467 data_ptr += data_len_n;
468 n_remaining -= data_len_n;
481CommDriverN2KSocketCAN::CommDriverN2KSocketCAN(
const ConnectionParams* params,
485 m_listener(listener),
487 m_portstring(params->GetDSPort()),
488 m_baudrate(wxString::Format(
"%i", params->Baudrate)) {}
490CommDriverN2KSocketCAN::~CommDriverN2KSocketCAN() {}
493 CommDriverRegistry::GetInstance().
Activate(shared_from_this());
501 m_port_name(port_name.Clone()),
504 assert(m_parent_driver != 0);
507std::vector<unsigned char> Worker::PushCompleteMsg(
const CanHeader header,
509 const CanFrame frame) {
510 std::vector<unsigned char> data;
511 data.push_back(0x93);
512 data.push_back(0x13);
513 data.push_back(header.priority);
514 data.push_back(header.pgn & 0xFF);
515 data.push_back((header.pgn >> 8) & 0xFF);
516 data.push_back((header.pgn >> 16) & 0xFF);
517 data.push_back(header.destination);
518 data.push_back(header.source);
519 data.push_back(0xFF);
520 data.push_back(0xFF);
521 data.push_back(0xFF);
522 data.push_back(0xFF);
523 data.push_back(CAN_MAX_DLEN);
524 for (
size_t n = 0; n < CAN_MAX_DLEN; n++) data.push_back(frame.data[n]);
525 data.push_back(0x55);
529std::vector<unsigned char> Worker::PushFastMsgFragment(
const CanHeader& header,
531 std::vector<unsigned char> data;
532 data.push_back(0x93);
533 data.push_back(fast_messages[position].expected_length + 11);
534 data.push_back(header.priority);
535 data.push_back(header.pgn & 0xFF);
536 data.push_back((header.pgn >> 8) & 0xFF);
537 data.push_back((header.pgn >> 16) & 0xFF);
538 data.push_back(header.destination);
539 data.push_back(header.source);
540 data.push_back(0xFF);
541 data.push_back(0xFF);
542 data.push_back(0xFF);
543 data.push_back(0xFF);
544 data.push_back(fast_messages[position].expected_length);
545 for (
size_t n = 0; n < fast_messages[position].expected_length; n++)
546 data.push_back(fast_messages[position].data[n]);
547 data.push_back(0x55);
548 fast_messages.
Remove(position);
552void Worker::ThreadMessage(
const std::string& msg, wxLogLevel level) {
553 wxLogGeneric(level, wxString(msg.c_str()));
554 auto s = std::string(
"CommDriverN2KSocketCAN: ") + msg;
558void Worker::SocketMessage(
const std::string& msg,
const std::string& device) {
559 std::stringstream ss;
560 ss << msg << device <<
": " << strerror(errno);
561 ThreadMessage(ss.str());
570int Worker::InitSocket(
const std::string port_name) {
571 int sock = socket(PF_CAN, SOCK_RAW, CAN_RAW);
573 SocketMessage(
"SocketCAN socket create failed: ", port_name);
578 struct ifreq if_request;
579 strcpy(if_request.ifr_name, port_name.c_str());
580 if (ioctl(sock, SIOCGIFINDEX, &if_request) < 0) {
581 SocketMessage(
"SocketCAN ioctl (SIOCGIFINDEX) failed: ", port_name);
586 struct sockaddr_can can_address;
587 can_address.can_family = AF_CAN;
588 can_address.can_ifindex = if_request.ifr_ifindex;
589 if (ioctl(sock, SIOCGIFFLAGS, &if_request) < 0) {
590 SocketMessage(
"SocketCAN socket IOCTL (SIOCGIFFLAGS) failed: ", port_name);
593 if (if_request.ifr_flags & IFF_UP) {
594 ThreadMessage(
"socketCan interface is UP");
596 ThreadMessage(
"socketCan interface is NOT UP");
602 tv.tv_sec = kSocketTimeoutSeconds;
605 setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (
const char*)&tv,
sizeof tv);
607 SocketMessage(
"SocketCAN setsockopt SO_RCVTIMEO failed on device: ",
611 r = bind(sock, (
struct sockaddr*)&can_address,
sizeof(can_address));
613 SocketMessage(
"SocketCAN socket bind() failed: ", port_name);
625void Worker::HandleInput(CanFrame frame) {
632 if (position == kNotFound) {
635 ready = fast_messages.
InsertEntry(header, frame.data, position);
638 ready = fast_messages.
AppendEntry(header, frame.data, position);
642 std::vector<unsigned char> vec;
645 vec = PushFastMsgFragment(header, position);
648 vec = PushCompleteMsg(header, position, frame);
651 auto src_addr = m_parent_driver->GetAddress(m_parent_driver->node_name);
652 auto msg = std::make_shared<const Nmea2000Msg>(header.pgn, vec, src_addr);
653 ProcessRxMessages(msg);
654 m_parent_driver->m_listener.
Notify(std::move(msg));
659void Worker::ProcessRxMessages(std::shared_ptr<const Nmea2000Msg> n2k_msg){
661 if(n2k_msg->PGN.pgn == 59904 ){
668void Worker::Entry() {
673 socket = InitSocket(m_port_name.ToStdString());
675 std::string msg(
"SocketCAN socket create failed: ");
676 ThreadMessage(msg + m_port_name.ToStdString());
683 if (m_parent_driver->SendAddressClaim(DEFAULT_N2K_SOURCE_ADDRESS))
684 m_parent_driver->m_source_address = DEFAULT_N2K_SOURCE_ADDRESS;
687 while (m_run_flag > 0) {
688 recvbytes = read(socket, &frame,
sizeof(frame));
689 if (recvbytes == -1) {
690 if (errno == EAGAIN || errno == EWOULDBLOCK)
continue;
692 wxLogWarning(
"can socket %s: fatal error %s", m_port_name.c_str(),
696 if (recvbytes != 16) {
697 wxLogWarning(
"can socket %s: bad frame size: %d (ignored)",
698 m_port_name.c_str(), recvbytes);
708bool Worker::StartThread() {
710 std::thread t(&Worker::Entry,
this);
715void Worker::StopThread() {
716 if (m_run_flag < 0) {
717 wxLogMessage(
"Attempt to stop already dead thread (ignored).");
720 wxLogMessage(
"Stopping Worker Thread");
724 while ((m_run_flag >= 0) && (tsec--)) wxSleep(1);
727 wxLogMessage(
"StopThread: Stopped in %d sec.", 10 - tsec);
729 wxLogWarning(
"StopThread: Not Stopped after 10 sec.");
735 const unsigned char sid) {
736 for (
unsigned i = 0; i < entries.size(); i++) {
737 if (((sid & 0xE0) == (entries[i].sid & 0xE0)) &&
738 (entries[i].header.pgn == header.pgn) &&
739 (entries[i].header.source == header.source) &&
740 (entries[i].header.destination == header.destination)) {
748 entries.push_back(
Entry());
749 return entries.size() - 1;
752int FastMessageMap::GarbageCollector(
void) {
753 std::vector<unsigned> stale_entries;
754 for (
unsigned i = 0; i < entries.size(); i++) {
755 if (entries[i].IsExpired()) stale_entries.push_back(i);
757 for (
auto i : stale_entries)
Remove(i);
758 return stale_entries.size();
762 const unsigned char* data,
int index) {
770 if ((data[0] & 0x1F) == 0) {
773 total_data_len =
static_cast<unsigned int>(data[1]);
774 total_data_len += 7 - ((total_data_len - 6) % 7);
776 entries[index].sid =
static_cast<unsigned int>(data[0]);
777 entries[index].expected_length =
static_cast<unsigned int>(data[1]);
778 entries[index].header = header;
779 entries[index].time_arrived = std::chrono::system_clock::now();
781 entries[index].data.resize(total_data_len);
782 memcpy(&entries[index].data[0], &data[2], 6);
785 entries[index].cursor = 6;
788 return entries[index].expected_length <= 6;
796 const unsigned char* data,
int position) {
798 if ((entries[position].sid + 1) == data[0]) {
799 memcpy(&entries[position].data[entries[position].cursor], &data[1], 7);
800 entries[position].sid = data[0];
803 entries[position].cursor += 7;
805 return entries[position].cursor >= entries[position].expected_length;
806 }
else if ((data[0] & 0x1F) == 0) {
812 entries.erase(entries.begin() + position);
822 entries.erase(entries.begin() + position);
824 if (dropped_frames == 0) {
825 dropped_frame_time = std::chrono::system_clock::now();
const std::string iface
Physical device for 0183, else a unique string.
Driver for NMEA2000 messages over Linux can drivers.
void Activate() override
Register driver and possibly do other post-ctor steps.
Local driver implementation, not visible outside this file.
Abstract driver interface for NMEA2000 messages.
EventVar evt_driver_msg
Notified for messages from drivers.
void Activate(DriverPtr driver)
Add driver to list of active drivers.
Interface implemented by transport layer and possible other parties like test code which should handl...
virtual void Notify(std::shared_ptr< const NavMsg > message)=0
Handle a received message.
const void Notify()
Notify all listeners, no data supplied.
CanHeader header
Can header, used to "map" the incoming fast message fragments.
TimePoint time_arrived
time of last fragment.
unsigned int cursor
cursor into the current position in data.
unsigned int sid
Sequence identifier, used to check if a received message is the next message in the sequence.
unsigned int expected_length
total data length from first frame
std::vector< unsigned char > data
Received data.
Track fast message fragments eventually forming complete messages.
int AddNewEntry(void)
Allocate a new, fresh entry and return index to it.
Entry & operator[](int i)
Getter.
void Remove(int pos)
Remove entry at pos.
bool AppendEntry(const CanHeader hdr, const unsigned char *data, int index)
Append fragment to existing multipart message.
int FindMatchingEntry(const CanHeader header, const unsigned char sid)
Setter.
bool InsertEntry(const CanHeader header, const unsigned char *data, int index)
Insert a new entry, first part of a multipart message.
Keeps listening over it's lifespan, removes itself on destruction.
Adds a std::shared<void> element to wxCommandEvent.
Manages reading the N2K data stream provided by some N2K gateways from the declared serial port.
N2k uses CAN which defines the basic properties of messages.