# Copyright 2021 JD.com, Inc., JD AI
"""
@author: Yehao Li, Jianjie Luo
@contact: yehaoli.sysu@gmail.com, jianjieluo.sysu@gmail.com
"""
import os
import csv
import copy
import random
from xmodaler.functional.func_pretrain import caption_to_mask_tokens
from collections import defaultdict
import numpy as np
from xmodaler.config import configurable
from xmodaler.config import kfg
from xmodaler.tokenization import BertTokenizer
from xmodaler.functional import (
read_lines_set,
read_np,
boxes_to_locfeats,
iou,
random_region,
dict_as_tensor
)
from ..build import DATASETS_REGISTRY
__all__ = ["ConceptualCaptionsDataset", "ConceptualCaptionsDatasetForSingleStream"]
[docs]@DATASETS_REGISTRY.register()
class ConceptualCaptionsDataset:
[docs] @configurable
def __init__(
self,
stage: str,
anno_file: str,
max_seq_length: int,
max_feat_num: int,
feats_folder: str,
images_ids_file: str,
tokenizer
):
self.stage = stage
self.anno_file = anno_file
self.max_seq_length = max_seq_length
self.max_feat_num = max_feat_num
self.feats_folder = feats_folder
self.images_ids_file =images_ids_file
self.tokenizer = tokenizer
[docs] @classmethod
def from_config(cls, cfg, stage: str = "train"):
ann_files = {
"train": os.path.join(cfg.DATALOADER.ANNO_FOLDER, "Train_GCC-training.tsv"),
"val": os.path.join(cfg.DATALOADER.ANNO_FOLDER, "Validation_GCC-1.1.0-Validation.tsv"),
}
images_ids_files = {
"train": os.path.join(cfg.DATALOADER.ANNO_FOLDER, "train_images_ids.txt"),
"val": os.path.join(cfg.DATALOADER.ANNO_FOLDER, "val_images_ids.txt")
}
ret = {
"stage": stage,
"anno_file": ann_files[stage],
"max_seq_length": cfg.MODEL.MAX_SEQ_LEN,
"max_feat_num": cfg.DATALOADER.MAX_FEAT_NUM,
"feats_folder": cfg.DATALOADER.FEATS_FOLDER,
"images_ids_file": images_ids_files[stage],
"tokenizer": BertTokenizer.from_pretrained(cfg.MODEL.PRETRAINING.MODEL_NAME,
do_lower_case=cfg.MODEL.PRETRAINING.DO_LOWER_CASE),
}
return ret
[docs] def load_data(self, cfg):
images_ids_set = read_lines_set(self.images_ids_file)
datalist = []
csv_rd = csv.reader(open(self.anno_file, encoding='UTF-8'), delimiter='\t', quotechar='"')
for imgid, row in enumerate(csv_rd):
imgid_str = str(imgid + 1)
if imgid_str in images_ids_set:
datalist.append({
"image_id": imgid_str,
"caption": row[0]
})
return datalist
[docs] def __call__(self, dataset_dict):
dataset_dict = copy.deepcopy(dataset_dict)
image_id = dataset_dict['image_id']
caption = dataset_dict["caption"]
image_path = os.path.join(self.feats_folder, image_id + ".npz")
content = read_np(image_path)
features = content['features'][0:self.max_feat_num - 1]
cls_probs = content['cls_prob'][0:self.max_feat_num - 1]
boxes = content['boxes'][0:self.max_feat_num - 1]
image_h = content['image_h'][0]
image_w = content['image_w'][0]
num_boxes = len(boxes)
image_locations = boxes_to_locfeats(boxes, image_w, image_h)
overlaps = iou(boxes, boxes)
tokens_ids, u_tokens_labels, g_tokens_labels = caption_to_mask_tokens(caption, self.max_seq_length, self.tokenizer, need_g_tokens=True)
tokens_length = tokens_ids.shape[0]
u_tokens_type = np.array([0] * tokens_length)
g_tokens_type = np.array([1] * tokens_length)
imgfeats, imgfeats_labels, masked_num = random_region(features, overlaps)
imgfeats_labels = np.array(imgfeats_labels)
valid_feats_num = max(1, num_boxes - masked_num)
g_image_feat = np.sum(imgfeats, axis=0) / valid_feats_num
g_image_location = np.array([0, 0, 1, 1, 1])
imgfeats = np.concatenate([np.expand_dims(g_image_feat, axis=0), imgfeats], axis=0)
image_locations = np.concatenate([np.expand_dims(g_image_location, axis=0), image_locations], axis=0)
ret = {
kfg.IDS: image_id,
kfg.ATT_FEATS: imgfeats.astype('float32'),
kfg.ATT_FEATS_LOC: image_locations.astype('float32'),
kfg.U_TOKENS_TYPE: u_tokens_type.astype(np.int64),
kfg.G_TOKENS_TYPE: g_tokens_type.astype(np.int64),
kfg.U_TOKENS_IDS: tokens_ids.astype(np.int64),
kfg.G_TOKENS_IDS: tokens_ids.astype(np.int64),
kfg.U_TARGET_IDS: u_tokens_labels.astype(np.int64),
kfg.G_TARGET_IDS: g_tokens_labels.astype(np.int64),
kfg.V_TARGET: cls_probs.astype('float32'),
kfg.V_TARGET_LABELS: imgfeats_labels.astype(np.int64)
}
dict_as_tensor(ret)
return ret
[docs]@DATASETS_REGISTRY.register()
class ConceptualCaptionsDatasetForSingleStream(ConceptualCaptionsDataset):
[docs] @configurable
def __init__(
self,
stage: str,
anno_file: str,
max_seq_length: int,
max_feat_num: int,
feats_folder: str,
images_ids_file: str,
tokenizer,
itm_neg_prob: float
):
super(ConceptualCaptionsDatasetForSingleStream, self).__init__(
stage, anno_file, max_seq_length, max_feat_num, feats_folder, images_ids_file, tokenizer
)
# Prepare for neg caption sample
self.itm_neg_prob = itm_neg_prob
datalist = self.load_data(None)
self.imgid2caps = defaultdict(list)
for item in datalist:
image_id = item['image_id']
caption = item['caption']
self.imgid2caps[image_id].append(caption)
self.imgid2caps = dict(self.imgid2caps)
self.image_ids = list(self.imgid2caps.keys())
[docs] @classmethod
def from_config(cls, cfg, stage: str = "train"):
ret = super().from_config(cfg, stage)
ret["itm_neg_prob"] = cfg.MODEL.ITM_NEG_PROB
return ret
[docs] def __call__(self, dataset_dict):
dataset_dict = copy.deepcopy(dataset_dict)
image_id = dataset_dict['image_id']
caption = dataset_dict["caption"]
# sample neg caption for single stream bert itm task
caption, itm_neg_label = self.random_cap(caption)
image_path = os.path.join(self.feats_folder, image_id + ".npz")
content = read_np(image_path)
features = content['features'][0:self.max_feat_num - 1]
cls_probs = content['cls_prob'][0:self.max_feat_num - 1]
boxes = content['boxes'][0:self.max_feat_num - 1]
image_h = content['image_h'][0]
image_w = content['image_w'][0]
image_locations = boxes_to_locfeats(boxes, image_w, image_h)
overlaps = iou(boxes, boxes)
tokens_ids, u_tokens_labels, token_ids_wo_mask = caption_to_mask_tokens(
caption, self.max_seq_length, self.tokenizer,
need_g_tokens=False, need_no_mask_tokens=True, must_mask=True
)
tokens_length = tokens_ids.shape[0]
u_tokens_type = np.array([0] * tokens_length)
imgfeats_wo_mask = copy.deepcopy(features)
imgfeats, imgfeats_labels, _ = random_region(features, overlaps)
imgfeats_labels = np.array(imgfeats_labels)
assert len(imgfeats_wo_mask) == len(imgfeats)
assert len(tokens_ids) == len(token_ids_wo_mask)
ret = {
kfg.IDS: image_id,
kfg.ATT_FEATS: imgfeats.astype('float32'),
kfg.ATT_FEATS_WO_MASK: imgfeats_wo_mask.astype('float32'),
kfg.ATT_FEATS_LOC: image_locations.astype('float32'),
kfg.U_TOKENS_TYPE: u_tokens_type.astype(np.int64),
kfg.U_TOKENS_IDS: tokens_ids.astype(np.int64),
kfg.U_TOKENS_IDS_WO_MASK: token_ids_wo_mask.astype(np.int64),
kfg.U_TARGET_IDS: u_tokens_labels.astype(np.int64),
kfg.V_TARGET: cls_probs.astype('float32'),
kfg.V_TARGET_LABELS: imgfeats_labels.astype(np.int64),
kfg.ITM_NEG_LABEL: itm_neg_label
}
dict_as_tensor(ret)
return ret
def random_cap(self, caption):
if random.random() < self.itm_neg_prob:
caption = self.get_random_caption(caption)
label = 1
else:
label = 0
return caption, label
def get_random_caption(self, caption):
while True:
rand_idx = random.randint(0, len(self.image_ids) - 1)
rand_captions = self.imgid2caps[self.image_ids[rand_idx]]
rand_caption = random.choice(rand_captions) if len(rand_captions) > 1 else rand_captions[0]
if rand_caption != caption:
break
return rand_caption