Deep Learning Human action recognition

Nhận diện hành động của người sử dụng Deep Learning

Nhận diện hành động của người sử dụng Deep Learning

Human Action Recognition (HAR) là quá trình sử dụng những cảnh quay trong video để nhận diện, phân loại các hành động khác nhau được thực hiện bởi người trong video đó. Nó được ứng dụng rất rộng rãi trong các lĩnh vực như giám sát, thể dục, thể thao, …

Giả sử, bạn muốn tạo một ứng dụng dạy học Yoga trực tuyến. Trước tiên, bạn cần quay các video hướng dẫn để người học theo dõi và làm theo. Sau đó, mỗi người học tự tập và quay lại video của mình. Họ gửi các video đó lên ứng dụng của bạn. Dựa vào video nhận được, ứng dụng có thể đánh giá được mức độ chính xác trong mỗi động tác của người học. Từ đó đưa ra gợi ý cải thiện, … Thật tuyệt vời phải không?

Trong bài này, chúng ta sẽ cùng nhau tạo ra một model để nhận diện một số hành dộng của người, sử dụng pose estimation và mạng LSTM. Pytorch_Lightning được sử dụng trong bài này.

1. Tổng quan sơ đồ kiến trúc

Để phân loại một hành động, trước tiên chúng ta cần xác định vị trí các bộ phận cơ thể khác nhau trong mọi khung hình, sau đó phân tích chuyển động của các bộ phận đó theo thời gian.

Bước đầu tiên đạt được bằng cách sử dụng Detectron2, nó xuất ra tư thế của cơ thể (17 Keypoints) sau khi quan sát một khung hình trong video.

Bước thứ hai là phân tích chuyển động của cơ thể theo thời gian và đưa ra dự đoán được thực hiện bằng mạng LSTM. Đầu vào là các Keypoints từ một chuỗi khung được, đầu ra là loại hành đồng được dự đoán.

2. Chuẩn bị dữ liệu

Đối với ứng dụng này, chúng ta chỉ cần huấn luyện mạng LSTM để phân loại các hành động, còn phần Pose Estimation thì chúng ta sẽ sử dụng pre-trained có sẵn cung cấp bởi Detectron2.

Bộ dữ liệu được dùng để huấn luyện mạng LSTM được tạo thành bằng cách sử dụng OpenPose trên các video của tập dữ liệu Berkeley Multimodal Human Action Database (MHAD). Sử dụng cách thức tương tự, chúng ta cũng có thể tạo ra bộ dữ liệu của riêng mình.

Download bộ dữ liệu tại đây. Nó bao gồm 6 hành động: JUMPING, JUMPING_JACKS, BOXING, WAVING_2HANDS, WAVING_1HAND, CLAPPING_HANDS.

Training Data bao gồm các chuỗi 17 Keypoints kết hợp với một nhãn tương ứng. Mỗi Keypoint là một cặp tọa độ (x,y).

Mỗi lần phân loại, chúng ta sẽ sử dụng 32 frames liên tiếp nhau. Như vậy thì kích thước dữ liệu của một Input Data sẽ là 32x34:

!head -2 RNN-HAR-2D-Pose-database/X_train.txt
---
295.914,161.579,307.693,203.413,281.546,203.368,274.997,251.562,267.194,293.253,337.619,204.669,347.958,255.443,341.541,295.866,286.81,289.393,297.196,355.832,297.22,405.371,321.967,291.959,327.143,358.408,328.528,411.922,294.546,156.42,305.002,156.418,0,0,318.083,161.632
295.855,161.6,307.684,203.408,281.529,203.385,274.989,251.574,267.191,291.961,337.615,204.646,347.974,254.209,344.093,295.816,286.803,289.377,297.165,355.827,297.205,404.095,323.248,290.652,324.564,358.409,328.493,410.63,293.252,157.686,303.706,157.706,0,0,318.024,161.654

Bởi vì OpenPose trả về kết quả là 18 Keypoints, trong khi kết quả của Detectron2 chỉ là 17 Keypoints nên chúng ta sẽ phải thực hiện một bước chuyển đổi trước khi sử dụng bộ dữ liệu này.

WINDOW_SIZE = 32  # 32 continuous frames

class PoseDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.y = Y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

openpose_to_detectron_mapping = [0, 1, 28, 29, 26, 27, 32, 33, 30, 31, 8, 9, 2, 3, 10, 11, 4, 5, 12, 13, 6, 7, 20, 21, 14, 15, 22, 23, 16, 17, 24, 25, 18, 19]

class PoseDataModule(pl.LightningDataModule):
    def __init__(self, data_root, batch_size):
        super().__init__()
        self.data_root = data_root
        self.batch_size = batch_size
        self.X_train_path = self.data_root + "X_train.txt"
        self.X_test_path = self.data_root + "X_test.txt"
        self.y_train_path = self.data_root + "Y_train.txt"
        self.y_test_path = self.data_root + "Y_test.txt"


    # Detectron2 produces only 17 key points while OpenPose produces 18 (or more) key points.
    def convert_to_detectron_format(self, row):
        row = row.split(',')
        # filtering out coordinate of neck joint from the training/validation set originally generated using OpenPose.
        temp = row[:2] + row[4:]
        # change to Detectron2 order of key points
        temp = [temp[i] for i in openpose_to_detectron_mapping]
        return temp

    def load_X(self, X_path):
        file = open(X_path, 'r')
        X = np.array(
            [elem for elem in [
                self.convert_to_detectron_format(row) for row in file
            ]],
            dtype=np.float32
        )
        file.close()
        blocks = int(len(X) / WINDOW_SIZE)
        X_ = np.array(np.split(X, blocks))
        return X_

    # Load the networks outputs
    def load_y(self, y_path):
        file = open(y_path, 'r')
        y = np.array(
            [elem for elem in [
                row.replace('  ', ' ').strip().split(' ') for row in file
            ]],
            dtype=np.int32
        )
        file.close()
        # for 0-based indexing
        return y - 1

    def prepare_data(self):
        pass

    def setup(self, stage=None):
        X_train = self.load_X(self.X_train_path)
        X_test = self.load_X(self.X_test_path)
        y_train = self.load_y(self.y_train_path)
        y_test = self.load_y(self.y_test_path)
        self.train_dataset = PoseDataset(X_train, y_train)
        self.val_dataset = PoseDataset(X_test, y_test)

    def train_dataloader(self):
        # train loader
        train_loader = torch.utils.data.DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True
        )
        return train_loader

    def val_dataloader(self):
        # validation loader
        val_loader = torch.utils.data.DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            shuffle=False
        )
        return val_loader

3. Xây dựng mô hình

3.1 Human Pose Estimation model

Phần này, chúng ta sử dụng pre-trained R50-FPN model từ Detectron2 Model Zoo

# obtain detectron2's default config
cfg = get_cfg()
# load the pre trained model from Detectron2 model zoo
cfg.merge_from_file(model_zoo.get_config_file("COCO-Keypoints/keypoint_rcnn_R_50_FPN_3x.yaml"))
# set confidence threshold for this model
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5 
# load model weights
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-Keypoints/keypoint_rcnn_R_50_FPN_3x.yaml")
# create the predictor for pose estimation using the config
pose_detector = DefaultPredictor(cfg)

3.2 Định nghĩa LSTM model

LSTM model sẽ được khởi tạo với hidden_dim = 50, optimizer là Adam và sử dụng ReduceLROnPlateau scheduler để giảm learning_rate. Ở đây, mình chỉ sử dụng 1 LSTM layer, bạn có thể thí nghiệm với nhiều LSTM layers hơn.

# We have 6 output action classes.
TOT_ACTION_CLASSES = 6

#lstm classifier definition
class ActionClassificationLSTM(pl.LightningModule):
    # initialise method
    def __init__(self, input_features, hidden_dim, learning_rate=0.001):
        super().__init__()
        # save hyperparameters
        self.save_hyperparameters()
        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(input_features, hidden_dim, num_layers=2, batch_first=True)
        # The linear layer that maps from hidden state space to classes
        self.linear = nn.Linear(hidden_dim, TOT_ACTION_CLASSES)

    def forward(self, x):
        # invoke lstm layer
        lstm_out, (ht, ct) = self.lstm(x)
        # invoke linear layer
        return self.linear(ht[-1])

    def training_step(self, batch, batch_idx):
        # get data and labels from batch
        x, y = batch
        # reduce dimension
        y = torch.squeeze(y)
        # convert to long
        y = y.long()
        # get prediction
        y_pred = self(x)
        # calculate loss
        loss = F.cross_entropy(y_pred, y)
        # get probability score using softmax
        prob = F.softmax(y_pred, dim=1)
        # get the index of the max probability
        pred = prob.data.max(dim=1)[1]
        # calculate accuracy
        acc = torchmetrics.functional.accuracy(pred, y)
        dic = {
            'batch_train_loss': loss,
            'batch_train_acc': acc
        }
        # log the metrics for pytorch lightning progress bar or any other operations
        self.log('batch_train_loss', loss, prog_bar=True)
        self.log('batch_train_acc', acc, prog_bar=True)
        #return loss and dict
        return {'loss': loss, 'result': dic}

    def training_epoch_end(self, training_step_outputs):
        # calculate average training loss end of the epoch
        avg_train_loss = torch.tensor([x['result']['batch_train_loss'] for x in training_step_outputs]).mean()
        # calculate average training accuracy end of the epoch
        avg_train_acc = torch.tensor([x['result']['batch_train_acc'] for x in training_step_outputs]).mean()
        # log the metrics for pytorch lightning progress bar and any further processing
        self.log('train_loss', avg_train_loss, prog_bar=True)
        self.log('train_acc', avg_train_acc, prog_bar=True)

    def validation_step(self, batch, batch_idx):
        # get data and labels from batch
        x, y = batch
        # reduce dimension
        y = torch.squeeze(y)
        # convert to long
        y = y.long()
        # get prediction
        y_pred = self(x)
        # calculate loss
        loss = F.cross_entropy(y_pred, y)
        # get probability score using softmax
        prob = F.softmax(y_pred, dim=1)
        # get the index of the max probability
        pred = prob.data.max(dim=1)[1]
        # calculate accuracy
        acc = torchmetrics.functional.accuracy(pred, y)
        dic = {
            'batch_val_loss': loss,
            'batch_val_acc': acc
        }
        # log the metrics for pytorch lightning progress bar and any further processing
        self.log('batch_val_loss', loss, prog_bar=True)
        self.log('batch_val_acc', acc, prog_bar=True)
        #return dict
        return dic

    def validation_epoch_end(self, validation_step_outputs):
        # calculate average validation loss end of the epoch
        avg_val_loss = torch.tensor([x['batch_val_loss']
                                     for x in validation_step_outputs]).mean()
        # calculate average validation accuracy end of the epoch
        avg_val_acc = torch.tensor([x['batch_val_acc']
                                    for x in validation_step_outputs]).mean()
        # log the metrics for pytorch lightning progress bar and any further processing
        self.log('val_loss', avg_val_loss, prog_bar=True)
        self.log('val_acc', avg_val_acc, prog_bar=True)

    def configure_optimizers(self):
        # adam optimiser
        optimizer = optim.Adam(self.parameters(), lr=self.hparams.learning_rate)
        # learning rate reducer scheduler
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=10, min_lr=1e-15, verbose=True)
        # scheduler reduces learning rate based on the value of val_loss metric
        return {"optimizer": optimizer,
                "lr_scheduler": {"scheduler": scheduler, "interval": "epoch", "frequency": 1, "monitor": "val_loss"}}

4. Huấn luyện mô hình

Sử dụng ModelCheckpoint callback và LearningRateMonitor, chúng ta sẽ huấn luyện mạng LSTM như sau:

def do_training_validation():
   pl.seed_everything(21)    
   parser = ArgumentParser()
   parser = pl.Trainer.add_argparse_args(parser)
   parser = configuration_parser(parser)
   # args = parser.parse_args()
   args, unknown = parser.parse_known_args()
   # init model    
   hidden_dim = 50
   WINDOW_SIZE = 32
   model = ActionClassificationLSTM(WINDOW_SIZE, hidden_dim, learning_rate=args.learning_rate)
   data_module = PoseDataModule(data_root=args.data_root,
                                       batch_size=args.batch_size)    
   #save only the top 1 model based on val_loss
   checkpoint_callback = ModelCheckpoint(save_top_k=1, monitor='val_loss')
   lr_monitor = LearningRateMonitor(logging_interval='step')  
   #trainer
   trainer = pl.Trainer.from_argparse_args(args,
      # fast_dev_run=True,
      max_epochs=args.epochs, 
      deterministic=True, 
      gpus=1, 
      progress_bar_refresh_rate=1, 
      callbacks=[EarlyStopping(monitor='train_loss', patience=15), checkpoint_callback, lr_monitor])    
   trainer.fit(model, data_module)    
   return model

Kết quả huấn luyện model:

  • Train Accuracy:

  • Train Loss:

  • Validation Accuracy:
  • Validation Loss:

5. Thực hiên Inference

# how many frames to skip while inferencing
# configuring a higher value will result in better FPS (frames per rate), but accuracy might get impacted
SKIP_FRAME_COUNT = 1

# analyse the video
def analyse_video(pose_detector, lstm_classifier, video_path):
    # open the video
    cap = cv2.VideoCapture(video_path)
    # width of image frame
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    # height of image frame
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    # frames per second of the input video
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    # total number of frames in the video
    tot_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # video output codec
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    # extract the file name from video path
    file_name = ntpath.basename(video_path)
    # video writer
    vid_writer = cv2.VideoWriter('res_{}'.format(
        file_name), fourcc, 30, (width, height))
    # counter
    counter = 0
    # buffer to keep the output of detectron2 pose estimation
    buffer_window = []
    # start time
    start = time.time()
    label = None
    # iterate through the video
    while True:
        # read the frame
        ret, frame = cap.read()
        # return if end of the video
        if ret == False:
            break
        # make a copy of the frame
        img = frame.copy()
        if(counter % (SKIP_FRAME_COUNT+1) == 0):
            # predict pose estimation on the frame
            outputs = pose_detector(frame)
            # filter the outputs with a good confidence score
            persons, pIndicies = filter_persons(outputs)
            if len(persons) >= 1:
                # pick only pose estimation results of the first person.
                # actually, we expect only one person to be present in the video.
                p = persons[0]
                # draw the body joints on the person body
                draw_keypoints(p, img)
                # input feature array for lstm
                features = []
                # add pose estimate results to the feature array
                for i, row in enumerate(p):
                    features.append(row[0])
                    features.append(row[1])

                # append the feature array into the buffer
                # not that max buffer size is 32 and buffer_window operates in a sliding window fashion
                if len(buffer_window) < WINDOW_SIZE:
                    buffer_window.append(features)
                else:
                    # convert input to tensor
                    model_input = torch.Tensor(np.array(buffer_window, dtype=np.float32))
                    # add extra dimension
                    model_input = torch.unsqueeze(model_input, dim=0)
                    # predict the action class using lstm
                    y_pred = lstm_classifier(model_input)
                    prob = F.softmax(y_pred, dim=1)
                    # get the index of the max probability
                    pred_index = prob.data.max(dim=1)[1]
                    # pop the first value from buffer_window and add the new entry in FIFO fashion, to have a sliding window of size 32.
                    buffer_window.pop(0)
                    buffer_window.append(features)
                    label = LABELS[pred_index.numpy()[0]]
                    #print("Label detected ", label)

        # add predicted label into the frame
        if label is not None:
            cv2.putText(img, 'Action: {}'.format(label),
                        (int(width-400), height-50), cv2.FONT_HERSHEY_COMPLEX, 0.9, (102, 255, 255), 2)
        # increment counter
        counter += 1
        # write the frame into the result video
        vid_writer.write(img)
        # compute the completion percentage
        percentage = int(counter*100/tot_frames)
        # return the completion percentage
        # yield "data:" + str(percentage) + "\n\n"

        # show video results
        cv2.imshow("image", img)
        # Press Q on keyboard to  exit
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break

Kết quả thực hiện trên video:

6. Kết luận

Trong bài này, chúng ta đã cùng nhau thực hành xây dựng một mô hình để nhận diện hành động của người trong video bằng cách sử dụng kết hợp Detectron2 cho Pose Estimation và LSTM cho phân loại. Có rất nhiều thứ có thể cải tiến để có được kết quả tốt hơn mà bạn có thể thử nếu áp dụng vào bài toán thực tế:

  • Tăng FPS để có thể chạy được realtime: Tối ưu hóa model (pruning, quantization), loại bỏ bớt frame khi nhận diện, sử dụng multi-threading, …
  • Sử dụng các Pose Estimation model khác như AlphaPose, OpenPose, …

Toàn bộ source code của bài này, các bạn xem tại đây

Mời các bạn đón đọc.!

7. Tham khảo

[1] Bibin Sebastian, “Human Action Recognition using Detectron2 and LSTM”, Available online: https://learnopencv.com/human-action-recognition-using-detectron2-and-lstm/ (Accessed on 30 Jul 2021).