Baseline: Faasm

Faasm 是来自 Imperial College London 的 Simon Shillaker 和 Peter Pietzuch 与 ATC'20上提出的工作。

Faasm 通过 WebAssembly 提供的软件故障隔离来隔离执行函数的内存,同时允许在同一地址空间内的函数之间共享内存区域以避免高昂的数据移动成本。其提供了 Faaslets 运行时,使用标准 Linux cgroups 来隔离 CPU 和网络等资源,并提供低级 POSIX 主机接口以便进行网络、文件系统访问和动态加载,并通过快照技术减少恢复 Faaslets 初始化时间。

本文主要介绍如何部署 Faasm 并在上面开发 Benchmark 应用。

安装

$ git clone https://github.com/faasm/faasm
$ cd faasm
$ git checkout v0.18.0
$ git submodule update --init --recursive

如果切换了主分支的版本,需要重新执行 submodule update

$ source bin/workon.sh
$ export PYTHON_CODEGEN=on    # only for python
$ export FAASM_INI_FILE=./faasm.ini 
$ faasmctl deploy.compose --workers=4

通过

$ find . -name "hello.cpp"
./venv-bm/lib/python3.9/site-packages/faasmctl/.config/faasm-source/0.18.0/clients/python/third-party/cpp/func/demo/hello.cpp
./venv-bm/lib/python3.9/site-packages/faasmctl/.config/faasm-source/0.18.0/clients/cpp/func/demo/hello.cpp

可以看见这部分命令自动创建了 venv-bm 虚拟环境,并将目录 ./venv-bm/lib/python3.9/site-packages/faasmctl/.config/faasm-source/0.18.0/ 挂载到了容器中,因此我们可以通过直接修改这块目录下的代码进行 faasm 代码的修改和用户代码的构建。

如果需要删除容器,也不会影响到该目录之前的修改:

$ faasmctl delete.delete

此外,在执行的过程中,我们可以检测所有 worker 的 log,这有助于我们运行和调试:

$ faasmctl logs -s worker -f

C++ 应用构建

启动 cpp 应用的容器:

$ faasmctl cli.cpp

进入后会自动进入容器的 /code/cpp 目录下,这对应着主机的 ./venv-bm/lib/python3.9/site-packages/faasmctl/.config/faasm-source/0.18.0/clients/cpp 目录,因此可以直接打开该目录进行容器内的源码修改。

命令

在容器中,faasm 提供了以下和用户函数相关的命令:

inv func.compile <user> <func>
inv func.upload <user> <func>
inv func.flush
inv func.invoke <user> <func>

对于 demo 用户,其 string 函数的位置为 /code/cpp/demo/string.cpp,完整的流程应该完整包括编译、上传、刷新缓存(不可缺少)、调用。

修改

时至今日1,faasm 仍没有修复 inv func.invoke 无法正确拿到函数回调的问题,因此如果需要进行测试,需要手动打时间戳计算数据:

启动时间戳

首先需要在 /code/cpp/tasks 中找到 func.py 下的 invoke 函数,通过bash脚本拿到绝对时间(因为没有 time 模块):

@task
def invoke(ctx, user, func, input_data=None, mpi=None, graph=False):
    ...
    # Invoke message
    bash_script = '''
    #!/bin/bash
    microseconds=$(date +%s%6N)
    echo "start timestamp: ${microseconds}#"
    '''
    result = run(['bash', '-c', bash_script], capture_output=True, text=True)
    print(result.stdout)
    
    response = invoke_wasm(data)

    print("Success:\n{}".format(response.messageResults[0].outputData))

接口

faasm 文档中的接口和实际可用的接口不一致,因此根据源码2在这里作以下梳理:

  • 宏:#define BYTES(arr) reinterpret_cast<uint8_t*>(arr) 只能用于指针转换
  • 数据传递:
    • 正常
      • 写入字节流:void faasmWriteState(const char* key, const uint8_t* data, long dataLen);
      • 读取字节流:long faasmReadState(const char* key, unsigned char* buffer, long bufferLen);
      • 获取字节流长度:size_t faasmReadStateSize(const char* key);
      • 获取一部分字节流:void faasmReadStateOffset(const char* key, long totalLen, long offset, uint8_t* buffer, long bufferLen);
    • Append-Only
      • 末尾字节流添加:void faasmAppendState(const char* key, const uint8_t* data, long dataLen);
      • 读字节流:void faasmReadAppendedState(const char* key, uint8_t* buffer, long bufferLen, long nElems);
    • 文件
      • 从文件读入 unsigned long faasmWriteStateFromFile(const char* key, const char* filePath);
  • 调用:
    • 创建调用:unsigned int faasmChain(FaasmFuncPtr funcPtr, const uint8_t* inputData, long inputDataSize);
    • 调用其他函数:unsigned int faasmChainNamed(const char* name, const uint8_t* inputData, long inputDataSize);
    • 异步调用:unsigned int faasmAwaitCall(unsigned int callId);

应用

Long Chain

描述: 10次递归调用,进入和退出函数时维护时间戳状态 实现:

#include <faasm/faasm.h>
#include <faasm/core.h>
#include <sys/time.h>
#include <stdio.h>
using namespace std;

#define START_KEY "start"
#define END_KEY "end"

long long get_time() {
    timeval tv{};
    gettimeofday(&tv, nullptr);
    
    long long microseconds = (long long)tv.tv_sec * 1000000 + tv.tv_usec;
    return microseconds;
}

int chain() {
    long long start_time = get_time();
    int id = 0;
    faasmGetInput((uint8_t*)&id, sizeof(int));
    printf("Chained call %d\n", id);
    faasmAppendState(START_KEY, BYTES(&start_time), sizeof(long long));
    long long end_time = get_time();
    faasmAppendState(END_KEY, BYTES(&end_time), sizeof(long long));
    if (id == 10) {
        return 0;
    }
    FaasmFuncPtr func = chain;
    int nextId = id + 1;
    unsigned int callId = faasmChain(func, (uint8_t*)&nextId, sizeof(int));
    unsigned int result = faasmAwaitCall(callId);
    return result;
}

int main(int argc, char* argv[]) {
    int step = 1;
    FaasmFuncPtr func = chain;
    unsigned int callId = faasmChain(func, BYTES(&step), sizeof(int));
    unsigned int result = faasmAwaitCall(callId);
    printf("all finished %d\n", result);
    size_t returnSize = 10 * sizeof(long long);
    auto start_buffer = new long long[10];
    auto end_buffer = new long long[10];
    faasmReadAppendedState(START_KEY, (uint8_t*)start_buffer, returnSize, 10);
    faasmReadAppendedState(END_KEY, (uint8_t*)end_buffer, returnSize, 10);
    for (int i = 0; i < 10; i++) {
        printf("%lld%c", start_buffer[i], i == 9 ? '\n' : ',');
    }
    for (int i = 0; i < 10; i++) {
        printf("%lld%c", end_buffer[i], i == 9 ? '\n' : ',');
    }
    delete[] start_buffer;
    delete[] end_buffer;
    auto now = get_time();
    printf("End time: %lld\n", now);
    return 0;
}

WordCount

描述:两个阶段,Mapper从文件读入并进行划分,Reducer作最后的整理并写入文件 准备:文件上传需要通过 faasmctl upload.file <host_path> <faasm_path> ,可以在 ./venv-bm/lib/python3.9/site-packages/faasmctl/.config/faasm-source/0.18.0/dev/minio/data/下找到`faasm:://<faasm_path> 映射的文件 实现

Mapper 实现:

#include <faasm/faasm.h>
#include <faasm/core.h>
#include <sys/time.h>
#include <stdio.h>
#include <vector>
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
using namespace std;

#define MAX_WORD_LENGTH 100
#define MAX_WORDS 8000
#define MAX_SLOT_NUM 100
#define MAX_BUFFER_SIZE 8000

long long get_time() {
    timeval tv{};
    gettimeofday(&tv, nullptr);
    
    long long microseconds = (long long)tv.tv_sec * 1000000 + tv.tv_usec;
    return microseconds;
}

void to_lowercase(char *str) {
    for (int i = 0; str[i]; i++) {
        str[i] = tolower(str[i]);
    }
}

int mapper() {
    int* argv = new int[2];
    faasmGetInput(BYTES(argv), sizeof(int) * 2);
    int id = argv[0];
    int reducer_num = argv[1];
    printf("mapper recieve: id: %d, reducer_num: %d\n", id, reducer_num);

    char input_file[40];
    sprintf(input_file, "faasm://fake_data_%d.txt", id);
    printf("%s\n", input_file);
    FILE *file = fopen(input_file, "r");
    if (!file) {
        perror("Failed to open input file\n");
        exit(EXIT_FAILURE);
    }

    printf("mapper_%d input file: %s\n", id, input_file);
    
    int count[MAX_WORDS] = {0};
    char *words[MAX_WORDS];
    char word[MAX_WORD_LENGTH];
    int word_index = 0;
    while (fscanf(file, "%s", word) != EOF) {
        to_lowercase(word);
        
        int found = 0;
        for (int i = 0; i < word_index; i++) {
            if (strcmp(words[i], word) == 0) {
                count[i]++;
                found = 1;
                break;
            }
        }
        
        if (!found) {
            words[word_index] = strdup(word);
            count[word_index]++;
            word_index++;
        }
    }
    fclose(file);

    printf("mapper_%d read success!\n", id);

    char *slot_name[MAX_SLOT_NUM];
    char *buffer[MAX_SLOT_NUM];
    char slot[20];
    int bufferSize = MAX_BUFFER_SIZE;
    int slot_index = 0;
    
    for (int i = 0; i < word_index; i++) {
        int partition_index = i % reducer_num;
        sprintf(slot, "buffer_%d_%d", partition_index, id);
        int found = 0;
        for (int j = 0; j < slot_index; j++) {
            if (strcmp(slot_name[j], slot) == 0) {
                found = 1;
                sprintf(buffer[j] + strlen(buffer[j]), "%s: %d\n", words[i], count[i]);
                break;
            }
        }
        
        if (!found) {
            slot_name[slot_index] = strdup(slot);
            buffer[slot_index] = (char *)malloc(bufferSize * sizeof(char));
            memset(buffer[slot_index], 0, bufferSize * sizeof(char));
            sprintf(buffer[slot_index], "%s: %d\n", words[i], count[i]);
            slot_index++;
        }
    }

    for (int i = 0; i < slot_index; i++) {
        faasmWriteState(slot_name[i], (uint8_t*)buffer[i], strlen(buffer[i]));
    }
    
    printf("mapper_%d finished!\n", id);
    return 0;
}

int main(int argc, char* argv[]) {
    int mapper_num = 3;
    int reducer_num = 3;
    // Stage 1: Map
    vector<int> call_id;
    for (int i = 0; i < mapper_num; i++) {
        int* input = new int[2];
        input[0] = i;
        input[1] = reducer_num;
        call_id.push_back(faasmChain(mapper, BYTES(input), sizeof(int) * 2));
    }
    for (int i = 0; i < mapper_num; i++) {
        faasmAwaitCall(call_id[i]);
    }
    printf("Map finished!\n");
    // Stage 2: Reduce
    vector<int> call_id2;
    for (int i = 0; i < reducer_num; i++) {
        int* input = new int[2];
        input[0] = i;
        input[1] = mapper_num;
        call_id2.push_back(faasmChainNamed("reducer", BYTES(input), sizeof(int) * 2));
    }
    for (int i = 0; i < reducer_num; i++) {
        faasmAwaitCall(call_id2[i]);
    }
    printf("Reduce finished!\n");
    printf("%lld\n", get_time());
    return 0;
}

Reducer 实现:

#include <faasm/faasm.h>
#include <faasm/core.h>
#include <sys/time.h>
#include <stdio.h>
#include <vector>
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
#include <unistd.h>
using namespace std;

#define MAX_WORD_LENGTH 100
#define MAX_WORDS 8000
#define MAX_SLOT_NUM 100
#define MAX_BUFFER_SIZE 8000

long long get_time() {
    timeval tv{};
    gettimeofday(&tv, nullptr);
    
    long long microseconds = (long long)tv.tv_sec * 1000000 + tv.tv_usec;
    return microseconds;
}

void to_lowercase(char *str) {
    for (int i = 0; str[i]; i++) {
        str[i] = tolower(str[i]);
    }
}

int main(int argc, char* argsv[]) {
    int* argv = new int[2];
    faasmGetInput(BYTES(argv), sizeof(int) * 2);
    int id = argv[0];
    int mapper_num = argv[1];
    printf("reducer.c recieve: id: %d, mapper_num: %d\n", id, mapper_num);

    char *slot_name[MAX_SLOT_NUM];
    char *buffer[MAX_SLOT_NUM];
    char slot[20];
    int slot_num = mapper_num;

    printf("access start!\n");

    for (int i = 0; i < slot_num; ++i) {
        sprintf(slot, "buffer_%d_%d", id, i);
        slot_name[i] = strdup(slot);
        size_t bufferSize = faasmReadStateSize(slot_name[i]);
        buffer[i] = (char *)malloc(bufferSize * sizeof(char));
        faasmReadState(slot_name[i], (uint8_t*)buffer[i], bufferSize);
    }
    printf("access success!\n");

    int count[MAX_WORDS] = {0};
    char *words[MAX_WORDS];
    char word[MAX_WORD_LENGTH];
    int word_index = 0;
    
    for (int i = 0; i < slot_num; i++) {
        char *line = strtok(buffer[i], "\n");
        while (line != NULL) {
            int num = 0;
            sscanf(line, "%[^:]: %d", word, &num); // 解析每一行
            to_lowercase(word);
            int found = 0;
            for (int i = 0; i < word_index; i++) {
                if (strcmp(words[i], word) == 0) {
                    found = 1;
                    count[i] += num;
                    break;
                }
            }
            if (!found) {
                words[word_index] = strdup(word);
                count[word_index] += num;
                word_index++;
            }
            line = strtok(NULL, "\n");
        }
    }

    printf("reducer_%d read success!\n", id);

    char output_file[30];
    sprintf(output_file, "faasm://reducer_%d_%lld.txt", id, get_time());
    FILE *output = fopen(output_file, "w");
    fprintf(output, "reducer_%d start writing!\n", id);
    for (int i = 0; i < word_index; i++) {
        fprintf(output, "%s %d\n", words[i], count[i]);
    }
    fclose(output);
    
    printf("reducer_%d finished!\n", id);
    printf("%lld\n", get_time());
    return 0;
}

ParallSort

描述:分布式排序,2个sorter,2个splitter,3个merger,1个checker 实现: 首先在 demo/fcntl.cpp 中实现调度器:

#include <faasm/faasm.h>
#include <faasm/core.h>
#include <sys/time.h>
#include <stdio.h>
#include <vector>
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
#include <unistd.h>
using namespace std;

int main(int argc, char* argsv[]) {
    int sorter_num = 2;
    int splitter_num = 2;
    int merger_num = 3;
    int checker_num = 1;
    // alias fibonacci to sorter
    printf("Stage 1: Sorter\n");
    vector<int> call_id;
    for (int i = 0; i < sorter_num; ++i) {
        int* input = new int[3];
        input[0] = i;
        input[1] = sorter_num;
        input[2] = merger_num;
        call_id.push_back(faasmChainNamed("fibonacci", BYTES(input), sizeof(int) * 3));
    }
    for (auto id : call_id) {
        faasmAwaitCall(id);
    }
    // alias file to splitter
    printf("Stage 2: Splitter\n");
    call_id.clear();
    for (int i = 0; i < splitter_num; ++i) {
        int* input = new int[3];
        input[0] = i;
        input[1] = sorter_num;
        input[2] = merger_num;
        call_id.push_back(faasmChainNamed("file", BYTES(input), sizeof(int) * 3));
    }
    for (auto id : call_id) {
        faasmAwaitCall(id);
    }
    // alias filedescriptor to merger
    printf("Stage 3: Merger\n");
    call_id.clear();
    for (int i = 0; i < merger_num; ++i) {
        int* input = new int[3];
        input[0] = i;
        input[1] = sorter_num;
        input[2] = merger_num;
        call_id.push_back(faasmChainNamed("filedescriptor", BYTES(input), sizeof(int) * 3));
    }
    for (auto id : call_id) {
        faasmAwaitCall(id);
    }
    // alias float_perf to checker
    printf("Stage 4: Checker\n");
    call_id.clear();
    for (int i = 0; i < checker_num; ++i) {
        int* input = new int[3];
        input[0] = i;
        input[1] = sorter_num;
        input[2] = merger_num;
        call_id.push_back(faasmChainNamed("float_perf", BYTES(input), sizeof(int) * 3));
    }
    for (auto id : call_id) {
        faasmAwaitCall(id);
    }
}

然后在 demo/fibonacci.cpp 中实现sorter:

#include ...

#define MAX_ARRAY_LENGTH 100

// 比较函数,用于 qsort
int compare(const void *a, const void *b) {
    return (*(int *)a - *(int *)b); // 升序排序
}

int main(int argc, char* argsv[]) {
    auto argv = new int[3];
    faasmGetInput(BYTES(argv), sizeof(int) * 3);
    int id = argv[0];
    int sorter_num = argv[1];
    int merger_num = argv[2];
    printf("sorter_%d start!\n", id);
    
    char input_file[40];
    printf("sorter_%d file define!\n", id);
    sprintf(input_file, "faasm://sort_data_%d.txt", id);
    printf("sorter_%d fopen start!\n", id);
    FILE *file = fopen(input_file, "r");
    if (!file) {
        perror("Failed to open input file\n");
        exit(EXIT_FAILURE);
    }
    printf("sorter_%d fopen finished!\n", id);
    int array[MAX_ARRAY_LENGTH];
    int index = 0;
    int ch = fgetc(file), x = 0;
    for (; ch != '\n' && ch != EOF; ch = fgetc(file)) {
        if (ch == ',') array[index++] = x, x = 0; 
        else x = x * 10 + ch - '0';
    }
    array[index++] = x;
    printf("sorter_%d read finished!\n", id);
    fclose(file);
    qsort(array, index, sizeof(int), compare);

    printf("sorter_%d sort finished!\n", id);

    if (id == 0) {
        int pivot[merger_num-1];
        for (int i = 0; i < merger_num-1; i++) {
            int idx = (i+1) * index / merger_num;
            pivot[i] = array[idx];
        }
        for (int k = 0; k < sorter_num; k++) {
            char slot_name[20];
            sprintf(slot_name, "pivot_%d", k);
            int bufferSize = 1000;
            char *buffer;
            buffer = (char *)malloc(bufferSize * sizeof(char));
            if (buffer == NULL) {
                perror("malloc error");
                return 1;
            }
            memset(buffer, 0, bufferSize * sizeof(char));
            buffer[0] = '\0'; // 初始化为空字符串
            for (int i = 0; i < merger_num-1; i++) {
                char temp[12]; // 临时缓冲区,注意要足够大以容纳最大整数和一个空格
                snprintf(temp, sizeof(temp), "%d ", pivot[i]); // 将整数转换为字符串,并加上空格
                strcat(buffer, temp); // 追加到 buffer
            }
            // 去掉最后一个多余的空格
            buffer[strlen(buffer) - 1] = '\0';
            faasmWriteState(slot_name, (uint8_t*)buffer, strlen(buffer));
            // buffer_register(slot_name, strlen(slot_name), buffer, bufferSize);
            // free(buffer);
        }
    }
    char slot_name[20];
    sprintf(slot_name, "sorter_%d", id);
    char *buffer;
    int bufferSize = 1000;
    buffer = (char *)malloc(bufferSize * sizeof(char));
    if (buffer == NULL) {
        perror("malloc error");
        return 1;
    }
    memset(buffer, 0, bufferSize * sizeof(char));
    buffer[0] = '\0'; // 初始化为空字符串
    for (int i = 0; i < index; i++) {
        char temp[12]; // 临时缓冲区,注意要足够大以容纳最大整数和一个空格
        snprintf(temp, sizeof(temp), "%d ", array[i]); // 将整数转换为字符串,并加上空格
        strcat(buffer, temp); // 追加到 buffer
    }
    // 去掉最后一个多余的空格
    buffer[strlen(buffer) - 1] = '\0';

    faasmWriteState(slot_name, (uint8_t*)buffer, strlen(buffer));
    printf("sorter_%d all finished!\n", id);
    return 0;
}

demo/file.cpp 中实现 splitter:

#include ...

#define MAX_ARRAY_LENGTH 100

int main(int argc, char* argsv[]) {
    auto argv = new int[3];
    faasmGetInput(BYTES(argv), sizeof(int) * 3);
    int id = argv[0];
    int sorter_num = argv[1];
    int merger_num = argv[2];
    printf("spliter_%d start!\n", id);
    char slot_name[20];
    int bufferSize = 1000;
    char *ptr;
    int num;
    
    // access pivot buffer
    sprintf(slot_name, "pivot_%d", id);
    char *pivot_buffer;
    pivot_buffer = (char *)malloc(bufferSize * sizeof(char));
    if (pivot_buffer == NULL) {
        perror("malloc error");
        return 1;
    }
    memset(pivot_buffer, 0, bufferSize * sizeof(char));
    pivot_buffer[0] = '\0'; // 初始化为空字符串
    size_t stateSize = faasmReadStateSize(slot_name);
    faasmReadState(slot_name, (uint8_t*)pivot_buffer, stateSize);
    // access_buffer(slot_name, strlen(slot_name), pivot_buffer, bufferSize);
    int pivot_array[merger_num-1]; 
    int pivot_index = 0;
    ptr = pivot_buffer;
    printf("pivot_buffer: %s", pivot_buffer);
    while (sscanf(ptr, "%d", &num) == 1) {
        pivot_array[pivot_index] = num;
        pivot_index++;
        // 移动指针到下一个数字
        while (*ptr && *ptr != ' ') {
            ptr++;
        }
        if (*ptr == ' ') {
            ptr++;
        }
    }
    // free(pivot_buffer);
    printf("spliter_%d pivot access finished!\n", id);

    // access sorter buffer
    sprintf(slot_name, "sorter_%d", id);
    char *sorter_buffer;
    sorter_buffer = (char *)malloc(bufferSize * sizeof(char));
    if (sorter_buffer == NULL) {
        perror("malloc error");
        return 1;
    }
    memset(sorter_buffer, 0, bufferSize * sizeof(char));
    sorter_buffer[0] = '\0'; // 初始化为空字符串
    stateSize = faasmReadStateSize(slot_name);
    faasmReadState(slot_name, (uint8_t*)sorter_buffer, stateSize);
    // access_buffer(slot_name, strlen(slot_name), sorter_buffer, bufferSize);
    int sorter_array[MAX_ARRAY_LENGTH]; 
    int sorter_index = 0;
    ptr = sorter_buffer;
    while (sscanf(ptr, "%d", &num) == 1) {
        sorter_array[sorter_index] = num;
        sorter_index++;
        // 移动指针到下一个数字
        while (*ptr && *ptr != ' ') {
            ptr++;
        }
        if (*ptr == ' ') {
            ptr++;
        }
    }
    // free(sorter_buffer);
    printf("spliter_%d sorter access finished!\n", id);

    // trans to merger
    int array[merger_num][MAX_ARRAY_LENGTH];
    int index[merger_num];
    memset(index, 0, sizeof(index));
    for (int i = 0; i < sorter_index; i++) {
        int row = 0;
        for (int j = 0; j < pivot_index; j++) {
            if (sorter_array[i] >= pivot_array[j]) {
                row++;
            } else {
                break;
            }
        }
        array[row][index[row]] = sorter_array[i];
        index[row]++;
    }

    for (int i = 0; i < merger_num; i++) {
        char slot_name[20];
        sprintf(slot_name, "merger_%d_%d", id, i);
        char *buffer;
        int bufferSize = 1000;
        // printf("buffersize: %d\n", bufferSize);
        buffer = (char *)malloc(bufferSize * sizeof(char));
        if (buffer == NULL) {
            perror("malloc error");
            return 1;
        }
        memset(buffer, 0, bufferSize * sizeof(char));
        buffer[0] = '\0'; // 初始化为空字符串
        for (int j = 0; j < index[i]; j++) {
            char temp[12]; // 临时缓冲区,注意要足够大以容纳最大整数和一个空格
            snprintf(temp, sizeof(temp), "%d ", array[i][j]); // 将整数转换为字符串,并加上空格
            strcat(buffer, temp); // 追加到 buffer
        }
        // 去掉最后一个多余的空格
        buffer[strlen(buffer) - 1] = '\0';
        faasmWriteState(slot_name, (uint8_t*)buffer, strlen(buffer));
        // buffer_register(slot_name, strlen(slot_name), buffer, bufferSize);
        free(buffer);
    }

    printf("spliter_%d all finished!\n", id);
    return 0;
}

demo/filedescriptor.cpp 中实现merger:

#include ...

#define MAX_ARRAY_LENGTH 100

typedef struct {
    int value;  // 存储的值
    int arrayIndex;  // 数组索引
    int elementIndex; // 元素索引
} HeapNode;

// 最小堆的比较函数
int compare(const void *a, const void *b) {
    return ((HeapNode *)a)->value - ((HeapNode *)b)->value;
}

void heapifyDown(HeapNode *minHeap, int heapSize, int index) {
    int smallest = index;
    int left = 2 * index + 1;
    int right = 2 * index + 2;

    // 比较当前节点与其左子节点
    if (left < heapSize && minHeap[left].value < minHeap[smallest].value) {
        smallest = left;
    }

    // 比较当前节点与其右子节点
    if (right < heapSize && minHeap[right].value < minHeap[smallest].value) {
        smallest = right;
    }

    // 如果最小值不是当前节点,则交换并继续下沉
    if (smallest != index) {
        HeapNode temp = minHeap[index];
        minHeap[index] = minHeap[smallest];
        minHeap[smallest] = temp;

        // 递归调用下沉操作
        heapifyDown(minHeap, heapSize, smallest);
    }
}

int main(int argc, char* argsv[]) {
    auto argv = new int[3];
    faasmGetInput(BYTES(argv), sizeof(int) * 3);
    int id = argv[0];
    int sorter_num = argv[1];
    int merger_num = argv[2];
    printf("merger_%d start!\n", id);
    size_t stateSize;

    // access buffer
    int array[sorter_num][MAX_ARRAY_LENGTH];
    int index[sorter_num];
    memset(index, 0, sizeof(index));
    for (int i = 0; i < sorter_num; i++) {
        char slot_name[20];
        sprintf(slot_name, "merger_%d_%d", i, id);
        char *buffer;
        int bufferSize = 1000;
        buffer = (char *)malloc(bufferSize * sizeof(char));
        if (buffer == NULL) {
            perror("malloc error");
            return 1;
        }
        memset(buffer, 0, bufferSize * sizeof(char));
        buffer[0] = '\0'; // 初始化为空字符串
        stateSize = faasmReadStateSize(slot_name);
        faasmReadState(slot_name, (uint8_t*)buffer, stateSize);
        // access_buffer(slot_name, strlen(slot_name), buffer, bufferSize);
        char *ptr = buffer;
        int num;
        while (sscanf(ptr, "%d", &num) == 1) {
            array[i][index[i]] = num;
            index[i]++;
            // 移动指针到下一个数字
            while (*ptr && *ptr != ' ') {
                ptr++;
            }
            if (*ptr == ' ') {
                ptr++;
            }
        }
        // free(buffer);
    }

    // merge
    int result[MAX_ARRAY_LENGTH];
    int resultIndex = 0;
    HeapNode *minHeap = (HeapNode *)malloc(sorter_num * sizeof(HeapNode));
    int heapSize = 0;
    // 初始化最小堆
    for (int i = 0; i < sorter_num; i++) {
        if (index[i] > 0) {  // 确保数组非空
            minHeap[heapSize].value = array[i][0];
            minHeap[heapSize].arrayIndex = i;
            minHeap[heapSize].elementIndex = 0;
            heapSize++;
        }
    }
    // 构建初始最小堆
    // qsort(minHeap, heapSize, sizeof(HeapNode), compare);
    // 使用下沉操作调整整个堆以确保最小堆性质
    for (int i = (heapSize - 2) / 2; i >= 0; i--) {
        heapifyDown(minHeap, heapSize, i);
    }
    while (heapSize > 0) {
        // 获取最小元素
        HeapNode minNode = minHeap[0];
        result[resultIndex++] = minNode.value;

        // 替换最小值的元素
        if (minNode.elementIndex + 1 < index[minNode.arrayIndex]) {
            minNode.elementIndex++;
            minNode.value = array[minNode.arrayIndex][minNode.elementIndex];
            // 更新堆
            minHeap[0] = minNode;
            // qsort(minHeap, heapSize, sizeof(HeapNode), compare); // 重新构建堆
        } else {
            // 如果该数组没有更多元素,则用最后一个元素替换掉
            minHeap[0] = minHeap[--heapSize];
        }
        // 执行下沉操作以恢复堆的性质
        heapifyDown(minHeap, heapSize, 0);
    }
    // free(minHeap);

    // rigister mergered array
    
    char slot_name[20];
    sprintf(slot_name, "checker_%d", id);
    int bufferSize = 1000;
    char *buffer;
    buffer = (char *)malloc(bufferSize * sizeof(char));
    if (buffer == NULL) {
        perror("malloc error");
        return 1;
    }
    memset(buffer, 0, bufferSize * sizeof(char));
    buffer[0] = '\0'; // 初始化为空字符串
    for (int i = 0; i < resultIndex; i++) {
        char temp[12]; // 临时缓冲区,注意要足够大以容纳最大整数和一个空格
        snprintf(temp, sizeof(temp), "%d ", result[i]); // 将整数转换为字符串,并加上空格
        strcat(buffer, temp); // 追加到 buffer
    }
    // 去掉最后一个多余的空格
    buffer[strlen(buffer) - 1] = '\0';
    faasmWriteState(slot_name, (uint8_t*)buffer, strlen(buffer));
    // buffer_register(slot_name, strlen(slot_name), buffer, bufferSize);
    // free(buffer);

    printf("merger_%d all finished!\n", id);
    return 0;
}

demo/float_perf.cpp 中实现 checker:

#include ...

#define MAX_ARRAY_LENGTH 100

long long get_time() {
    timeval tv{};
    gettimeofday(&tv, nullptr);
    
    long long microseconds = (long long)tv.tv_sec * 1000000 + tv.tv_usec;
    return microseconds;
}

int main(int argc, char* argsv[]) {
    auto argv = new int[3];
    faasmGetInput(BYTES(argv), sizeof(int) * 3);
    int id = argv[0];
    int sorter_num = argv[1];
    int merger_num = argv[2];
    printf("checker_%d start!\n", id);
    int result[MAX_ARRAY_LENGTH];
    size_t stateSize;
    int index = 0;
    for (int i = 0; i < merger_num; i++) {
        char slot_name[20];
        sprintf(slot_name, "checker_%d", i);
        // printf("pivotname: %s\n", slot_name);
        int bufferSize = 1000;
        char *buffer;
        buffer = (char *)malloc(bufferSize * sizeof(char));
        if (buffer == NULL) {
            perror("malloc error");
            return 1;
        }
        memset(buffer, 0, bufferSize * sizeof(char));
        buffer[0] = '\0'; // 初始化为空字符串
        stateSize = faasmReadStateSize(slot_name);
        faasmReadState(slot_name, (uint8_t*)buffer, stateSize);
        // access_buffer(slot_name, strlen(slot_name), buffer, bufferSize);
        char *ptr = buffer;
        int num;
        while (sscanf(ptr, "%d", &num) == 1) {
            result[index] = num;
            index++;
            // 移动指针到下一个数字
            while (*ptr && *ptr != ' ') {
                ptr++;
            }
            if (*ptr == ' ') {
                ptr++;
            }
        }
        // free(buffer);
    }
    printf("result_array: ");
    for (int i = 0; i < index-1; i++) {
        if (result[i] > result[i+1]) {
            printf("sort error!\n");
            return 0;
        }
        printf("%d ", result[i]);
    }
    printf("%d\n", result[index-1]);
    printf("checker_%d all finished!\n", id);
    printf("end time: %lld\n", get_time());
    return 0;
}

为了方便,通过脚本 paral_sort.sh 进行编译运行

#!/bin/bash

set -e

# Compile
inv func.compile demo fcntl
inv func.compile demo fibonacci
inv func.compile demo file
inv func.compile demo filedescriptor
inv func.compile demo float_perf

# Upload
inv func.upload demo fcntl
inv func.upload demo fibonacci
inv func.upload demo file
inv func.upload demo filedescriptor
inv func.upload demo float_perf

# Flush
inv func.flush

# Invoke
inv func.invoke demo fcntl

Python 应用构建

启动 python 应用的容器:

$ faasmctl cli.python

进入后会自动进入容器的 /code/python 目录下,这对应着主机的 ./venv-bm/lib/python3.9/site-packages/faasmctl/.config/faasm-source/0.18.0/clients/python 目录。

对于 Python, Faasm 提供的支持较少,具体体现在 Python 不支持单文件单函数功能且不提供刷新缓存的功能,所以每次更新都需要删除和重新部署整个集群来保证运行的代码是正确的。

命令

具体命令如下:

inv func.uploadpy <func>
inv func.invoke python <func>

接口

python 的接口主要根据 pyfaasm.core 中获取


Last modified on 2024-10-06