jobserver: bugfix: potential race
Delay insertion of to-be-sent jobs into m_runningJobs until they are really sent, as getMessage() won't work anymore (as in: segfault) if this job is concurrently re-sent (due to campaign end), its result is received, and deleted in the campaign. This becomes non-hypothetical with larger values for CLIENT_JOB_LIMIT and CLIENT_JOB_REQUEST_SEC. Additionally, reinsert the remaining jobs into the input queue if communication fails, instead of inefficiently delaying redistribution until the campaign end. Change-Id: If85e3c8261deda86beb8d4d93343429223753f22
This commit is contained in:
@ -266,10 +266,6 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
|||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!m_js.m_runningJobs.insert(workloadID, temp_exp)) {
|
|
||||||
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (exp.size() != 0) {
|
if (exp.size() != 0) {
|
||||||
ctrlmsg.set_job_size(exp.size());
|
ctrlmsg.set_job_size(exp.size());
|
||||||
@ -280,8 +276,22 @@ void CommThread::sendPendingExperimentData(Minion& minion)
|
|||||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
if (SocketComm::sendMsg(minion.getSocketDescriptor(), ctrlmsg)) {
|
||||||
for (i = 0; i < ctrlmsg.job_size(); i++) {
|
for (i = 0; i < ctrlmsg.job_size(); i++) {
|
||||||
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
|
if (SocketComm::sendMsg(minion.getSocketDescriptor(), exp.front()->getMessage())) {
|
||||||
|
|
||||||
|
// delay insertion into m_runningJobs until here, as
|
||||||
|
// getMessage() won't work anymore if this job is re-sent,
|
||||||
|
// received, and deleted in the meantime
|
||||||
|
if (!m_js.m_runningJobs.insert(exp.front()->getWorkloadID(), exp.front())) {
|
||||||
|
cout << "!![Server]could not insert workload id: [" << workloadID << "] double entry?" << endl;
|
||||||
|
}
|
||||||
|
|
||||||
exp.pop_front();
|
exp.pop_front();
|
||||||
} else {
|
} else {
|
||||||
|
// add remaining jobs back to the queue
|
||||||
|
cout << "!![Server] failed to send scheduled " << exp.size() << " jobs" << endl;
|
||||||
|
while (exp.size()) {
|
||||||
|
m_js.m_undoneJobs.Enqueue(exp.front());
|
||||||
|
exp.pop_front();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user