Page tree

В рамках данной идеи разработаем алгоритм, который будет lock-free, если добавление ребра не нарушает текущий топологический порядок, и берется блокировка при операции модификации. Основная идея заключается в том, чтобы не производить "хорошие" (не нарушающие топ. порядок) добавления физически, а добавлять их в некую очередь.

ВНИМАНИЕ! ТУТ ЕСТЬ БАГИ! МНОГО!

class Node {
    OrderAndEpoch: Long or Descriptor // should have volatile semantics
     
    fun Ord(): Int // returns order
    fun Epoch(): Int // returns epoch when the order has been modified
}
data class Descriptor(newValue: Long)
  
shared Epoch: Int
shared AddQueueTail: MultipleProducerSingleConsumerQueue<Edge>
  
shared OrdInt: Map<Int, Node> // Maps orders to the nodes, for modifier thread only
  
fun add((u, v): Edge): List<Cycle> {
retry: while (true) {
    // Read current epoch and orders of the edge endings
    val curEpoch = Epoch
    val ordU = u.OrderAndEpoch
    val ordV = v.OrderAndEpoch
    // Check that ordU and ordV are not Descriptors,
    // otherwise help to complete the operation
    if (ordU is Descriptor) {
        u.OrderAndEpoch.CAS(ordU, ordU.newValue)
        continue retry
    } else if (ordV is Descriptor) {
        v.OrderAndEpoch.CAS(ordV, ordV.newValue)
        continue retry
    }
    // Add the edge
    if (ordU < ordV) {
        // Edge could be added without topological order modification.
        // So add it to the "add" operations queue if the epoch has not been changed
        val success = atomically {
            AddQueueTail.add( (u,v) )
            assert(curEpoch == Epoch)
        }
    } else {
        // Otherwise we should rearrange nodes between v and u
        exclusively {
        	// Try to add the edge without modification after the writer lock is acquired
        	if (tryToAddWithoutModification((u, v))
        		return noCycles() // just an empty list
            performAllOperationsFromAddQueue() // Optimisation only
            (newTopOrder, success) = countNewTopOrder() // Count new topological order
            if (success) {
                // If (u, v) edge could be added successfully,
                // update the topological order values for other threads.
                // Add descriptors firstly (for lock-free unmodifiable operations) ...
                for (i = v.Ord() .. u.Ord())
                    ordInv[i].OrderAndEpoch = Descriptor(newTopOrder[i])
                // ... Increment the epoch
                Epoch++
                // Edge (u, v) has been added, but it is not guaranteed 
                // that new edges in the queue do not break new topological order,
                // so check it and revert topological order change if needed
                val successAdd = addEdgesFromQueueInBatch()
				if (successAdd) {
            		// Edge (u, v) has been added successfully, replace Descriptors with new values if needed
                	performDescriptors(v.Ord(), u.Ord())  
                else {
                	// "Good" edges from the queue create a cycle,
                	// revert modification operation
                	revertTopOrder()
                	// Complete previous descriptors replacement
                	performDescriptors(v.Ord(), u.Ord())
                	// Revert topological order
                	for (i = v.Ord() .. u.Ord())
                    	ordInv[i].OrderAndEpoch = Descriptor(prevTopOrder[i])
                    // Start new epoch
                    Epoch++
                    // Replace descriptors with their values
                    performDescriptors(v.Ord(), u.Ord())
                    // Find cycles and return them
                    return getCycles(u, v)
                }        
            } else {
            	// Find cycles and return them
            	return getCycles(u, v)
            }
        }
    }
    return noCycles() // just an empty list
}

fun performDescriptors(from: Int, to: Int) {
	for (i = from .. to) {
    	val ordI = ordInv[i].OrderAndEpoch
        if (ordI is Descriptor) // Otherwise another thread replaces the descriptor with its value before
        	ordInv[i].OrderAndEpoch.CAS(ordI, ordI.newValue)
    }
}
  • No labels