Pipeline 3: Add an artificial halo
Contents
Pipeline 3: Add an artificial halo¶
The disadvantage of FIFO in the previous example is the resource consumption. The FIFO requires two microthreads and a scratch buffer.
The simple workaround is to move such FIFO outside the kernel. We add another
halo, which we call an artificial halo, around the kernel (pe_program.csl
).
The west side is west.csl
and east side is east.csl
.
The west.csl
implements a FIFO to receive the data from H2D.
The east.csl
implements a FIFO to receive the data from pe_program.csl
and redirect it to D2H.
There is no more FIFO in pe_program.csl
. Instead, we replace the colors
MEMCPYH2D_DATA_1
by Cin
and MEMCPYD2H_DATA_1
by Cout
.
The color Cin
receives data from the west to the ramp.
The color Cout
sends the data from ramp to the east.
This example has the same property as pipeline-02-fifo
: as long as the
parameter size
does not exceed the capacity of the FIFO in west.csl
,
H2D can always finish so the @add16
can progress.
layout.csl¶
// resources to route the data between the host and the device.
//
// color/ task ID map
//
// ID var ID var ID var ID var
// 0 H2D 9 STARTUP 18 27 reserved (memcpy)
// 1 D2H 10 19 28 reserved (memcpy)
// 2 11 20 29 reserved
// 3 12 21 reserved (memcpy) 30 reserved (memcpy)
// 4 13 22 reserved (memcpy) 31 reserved
// 5 14 23 reserved (memcpy) 32
// 6 Cin 15 24 33
// 7 Cout 16 25 34
// 8 main 17 26 35
//
param size: i16;
param MEMCPYH2D_DATA_1_ID: i16;
param MEMCPYD2H_DATA_1_ID: i16;
const MEMCPYH2D_DATA_1: color = @get_color(MEMCPYH2D_DATA_1_ID);
const MEMCPYD2H_DATA_1: color = @get_color(MEMCPYD2H_DATA_1_ID);
const Cin: color = @get_color(6);
const Cout: color = @get_color(7);
const main: u16 = 8;
const STARTUP: local_task_id = @get_local_task_id(9);
const memcpy = @import_module( "<memcpy/get_params>", .{
.width = 3,
.height = 1,
.MEMCPYH2D_1 = MEMCPYH2D_DATA_1,
.MEMCPYD2H_1 = MEMCPYD2H_DATA_1
});
layout {
@set_rectangle(3, 1);
// west.csl has a H2D
@set_tile_code(0, 0, "memcpyEdge/west.csl", .{
.USER_IN_1 = Cin,
.STARTUP = STARTUP,
.memcpy_params = memcpy.get_params(0)
});
@set_tile_code(1, 0, "pe_program.csl", .{
.size = size,
.main = main,
.Cin = Cin,
.Cout = Cout,
.memcpy_params = memcpy.get_params(1)
});
// east.csl only hase a D2H
@set_tile_code(2, 0, "memcpyEdge/east.csl", .{
.USER_OUT_1 = Cout,
.STARTUP = STARTUP,
.memcpy_params = memcpy.get_params(2)
});
}
pe_program.csl¶
// Not a complete program; the top-level source file is code.csl.
param size: i16;
param main: u16;
param Cin: color;
param Cout: color;
param memcpy_params: comptime_struct;
const main_task_id: local_task_id = @get_local_task_id(main);
const sys_mod = @import_module( "<memcpy/memcpy>", memcpy_params);
const inDsd = @get_dsd(fabin_dsd, .{
.extent = size,
.fabric_color = Cin,
.input_queue = @get_input_queue(1),
});
const outDsd = @get_dsd(fabout_dsd, .{
.extent = size,
.fabric_color = Cout,
.output_queue = @get_output_queue(1)
});
var buf = @zeros([1]i16);
const one_dsd = @get_dsd(mem1d_dsd, .{ .tensor_access = |i|{size} -> buf[0] });
task mainTask() void {
buf[0] = @as(i16, 1);
@add16(outDsd, inDsd, one_dsd, .{.async=true});
}
comptime {
// activate local task mainTask at startup
@bind_local_task(mainTask, main_task_id);
@activate(main_task_id);
const input_route = .{ .rx = .{ WEST }, .tx = .{ RAMP } };
@set_local_color_config(Cin, .{ .routes = input_route });
const output_route = .{ .rx = .{ RAMP }, .tx = .{ EAST } };
@set_local_color_config(Cout, .{ .routes = output_route });
}
run.py¶
#!/usr/bin/env cs_python
import argparse
import json
import numpy as np
from cerebras.sdk.sdk_utils import memcpy_view, input_array_to_u32
from cerebras.sdk.runtime.sdkruntimepybind import SdkRuntime, MemcpyDataType # pylint: disable=no-name-in-module
from cerebras.sdk.runtime.sdkruntimepybind import MemcpyOrder # pylint: disable=no-name-in-module
parser = argparse.ArgumentParser()
parser.add_argument('--name', help='the test name')
parser.add_argument("--cmaddr", help="IP:port for CS system")
args = parser.parse_args()
dirname = args.name
# Parse the compile metadata
with open(f"{dirname}/out.json", encoding="utf-8") as json_file:
compile_data = json.load(json_file)
params = compile_data["params"]
MEMCPYH2D_DATA_1 = int(params["MEMCPYH2D_DATA_1_ID"])
MEMCPYD2H_DATA_1 = int(params["MEMCPYD2H_DATA_1_ID"])
# Size of the input and output tensors; use this value when compiling the
# program, e.g. `cslc --params=size:12 --fabric-dims=8,3 --fabric-offsets=4,1`
size = int(params["size"])
print(f"MEMCPYH2D_DATA_1 = {MEMCPYH2D_DATA_1}")
print(f"MEMCPYD2H_DATA_1 = {MEMCPYD2H_DATA_1}")
print(f"size = {size}")
memcpy_dtype = MemcpyDataType.MEMCPY_16BIT
runner = SdkRuntime(dirname, cmaddr=args.cmaddr)
runner.load()
runner.run()
# Generate a random input tensor of the desired size
input_tensor = np.random.randint(256, size=size, dtype=np.int16)
print("step 1: streaming H2D to P0.0")
# "input_tensor" is a 1d array
# The type of input_tensor is int16, we need to extend it to uint32
# There are two kind of extension when using the utility function input_array_to_u32
# input_array_to_u32(np_arr: np.ndarray, sentinel: Optional[int], fast_dim_sz: int)
# 1) zero extension:
# sentinel = None
# 2) upper 16-bit is the index of the array:
# sentinel is Not None
#
# In this example, the upper 16-bit is don't care because pe_program.csl only uses
# @add16 to reads lower 16-bit
tensors_u32 = input_array_to_u32(input_tensor, 1, size)
runner.memcpy_h2d(MEMCPYH2D_DATA_1, tensors_u32, 0, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=True)
print("step 2: streaming D2H at P2.0")
# The D2H buffer must be of type u32
out_tensors_u32 = np.zeros(size, np.uint32)
runner.memcpy_d2h(out_tensors_u32, MEMCPYD2H_DATA_1, 2, 0, 1, 1, size, \
streaming=True, data_type=memcpy_dtype, order=MemcpyOrder.COL_MAJOR, nonblock=False)
# remove upper 16-bit of each u32
result_tensor = memcpy_view(out_tensors_u32, np.dtype(np.int16))
runner.stop()
np.testing.assert_equal(result_tensor, input_tensor + 1)
print("SUCCESS!")
memcpyEdge/memcpy_edge.csl¶
// This is a template of memcpy over the edges.
// memcpy_edge.csl can be "north", "south", "west" or "east"
// of the following layout.
// +---------+
// | north |
// +------+---------+------+
// | west | core | east |
// +------+---------+------+
// | south |
// +---------+
// north.csl, south.csl, west.csl and east.csl instantiate
// memcpy_edge.csl with a proper direction.
//
// memcpy_edge.csl supports 2 streaming H2Ds and one
// streaming D2H. Such constraint depends on the design.
// The current implementation binds a FIFO for a H2D or D2H,
// so we can only support 3 in total.
// We choose 2 H2Ds and 1 D2H.
// if we replace FIFO by WTT, we could support more.
//
// However the user can instantiate memcpy_edge.csl for each
// edge. The maximum number of H2Ds is 2*4 = 8 and maximum
// number of D2Hs is 1*4 = 4.
//
// If the user only has a H2D at north, for example, he only
// needs to configure color USER_IN_1, i.e. only a single
// streaming H2D is used.
//
// For example,
// @set_tile_code(pe_x, 0, "north.csl", .{
// .USER_IN_1 = mainColor,
// .STARTUP = STARTUP,
// .memcpy_params = memcpy_params,
// .MEMCPYH2D_DATA_1 = MEMCPYH2D_DATA_1,
// .MEMCPYD2H_DATA_1 = MEMCPYD2H_DATA_1
// });
// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);
// receive data from the "core"
param USER_OUT_1: color = @get_color(32);
// entrypoint
param STARTUP: local_task_id;
// ----------
// Every PE needs to import memcpy module otherwise the I/O cannot
// propagate the data to the destination.
param memcpy_params: comptime_struct;
// The direction of "core", for example
// north.csl has dir = SOUTH
// south.csl has dir = NORTH
// west.csl has dir = EAST
// east.csl has dir = WEST
param dir: direction;
// memcpy module reserves input queue 0 and output queue 0
const sys_mod = @import_module( "<memcpy/memcpy>", memcpy_params);
// ----------
const h2d_mod = @import_module("h2d.csl", .{
.USER_IN_1 = USER_IN_1,
.USER_IN_2 = USER_IN_2,
.MEMCPYH2D_1 = memcpy_params.MEMCPYH2D_1,
.MEMCPYH2D_2 = memcpy_params.MEMCPYH2D_2,
.txdir = dir
});
const d2h_mod = @import_module("d2h.csl", .{
.USER_OUT_1 = USER_OUT_1,
.MEMCPYD2H_1 = memcpy_params.MEMCPYD2H_1,
.rxdir = dir
});
task f_startup() void {
h2d_mod.f_startup();
d2h_mod.f_startup();
}
comptime {
@bind_local_task(f_startup, STARTUP);
@activate(STARTUP);
}
memcpyEdge/h2d.csl¶
// Two streaming H2Ds:
// 1st H2D: UT 1 and UT 2
// 2nd H2D: UT 3 and UT 4
param MEMCPYH2D_1: color = @get_color(32);
param MEMCPYH2D_2: color = @get_color(32);
// Color along which we send a wavelet to pe_program
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);
param txdir: direction;
const max_fifo_len = 256*20; // maximum length of the fifo
var fifo1_buffer = @zeros([max_fifo_len]u32);
const fifo1 = @allocate_fifo(fifo1_buffer);
var fifo2_buffer = @zeros([max_fifo_len]u32);
const fifo2 = @allocate_fifo(fifo2_buffer);
// length=inf
var fab_recv_wdsd_1 = @get_dsd(fabin_dsd, .{
.extent = 0x7fff,
.fabric_color = MEMCPYH2D_1,
.input_queue = @get_input_queue(1)
});
// length=inf
var fab_trans_wdsd_1 = @get_dsd(fabout_dsd, .{
.extent = 0x7fff,
.fabric_color = USER_IN_1,
.output_queue = @get_output_queue(2)
});
// length=inf
var fab_recv_wdsd_2 = @get_dsd(fabin_dsd, .{
.extent = 0x7fff,
.fabric_color = MEMCPYH2D_2,
.input_queue = @get_input_queue(3)
});
// length=inf
var fab_trans_wdsd_2 = @get_dsd(fabout_dsd, .{
.extent = 0x7fff,
.fabric_color = USER_IN_2,
.output_queue = @get_output_queue(4)
});
// if no user's color is defined, f_startup() is empty
fn f_startup() void {
if ( (@get_int(MEMCPYH2D_1) < 24) and (@get_int(USER_IN_1) < 24) ){
// receive data from streaming H2D
@mov32(fifo1, fab_recv_wdsd_1, .{.async=true} );
// forward data to USER_IN_1
@mov32(fab_trans_wdsd_1, fifo1, .{.async=true} );
}
if ( (@get_int(MEMCPYH2D_2) < 24) and (@get_int(USER_IN_2) < 24) ){
// receive data from streaming H2D
@mov32(fifo2, fab_recv_wdsd_2, .{.async=true} );
// forward data to USER_IN_1
@mov32(fab_trans_wdsd_2, fifo2, .{.async=true} );
}
}
comptime {
if (@get_int(USER_IN_1) < 24){
const h2d_route = .{ .rx = .{ RAMP }, .tx = .{ txdir } };
@set_local_color_config(USER_IN_1, .{ .routes = h2d_route });
}
if (@get_int(USER_IN_2) < 24){
const h2d_route = .{ .rx = .{ RAMP }, .tx = .{ txdir } };
@set_local_color_config(USER_IN_2, .{ .routes = h2d_route });
}
}
memcpyEdge/d2h.csl¶
// One streaming D2H:
// 1st D2H: UT 5 and UT 6
param MEMCPYD2H_1: color = @get_color(32);
// Color along which we expect a wavelet
param USER_OUT_1: color = @get_color(32);
param rxdir: direction;
const max_fifo_len = 256*40; // maximum length of the fifo
var fifo1_buffer = @zeros([max_fifo_len]u32);
const fifo1 = @allocate_fifo(fifo1_buffer);
// length=inf
var fab_recv_wdsd = @get_dsd(fabin_dsd, .{
.extent = 0x7fff,
.fabric_color = USER_OUT_1,
.input_queue = @get_input_queue(6)
});
// length=inf
var fab_trans_wdsd = @get_dsd(fabout_dsd, .{
.extent = 0x7fff,
.fabric_color = MEMCPYD2H_1,
.output_queue = @get_output_queue(5)
});
// if USER_OUT_1 is not valid, f_startup() is empty
fn f_startup() void {
if ( (@get_int(MEMCPYD2H_1) < 24) and (@get_int(USER_OUT_1) < 24) ){
// receive data from USER_OUT_1
@mov32(fifo1, fab_recv_wdsd, .{.async=true} );
// forward data to MEMCPYD2H_1
@mov32(fab_trans_wdsd, fifo1, .{.async=true} );
}
}
comptime {
if (@get_int(USER_OUT_1) < 24){
const d2h_route = .{ .rx = .{ rxdir }, .tx = .{ RAMP } };
@set_local_color_config(USER_OUT_1, .{ .routes = d2h_route });
}
}
memcpyEdge/east.csl¶
// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);
// receive data from the "core"
param USER_OUT_1: color = @get_color(32);
// entrypoint
param STARTUP: local_task_id;
param memcpy_params: comptime_struct;
const edge_mod = @import_module( "memcpy_edge.csl", .{
.memcpy_params = memcpy_params,
.USER_IN_1 = USER_IN_1,
.USER_IN_2 = USER_IN_2,
.USER_OUT_1 = USER_OUT_1,
.STARTUP = STARTUP,
.dir = WEST
});
memcpyEdge/west.csl¶
// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);
// receive data from the "core"
param USER_OUT_1: color = @get_color(32);
// entrypoint
param STARTUP: local_task_id;
param memcpy_params: comptime_struct;
const edge_mod = @import_module( "memcpy_edge.csl", .{
.memcpy_params = memcpy_params,
.USER_IN_1 = USER_IN_1,
.USER_IN_2 = USER_IN_2,
.USER_OUT_1 = USER_OUT_1,
.STARTUP = STARTUP,
.dir = EAST
});
memcpyEdge/north.csl¶
// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);
// receive data from the "core"
param USER_OUT_1: color = @get_color(32);
// entrypoint
param STARTUP: local_task_id;
param memcpy_params: comptime_struct;
const edge_mod = @import_module( "memcpy_edge.csl", .{
.memcpy_params = memcpy_params,
.USER_IN_1 = USER_IN_1,
.USER_IN_2 = USER_IN_2,
.USER_OUT_1 = USER_OUT_1,
.STARTUP = STARTUP,
.dir = SOUTH
});
memcpyEdge/south.csl¶
// send data to the "core"
param USER_IN_1: color = @get_color(32);
param USER_IN_2: color = @get_color(32);
// receive data from the "core"
param USER_OUT_1: color = @get_color(32);
// entrypoint
param STARTUP: local_task_id;
param memcpy_params: comptime_struct;
const edge_mod = @import_module( "memcpy_edge.csl", .{
.memcpy_params = memcpy_params,
.USER_IN_1 = USER_IN_1,
.USER_IN_2 = USER_IN_2,
.USER_OUT_1 = USER_OUT_1,
.STARTUP = STARTUP,
.dir = NORTH
});
commands.sh¶
#!/usr/bin/env bash
set -e
cslc ./layout.csl --fabric-dims=10,3 \
--fabric-offsets=4,1 --params=size:32 -o out \
--params=MEMCPYH2D_DATA_1_ID:0 \
--params=MEMCPYD2H_DATA_1_ID:1 \
--memcpy --channels=1 --width-west-buf=0 --width-east-buf=0
cs_python run.py --name out