ffplay用PacketQueue保存解封裝後的數據,即保存AVPacket。

ffplay首先定義了一個結構體MyAVPacketList

typedef struct MyAVPacketList {
AVPacket pkt;//解封裝後的數據
struct MyAVPacketList *next;//下一個節點
int serial;//序列號
} MyAVPacketList;

可以理解為是隊列的一個節點。可以通過其next欄位訪問下一個節點。

所以這裡我認為命名為MyAVPacketNode更為合理

serial欄位主要用於標記當前節點的序列號,ffplay中多處用到serial的概念,一般用於區分是否連續數據。在後面的代碼分析中我們還會看到它的作用。

接著定義另一個結構體PacketQueue

typedef struct PacketQueue {
MyAVPacketList *first_pkt, *last_pkt;//隊首,隊尾
int nb_packets;//隊列中一共有多少個節點
int size;//隊列所有節點位元組總數,用於計算cache大小
int64_t duration;//隊列所有節點的合計時長
int abort_request;//是否要中止隊列操作,用於安全快速退出播放
int serial;//序列號,和MyAVPacketList的serial作用相同,但改變的時序稍微有點不同
SDL_mutex *mutex;//用於維持PacketQueue的多線程安全(SDL_mutex可以按pthread_mutex_t理解)
SDL_cond *cond;//用於讀、寫線程相互通知(SDL_cond可以按pthread_cond_t理解)
} PacketQueue;

這個結構體內定義了「隊列」自身的屬性。上面的注釋對每個欄位作了簡單的介紹,接下來我們從隊列的操作函數具體分析各個欄位的含義。

PacketQueue操作提供以下方法:

  • packet_queue_init:初始化
  • packet_queue_destroy:銷毀
  • packet_queue_start:啟用
  • packet_queue_abort:中止
  • packet_queue_get:獲取一個節點
  • packet_queue_put:存入一個節點
  • packet_queue_put_nullpacket:存入一個空節點
  • packet_queue_flush:清除隊列內所有的節點

初始化用於初始各個欄位的值,並創建mutex和cond:

static int packet_queue_init(PacketQueue *q)
{
memset(q, 0, sizeof(PacketQueue));
q->mutex = SDL_CreateMutex();
if (!q->mutex) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateMutex(): %s
", SDL_GetError());
return AVERROR(ENOMEM);
}
q->cond = SDL_CreateCond();
if (!q->cond) {
av_log(NULL, AV_LOG_FATAL, "SDL_CreateCond(): %s
", SDL_GetError());
return AVERROR(ENOMEM);
}
q->abort_request = 1;
return 0;
}

相應的,銷毀過程負責清理mutex和cond:

static void packet_queue_destroy(PacketQueue *q)
{
packet_queue_flush(q);//先清除所有的節點
SDL_DestroyMutex(q->mutex);
SDL_DestroyCond(q->cond);
}

啟用隊列:

static void packet_queue_start(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 0;
packet_queue_put_private(q, &flush_pkt);//這裡放入了一個flush_pkt
SDL_UnlockMutex(q->mutex);
}

flush_pkt定義是static AVPacket flush_pkt;,是一個特殊的packet,主要用來作為非連續的兩端數據的「分界」標記。

中止隊列:

static void packet_queue_abort(PacketQueue *q)
{
SDL_LockMutex(q->mutex);
q->abort_request = 1;
SDL_CondSignal(q->cond);//釋放一個條件信號
SDL_UnlockMutex(q->mutex);
}

這裡SDL_CondSignal的作用在於確保當前等待該條件的線程能被激活並繼續執行退出流程。

讀、寫是PacketQueue的主要方法。

先看寫——往隊列中放入一個節點:

static int packet_queue_put(PacketQueue *q, AVPacket *pkt)
{
int ret;

SDL_LockMutex(q->mutex);
ret = packet_queue_put_private(q, pkt);//主要實現在這裡
SDL_UnlockMutex(q->mutex);

if (pkt != &flush_pkt && ret < 0)
av_packet_unref(pkt);//放入失敗,釋放AVPacket

return ret;
}

主要實現在函數packet_queue_put_private,這裡需要注意的是如果放入失敗,需要釋放AVPacket。

static int packet_queue_put_private(PacketQueue *q, AVPacket *pkt)
{
MyAVPacketList *pkt1;

if (q->abort_request)//如果已中止,則放入失敗
return -1;

pkt1 = av_malloc(sizeof(MyAVPacketList));//分配節點內存
if (!pkt1)//內存不足,則放入失敗
return -1;
pkt1->pkt = *pkt;//拷貝AVPacket(淺拷貝,AVPacket.data等內存並沒有拷貝)
pkt1->next = NULL;
if (pkt == &flush_pkt)//如果放入的是flush_pkt,需要增加隊列的序列號,以區分不連續的兩段數據
q->serial++;
pkt1->serial = q->serial;//用隊列序列號標記節點

//隊列操作:如果last_pkt為空,說明隊列是空的,新增節點為隊頭;否則,隊列有數據,則讓原隊尾的next為新增節點。 最後將隊尾指向新增節點
if (!q->last_pkt)
q->first_pkt = pkt1;
else
q->last_pkt->next = pkt1;
q->last_pkt = pkt1;

//隊列屬性操作:增加節點數、cache大小、cache總時長
q->nb_packets++;
q->size += pkt1->pkt.size + sizeof(*pkt1);
q->duration += pkt1->pkt.duration;

/* XXX: should duplicate packet data in DV case */
//發出信號,表明當前隊列中有數據了,通知等待中的讀線程可以取數據了
SDL_CondSignal(q->cond);
return 0;
}

對packet_queue_put_private筆者增加了詳細注釋,應該比較容易理解了。

主要完成3件事:

  1. 計算serial。serial標記了這個節點內的數據是何時的。一般情況下新增節點與上一個節點的serial是一樣的,但當隊列中加入一個flush_pkt後,後續節點的serial會比之前大1.
  2. 隊列操作。經典的隊列實現方式,不展開了。
  3. 隊列屬性操作。更新隊列中節點的數目、佔用位元組數(含AVPacket.data的大小)及其時長。

再來看讀——從隊列中取一個節點:

/* return < 0 if aborted, 0 if no packet and > 0 if packet. */
//block: 調用者是否需要在沒節點可取的情況下阻塞等待
//AVPacket: 輸出參數,即MyAVPacketList.pkt
//serial: 輸出參數,即MyAVPacketList.serial
static int packet_queue_get(PacketQueue *q, AVPacket *pkt, int block, int *serial)
{
MyAVPacketList *pkt1;
int ret;

SDL_LockMutex(q->mutex);

for (;;) {
if (q->abort_request) {
ret = -1;
break;
}

//......這裡是省略的代碼,取一個節點,然後break
}
SDL_UnlockMutex(q->mutex);
return ret;
}

函數較長,我們先省略for循環的主體部分,簡單看下函數整體流程。整體流程比較清晰:加鎖,進入循環,如果此時需要退出,則break,返回-1;否則,取一個節點,然後break。

這裡for循環主要充當一個「殼」,以方便在一塊多分支代碼中可以通過break調到統一的出口。

對於加鎖情況下的多分支return,這是一個不錯的寫法。但要小心這是一把雙刃劍,沒有仔細處理每個分支,容易陷入死循環。

然後看for的主體:

pkt1 = q->first_pkt;//MyAVPacketList *pkt1; 從隊頭拿數據
if (pkt1) {//隊列中有數據
q->first_pkt = pkt1->next;//隊頭移到第二個節點
if (!q->first_pkt)
q->last_pkt = NULL;
q->nb_packets--;//節點數減1
q->size -= pkt1->pkt.size + sizeof(*pkt1);//cache大小扣除一個節點
q->duration -= pkt1->pkt.duration;//總時長扣除一個節點
*pkt = pkt1->pkt;//返回AVPacket,這裡發生一次AVPacket結構體拷貝,AVPacket的data只拷貝了指針
if (serial)//如果需要輸出serial,把serial輸出
*serial = pkt1->serial;
av_free(pkt1);//釋放節點內存
ret = 1;
break;
} else if (!block) {//隊列中沒有數據,且非阻塞調用
ret = 0;
break;
} else {//隊列中沒有數據,且阻塞調用
SDL_CondWait(q->cond, q->mutex);//這裡沒有break。for循環的另一個作用是在條件變數滿足後重複上述代碼取出節點
}

我們知道隊列是一個先進先出的模型,所以從隊頭拿數據。對於沒有取到數據的情況,根據block參數進行判斷是否阻塞,如果阻塞,通過SDL_CondWait等待信號。

如果有取到數據,主要分3個步驟:

  1. 隊列操作:轉移隊頭、扣除大小。這裡nb_packetsduration的運算較明顯,size需要注意也要扣除AVPacket的size
  2. 給輸出參數賦值:基本就是MyAVPacketList拍平傳遞給輸出參數pkt和serial即可
  3. 釋放節點內存:釋放放入隊列時申請的節點內存

最後是提供了幾個"util"方法:

packet_queue_put_nullpacket放入「空包」。放入空包意味著流的結束,一般在視頻讀取完成的時候放入空包。該函數的實現很明了,構建一個空包,然後調用packet_queue_put:

static int packet_queue_put_nullpacket(PacketQueue *q, int stream_index)
{
AVPacket pkt1, *pkt = &pkt1;
av_init_packet(pkt);
pkt->data = NULL;
pkt->size = 0;
pkt->stream_index = stream_index;
return packet_queue_put(q, pkt);
}

packet_queue_flush用於將隊列中的所有節點清除。比如用於銷毀隊列、seek操作等。

static void packet_queue_flush(PacketQueue *q)
{
MyAVPacketList *pkt, *pkt1;

SDL_LockMutex(q->mutex);
for (pkt = q->first_pkt; pkt; pkt = pkt1) {
pkt1 = pkt->next;
av_packet_unref(&pkt->pkt);
av_freep(&pkt);
}
q->last_pkt = NULL;
q->first_pkt = NULL;
q->nb_packets = 0;
q->size = 0;
q->duration = 0;
SDL_UnlockMutex(q->mutex);
}

函數主體的for循環是隊列遍歷,遍歷過程釋放節點和AVPacket。最後將PacketQueue的屬性恢復為空隊列狀態。

至此,我們分析了PacketQueue的實現和主要的操作方法。

現在總結下兩個關鍵的點:

第一,PacketQueue的內存管理:

MyAVPacketList的內存是完全由PacketQueue維護的,在put的時候malloc,在get的時候free。

AVPacket分兩塊,一部分是AVPacket結構體的內存,這部分從MyAVPacketList的定義可以看出是和MyAVPacketList共存亡的。另一部分是AVPacket欄位指向的內存,這部分一般通過av_packet_unref函數釋放。一般情況下,是在get後由調用者負責用av_packet_unref函數釋放。特殊的情況是當碰到packet_queue_flush或put失敗時,這時需要隊列自己處理。

第二,serial的變化過程:

如上圖所示,左邊是隊頭,右邊是隊尾,從左往右標註了5個節點的serial,以及放入對應節點時queue的serial。

可以看到放入flush_pkt的時候後,serial增加了1.

要區分的是上圖雖然看起來queue的serial和節點的serial是相等的,但這是放入時相等,在取出時是不等的。假設,現在要從隊頭取出一個節點,那麼取出的節點是serial 1,而PacketQueue自身的queue已經增長到了2.

代碼背後的設計思路:

  1. 設計一個多線程安全的隊列,保存AVPacket,同時統計隊列內已緩存的數據大小。(這個統計數據會用來後續設置要緩存的數據量)
  2. 引入serial的概念,區別前後數據包是否連續
  3. 設計了兩類特殊的packet——flush_pkt和nullpkt,用於更細緻的控制(類似用於多線程編程的事件模型——往隊列中放入flush事件、放入null事件)

推薦閱讀:

相关文章