В рамках данной идеи разработаем алгоритм, который будет lock-free, если добавление ребра не нарушает текущий топологический порядок, и берется блокировка при операции модификации. Основная идея заключается в том, чтобы не производить "хорошие" (не нарушающие топ. порядок) добавления физически, а добавлять их в некую очередь.
ВНИМАНИЕ! ТУТ ЕСТЬ БАГИ! МНОГО!
class Node {
OrderAndEpoch: Long or Descriptor // should have volatile semantics
fun Ord(): Int // returns order
fun Epoch(): Int // returns epoch when the order has been modified
}
data class Descriptor(newValue: Long)
shared Epoch: Int
shared AddQueueTail: MultipleProducerSingleConsumerQueue<Edge>
shared OrdInt: Map<Int, Node> // Maps orders to the nodes, for modifier thread only
fun add((u, v): Edge): List<Cycle> {
retry: while (true) {
// Read current epoch and orders of the edge endings
val curEpoch = Epoch
val ordU = u.OrderAndEpoch
val ordV = v.OrderAndEpoch
// Check that ordU and ordV are not Descriptors,
// otherwise help to complete the operation
if (ordU is Descriptor) {
u.OrderAndEpoch.CAS(ordU, ordU.newValue)
continue retry
} else if (ordV is Descriptor) {
v.OrderAndEpoch.CAS(ordV, ordV.newValue)
continue retry
}
// Add the edge
if (ordU < ordV) {
// Edge could be added without topological order modification.
// So add it to the "add" operations queue if the epoch has not been changed
val success = atomically {
AddQueueTail.add( (u,v) )
assert(curEpoch == Epoch)
}
} else {
// Otherwise we should rearrange nodes between v and u
exclusively {
// Try to add the edge without modification after the writer lock is acquired
if (tryToAddWithoutModification((u, v))
return noCycles() // just an empty list
performAllOperationsFromAddQueue() // Optimisation only
(newTopOrder, success) = countNewTopOrder() // Count new topological order
if (success) {
// If (u, v) edge could be added successfully,
// update the topological order values for other threads.
// Add descriptors firstly (for lock-free unmodifiable operations) ...
for (i = v.Ord() .. u.Ord())
ordInv[i].OrderAndEpoch = Descriptor(newTopOrder[i])
// ... Increment the epoch
Epoch++
// Edge (u, v) has been added, but it is not guaranteed
// that new edges in the queue do not break new topological order,
// so check it and revert topological order change if needed
val successAdd = addEdgesFromQueueInBatch()
if (successAdd) {
// Edge (u, v) has been added successfully, replace Descriptors with new values if needed
performDescriptors(v.Ord(), u.Ord())
else {
// "Good" edges from the queue create a cycle,
// revert modification operation
revertTopOrder()
// Complete previous descriptors replacement
performDescriptors(v.Ord(), u.Ord())
// Revert topological order
for (i = v.Ord() .. u.Ord())
ordInv[i].OrderAndEpoch = Descriptor(prevTopOrder[i])
// Start new epoch
Epoch++
// Replace descriptors with their values
performDescriptors(v.Ord(), u.Ord())
// Find cycles and return them
return getCycles(u, v)
}
} else {
// Find cycles and return them
return getCycles(u, v)
}
}
}
return noCycles() // just an empty list
}
fun performDescriptors(from: Int, to: Int) {
for (i = from .. to) {
val ordI = ordInv[i].OrderAndEpoch
if (ordI is Descriptor) // Otherwise another thread replaces the descriptor with its value before
ordInv[i].OrderAndEpoch.CAS(ordI, ordI.newValue)
}
} |