• <ruby id="5koa6"></ruby>
    <ruby id="5koa6"><option id="5koa6"><thead id="5koa6"></thead></option></ruby>

    <progress id="5koa6"></progress>

  • <strong id="5koa6"></strong>
  • FreeBSD5.0內核 - 鎖機制

    發表于:2007-05-25來源:作者:點擊數: 標簽:FreeBSD5.0內核機制
    在FreeBSD5.0中,有很多類型的鎖:互斥體(struct mtx)、共享/獨占鎖(struct sx)、lockmgr鎖(struct lock)、條件變量(struct cv)和信號量(struct sema)。本文將探討這些鎖機制的含義、應用和實現。另外,有一種文件鎖(struct lockf)用于文件字段的保護,嵌在i
    在FreeBSD5.0中,有很多類型的鎖:互斥體(struct mtx)、共享/獨占鎖(struct sx)、lockmgr鎖(struct lock)、條件變量(struct cv)和信號量(struct sema)。本文將探討這些鎖機制的含義、應用和實現。另外,有一種文件鎖(struct lockf)用于文件字段的保護,嵌在inode結構中使用,是屬于另外的范疇,這里不做說明,有興趣可以參考flock(2)和VOP_ADVLOCK(9)。

    在FreeBSD5.0中,有很多類型的鎖:互斥體(struct mtx)、共享/獨占鎖(struct sx)、lockmgr鎖(struct lock)、條件變量(struct cv)和信號量(struct sema)。本文將探討這些鎖機制的含義、應用和實現。另外,有一種文件鎖(struct lockf)用于文件字段的保護,嵌在inode結構中使用,是屬于另外的范疇,這里不做說明,有興趣可以參考flock(2)和VOP_ADVLOCK(9)。 1 基本鎖機制 struct lock_class { const char *lc_name; u_int lc_flags; }; 本文討論的鎖機制,任何一種類型都直接/間接的包含該結構。每一個鎖都有一個類(lock_class),該類描述了某個鎖類(lc_name)和基本特性(lc_flags)。 其中,lc_flags的宏定義:LC_SLEEPLOCK是普通鎖;LC_SPINLOCK是自旋鎖;LC_SLEEPABLE說明可以在對該鎖加鎖的情況下,休眠;LC_RECURSABLE說明該鎖是否支持遞歸;LC_UPGRADABLE說明該鎖支持從共享鎖變為獨占鎖,抑或相反。 目前,系統支持如下的lock_class: struct lock_class lock_class_mtx_sleep = { "sleep mutex", LC_SLEEPLOCK | LC_RECURSABLE }; struct lock_class lock_class_mtx_spin = { "spin mutex", LC_SPINLOCK | LC_RECURSABLE }; struct lock_class lock_class_sx = { "sx", LC_SLEEPLOCK | LC_SLEEPABLE | LC_RECURSABLE | LC_UPGRADABLE }; 前兩個說明了互斥體支持的兩個類(MTX_DEF、MTX_SPIN);后一個說明了共享/獨占鎖的類。 struct lock_object { struct lock_class *lo_class; const char *lo_name; /* Individual lock name. */ const char *lo_type; /* General lock type. */ u_int lo_flags; TAILQ_ENTRY(lock_object) lo_list; /* List of all locks in system. */ struct witness *lo_witness; }; 用于存儲一個具體鎖的數據結構,lo_name是單個具體鎖的名字;如果在鎖初始化時指定了類型描述的字符串,則賦給lo_type,否則lo_type的值和lo_name一樣。 成員lo_flags的值,及其說明如下: #define LO_CLASSFLAGS 0x0000ffff /* Class specific flags. */ #define LO_INITIALIZED 0x00010000 /* Lock has been initialized. */ #define LO_WITNESS 0x00020000 /* Should witness monitor this lock. */ #define LO_QUIET 0x00040000 /* Don't log locking operations. */ #define LO_RECURSABLE 0x00080000 /* Lock may recurse. */ #define LO_SLEEPABLE 0x00100000 /* Lock may be held while sleeping. */ #define LO_UPGRADABLE 0x00200000 /* Lock may be upgraded/downgraded. */ #define LO_DUPOK 0x00400000 /* Don't check for duplicate acquires */ 余下兩個成員,lo_list用于加入全局all_locks鏈表中,lo_witness用于指向witness數據結構。這兩個成員的作用是用于witness模塊,跟蹤鎖的獲得、釋放。這個部分需要參考witness(9)。 本節介紹了FreeBSD鎖機制的最基本的結構和意義,下面介紹各種鎖機制的基本含義。 2互斥體(Mutex) FreeBSD5.0進程同步控制的基本和主要方法是Mutex。在設計時,主要有如下考慮: Ø 申請和釋放一個無爭議的互斥體應該盡可能的簡單。 Ø 互斥體的數據結構應該有足夠的信息和存儲空間來支持優先級的繼承。 Ø 一個進程應該可以遞歸地使用某一個互斥體(比如最常用的全局互斥體Giant)。 根據Mutex阻塞時,是否做上下文切換,可以分為兩類:MTX_DEF(做切換)和MTX_SPIN(不做切換)。 MTX_DEF:當內核線程不能獲得鎖時,該線程會放棄CPU資源;采用這樣類型的鎖,不必考慮產生死鎖的情況。 MTX_SPIN:當內核線程不能獲得鎖時,該線程不會放棄CPU資源,它會自旋,等待直到請求的鎖被其它CPU釋放。這樣做,可能會產生死鎖的情況,因此,我們需要在使用這種類型的鎖屏蔽本地CPU的所有中斷。 關于互斥體的其他類型可以和以上兩種類型并存,它們說明了互斥體的其它屬性。 MTX_RECURSE:具有該標志的鎖,可以被遞歸使用。 MTX_QUIET:對該鎖不做任何日志操作。 MTX_NOWITNESS:通知witness,不必跟蹤該鎖。 MTX_DUPOK:當該鎖被復制時,witness不必寫日志消息。 2.1 互斥體數據結構 struct mtx { struct lock_object mtx_object; /* Common lock properties. */ volatile uintptr_t mtx_lock; /* Owner and flags. */ volatile u_int mtx_recurse; /* Number of recursive holds. */ TAILQ_HEAD(, thread) mtx_blocked; /* Threads blocked on us. */ LIST_ENTRY(mtx) mtx_contested; /* Next contested mtx. */ /* Fields used only if MUTEX_PROFILING is configured: */ u_int64_t mtx_acqtime; const char *mtx_filename; int mtx_lineno; }; 成員mtx_object的意義和前面介紹lock_object結構所說的一樣,其lo_class只能是lock_class_mtx_sleep或lock_class_mtx_spin。 成員mtx_lock是對互斥體鎖狀態的額外說明,其值如下: #define MTX_RECURSED 0x00000001 /* lock recursed (for MTX_DEF only) */ #define MTX_CONTESTED 0x00000002 /* lock contested (for MTX_DEF only) */ #define MTX_UNOWNED 0x00000004 /* Cookie for free mutex */ 成員mtx_recurse是統計遞歸調用該鎖次數的計數器。 成員mtx_blocked是阻塞于該鎖的線程鏈表的頭指針。 成員mtx_contested作為一個實體,鏈接在thread結構中td_contested指向的該線程的競爭互斥體鏈表中。 余下的三個成員用于Mutex調試。 2.2 Mutex接口函數 函數void mtx_init(struct mtx *mutex, const char *name, const char *type, int opts): 在互斥體鎖能夠被使用之前,該鎖mutex必須通過mtx_init初始化;入參name和type都是說明性的字符串,用于調試和witness;opts指明了該鎖的類型和特性。 函數void mtx_lock_flags(struct mtx *mutex, int flags); 該函數是為目前正在運行的內核線程A申請一個相互排斥的MTX_DEF類型鎖,如果正在運行的另一個內核線程B持有該鎖,則線程A會放棄CPU資源,直到該鎖能被再次使用,通常在等待該鎖,線程A會sleep。函數mtx_lock()是其一個特例。 函數void mtx_lock_spin_flags(struct mtx *mutex, int flags); 該函數是為目前正在運行的內核線程A申請一個相互排斥的MTX_SPIN類型鎖,如果正在運行的另一個內核線程B持有該鎖,則線程A會自旋直到該鎖能夠被再次使用,在線程A自旋及獲得該鎖的過程中,對于線程A而言,所有的中斷應該屏蔽。函數mtx_lock_spin()是其一個特例。 函數int mtx_trylock_flags(struct mtx *mutex, int flags): 該函數試圖獲得一個互斥體mutex,如果不能,則立即返回0,如果可以,則獲得該鎖,并返回非零值。函數mtx_trylock()是其一個特例。 函數void mtx_unlock_flags(struct mtx *mutex, int flags): 該函數釋放其持有的相互排斥的MTX_DEF類型鎖,如果有一個高優先級的線程正在等待該鎖,則釋放該鎖的線程釋放CPU資源以便高優先級的線程能夠馬上獲得該鎖,并執行。只有在釋放該鎖的線程處于臨界區時,是例外。函數mtx_unlock()是其一個特例。 函數void mtx_unlock_spin_flags(struct mtx *mutex, int flags): 該函數釋放其持有的相互排斥的MTX_ SPIN類型鎖,恢復在獲得該鎖前的中斷狀態。函數mtx_unlock_spin()是其一個特例。 函數void mtx_destroy(struct mtx *mutex): 該函數用于銷毀一個互斥體鎖,并釋放或重寫該鎖的所有數據。一個沒有初始化(mtx_init)的鎖是不應該被銷毀的。如果該鎖只有一個持有統計數時,是允許被銷毀的。當該鎖處于遞歸調用中,或是有另外的進程阻塞于該鎖時,該鎖是不允許被銷毀的。 宏int mtx_initialized(struct mtx *mutex):如果該鎖已經初始化了,返回非零,否則返回零。宏int mtx_owned(struct mtx *mutex):如果當前線程持有該鎖,返回非零,否則返回零。宏int mtx_recursed(struct mtx *mutex):如果該鎖正在被遞歸使用,則返回非零,否則返回零。 前面,我們提到的mtx_xxx_flag函數系列,其flags入參說明相關鎖作是否需要log日志(MTX_QUIET);相應的mtx_xxx函數系列,是不須log日志。 另外,函數void mtx_assert(struct mtx *mutex, int what):是用于診斷的,于互斥體鎖機制的原理關系并不緊密,因此,不作過多論述。 宏MTX_SYSINIT是對互斥體鎖初始化的一個封裝。 2.3 系統對互斥體子系統以及互斥體池的初始化 我們以i386體系為例,在內核加載的很早的時候(在與平臺相關的初始化階段,產生局部描述符表(LDT)之前),互斥體鎖子系統就已經初始化了(在init386()函數中調用mutex_init()函數實現)。 /* Setup thread0 so that mutexes work. */ LIST_INIT(&thread0.td_contested); /* Initialize mutexes. */ mtx_init(&Giant, "Giant", NULL, MTX_DEF | MTX_RECURSE); mtx_init(&sched_lock, "sched lock", NULL, MTX_SPIN | MTX_RECURSE); mtx_init(&proc0.p_mtx, "process lock", NULL, MTX_DEF | MTX_DUPOK); mtx_lock(&Giant); 初始化第一個線程的競爭互斥體鏈表。 初始化Giant全局鎖,支持遞歸調用的MTX_DEF類型。鎖Giant被廣泛地用在內核的幾乎每個角落,保護內核一般性代碼。 初始化sched_lock全局鎖,支持遞歸調用的MTX_SPIN類型。鎖sched_lock用于保護內核調度隊列。 以上兩個鎖都是可遞歸的,這里,需要補充說明一下可遞歸鎖的意義,一個非遞歸鎖加鎖后,是不能夠被再次加鎖;而當遞歸鎖遇到這樣的情況,首先檢查加鎖的持有者,如果是當前進程,那么對遞歸計數器加1,如果不是當前進程,則按非遞歸鎖那樣處理;當解鎖一個遞歸鎖時,遞歸計數器減1,當計數器為0時,正真解鎖(和非遞歸鎖調用mtx_unlock_xxx()一樣)。 初始化proc0變量的內部鎖p_mtx,該鎖用于保護對proc結構的操作。 最后對Giant加鎖。為什么必需在這里就對Giant加鎖?因為Giant必須先于其他任何互斥體先獲得鎖。即:在持有其他鎖的情況下,是不可能以非遞歸的方式獲得Giant(即第一次調用mtx_lock(&Giant));在持有Giant的情況下,時可以獲得其他互斥體;在持有其他鎖的情況下,是可以以遞歸方式獲得Giant。 在內核加載的稍后階段,會初始化互斥體池(在mi_startup()函數中,通過加載SI_SUB_MTX_POOL子模塊實現)。 初始化一個長度為MTX_POOL_SIZE(128)的互斥鎖數組mtx_pool_ary?;コ怄i池主要用在lock和sx中,參考lockinit(9)和sx(9)。以后涉及到一般性鎖和共享/獨占鎖時,再討論其用法。 2.4 互斥體加鎖/解鎖的過程 互斥體鎖的初始化和銷毀函數非常簡單,這里就不做論述了,有興趣的讀者可以參考源碼的實現。本節主要論述互斥體鎖的加鎖/解鎖的過程。 MTX_DEF: 獲得/釋放互斥體鎖(MTX_DEF)的原子操作: 獲得互斥體鎖是通過_obtain_lock宏(展開后是atomic_cmpset_int()嵌入匯編的內聯函數)實現的,其思想是:用mtx的成員mtx_lock與MTX_UNOWNED比較,如果相同,則說明沒有加鎖,則將當前線程(curthread)的地址傳給mtx_lock,并返回非零;否則,說明該鎖已經被加鎖,不做任何操作,返回零。 釋放互斥體鎖是通過_release_lock宏(展開后是atomic_cmpset_int()嵌入匯編的內聯函數)實現的,其思想是:用mtx的成員mtx_lock與當前線程(curthread)比較,如果相同,則說明該鎖被當前線程加鎖,則將MTX_UNOWNED傳給mtx_lock,并返回非零;否則,說明該鎖不是被當前線程加鎖,不做任何操作,返回零。 如果能正常加鎖/解鎖,則無需執行后面的函數。我們現在考慮不能加鎖/解鎖的情況,根據MTX_DEF的特性,當前線程應該試圖sleep,放棄其CPU資源,這是分別通過函數_mtx_lock_sleep/_mtx_unlock_sleep實現的。 函數_mtx_lock_sleep():(省略了和鎖操作日志、witness相關代碼,以線程A為調用者) if ((m->mtx_lock & MTX_FLAGMASK) == (uintptr_t)td) { m->mtx_recurse++; atomic_set_ptr(&m->mtx_lock, MTX_RECURSED); return; } 如果該鎖支持遞歸,而且該鎖的目前持有者是當前線程,則遞歸計數器加1,直接返回。 while (!_obtain_lock(m, td)) { … } 試圖獲得鎖,如果不行則休息一定時間,再次試圖獲得鎖;不停循環,直至獲得該鎖。我們接下來分析while語句里的代碼。 mtx_lock_spin(&sched_lock); if ((v = m->mtx_lock) == MTX_UNOWNED) { mtx_unlock_spin(&sched_lock); ia32_pause(); continue; } 開始準備調度其他線程運行,這需要在sched_lock鎖的保護下進行。在對sched_lock加鎖的過程中,有可能該鎖m已經被解鎖了,因此,再次通過_obtain_lock()試圖獲得該鎖。其中ia32_pause()是i386體系的一個暫停操作語句。之所以將mtx_lock記錄v值,是因為在隨后的操作中,可能mtx_lock會被改變(由調度產生)。 if (v == MTX_CONTESTED) { td1 = TAILQ_FIRST(&m->mtx_blocked); m->mtx_lock = (uintptr_t)td | MTX_CONTESTED; if (td1->td_priority < td->td_priority) td->td_priority = td1->td_priority; mtx_unlock_spin(&sched_lock); return; } 當v的值為MTX_CONTESTED,說明還有其他線程B、C阻塞在該鎖上,則將mtx_lock設置為線程A所持有,并改變線程A的優先級,完成線程調度,返回。 if ((v & MTX_CONTESTED) == 0 && !atomic_cmpset_ptr(&m->mtx_lock, (void *)v, (void *)(v | MTX_CONTESTED))) { mtx_unlock_spin(&sched_lock); ia32_pause(); continue; } 如果v值還沒有設置MTX_CONTESTED,并且v值和mtx_lock相同,說明該線程正阻塞于該鎖,應該設置為MTX_CONTESTED的狀態,繼續執行if以外的語句。如果v值已經設置了MTX_CONTESTED,則繼續執行if以外語句。如果v值還沒設置MTX_CONTESTED,并且mtx_lock和v值不相同,暫停操作,再次試圖獲得該鎖。 if (TAILQ_EMPTY(&m->mtx_blocked)) { td1 = mtx_owner(m); LIST_INSERT_HEAD(&td1->td_contested, m, mtx_contested); TAILQ_INSERT_TAIL(&m->mtx_blocked, td, td_lockq); } else { TAILQ_FOREACH(td1, &m->mtx_blocked, td_lockq) if (td1->td_priority > td->td_priority) break; if (td1) TAILQ_INSERT_BEFORE(td1, td, td_lockq); else TAILQ_INSERT_TAIL(&m->mtx_blocked, td, td_lockq); } 當代碼執行到這里,說明該鎖被其他線程持有,將該線程A插入該m的阻塞隊列mtx_blocked中。 d->td_blocked = m; td->td_lockname = m->mtx_object.lo_name; TD_SET_LOCK(td); propagate_priority(td); td->td_proc->p_stats->p_ru.ru_nvcsw++; mi_switch(); mtx_unlock_spin(&sched_lock); 最后,完成一些線程相關的設置,然后調用mi_switch()切換線程。 函數_mtx_unlock_sleep(): if (mtx_recursed(m)) { if (--(m->mtx_recurse) == 0) atomic_clear_ptr(&m->mtx_lock, MTX_RECURSED); return; } 如果該鎖m是支持遞歸的,則遞歸計數器減1,如果計數器為零,則,清除mtx_lock的設置,返回。 mtx_lock_spin(&sched_lock); td1 = TAILQ_FIRST(&m->mtx_blocked); TAILQ_REMOVE(&m->mtx_blocked, td1, td_lockq); 選出阻塞于該鎖的第一個線程A,并將其移出mtx_blocked隊列。 if (TAILQ_EMPTY(&m->mtx_blocked)) { LIST_REMOVE(m, mtx_contested); _release_lock_quick(m); } else atomic_store_rel_ptr(&m->mtx_lock, (void *)MTX_CONTESTED); 如果mtx_blocked隊列為空,則將mtx_lock設置為MTX_UNOWNED,否則設置為MTX_CONTESTED。 pri = PRI_MAX; LIST_FOREACH(m1, &td->td_contested, mtx_contested) { int cp = TAILQ_FIRST(&m1->mtx_blocked)->td_priority; if (cp < pri) pri = cp; } if (pri > td->td_base_pri) pri = td->td_base_pri; td->td_priority = pri; 遍歷當前線程B的所有競爭鎖,找出一個優先級最高的(td_priority最小),并將該值賦給pri變量。并調整線程B的活動優先級。 td1->td_blocked = NULL; TD_CLR_LOCK(td1); if (!TD_CAN_RUN(td1)) { mtx_unlock_spin(&sched_lock); return; } setrunqueue(td1); 如果線程A可以運行,則加入可運行線程隊列,等待分時調度。如果不可運行,說明線程A還在等待別的資源。 if (td->td_critnest == 1 && td1->td_priority < pri) { td->td_proc->p_stats->p_ru.ru_nivcsw++; mi_switch(); } mtx_unlock_spin(&sched_lock); 如果線程B進入臨界區,并且線程A的優先級高于線程B,則通過mi_switch()做上下文切換。 MTX_SPIN: 對MTX_SPIN類型的鎖加鎖,展開mtx_lock_spin()得到: critical_enter(); if (!_obtain_lock((mp), (tid))) { if ((mp)->mtx_lock == (uintptr_t)(tid)) (mp)->mtx_recurse++; else _mtx_lock_spin((mp), (opts), (file), (line)); } 假設該函數的調用者是線程A,通過critical_enter()函數,對當前線程A的td_critnest成員加1,說明線程A進入臨界區。如果能獲得該鎖,則成功返回。如果不能:如果該鎖的持有者是線程A,則將遞歸計數器加1,如果不是,則調用_mtx_lock_spin函數處理。下面討論該函數。 該函數是一個for(;循環主體,我們討論for語句里的內容,即自旋鎖的自旋主體。 if (_obtain_lock(m, curthread)) break; critical_exit(); 試圖過得該鎖,如果可以,停止自旋。如果不能,繼續自旋。通過critical_exit()函數退出臨界區,這樣允許我們在自旋的時候,如果有中斷產生,中斷能夠有機會處理。 while (m->mtx_lock != MTX_UNOWNED) { if (i++ < 10000000) { ia32_pause(); continue; } if (i < 60000000) DELAY(1); else panic("spin lock %s held by %p for > 5 seconds", m->mtx_object.lo_name, (void *)m->mtx_lock); ia32_pause(); } critical_enter(); 自旋,如果,自旋時間大于5秒,則認為出錯。如果mtx_lock的值為MTX_UNOWNED,則說明該鎖已被釋放。因此,可以進入臨界區,再次試圖獲得該鎖。 對MTX_SPIN類型的鎖加鎖,展開mtx_unlock_spin()得到: if (mtx_recursed((mp))) (mp)->mtx_recurse--; else _release_lock_quick((mp)); critical_exit(); 如果該鎖遞歸值不為0,則遞歸計數器減1。如果為零,則將該鎖的mtx_lock設置為MTX_UNOWNED。最后退出臨界區。 3 條件變量(Condition variables) 條件變量構建于互斥體之上,和互斥體聯合使用,等待條件發生。我們可以理解為基于鎖的msleep機制(參考:msleep(9))。 3.1 條件變量數據結構 struct cv { struct cv_waitq cv_waitq; struct mtx *cv_mtx; const char *cv_description; }; 成員cv_waitq是阻塞于該條件變量的線程鏈表表頭;成員cv_mtx應該指向通過函數系列cv_*wait*()傳入的參數(mtx類型指針),目前只有在定義了INVARIANTS的時候有意義;成員cv_description是該條件變量的說明性文字。 3.2 條件變量的接口函數 函數void cv_init(struct cv *cvp, const char *desc): 通過該函數初始化條件變量cvp,其說明性文字是通過desc指針獲得。 TAILQ_INIT(&cvp->cv_waitq); cvp->cv_mtx = NULL; cvp->cv_description = desc; 這段初始化的代碼十分簡單。 函數void cv_destroy(struct cv *cvp): 通過該函數銷毀一個條件變量cvp。 KASSERT(cv_waitq_empty(cvp), ("%s: cv_waitq non-empty", __func__)); 通過宏cv_waitq_empty判斷阻塞于該條件變量的線程隊列cv_waitq是否為空。如果不為空則出錯。 當線程等待條件變量發生時,阻塞等待;當條件變量滿足后,通過信號通知。用于阻塞等待的函數系列:cv_wait()、cv_wait_sig()、cv_timedwait()和cv_timedwait_sig()。 函數void cv_wait(struct cv *cvp, struct mtx *mp): 該函數用于等待一個條件變量。當等待條件變量時,將當前線程放置在該條件變量的等待隊列(cv_waitq)中,然后掛起該線程。 mtx_lock_spin(&sched_lock); CV_WAIT_VALIDATE(cvp, mp); DROP_GIANT(); mtx_unlock(mp); cv_waitq_add(cvp, td); cv_switch(td); mtx_unlock_spin(&sched_lock); PICKUP_GIANT(); mtx_lock(mp); 這段代碼簡單的說,就是在調度自旋鎖的保護下,將當前線程掛起,并做上下文切換。宏CV_WAIT_VALIDATE的意義:如果該條件變量cvp目前沒有阻塞線程,則將成員cv_mtx指向互斥體mp;如果有阻塞線程,而且其cv_mtx指向的互斥體和mp不一樣,則異常出錯。 通過cv_waitq_add()函數,將td線程加入cvp的阻塞隊列中,并完成td的相關設置。通過cv_switch()函數完成td的相關工作,并調用mi_switch()做上下文切換。之所以要對mp解鎖,是給阻塞于該mp的其他線程運行的機會。在等待到條件變量,并且輪到自己的時間片,則善后處理,并對mp再次加鎖。 宏DROP_GIANT和PICKUP_GIANT是有關Giant互斥體操作的一對宏,必須聯合使用。這對宏的作用是:在該對宏的代碼中,存在主動上下文切換,因此,需要先把Giant解鎖,在處理完中間代碼后,再將Giant恢復到DROP_GIANT宏之前的狀態。其宏展開如下: do { int _giantcnt; for (_giantcnt = 0; mtx_owned(&Giant); _giantcnt++) mtx_unlock(&Giant) 我們中間代碼,關于這對宏,我省略了witness和斷言的相關代碼,這樣更清晰些。 while (_giantcnt--) mtx_lock(&Giant); } while (0) 函數int cv_wait_sig(struct cv *cvp, struct mtx *mp): 該函數同樣用于等待條件變量,當等待條件變量時,允許信號的中斷。給出其主要代碼,和cv_wait()函數相同的部分就不討論了。 mtx_lock_spin(&sched_lock); CV_WAIT_VALIDATE(cvp, mp); DROP_GIANT(); mtx_unlock(mp); cv_waitq_add(cvp, td); sig = cv_switch_catch(td); mtx_unlock_spin(&sched_lock); PROC_LOCK(p); if (sig == 0) sig = cursig(td); /* XXXKSE */ if (sig != 0) { if (SIGISMEMBER(p->p_sigacts->ps_sigintr, sig)) rval = EINTR; else rval = ERESTART; } PROC_UNLOCK(p); if (p->p_flag & P_WEXIT) rval = EINTR; PICKUP_GIANT(); mtx_lock(mp); 在這里,是通過cv_switch_catch()調用,決定是否需要上下文切換,在該函數里,信號中斷是允許的。在宏PROC_LOCK(進程結構互斥體保護機制)和PROC_UNLOCK的保護下,會再給一次機會給當前線程處理信號中斷。信號值會作為cv_wait_sig()函數的返回值,由調用者決定如何處理。 另外兩個函數:函數int cv_timedwait(struct cv *cvp, struct mtx *mp, int timo)和函數int cv_timedwait_sig(struct cv *cvp, struct mtx *mp, int timo)和前面兩個函數功能分別對應,只不過加入了等待的最大時間(timo/hz秒)的限制。 接下來,我們討論當條件變量滿足時,信號通知的相關內容,涉及函數cv_signal()和cv_broadcast()。 函數void cv_signal(struct cv *cvp): mtx_lock_spin(&sched_lock); if (!TAILQ_EMPTY(&cvp->cv_waitq)) { CV_SIGNAL_VALIDATE(cvp); cv_wakeup(cvp); } mtx_unlock_spin(&sched_lock); 如果阻塞于該條件變量的線程隊列為空,則不用做任何事。如果不為空,通過cv_wakeup()函數喚醒線程隊列的第一個線程。由于該函數的處理主體是cv_wakeup()函數,下面討論它的實現。 td = TAILQ_FIRST(&cvp->cv_waitq); cv_waitq_remove(td); TD_CLR_SLEEPING(td); setrunnable(td); 得到阻塞于該條件變量的第一個線程,將其移出阻塞隊列,并修改其狀態值,通過調用函數setrunnable(),將該線程加入線程的運行隊列中,等待調度。 函數 void cv_broadcast(struct cv *cvp): mtx_lock_spin(&sched_lock); CV_SIGNAL_VALIDATE(cvp); while (!TAILQ_EMPTY(&cvp->cv_waitq)) cv_wakeup(cvp); mtx_unlock_spin(&sched_lock); 該函數喚醒阻塞于該條件變量的所有線程。 在前面,我們討論當線程阻塞于某個條件變量,涉及到兩個上下文切換函數的調用:cv_switch()和cv_switch_catch()。這兩個函數主要涉及內核調度和線程狀態轉換等知識點,和鎖關系不大,這里就討論了。 4共享/獨占鎖(Share/exclusive locks) 共享/獨占鎖(sx)是非常有效的讀寫鎖。之所以稱之為sx是因為考慮以后添加額外的功能,不僅僅是讀寫功能,但是目前僅用于讀寫控制。由于sx鎖機制的花費比互斥體高許多,因此必須謹慎使用。 4.1 共享/獨占鎖的數據結構 struct sx { struct lock_object sx_object; /* Common lock properties. */ struct mtx *sx_lock; /* General protection lock. */ int sx_cnt; /* -1: xlock, > 0: slock count. */ struct cv sx_shrd_cv; /* slock waiters. */ int sx_shrd_wcnt; /* Number of slock waiters. */ struct cv sx_excl_cv; /* xlock waiters. */ int sx_excl_wcnt; /* Number of xlock waiters. */ struct thread *sx_xholder; /* Thread presently holding xlock. */ }; 成員sx_object說明該共享/獨占鎖的一般性屬性,正如我們第一節提及的,其lock_class是lock_class_sx。成員sx_lock是互斥體鎖指針,用于保護對該共享/獨占鎖成員的操作。成員sx_cnt是鎖的統計,初始化狀態是0,如果是-1,則說明該鎖是獨占鎖;如果是大于0的值,則說明該鎖是共享鎖,并統計了其引用的次數。成員sx_shrd_cv是用于共享鎖的條件變量控制,相應的成員sx_shrd_wcnt記錄了等待該共享鎖的線程數目。成員sx_excl_cv是用于獨占鎖的條件變量控制,相應的成員sx_excl_wcnt記錄了等待該獨占鎖的線程數目。成員sx_xholder指向了當前支持有該獨占鎖的線程。 4.2 共享/獨占鎖的接口函數 函數void sx_init(struct sx *sx, const char *description): lock = &sx->sx_object; bzero(sx, sizeof(*sx)); lock->lo_class = &lock_class_sx; lock->lo_type = lock->lo_name = description; lock->lo_flags = LO_WITNESS | LO_RECURSABLE | LO_SLEEPABLE | LO_UPGRADABLE; sx->sx_lock = mtx_pool_find(sx); sx->sx_cnt = 0; cv_init(&sx->sx_shrd_cv, description); sx->sx_shrd_wcnt = 0; cv_init(&sx->sx_excl_cv, description); sx->sx_excl_wcnt = 0; sx->sx_xholder = NULL; 共享/獨占鎖是通過該函數初始化的,這段代碼非常清晰,唯一需要說明的是,該結構的內部互斥體sx_lock是從互斥體池中獲得,參考第二節。 函數void sx_destroy(struct sx *sx): sx->sx_lock = NULL; cv_destroy(&sx->sx_shrd_cv); cv_destroy(&sx->sx_excl_cv); 通過該函數銷毀一個共享/獨占鎖。因為sx_lock是從互斥體池中獲得,所以只需斷掉鏈接,就可以了。 共享鎖的管理主要由三個函數組成:sx_slock()、sx_try_slock()和sx_sunlock()。 函數void sx_slock(struct sx *sx): mtx_lock(sx->sx_lock); while (sx->sx_cnt < 0) { sx->sx_shrd_wcnt++; cv_wait(&sx->sx_shrd_cv, sx->sx_lock); sx->sx_shrd_wcnt--; } sx->sx_cnt++; mtx_unlock(sx->sx_lock); 該函數申請一個共享鎖sx,在互斥體sx_lock的保護下,如果sx_cnt小于0,說明該sx正被獨占,因此共享申請等待計數器sx_shrd_wcnt加1,調用cv_wait函數,等待共享鎖的條件變量滿足,當條件滿足后,共享申請等待計數器sx_shrd_wcnt減1。獲得共享鎖后,共享鎖持有者計數器sx_cnt加1。 函數int sx_try_slock(struct sx *sx): mtx_lock(sx->sx_lock); if (sx->sx_cnt >= 0) { sx->sx_cnt++; mtx_unlock(sx->sx_lock); return (1); } else { return (0); } 該函數試圖獲得共享鎖,如果可以則將sx_cnt計數器加1,返回非零值,表示成功;如果不能,則說明該鎖正被獨占,直接返回0,表示失敗。 函數void sx_sunlock(struct sx *sx): mtx_lock(sx->sx_lock); sx->sx_cnt--; if (sx->sx_excl_wcnt > 0) { if (sx->sx_cnt == 0) cv_signal(&sx->sx_excl_cv); } else if (sx->sx_shrd_wcnt > 0) cv_broadcast(&sx->sx_shrd_cv); mtx_unlock(sx->sx_lock); 該函數釋放共享鎖,首先將共享持有者計數器sx_cnt減1;如果sx_excl_wcnt大于0,說明現在有申請獨占鎖的線程阻塞,如果sx_cnt為0,說明目前該鎖完全自由,即:沒有被共享占有,則向獨占等待條件變量sx_excl_cv發一個信號,通知它可以試圖申請獨占鎖。在沒有獨占鎖申請的情況下,如果有共享鎖申請等待,即sx_shrd_wcnt大于0,則向共享等待條件變量sx_shrd_cv發一個廣播信號,喚醒所有等待在該條件變量的線程。 獨占鎖的管理主要由三個函數組成:sx_xlock()、sx_try_xlock()和sx_xunlock()。 函數void sx_xlock(struct sx *sx): mtx_lock(sx->sx_lock); while (sx->sx_cnt != 0) { sx->sx_excl_wcnt++; cv_wait(&sx->sx_excl_cv, sx->sx_lock); sx->sx_excl_wcnt--; } sx->sx_cnt--; sx->sx_xholder = curthread; mtx_unlock(sx->sx_lock); 該函數申請一個獨占鎖,在互斥體sx_lock的保護下,如果sx_cnt不等于0,則說明該鎖正被獨占或者共享占有,因此,將獨占等待計數器sx_excl_wcnt加1,調用cv_wait()函數等待條件變量sx_excl_cv滿足條件。當條件滿足后,獨占等待計數器sx_excl_wcnt減1,sx_cnt計數器減1,說明現在被獨占,將sx_xholder成員指向當前申請獨占鎖的線程地址。 函數int sx_try_xlock(struct sx *sx): mtx_lock(sx->sx_lock); if (sx->sx_cnt == 0) { sx->sx_cnt--; sx->sx_xholder = curthread; mtx_unlock(sx->sx_lock); return (1); } else { mtx_unlock(sx->sx_lock); return (0); } 該函數試圖獲得獨占鎖,如果可以則將sx_cnt計數器為0,則說明該鎖是自由的,可以申請獨占鎖,返回非零值,表示成功;如果不能,則說明該鎖正被獨占或是共享,直接返回0,表示失敗。 函數void sx_xunlock(struct sx *sx): mtx_lock(sx->sx_lock); sx->sx_cnt++; sx->sx_xholder = NULL; if (sx->sx_shrd_wcnt > 0) cv_broadcast(&sx->sx_shrd_cv); else if (sx->sx_excl_wcnt > 0) cv_signal(&sx->sx_excl_cv); mtx_unlock(sx->sx_lock); 該函數釋放獨占鎖,將sx_cnt加1,恢復到自由狀態(sx_cnt為0),斷開sx_xholder的指針鏈接。如果有共享鎖申請等待隊列(sx_shrd_wcnt大于0),則發一個廣播信號,喚醒所有等待在條件變量sx_shrd_cv上的線程;如果沒有共享鎖申請,則檢查是否有其他獨占鎖申請(sx_excl_wcnt大于0),如果有,則發一個信號,喚醒這個等待隊列的第一個線程。 另外,共享/獨占鎖支持相互的轉換,即共享鎖可以轉換為獨占鎖,獨占鎖也可以轉換為共享鎖。這是通過函數sx_try_upgrade()和sx_downgrade()實現的。 函數int sx_try_upgrade(struct sx *sx): mtx_lock(sx->sx_lock); if (sx->sx_cnt == 1) { sx->sx_cnt = -1; sx->sx_xholder = curthread; mtx_unlock(sx->sx_lock); return (1); } else { mtx_unlock(sx->sx_lock); return (0); } 該函數實現試圖將共享鎖轉換成獨占鎖。在sx_lock互斥體的保護下,如果當前有且僅有一個共享鎖,則可以轉換,轉換成功后,返回1,表示成功;否則返回0,表示失敗。 函數void sx_downgrade(struct sx *sx): mtx_lock(sx->sx_lock); sx->sx_cnt = 1; sx->sx_xholder = NULL; if (sx->sx_shrd_wcnt > 0) cv_broadcast(&sx->sx_shrd_cv); mtx_unlock(sx->sx_lock); 該函數實現將獨占鎖轉換成共享鎖。因為在獨占的情況下,是不可能有共享的情況存在。因此,直接將sx_cnt變為1,說明現在有一個共享鎖,斷開獨占鎖的線程鏈接。如果還有其他共享鎖申請,由于該鎖原來是獨占而被阻塞,因此,需要發一個廣播信號,通知阻塞線程隊列,現在可以共享。 5 信號燈(semaphore) 信號燈統計機制為訪問一組資源(資源池)提供了一種同步機制。不同于互斥體,信號燈并沒有持有者的概念,因此,信號燈常用于一個線程需要一個資源,而另一個線程需要釋放該資源的情況。記錄資源(釋放資源provider,該資源的信號燈加1)總是成功,而等待資源(申請資源consumer,該資源信號燈減1)只有在信號燈大于0的情況才能成功。信號燈的管理比互斥體復雜許多,因此花費更大,所以其效率不高。 5.1 信號燈數據結構 struct sema { struct mtx sema_mtx; /* General protection lock. */ struct cv sema_cv; /* Waiters. */ int sema_waiters; /* Number of waiters. */ int sema_value; /* Semaphore value. */ }; 成員sema_mtx用于保護對sema結構的操作。成員sema_cv是等待信號燈的條件變量,成員sema_waiters統計等待信號燈的線程數目。成員sema_value記錄信號燈的值。 5.2 信號燈的函數 函數void sema_init(struct sema *sema, int value, const char *description): bzero(sema, sizeof(*sema)); mtx_init(&sema->sema_mtx, description, "sema backing lock", MTX_DEF | MTX_NOWITNESS | MTX_QUIET); cv_init(&sema->sema_cv, description); sema->sema_value = value; 該函數用于初始化一個信號燈。初始化該sema的內部互斥體sema_mtx和條件變量sema_cv。而信號燈計數器sema_value的值則是通過入參value獲得。 函數void sema_destroy(struct sema *sema): mtx_destroy(&sema->sema_mtx); cv_destroy(&sema->sema_cv); 該函數用于銷毀一個信號燈。該函數包含銷毀互斥體sema_mtx和條件變量sema_cv。 函數void sema_post(struct sema *sema): mtx_lock(&sema->sema_mtx); sema->sema_value++; if (sema->sema_waiters && sema->sema_value > 0) cv_signal(&sema->sema_cv); mtx_unlock(&sema->sema_mtx); 該函數用于增加信號燈計數器sema_value的值,如果目前有其他線程阻塞于該信號燈,并且該信號燈計數器大于0。則發一個信號,喚醒等待該信號燈隊列的第一個線程。 函數void sema_wait(struct sema *sema): mtx_lock(&sema->sema_mtx); while (sema->sema_value == 0) { sema->sema_waiters++; cv_wait(&sema->sema_cv, &sema->sema_mtx); sema->sema_waiters--; } sema->sema_value--; mtx_unlock(&sema->sema_mtx); 該函數申請得到一個信號燈。如果該信號燈的計數器sema_value為0,則該信號燈代表的資源不可再申請,則將該線程加入sema_cv條件變量的等待隊列中,同時將該信號燈的等待計數器加1。當得到該信號燈后,將等待計數器sema_waiters減1,同時將該信號燈計數器sema_value減1,說明該信號燈代表的資源已經被申請了一次。 函數 int sema_timedwait(struct sema *sema, int timo): mtx_lock(&sema->sema_mtx); for (timed_out = 0; sema->sema_value == 0 && timed_out == 0 { sema->sema_waiters++; timed_out = cv_timedwait(&sema->sema_cv, &sema->sema_mtx, timo); sema->sema_waiters--; } if (sema->sema_value > 0) { sema->sema_value--; ret = 1; } else { ret = 0; } mtx_unlock(&sema->sema_mtx); 該函數和上一個函數功能一樣,只不過它有等待時間的限定。如果成功獲得該信號燈,則返回1。如果不成功,則返回0。 函數int sema_trywait(struct sema *sema): mtx_lock(&sema->sema_mtx); if (sema->sema_value > 0) { sema->sema_value--; ret = 1; } else { ret = 0; } mtx_unlock(&sema->sema_mtx); 該函數試圖獲得一個信號燈,如果不能,立即返回0,能則返回1。 函數int sema_value(struct sema *sema): mtx_lock(&sema->sema_mtx); ret = sema->sema_value; mtx_unlock(&sema->sema_mtx); 該函數用于獲得信號燈的計數器值。因為信號燈的值隨時有可能變化,因此,得到sema_value的值必須在sema_mtx的保護下。 6 lockmgr鎖機制(lock) lockmgr鎖提供了多重共享鎖機制,支持共享鎖向獨占鎖的轉變功能。其等待lockmgr鎖可用的過程,采用msleep機制(參考 msleep(9)) - 基于事件的進程阻塞機制。因此,lockmgr鎖實現所謂的加鎖/解鎖,是通過lockmgr鎖的上行(upgrade)/下行(downgrade)操作,這是前面提及的鎖機制的策略變化。和前面鎖機制不同的另一個方面是,其調用者是進程。 6.1 lockmgr鎖的數據結構 我們略過DEBUG相關的調試成員。 struct lock { struct mtx *lk_interlock; /* lock on remaining fields */ u_int lk_flags; /* see below */ int lk_sharecount; /* # of aclearcase/" target="_blank" >ccepted shared locks */ int lk_waitcount; /* # of processes sleeping for lock */ short lk_exclusivecount; /* # of recursive exclusive locks */ short lk_prio; /* priority at which to sleep */ const char *lk_wmesg; /* resource sleeping (for tsleep) */ int lk_timo; /* maximum sleep time (for tsleep) */ pid_t lk_lockholder; /* pid of exclusive lock holder */ struct lock *lk_newlock; /* lock taking over this lock */ } 成員lk_interlock用戶保護lock結構內部成員的操作,我們稍后結合lockmgr鎖的初始化再討論。 成員lk_flags在初始化的時候代表了該鎖的類型,在通過lockmgr管理該鎖時,也可以作為入參表示其期望的操作。 #define LK_TYPE_MASK 0x0000000f /* type of lock sought */ #define LK_SHARED 0x00000001 /* shared lock */ #define LK_EXCLUSIVE 0x00000002 /* exclusive lock */ #define LK_UPGRADE 0x00000003 /* shared-to-exclusive upgrade */ #define LK_EXCLUPGRADE 0x00000004 /* first shared-to-exclusive upgrade */ #define LK_DOWNGRADE 0x00000005 /* exclusive-to-shared downgrade */ #define LK_RELEASE 0x00000006 /* release any type of lock */ #define LK_DRAIN 0x00000007 /* wait for all lock activity to end */ #define LK_EXCLOTHER 0x00000008 /* other process holds lock */ LK_SHARED:一個進程獲得一個共享lockmgr鎖,如果該進程獨占持有該lockmgr鎖,則需要下行處理,使其變為共享鎖。 LK_EXCLUSIVE:當lockmgr鎖被獨占時,是不允許共享申請的。如果現在有共享存在,則需要將其上行處理為獨占。通常僅有一個獨占狀態存在,但是,一個持有獨占lockmgr鎖的進程,在其設置了LK_CANRECURSE標志的情況下,可以申請額外的獨占鎖。 LK_UPGRADE:一個持有共享狀態的lockmgr鎖可以通過上行處理,將共享態變為獨占態。我們需要在意的是,在上行處理的過程中,其它進程也同樣有機會得到獨占機會。 LK_EXCLUPGRADE:和LK_UPGRADE意義一樣,只不過在上行過程中,其它進程不可能獲得獨占機會。 LK_DOWNGRADE:一個獨占持有lockmgr鎖的進程,可以通過下行處理,將其轉換為共享狀態。如果該進程持有遞歸獨占鎖,那么所有的獨占狀態都需要下行處理。 LK_RELEASE:釋放一個lockmgr鎖實體。 LK_DRAIN:這用于等待該鎖所有活動狀態結束,并標識為退役,一般用在釋放鎖之前。 LK_EXCLOTHER:一般用在函數lockstatus()返回里,說明該鎖正被其他進程獨占。 關于這些標識,我們在后面結合函數更詳細的說明。 成員lk_sharecount統計該鎖已被接受的共享狀態的數目。 成員lk_waitcount是阻塞于該鎖的進程數目。 成員lk_exclusivecount說明了遞歸獨占該鎖的數目。 成員lk_prio,如果某個進程不能獲得該鎖,而不得不sleep,那么它應該sleep在什么優先級的隊列上,是由該成員指出的。 成員lk_wmesg是sleep的說明性文字。 成員lk_timo說明了該進程最大的sleep時間。 成員lk_lockholder,如果該鎖被某個進程獨占,該成員說明了該進程ID。 成員lk_newlock,如果該鎖被其他鎖托管,則該成員指向托管鎖的地址。 6.2 lockmgr鎖的函數接口 函數void lockinit(struct lock *lkp, int prio, const char *wmesg, int timo, int flags): 初始化一個lockmgr鎖。 if (lock_mtx_valid == 0) { mtx_init(&lock_mtx, "lockmgr", NULL, MTX_DEF); lock_mtx_valid = 1; } 這段代碼的目的是初始化全局互斥體lock_mtx。通常,該段代碼永遠不會調用。因為,在系統啟動時,通過宏SYSINIT(lmgrinit, SI_SUB_LOCK, SI_ORDER_FIRST, lockmgr_init, NULL)注冊lock子系統時,在lockmgr_init()函數中已經初始化了lock_mtx,并且將lock_mtx_valid設置為1了。 if (mtx_pool_valid) { mtx_lock(&lock_mtx); lkp->lk_interlock = mtx_pool_alloc(); mtx_unlock(&lock_mtx); } else { lkp->lk_interlock = &lock_mtx; } 通過這段代碼,我們可以明白lock_mtx的作用,如果系統支持互斥體池,則lock_mtx用于保護從互斥體池中獲得互斥體A的過程,由互斥體A來保護該lockmgr鎖的操作;如果系統不支持互斥體,由lock_mtx充當lockmgr鎖的內部成員操作的保護機制。 lkp->lk_flags = (flags & LK_EXTFLG_MASK); lkp->lk_sharecount = 0; lkp->lk_waitcount = 0; lkp->lk_exclusivecount = 0; lkp->lk_prio = prio; lkp->lk_wmesg = wmesg; lkp->lk_timo = timo; lkp->lk_lockholder = LK_NOPROC; lkp->lk_newlock = NULL; 初始化lock結構的其他成員,lk_flags 、lk_prio、lk_wmesg和lk_timo由入參指定。 函數void lockdestroy(struct lock *lkp): 該函數用于銷毀一個lockmgr鎖,但是目前沒有任何操作。 函數int lockcount(struct lock *lkp): mtx_lock(lkp->lk_interlock); count = lkp->lk_exclusivecount + lkp->lk_sharecount; mtx_unlock(lkp->lk_interlock); 該函數用于統計目前共享和獨占持有該鎖的數目。 函數int lockstatus(struct lock *lkp, struct thread *td): mtx_lock(lkp->lk_interlock); if (lkp->lk_exclusivecount != 0) { if (td == NULL || lkp->lk_lockholder == td->td_proc->p_pid) lock_type = LK_EXCLUSIVE; else lock_type = LK_EXCLOTHER; } else if (lkp->lk_sharecount != 0) lock_type = LK_SHARED; mtx_unlock(lkp->lk_interlock); 該函數用于得到目前鎖的狀態。如果存在遞歸調用獨占狀態,則檢查入參td是否為空,如果為空,或是不為空,并且該線程的進程ID和lk_lockholder一樣,則說明該鎖被當前進程占有;否則該鎖被其它進程獨占。如果不存在獨占,如果共享統計不為0,則說明該鎖被共享占有。如果什么都不是,則說明該鎖是自由的,返回0。 函數int lockmgr(struct lock *lkp, u_int flags, struct mtx *interlkp, struct thread *td): 該函數是lockmgr鎖里最關鍵、最復雜的函數,是lock機制管理的核心。 error = 0; if (td == NULL) pid = LK_KERNPROC; else pid = td->td_proc->p_pid; mtx_lock(lkp->lk_interlock); if (flags & LK_INTERLOCK) { mtx_unlock(interlkp); } 這段代碼根據td的入參,設定pid的值,根據輸入的flags是否含有LK_INTERLOCK值,決定是否需要輸入的互斥體interlkp的保護。如果指明了LK_INTERLOCK,則說明就用lk_interlock保護就夠了。 extflags = (flags | lkp->lk_flags) & LK_EXTFLG_MASK; 該語句用于分離lock的額外標志。這里有必要介紹一下lock的額外標志: #define LK_NOWAIT 0x00000010 /* do not sleep to await lock */ #define LK_SLEEPFAIL 0x00000020 /* sleep, then return failure */ #define LK_CANRECURSE 0x00000040 /* allow recursive exclusive lock */ #define LK_REENABLE 0x00000080 /* lock is be reenabled after drain */ #define LK_NOPAUSE 0x01000000 /* no spinloop */ #define LK_TIMELOCK 0x02000000 /* use lk_timo, else no timeout */ LK_NOWAIT、LK_SLEEPFAIL和LK_CANRECURSE通常實在lockinit()時設置的。當一個lockmgr鎖已經設置了LK_DRAIN 后,說明該鎖準備釋放,但是可以通過LK_REENABLE,重新使用該鎖。LK_NOPAUSE說明該鎖不支持自旋等待。LK_TIMELOCK說明該鎖支持等待時間的設定,具體時間是通過lk_timo獲得。 函數lockmgr()完成預處理后,主要是通過一個很大的switch (flags & LK_TYPE_MASK)語句來處理不同的管理要求。下面我們根據入參flags含有的標識來說明: LK_SHARED: if (lkp->lk_lockholder != pid) { lockflags = LK_HAVE_EXCL; mtx_lock_spin(&sched_lock); if (td != NULL && !(td->td_flags & TDF_DEADLKTREAT)) lockflags |= LK_WANT_EXCL | LK_WANT_UPGRADE; mtx_unlock_spin(&sched_lock); error = acquire(&lkp, extflags, lockflags); if (error) break; sharelock(lkp, 1); break; } sharelock(lkp, 1); LK_DOWNGRADE: sharelock(lkp, lkp->lk_exclusivecount); lkp->lk_exclusivecount = 0; lkp->lk_flags &= ~LK_HAVE_EXCL; lkp->lk_lockholder = LK_NOPROC; if (lkp->lk_waitcount) wakeup((void *)lkp); break; 這段代碼由兩個部分組成,如果是LK_SHARED的標識,則需要執行這正段代碼;如果是LK_DOWNGRADE,則只用執行后半段。 我們考慮LK_SHARED的情況,函數sharelock(struct lock *lkp, int incr)就是將LK_SHARE_NONZERO賦給該lkp的lk_flags成員,并將lk_sharecount加incr。 在什么情況下,lkp->lk_lockholder != pid不成立呢? 只有在該鎖lkp正被當前進程獨占的情況下,lkp->lk_lockholder才等于pid。這時,根據LK_SHARED的意義,我們不僅需要調用增加這次共享請求的計數(sharelock函數),而且需要下行處理,故而需要執行LK_DOWNGRADE的功能。將所有獨占態改變為共享態,并試圖喚醒所有阻塞于該lkp的所有進程。 當上述判斷成立,則有兩種情況:A、該鎖本就是共享態;B、該鎖被其他進程獨占。無論是哪種情況,都涉及到lockmgr鎖的內部函數acquire(),因此,有必要在這里插入對該函數的分析。 函數static int acquire(struct lock **lkpp, int extflags, int wanted): if ((extflags & LK_NOWAIT) && (lkp->lk_flags & wanted)) { return EBUSY; } if (((lkp->lk_flags | extflags) & LK_NOPAUSE) == 0) { error = apause(lkp, wanted); if (error == 0) return 0; } 如果該鎖的額外標識了不必sleep來等待獲得該鎖(LK_NOWAIT),并且該鎖lk_flags標志含有想申請的值(wanted),則直接返回EBUSY,申請失敗。 如果所有的標志都沒有含有LK_NOPAUSE(不支持自旋等待),則調用apause()函數在解鎖(lk_interlock)的情況下等待一段時間直到lk_flags不再含有wanted的標志,成功返回0,失敗返回1。 我們考慮情況A,在這里就會成功返回。再考慮在lockmgr()函數里,會調用sharelock()完成LK_SHARED的設置。 函數acquire()余下的部分是等待該鎖可用(lk_flags不再含有wanted的標志)。所有的等待操作在splhigh的保護下進行(參考 splhigh(9))。下面考慮等待主體部分。 lkp->lk_flags |= LK_WAIT_NONZERO; lkp->lk_waitcount++; error = msleep(lkp, lkp->lk_interlock, lkp->lk_prio, lkp->lk_wmesg, ((extflags & LK_TIMELOCK) ? lkp->lk_timo : 0)); 添加LK_WAIT_NONZERO標志到lk_flags中,等待計數器lk_waitcount加1,調用msleep等待lkp滿足條件。 if (lkp->lk_waitcount == 1) { lkp->lk_flags &= ~LK_WAIT_NONZERO; lkp->lk_waitcount = 0; } else { lkp->lk_waitcount--; } if (error) { splx(s); return error; } 當條件滿足后,如果lk_waitcount為1,清除LK_WAIT_NONZERO標志,lk_waitcount計為0。如果lk_waitcount不為1,則只是lk_waitcount減1(有可能別的進程也在等待)。如果出錯,則返回出錯值。 if (extflags & LK_SLEEPFAIL) { splx(s); return ENOLCK; } 如果設置了LK_SLEEPFAIL,則說明只要sleep了,就算出錯,返回ENOLOCK。 if (lkp->lk_newlock != NULL) { mtx_lock(lkp->lk_newlock->lk_interlock); mtx_unlock(lkp->lk_interlock); if (lkp->lk_waitcount == 0) wakeup((void *)(&lkp->lk_newlock)); *lkpp = lkp = lkp->lk_newlock; } 如果有其他鎖托管了該鎖,則做轉換。 現在,我們已經了解了acquire()函數的功能,考慮情況B,則根據acquire()的返回,如果沒出錯,則說明,該鎖的獨占已經釋放,可以共享占有,同樣調用sharelock()函數實現共享。我們接下來分析lockmgr()函數switch中的其它情況。 根據LK_EXCLUPGRADE和LK_UPGRADE的區別,就是前者在上行處理時,多了獨占的考慮。因此有一段預處理,如下: LK_EXCLUPGRADE: if (lkp->lk_flags & LK_WANT_UPGRADE) { shareunlock(lkp, 1); error = EBUSY; break; } 首先判斷lk_flags是否含有LK_WANT_UPGRADE標識,如果含有說明有其他進程試圖上行處理該鎖,那么調用shareunlock()函數釋放該鎖,說明不能該進程不能LK_EXCLUPGRADE,返回EBUSY。如果該進程可以獨占上行處理,余下的處理和一般上行處理一致。 LK_UPGRADE: if ((lkp->lk_lockholder == pid) || (lkp->lk_sharecount <= 0)) panic("lockmgr: upgrade exclusive lock"); shareunlock(lkp, 1); 如果lk_lockholder等于pid,說明該鎖被獨占,或者是lk_sharecount小于0,說明該鎖沒有被共享占有,則說明出錯。不出錯,說明當前至少有一個共享鎖,為了上行處理,先對該鎖的共享占有解鎖一次。 if ((extflags & LK_NOWAIT) && ((lkp->lk_flags & LK_WANT_UPGRADE) || lkp->lk_sharecount > 1)) { error = EBUSY; break; } 如果額外標志包含LK_NOWAIT(不支持sleep等待該鎖),并且lk_flags標志包含LK_WANT_UPGRADE(說明有其他進程正在上行處理)或者lk_sharecount大于1(說明不止一個共享狀態),那么認為不能上行處理,返回EBUSY。 if ((lkp->lk_flags & LK_WANT_UPGRADE) == 0) { lkp->lk_flags |= LK_WANT_UPGRADE; error = acquire(&lkp, extflags, LK_SHARE_NONZERO); lkp->lk_flags &= ~LK_WANT_UPGRADE; if (error) break; lkp->lk_flags |= LK_HAVE_EXCL; lkp->lk_lockholder = pid; if (lkp->lk_exclusivecount != 0) panic("lockmgr: non-zero exclusive count"); lkp->lk_exclusivecount = 1; break; } 如果lk_falgs不包含LK_WANT_UPGRADE,則目前沒有其他進程上行,本進程可以設置LK_WANT_UPGRADE,說明本進程正在上行處理。然后,調用acquire()函數等待該鎖可用,成功等待后,設置LK_HAVE_EXCL(說明獲得獨占態的鎖),設置lk_lockholder。最后判斷lk_exclusivecount是否為0,不為0,則出現異常錯誤。為0,則完全成功返回。 就LK_EXCLUPGRADE而言,如果能獲得,執行到這里,就可以成功。 if ( (lkp->lk_flags & (LK_SHARE_NONZERO|LK_WAIT_NONZERO)) == LK_WAIT_NONZERO) wakeup((void *)lkp); 程序走到這一步,說明別的進程B在上行處理,通過該語句判斷說明,本進程是最后一個持有共享態的該鎖,由于在前面已經解鎖了,因此可以喚醒進程B繼續上行處理。 就LK_UPGRADE操作而言,如果執行到這一步,說明別的進程正在上行處理。無論別的進程在做什么,LK_UPGRADE余下的操作和LK_EXCLUSIVE操作并無二樣。 LK_EXCLUSIVE: if (lkp->lk_lockholder == pid && pid != LK_KERNPROC) { if ((extflags & (LK_NOWAIT | LK_CANRECURSE)) == 0) panic("lockmgr: locking against myself"); if ((extflags & LK_CANRECURSE) != 0) { lkp->lk_exclusivecount++; break; } } 如果該鎖已經被本進程獨占持有,而且該鎖支持LK_CANRECURSE,只須將lk_exclusivecount計數器加1。 if ((extflags & LK_NOWAIT) && (lkp->lk_flags & (LK_HAVE_EXCL | LK_WANT_EXCL | LK_WANT_UPGRADE | LK_SHARE_NONZERO))) { error = EBUSY; break; } 如果該進程不支持等待該鎖(LK_NOWAIT),并且該鎖已經獲得獨占態、或正在上行處理、或共享態存在,則返回EBUSY,不能獲得。 error = acquire(&lkp, extflags, (LK_HAVE_EXCL | LK_WANT_EXCL)); if (error) break; lkp->lk_flags |= LK_WANT_EXCL; error = acquire(&lkp, extflags, LK_WANT_UPGRADE | LK_SHARE_NONZERO); lkp->lk_flags &= ~LK_WANT_EXCL; if (error) break; lkp->lk_flags |= LK_HAVE_EXCL; lkp->lk_lockholder = pid; if (lkp->lk_exclusivecount != 0) panic("lockmgr: non-zero exclusive count"); lkp->lk_exclusivecount = 1; break; 這段代碼,第一次調用acquire()函數,等待其他進程獨占該鎖使用完成;第二次調用acquire()函數是等待共享使用鎖的進程完成。當這些處理完后,說明獨占獲得該鎖。 LK_RELEASE: if (lkp->lk_exclusivecount != 0) { if (lkp->lk_lockholder != pid && lkp->lk_lockholder != LK_KERNPROC) { panic("lockmgr: pid %d, not %s %d unlocking", pid, "exclusive lock holder", lkp->lk_lockholder); } if (lkp->lk_exclusivecount == 1) { lkp->lk_flags &= ~LK_HAVE_EXCL; lkp->lk_lockholder = LK_NOPROC; lkp->lk_exclusivecount = 0; } else { lkp->lk_exclusivecount--; } } else if (lkp->lk_flags & LK_SHARE_NONZERO) shareunlock(lkp, 1); if (lkp->lk_flags & LK_WAIT_NONZERO) wakeup((void *)lkp); break; 首先考慮該鎖獨占計數器lk_exclusivecount不為0(說明該鎖被獨占使用,不可能還有共享態存在),如果lk_lockholder不等于pid,而且也不等于LK_KERNPROC,這是lockmgr鎖規則不允許的,出錯。如果該計數器lk_exclusivecount為1,無論該鎖是否支持遞歸調用,都是最后一個獨占態,應該將該鎖狀態還原為初始態。如果該計數器lk_exclusivecount不為0,說明該鎖被本進程遞歸調用,而且還有其他獨占態存在,只須將lk_exclusivecount減1就可以了。 考慮該鎖被共享使用(LK_SHARE_NONZERO),調用shareunlock()函數處理就可以了。 如果該鎖的lk_flags 標志含有LK_WAIT_NONZERO,說明有其他進程正阻塞于該鎖,試圖喚醒這些進程。處理完成。 LK_DRAIN: if (lkp->lk_lockholder == pid) panic("lockmgr: draining against myself"); error = acquiredrain(lkp, extflags); if (error) break; lkp->lk_flags |= LK_DRAINING | LK_HAVE_EXCL; lkp->lk_lockholder = pid; lkp->lk_exclusivecount = 1; break; 如果lk_lockholder等于pid,說明該鎖正被本進程獨占持有,而本進程不能等待本進程該鎖獨占活動狀態結束,所以出錯。 調用acquiredrain()函數等待該鎖所有狀態的操作完成。如果成功,則善后處理。 如果lockmgr的操作不是以上需求,則出錯(switch語句的default處理)。 if ((lkp->lk_flags & LK_WAITDRAIN) && (lkp->lk_flags & (LK_HAVE_EXCL | LK_WANT_EXCL | LK_WANT_UPGRADE | LK_SHARE_NONZERO | LK_WAIT_NONZERO)) == 0) { lkp->lk_flags &= ~LK_WAITDRAIN; wakeup((void *)&lkp->lk_flags); } mtx_unlock(lkp->lk_interlock); 最后這段代碼是上述操作成功完成后,最后處理部分。如果lk_flags標志含有LK_WAITDRAIN(說明有其他進程B等待結束鎖的活動狀態),并且所有的活動狀態確實結束了,則喚醒進程B。 至此,lockmgr()函數的討論結束。 函數void lockmgr_printinfo(lkp): if (lkp->lk_sharecount) printf(" lock type %s: SHARED (count %d)", lkp->lk_wmesg, lkp->lk_sharecount); else if (lkp->lk_flags & LK_HAVE_EXCL) printf(" lock type %s: EXCL (count %d) by pid %d", lkp->lk_wmesg, lkp->lk_exclusivecount, lkp->lk_lockholder); if (lkp->lk_waitcount > 0) printf(" with %d pending", lkp->lk_waitcount); 該函數是用于打印lockmgr鎖的當前信息,十分簡單。 7. 關于FreeBSD5.0的鎖機制再討論 A、盡量使用sleep類型(MTX_DEF)的互斥體機制,目前自旋鎖(MTX_SPIN)主要用于SMP調度方面。 B、在持有一個互斥體(不是Giant)的時候,不要使用tsleep(),因為在該函數中,除了Giant以外,不會釋放任何其他互斥體。 C、當持有一個互斥體A(不包括Giant)的時候,在使用msleep()和cv_wait()函數的時候,應該將A作為入參傳入。在這兩個函數里,會一開始釋放該互斥體A,在函數結束的時候,對互斥體A重新加鎖。 D、盡量避免使用遞歸鎖機制。

    原文轉自:http://www.kjueaiud.com

    評論列表(網友評論僅供網友表達個人看法,并不表明本站同意其觀點或證實其描述)
    老湿亚洲永久精品ww47香蕉图片_日韩欧美中文字幕北美法律_国产AV永久无码天堂影院_久久婷婷综合色丁香五月

  • <ruby id="5koa6"></ruby>
    <ruby id="5koa6"><option id="5koa6"><thead id="5koa6"></thead></option></ruby>

    <progress id="5koa6"></progress>

  • <strong id="5koa6"></strong>