J2534_WIN: Cleanup for style and prep for better mutexes.

master
Jessy Diamond Exum 2017-10-14 02:34:14 -07:00
parent 858119c922
commit 24f3f78b81
18 changed files with 245 additions and 183 deletions

View File

@ -0,0 +1,39 @@
#pragma once
#include <memory>
#include "J2534Frame.h"
class J2534Connection;
class Action
{
public:
Action(
std::weak_ptr<J2534Connection> connection,
std::chrono::microseconds delay
) : connection(connection), delay(delay) { };
Action(
std::weak_ptr<J2534Connection> connection
) : connection(connection), delay(std::chrono::microseconds(0)) { };
virtual void execute() = 0;
void scheduleImmediate() {
expire = std::chrono::steady_clock::now();
}
void scheduleImmediateDelay() {
expire = std::chrono::steady_clock::now() + this->delay;
}
void schedule(std::chrono::time_point<std::chrono::steady_clock> starttine, BOOL adddelayed) {
this->expire = starttine;
if (adddelayed)
expire += this->delay;
}
std::weak_ptr<J2534Connection> connection;
std::chrono::microseconds delay;
std::chrono::time_point<std::chrono::steady_clock> expire;
};

View File

@ -107,7 +107,7 @@ long J2534Connection::setBaud(unsigned long baud) {
return STATUS_NOERROR;
}
void J2534Connection::schedultMsgTx(std::shared_ptr<MessageTx> msgout) {
void J2534Connection::schedultMsgTx(std::shared_ptr<Action> msgout) {
if (auto panda_ps = this->panda_dev.lock()) {
synchronized(staged_writes_lock) {
this->txbuff.push(msgout);

View File

@ -2,14 +2,14 @@
#include "panda/panda.h"
#include "J2534_v0404.h"
#include "synchronize.h"
#include "J2534Frame.h"
#include "PandaJ2534Device.h"
#include "J2534MessageFilter.h"
#include "MessageTx.h"
#include "J2534Frame.h"
class J2534MessageFilter;
class J2534Frame;
class Action;
class PandaJ2534Device;
class MessageTx;
class J2534MessageFilter;
#define check_bmask(num, mask)(((num) & mask) == mask)
@ -70,7 +70,7 @@ public:
return 4128;
}
void schedultMsgTx(std::shared_ptr<MessageTx> msgout);
void schedultMsgTx(std::shared_ptr<Action> msgout);
void rescheduleExistingTxMsgs();
@ -99,7 +99,7 @@ protected:
std::queue<J2534Frame> messages;
std::array<std::shared_ptr<J2534MessageFilter>, 10> filters;
std::queue<std::shared_ptr<MessageTx>> txbuff;
std::queue<std::shared_ptr<Action>> txbuff;
private:
Mutex message_access_lock;

View File

@ -32,7 +32,7 @@ long J2534Connection_CAN::PassThruWriteMsgs(PASSTHRU_MSG *pMsg, unsigned long *p
}
auto msgtx = std::make_shared<MessageTx_CAN>(shared_from_this(), *msg);
this->schedultMsgTx(std::dynamic_pointer_cast<MessageTx>(msgtx));
this->schedultMsgTx(std::dynamic_pointer_cast<Action>(msgtx));
}
return STATUS_NOERROR;
}

View File

@ -37,7 +37,7 @@ long J2534Connection_ISO15765::PassThruWriteMsgs(PASSTHRU_MSG *pMsg, unsigned lo
if (msg->DataSize > 11 && fid == -1) return ERR_NO_FLOW_CONTROL; //11 bytes (4 for CANid, 7 payload) is max length of input frame.
auto msgtx = std::make_shared<MessageTx_ISO15765>( shared_from_this(), *msg, (fid == -1) ? nullptr : this->filters[fid] );
this->schedultMsgTx(std::dynamic_pointer_cast<MessageTx>(msgtx));
this->schedultMsgTx(std::dynamic_pointer_cast<Action>(msgtx));
}
return STATUS_NOERROR;
}

View File

@ -22,12 +22,20 @@ public:
}
J2534Frame(const PASSTHRU_MSG& msg) {
ProtocolID = msg.ProtocolID;
RxStatus = msg.RxStatus;
TxFlags = msg.TxFlags;
Timestamp = msg.Timestamp;
ExtraDataIndex = msg.ExtraDataIndex;
Data = std::string((const char*)msg.Data, msg.DataSize);
this->ProtocolID = msg.ProtocolID;
this->RxStatus = msg.RxStatus;
this->TxFlags = msg.TxFlags;
this->Timestamp = msg.Timestamp;
this->ExtraDataIndex = msg.ExtraDataIndex;
this->Data = std::string((const char*)msg.Data, msg.DataSize);
}
J2534Frame() {
this->ProtocolID = 0;
this->RxStatus = 0;
this->TxFlags = 0;
this->Timestamp = 0;
this->ExtraDataIndex = 0;
}
unsigned long ProtocolID;

View File

@ -1,25 +1,23 @@
#pragma once
#include <memory>
#include "J2534Connection.h"
#include "Action.h"
#include "J2534Frame.h"
class J2534Connection;
class MessageTx
class MessageTx : public Action, public std::enable_shared_from_this<MessageTx>
{
public:
MessageTx(
std::weak_ptr<J2534Connection> connection
) : connection(connection) { };
std::weak_ptr<J2534Connection> connection_in,
PASSTHRU_MSG& to_send
) : Action(connection_in), fullmsg(to_send) { };
virtual BOOL sendNextFrame() = 0;
virtual BOOL checkTxReceipt(J2534Frame frame) = 0;
virtual BOOL checkTxReceipt(J2534Frame frame) { return FALSE; };
virtual BOOL isFinished() = 0;
virtual BOOL isFinished() { return TRUE; };
virtual BOOL txReady() = 0;
virtual BOOL txReady() { return TRUE; };
std::weak_ptr<J2534Connection> connection;
std::chrono::microseconds separation_time;
};
protected:
J2534Frame fullmsg;
};

View File

@ -0,0 +1,43 @@
#include "stdafx.h"
#include "J2534Connection.h"
#include "MessageTxTimeout.h"
MessageTxTimeoutable::MessageTxTimeoutable(
std::weak_ptr<J2534Connection> connection,
PASSTHRU_MSG& to_send
) : MessageTx(connection, to_send), recvCount(0) { };
void MessageTxTimeoutable::scheduleTimeout(std::chrono::microseconds timeoutus) {
if (auto conn_sp = this->connection.lock()) {
if (auto panda_dev_sp = conn_sp->getPandaDev()) {
auto timeoutobj = std::make_shared<MessageTxTimeout>(std::static_pointer_cast<MessageTxTimeoutable>(shared_from_this()), timeoutus);
panda_dev_sp->scheduleAction(std::static_pointer_cast<Action>(timeoutobj), TRUE);
}
}
}
void MessageTxTimeoutable::scheduleTimeout(unsigned long timeoutus) {
scheduleTimeout(std::chrono::microseconds(timeoutus));
}
MessageTxTimeout::MessageTxTimeout(
std::shared_ptr<MessageTxTimeoutable> msg,
std::chrono::microseconds timeout
) : Action(msg->connection), msg(msg), lastRecvCount(msg->getRecvCount()) {
delay = timeout;
};
MessageTxTimeout::MessageTxTimeout(
std::shared_ptr<MessageTxTimeoutable> msg,
unsigned long timeout
) : MessageTxTimeout(msg, std::chrono::microseconds(timeout * 1000)) { };
void MessageTxTimeout::execute() {
if (auto msg_sp = this->msg.lock()) {
if (msg_sp->getRecvCount() == this->lastRecvCount) {
msg_sp->onTimeout();
}
}
}

View File

@ -1,14 +1,16 @@
#pragma once
#include "Action.h"
#include "MessageTx.h"
class MessageTxTimeout;
class MessageTxTimeoutable : public std::enable_shared_from_this<MessageTxTimeoutable>, public MessageTx
class MessageTxTimeoutable : public MessageTx
{
public:
MessageTxTimeoutable(
std::weak_ptr<J2534Connection> connection
) : MessageTx(connection), recvCount(0) { };
std::weak_ptr<J2534Connection> connection,
PASSTHRU_MSG& to_send
);
unsigned long getRecvCount() {
return recvCount;
@ -19,44 +21,26 @@ public:
protected:
unsigned long recvCount;
void scheduleTimeout(std::chrono::microseconds timeoutus) {
if (auto conn_sp = this->connection.lock()) {
if (auto panda_dev_sp = conn_sp->getPandaDev()) {
auto timeoutobj = std::make_shared<MessageTxTimeout>(shared_from_this(), timeoutus);
panda_dev_sp->registerMessageTx(std::static_pointer_cast<MessageTx>(timeoutobj), TRUE);
}
}
}
void scheduleTimeout(std::chrono::microseconds timeoutus);
void scheduleTimeout(unsigned long timeoutus) {
scheduleTimeout(std::chrono::microseconds(timeoutus));
}
void scheduleTimeout(unsigned long timeoutus);
};
class MessageTxTimeout : public MessageTx
class MessageTxTimeout : public Action
{
public:
MessageTxTimeout(
std::shared_ptr<MessageTxTimeoutable> msg,
std::chrono::microseconds timeout
) : MessageTx(msg->connection), msg(msg), lastRecvCount(msg->getRecvCount()) {
separation_time = timeout;
};
);
MessageTxTimeout(
std::shared_ptr<MessageTxTimeoutable> msg,
unsigned long timeout
) : MessageTxTimeout(msg, std::chrono::microseconds(timeout * 1000)) { };
);
virtual BOOL sendNextFrame() {
if (auto msg_sp = this->msg.lock()) {
if (msg_sp->getRecvCount() == this->lastRecvCount) {
msg_sp->onTimeout();
}
}
return FALSE; //Do not add this to the echo queue.
};
virtual void execute();
private:
std::weak_ptr<MessageTxTimeoutable> msg;

View File

@ -0,0 +1,38 @@
#include "stdafx.h"
#include "MessageTx_CAN.h"
#include "J2534Connection_CAN.h"
//class J2534Connection_CAN;
MessageTx_CAN::MessageTx_CAN(
std::shared_ptr<J2534Connection> connection_in,
PASSTHRU_MSG& to_send
) : MessageTx(connection_in, to_send), sentyet(FALSE), txInFlight(FALSE) {};
void MessageTx_CAN::execute() {
uint32_t addr = ((uint8_t)fullmsg.Data[0]) << 24 | ((uint8_t)fullmsg.Data[1]) << 16 |
((uint8_t)fullmsg.Data[2]) << 8 | ((uint8_t)fullmsg.Data[3]);
if (auto conn_sp = std::static_pointer_cast<J2534Connection_CAN>(this->connection.lock())) {
if (auto panda_dev_sp = conn_sp->getPandaDev()) {
auto payload = fullmsg.Data.substr(4);
if (panda_dev_sp->panda->can_send(addr, check_bmask(this->fullmsg.TxFlags, CAN_29BIT_ID),
(const uint8_t*)payload.c_str(), (uint8_t)payload.size(), panda::PANDA_CAN1) == FALSE) {
return;
}
this->txInFlight = TRUE;
this->sentyet = TRUE;
panda_dev_sp->txMsgsAwaitingEcho.push(shared_from_this());
}
}
}
//Returns TRUE if receipt is consumed by the msg, FALSE otherwise.
BOOL MessageTx_CAN::checkTxReceipt(J2534Frame frame) {
if (txReady()) return FALSE;
if (frame.Data == fullmsg.Data && ((this->fullmsg.TxFlags & CAN_29BIT_ID) == (frame.RxStatus & CAN_29BIT_ID))) {
txInFlight = FALSE;
return TRUE;
}
return FALSE;
}

View File

@ -1,9 +1,8 @@
#pragma once
#include <memory>
#include "MessageTx.h"
#include "J2534Connection_CAN.h"
class J2534Connection_CAN;
class J2534Connection;
class MessageTx_CAN : public MessageTx
{
@ -11,36 +10,12 @@ public:
MessageTx_CAN(
std::shared_ptr<J2534Connection> connection_in,
PASSTHRU_MSG& to_send
) : MessageTx(connection_in), fullmsg(to_send) , sentyet(FALSE), txInFlight(FALSE) {};
);
virtual BOOL sendNextFrame() {
uint32_t addr = ((uint8_t)fullmsg.Data[0]) << 24 | ((uint8_t)fullmsg.Data[1]) << 16 |
((uint8_t)fullmsg.Data[2]) << 8 | ((uint8_t)fullmsg.Data[3]);
if (auto conn_sp = std::static_pointer_cast<J2534Connection_CAN>(this->connection.lock())) {
if (auto panda_dev_sp = conn_sp->getPandaDev()) {
auto payload = fullmsg.Data.substr(4);
if (panda_dev_sp->panda->can_send(addr, check_bmask(this->fullmsg.TxFlags, CAN_29BIT_ID),
(const uint8_t*)payload.c_str(), (uint8_t)payload.size(), panda::PANDA_CAN1) == FALSE) {
return FALSE;
}
this->txInFlight = TRUE;
this->sentyet = TRUE;
return TRUE;
}
}
return FALSE;
};
virtual void execute();
//Returns TRUE if receipt is consumed by the msg, FALSE otherwise.
virtual BOOL checkTxReceipt(J2534Frame frame) {
if (txReady()) return FALSE;
if (frame.Data == fullmsg.Data && ((this->fullmsg.TxFlags & CAN_29BIT_ID) == (frame.RxStatus & CAN_29BIT_ID))) {
txInFlight = FALSE;
return TRUE;
}
return FALSE;
}
virtual BOOL checkTxReceipt(J2534Frame frame);
virtual BOOL isFinished() {
return !txInFlight && sentyet;
@ -53,6 +28,4 @@ public:
private:
BOOL sentyet;
BOOL txInFlight;
J2534Frame fullmsg;
};

View File

@ -10,7 +10,7 @@ MessageTx_ISO15765::MessageTx_ISO15765(
std::shared_ptr<J2534Connection> connection_in,
PASSTHRU_MSG& to_send,
std::shared_ptr<J2534MessageFilter> filter
) : MessageTxTimeoutable(connection_in), fullmsg(to_send), filter(filter), frames_sent(0),
) : MessageTxTimeoutable(connection_in, to_send), filter(filter), frames_sent(0),
consumed_count(0), txInFlight(FALSE), sendAll(FALSE), block_size(0), numWaitFrames(0), didtimeout(FALSE) {
CANid = ((uint8_t)fullmsg.Data[0]) << 24 | ((uint8_t)fullmsg.Data[1]) << 16 |
@ -56,10 +56,10 @@ unsigned int MessageTx_ISO15765::addressLength() {
return check_bmask(fullmsg.TxFlags, ISO15765_ADDR_TYPE) ? 5 : 4;
}
BOOL MessageTx_ISO15765::sendNextFrame() {
if (didtimeout) return FALSE;
if (this->frames_sent >= this->framePayloads.size()) return FALSE;
if (block_size == 0 && !sendAll && this->frames_sent > 0)return FALSE;
void MessageTx_ISO15765::execute() {
if (didtimeout) return;
if (this->frames_sent >= this->framePayloads.size()) return;
if (block_size == 0 && !sendAll && this->frames_sent > 0) return;
if (block_size > 0 && !sendAll) block_size--;
if (auto conn_sp = this->connection.lock()) {
@ -67,15 +67,14 @@ BOOL MessageTx_ISO15765::sendNextFrame() {
auto& outFramePayload = this->framePayloads[this->frames_sent];
if (panda_dev_sp->panda->can_send(this->CANid, check_bmask(this->fullmsg.TxFlags, CAN_29BIT_ID),
(const uint8_t*)outFramePayload.c_str(), (uint8_t)outFramePayload.size(), panda::PANDA_CAN1) == FALSE) {
return FALSE;
return;
}
this->txInFlight = TRUE;
this->frames_sent++;
return TRUE;
panda_dev_sp->txMsgsAwaitingEcho.push(shared_from_this());
}
}
return FALSE;
}
//Returns TRUE if receipt is consumed by the msg, FALSE otherwise.
@ -112,11 +111,11 @@ BOOL MessageTx_ISO15765::checkTxReceipt(J2534Frame frame) {
} else {
//Restart timeout if we are waiting for a flow control frame.
//FC frames are required when we are not sending all, the
//current block_size batch has been sent, a FC message has
//current block_size batch has not been sent, a FC message has
//already been received (differentiating from first frame), the
//message is not finished, and there is more than one frame in
//the message.
if(block_size == 0 && recvCount != 0 && !sendAll && !isFinished() && this->isFinished() && this->framePayloads.size() > 1)
if(block_size == 0 && recvCount != 0 && !sendAll && !this->isFinished() && this->framePayloads.size() > 1)
scheduleTimeout(TIMEOUT_CF);
}
return TRUE;
@ -136,14 +135,14 @@ void MessageTx_ISO15765::onTimeout() {
didtimeout = TRUE;
if (auto conn_sp = std::static_pointer_cast<J2534Connection_ISO15765>(this->connection.lock())) {
if (auto panda_dev_sp = conn_sp->getPandaDev()) {
panda_dev_sp->removeConnectionTopMessage(conn_sp);
panda_dev_sp->removeConnectionTopAction(conn_sp);
}
}
}
void MessageTx_ISO15765::flowControlContinue(uint8_t block_size, std::chrono::microseconds separation_time) {
this->block_size = block_size;
this->separation_time = separation_time;
this->delay = separation_time;
this->sendAll = block_size == 0;
this->recvCount++;
}

View File

@ -15,7 +15,7 @@ public:
unsigned int addressLength();
virtual BOOL sendNextFrame();
virtual void execute();
virtual BOOL checkTxReceipt(J2534Frame frame);
@ -42,6 +42,4 @@ public:
BOOL sendAll;
unsigned int numWaitFrames;
BOOL didtimeout;
J2534Frame fullmsg;
};

View File

@ -2,21 +2,6 @@
#include "PandaJ2534Device.h"
#include "J2534Frame.h"
SCHEDULED_TX_MSG::SCHEDULED_TX_MSG(std::shared_ptr<MessageTx> msgtx, BOOL startdelayed) : msgtx(msgtx) {
expire = std::chrono::steady_clock::now(); //Should be triggered immediately.
if(startdelayed)
expire += this->msgtx->separation_time;
}
void SCHEDULED_TX_MSG::refreshExpiration() {
expire = std::chrono::steady_clock::now() + this->msgtx->separation_time;
}
void SCHEDULED_TX_MSG::refreshExpiration(std::chrono::time_point<std::chrono::steady_clock> starttine) {
expire = starttine + this->msgtx->separation_time;
}
PandaJ2534Device::PandaJ2534Device(std::unique_ptr<panda::Panda> new_panda) : txInProgress(FALSE) {
this->panda = std::move(new_panda);
@ -81,10 +66,6 @@ DWORD PandaJ2534Device::addChannel(std::shared_ptr<J2534Connection>& conn, unsig
return STATUS_NOERROR;
}
DWORD WINAPI PandaJ2534Device::_can_recv_threadBootstrap(LPVOID This) {
return ((PandaJ2534Device*)This)->can_recv_thread();
}
DWORD PandaJ2534Device::can_recv_thread() {
DWORD err = TRUE;
while (err) {
@ -94,9 +75,9 @@ DWORD PandaJ2534Device::can_recv_thread() {
J2534Frame msg_out(msg_in);
if (msg_in.is_receipt) {
synchronized(active_flow_control_txs_lock) {
synchronized(activeTXs_mutex) {
if (txMsgsAwaitingEcho.size() > 0) {
auto msgtx = txMsgsAwaitingEcho.front()->msgtx;
auto msgtx = txMsgsAwaitingEcho.front();
if (auto conn = msgtx->connection.lock()) {
if (conn->isProtoCan() && conn->getPort() == msg_in.bus) {
if (msgtx->checkTxReceipt(msg_out)) {
@ -105,16 +86,15 @@ DWORD PandaJ2534Device::can_recv_thread() {
// Frame is for this msg, more tx frames required after a FC frame: Wait for FC frame to come and trigger next tx.
// Frame is for this msg, more tx frames required: Schedule next tx frame.
// Frame is for this msg, and is the final frame of the msg: Let conn process full msg, If another msg from this conn is available, register it.
auto tx_schedule = std::move(txMsgsAwaitingEcho.front());
txMsgsAwaitingEcho.pop(); //Remove the TX object and schedule record.
if (msgtx->isFinished()) {
conn->processMessageReceipt(msg_out); //Alert the connection a tx is done.
this->removeConnectionTopMessage(conn);
this->removeConnectionTopAction(conn);
} else {
if (msgtx->txReady()) { //Not finished, ready to send next frame.
tx_schedule->refreshExpiration(msg_in.recv_time_point);
this->insertMultiPartTxInQueue(std::move(tx_schedule));
msgtx->schedule(msg_in.recv_time_point, TRUE);
this->insertActionIntoTaskList(msgtx);
} else {
//Not finished, but next frame not ready (maybe waiting for flow control).
//Do not schedule more messages from this connection.
@ -142,10 +122,6 @@ DWORD PandaJ2534Device::can_recv_thread() {
return 0;
}
DWORD PandaJ2534Device::_msg_tx_threadBootstrap(LPVOID This) {
return ((PandaJ2534Device*)This)->msg_tx_thread();
}
DWORD PandaJ2534Device::msg_tx_thread() {
const HANDLE subscriptions[] = { this->flow_control_wakeup_event, this->thread_kill_event };
DWORD sleepDuration = INFINITE;
@ -159,19 +135,18 @@ DWORD PandaJ2534Device::msg_tx_thread() {
ResetEvent(this->flow_control_wakeup_event);
while (TRUE) {
synchronized(active_flow_control_txs_lock) { //implemented with for loop. Consumes breaks.
if (this->active_flow_control_txs.size() == 0) {
synchronized(activeTXs_mutex) { //implemented with for loop. Consumes breaks.
if (this->task_queue.size() == 0) {
sleepDuration = INFINITE;
goto break_flow_ctrl_loop;
}
if (std::chrono::steady_clock::now() >= this->active_flow_control_txs.front()->expire) {
auto scheduled_msg = std::move(this->active_flow_control_txs.front()); //Get the scheduled tx record.
this->active_flow_control_txs.pop_front();
if (scheduled_msg->msgtx->sendNextFrame())
txMsgsAwaitingEcho.push(std::move(scheduled_msg));
if (std::chrono::steady_clock::now() >= this->task_queue.front()->expire) {
auto task = this->task_queue.front(); //Get the scheduled tx record.
this->task_queue.pop_front();
task->execute();
} else { //Ran out of things that need to be sent now. Sleep!
auto time_diff = std::chrono::duration_cast<std::chrono::milliseconds>
(this->active_flow_control_txs.front()->expire - std::chrono::steady_clock::now());
(this->task_queue.front()->expire - std::chrono::steady_clock::now());
sleepDuration = max(1, time_diff.count());
goto break_flow_ctrl_loop;
}
@ -183,45 +158,44 @@ DWORD PandaJ2534Device::msg_tx_thread() {
return 0;
}
void PandaJ2534Device::insertMultiPartTxInQueue(std::unique_ptr<SCHEDULED_TX_MSG> fcwrite) {
synchronized(active_flow_control_txs_lock) {
auto iter = this->active_flow_control_txs.begin();
for (; iter != this->active_flow_control_txs.end(); iter++) {
if (fcwrite->expire < (*iter)->expire) break;
void PandaJ2534Device::insertActionIntoTaskList(std::shared_ptr<Action> action) {
synchronized(activeTXs_mutex) {
auto iter = this->task_queue.begin();
for (; iter != this->task_queue.end(); iter++) {
if (action->expire < (*iter)->expire) break;
}
this->active_flow_control_txs.insert(iter, std::move(fcwrite));
this->task_queue.insert(iter, action);
}
SetEvent(this->flow_control_wakeup_event);
}
void PandaJ2534Device::registerMessageTx(std::shared_ptr<MessageTx> msg, BOOL startdelayed) {
synchronized(ConnTxMutex) {
auto fcwrite = std::make_unique<SCHEDULED_TX_MSG>(msg, startdelayed);
this->insertMultiPartTxInQueue(std::move(fcwrite));
}
void PandaJ2534Device::scheduleAction(std::shared_ptr<Action> msg, BOOL startdelayed) {
if(startdelayed)
msg->scheduleImmediateDelay();
else
msg->scheduleImmediate();
this->insertActionIntoTaskList(msg);
}
void PandaJ2534Device::registerConnectionTx(std::shared_ptr<J2534Connection> conn) {
synchronized(ConnTxMutex) {
synchronized(connTXSet_mutex) {
auto ret = this->ConnTxSet.insert(conn);
if (ret.second == FALSE) return; //Conn already exists.
registerMessageTx(conn->txbuff.front());
this->scheduleAction(conn->txbuff.front());
}
}
void PandaJ2534Device::unstallConnectionTx(std::shared_ptr<J2534Connection> conn) {
synchronized(ConnTxMutex) {
synchronized(connTXSet_mutex) {
auto ret = this->ConnTxSet.insert(conn);
if (ret.second == TRUE) return; //Conn already exists.
auto fcwrite = std::make_unique<SCHEDULED_TX_MSG>(conn->txbuff.front());
this->insertMultiPartTxInQueue(std::move(fcwrite));
this->insertActionIntoTaskList(conn->txbuff.front());
}
SetEvent(flow_control_wakeup_event);
}
void PandaJ2534Device::removeConnectionTopMessage(std::shared_ptr<J2534Connection> conn) {
synchronized(active_flow_control_txs_lock) {
void PandaJ2534Device::removeConnectionTopAction(std::shared_ptr<J2534Connection> conn) {
synchronized(activeTXs_mutex) {
conn->txbuff.pop(); //Remove the top TX message from the connection tx queue.
//Remove the connection from the active connection list if no more messages are scheduled with this connection.
@ -230,7 +204,7 @@ void PandaJ2534Device::removeConnectionTopMessage(std::shared_ptr<J2534Connectio
this->ConnTxSet.erase(conn);
} else {
//Add the next scheduled tx from this conn
this->registerMessageTx(conn->txbuff.front());
this->scheduleAction(conn->txbuff.front());
}
}
}

View File

@ -7,23 +7,14 @@
#include "J2534_v0404.h"
#include "panda/panda.h"
#include "synchronize.h"
#include "J2534Connection.h"
#include "Action.h"
#include "MessageTx.h"
#include "J2534Connection.h"
class J2534Connection;
class Action;
class MessageTx;
typedef struct SCHEDULED_TX_MSG {
SCHEDULED_TX_MSG(std::shared_ptr<MessageTx> msgtx, BOOL startdelayed = FALSE);
void refreshExpiration();
void refreshExpiration(std::chrono::time_point<std::chrono::steady_clock> starttime);
std::shared_ptr<MessageTx> msgtx;
std::chrono::time_point<std::chrono::steady_clock> expire;
} FLOW_CONTROL_WRITE;
class PandaJ2534Device {
public:
PandaJ2534Device(std::unique_ptr<panda::Panda> new_panda);
@ -38,33 +29,38 @@ public:
std::unique_ptr<panda::Panda> panda;
std::vector<std::shared_ptr<J2534Connection>> connections;
void insertMultiPartTxInQueue(std::unique_ptr<SCHEDULED_TX_MSG> fcwrite);
void insertActionIntoTaskList(std::shared_ptr<Action> action);
void registerMessageTx(std::shared_ptr<MessageTx> msg, BOOL startdelayed=FALSE);
void scheduleAction(std::shared_ptr<Action> msg, BOOL startdelayed=FALSE);
void registerConnectionTx(std::shared_ptr<J2534Connection> conn);
void unstallConnectionTx(std::shared_ptr<J2534Connection> conn);
void removeConnectionTopMessage(std::shared_ptr<J2534Connection> conn);
void removeConnectionTopAction(std::shared_ptr<J2534Connection> conn);
std::queue<std::shared_ptr<MessageTx>> txMsgsAwaitingEcho;
private:
HANDLE thread_kill_event;
HANDLE can_thread_handle;
static DWORD WINAPI _can_recv_threadBootstrap(LPVOID This);
static DWORD WINAPI _can_recv_threadBootstrap(LPVOID This) {
return ((PandaJ2534Device*)This)->can_recv_thread();
}
DWORD can_recv_thread();
HANDLE flow_control_wakeup_event;
HANDLE flow_control_thread_handle;
static DWORD WINAPI _msg_tx_threadBootstrap(LPVOID This);
static DWORD WINAPI _msg_tx_threadBootstrap(LPVOID This) {
return ((PandaJ2534Device*)This)->msg_tx_thread();
}
DWORD msg_tx_thread();
std::list<std::unique_ptr<SCHEDULED_TX_MSG>> active_flow_control_txs;
Mutex active_flow_control_txs_lock;
std::queue<std::unique_ptr<SCHEDULED_TX_MSG>> txMsgsAwaitingEcho;
std::list<std::shared_ptr<Action>> task_queue;
Mutex activeTXs_mutex;
std::queue<std::shared_ptr<J2534Connection>> ConnTxQueue;
std::set<std::shared_ptr<J2534Connection>> ConnTxSet;
Mutex ConnTxMutex;
Mutex connTXSet_mutex;
BOOL txInProgress;
};

View File

@ -405,7 +405,7 @@ PANDAJ2534DLL_API long PTAPI PassThruIoctl(unsigned long ChannelID, unsigned lon
case DATA_RATE: // 5-500000
return ret_code(get_channel(ChannelID)->setBaud(inconfig->ConfigPtr[i].Value));
case LOOPBACK: // 0 (OFF), 1 (ON) [0]
get_channel(ChannelID)->loopback = bool(inconfig->ConfigPtr[i].Value);
get_channel(ChannelID)->loopback = (inconfig->ConfigPtr[i].Value != 0);
break;
case NODE_ADDRESS: // J1850PWM Related (Not supported by panda). HDS requires these to 'work'.
case NETWORK_LINE:

View File

@ -97,6 +97,7 @@
<ClInclude Include="J2534Frame.h" />
<ClInclude Include="J2534MessageFilter.h" />
<ClInclude Include="J2534_v0404.h" />
<ClInclude Include="Action.h" />
<ClInclude Include="MessageTx.h" />
<ClInclude Include="MessageTxTimeout.h" />
<ClInclude Include="MessageTx_CAN.h" />
@ -121,6 +122,8 @@
<ClCompile Include="J2534Connection_CAN.cpp" />
<ClCompile Include="J2534Connection_ISO15765.cpp" />
<ClCompile Include="J2534MessageFilter.cpp" />
<ClCompile Include="MessageTxTimeout.cpp" />
<ClCompile Include="MessageTx_CAN.cpp" />
<ClCompile Include="MessageTx_ISO15765.cpp" />
<ClCompile Include="PandaJ2534Device.cpp" />
<ClCompile Include="pandaJ2534DLL.cpp" />

View File

@ -75,9 +75,6 @@
<ClInclude Include="targetver.h">
<Filter>Header Files\boilerplate</Filter>
</ClInclude>
<ClInclude Include="MessageTx.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="J2534Frame.h">
<Filter>Header Files</Filter>
</ClInclude>
@ -96,6 +93,12 @@
<ClInclude Include="MessageTxTimeout.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="Action.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="MessageTx.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="pandaJ2534DLL.cpp">
@ -128,6 +131,12 @@
<ClCompile Include="MessageTx_ISO15765.cpp">
<Filter>Source Files\J2534_ISO15765</Filter>
</ClCompile>
<ClCompile Include="MessageTxTimeout.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="MessageTx_CAN.cpp">
<Filter>Source Files\J2534_CAN</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="pandaJ2534DLL.rc">