Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .github/workflows/ls-smoke-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: LocalStack Smoke Tests

on:
push:
branches: [localstack]
pull_request:
branches: [localstack]

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
smoke-ls-mock:
name: RIE ↔ LocalStack API Smoke Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6

- name: Set up Go
uses: actions/setup-go@v6
with:
go-version-file: go.mod

- name: Run smoke test
run: make -C cmd/ls-mock smoke-test
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/pkg
/build
/bin
aws-lambda-rie
ls-mock
*.swp
*.iml
tags
Expand Down
5 changes: 3 additions & 2 deletions README-LOCALSTACK.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ Refer to [debugging/README.md](./debugging/README.md) for instructions on how to
| `cmd/localstack` | LocalStack customizations |
| ├── `main.go` | Main entrypoint |
| ├── `custom_interop.go` | Custom server interface between the Lambda runtime API and this Go init. Implements the `Server` interface from `lambda/interop/model.go:Server` but forwards most calls to the original implementation in `lambda/rapidcore/server.go` available as `delegate`. |
| `cmd/ls-api` | Mock LocalStack component for testing (likely outdated) |
| `cmd/ls-mock` | Mock LocalStack component for smoke testing |
| ├── [`README.md`](./cmd/ls-mock/README.md) | Instructions for LS API<->RIE smoke testing |
| `debugging/` | Debug and test this Go init with LocalStack |
| ├── [`README.md`](./debugging/README.md) | Instructions for building and debugging with LocalStack |
| `lambda` | Original AWS implementation of the runtime emulator ideally kept untouched |
Expand All @@ -42,6 +43,6 @@ Example PR that integrates upstream changes: https://github.com/localstack/lambd

Document all custom changes with the following comment prefix `# LOCALSTACK CHANGES yyyy-mm-dd:`

* Everything in `cmd/localstack`, `cmd/ls-api`, and `.github`
* Everything in `cmd/localstack`, `cmd/ls-mock`, and `.github`
* `Makefile` for debugging and building with Docker
* 2023-10-17: `lambda/rapidcore/server.go` pass request metadata into .Reserve(invoke.ID, invoke.TraceID, invoke.LambdaSegmentID)
72 changes: 34 additions & 38 deletions cmd/localstack/custom_interop.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/interop"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lambda/rapidcore/standalone"
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lsapi"
"github.com/go-chi/chi/v5"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -50,20 +51,35 @@ func (l *LocalStackAdapter) SendStatus(status LocalStackStatus, payload []byte)
return nil
}

// The InvokeRequest is sent by LocalStack to trigger an invocation
type InvokeRequest struct {
InvokeId string `json:"invoke-id"`
InvokedFunctionArn string `json:"invoked-function-arn"`
Payload string `json:"payload"`
TraceId string `json:"trace-id"`
// SendLogs posts the captured invocation logs to LocalStack.
func (l *LocalStackAdapter) SendLogs(invokeId string, logs lsapi.LogResponse) error {
serialized, err := json.Marshal(logs)
if err != nil {
return err
}
_, err = http.Post(l.UpstreamEndpoint+"/invocations/"+invokeId+"/logs", "application/json", bytes.NewReader(serialized))
return err
}

// The ErrorResponse is sent TO LocalStack when encountering an error
type ErrorResponse struct {
ErrorMessage string `json:"errorMessage"`
ErrorType string `json:"errorType,omitempty"`
RequestId string `json:"requestId,omitempty"`
StackTrace []string `json:"stackTrace,omitempty"`
// SendResult posts the invocation result body to LocalStack.
// If isError is false, the body is also inspected for an "errorType" field — its
// presence indicates a Lambda function error and routes the result to /error.
func (l *LocalStackAdapter) SendResult(invokeId string, body []byte, isError bool) error {
if !isError {
var fields map[string]any
if json.Unmarshal(body, &fields) == nil {
_, isError = fields["errorType"]
}
}
endpoint := "/invocations/" + invokeId + "/response"
if isError {
log.Infoln("Sending to /error")
endpoint = "/invocations/" + invokeId + "/error"
} else {
log.Infoln("Sending to /response")
}
_, err := http.Post(l.UpstreamEndpoint+endpoint, "application/json", bytes.NewReader(body))
return err
}

func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollector *LogCollector) (server *CustomInteropServer) {
Expand All @@ -81,7 +97,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
go func() {
r := chi.NewRouter()
r.Post("/invoke", func(w http.ResponseWriter, r *http.Request) {
invokeR := InvokeRequest{}
invokeR := lsapi.InvokeRequest{}
bytess, err := io.ReadAll(r.Body)
if err != nil {
log.Error(err)
Expand Down Expand Up @@ -123,7 +139,7 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
case errors.Is(err, rapidcore.ErrInvokeTimeout):
log.Debugf("Got invoke timeout")
isErr = true
errorResponse := ErrorResponse{
errorResponse := lsapi.ErrorResponse{
ErrorMessage: fmt.Sprintf(
"%s %s Task timed out after %d.00 seconds",
time.Now().Format("2006-01-02T15:04:05Z"),
Expand Down Expand Up @@ -157,31 +173,11 @@ func NewCustomInteropServer(lsOpts *LsOpts, delegate interop.Server, logCollecto
memorySize := GetEnvOrDie("AWS_LAMBDA_FUNCTION_MEMORY_SIZE")
PrintEndReports(invokeR.InvokeId, "", memorySize, invokeStart, timeoutDuration, logCollector)

serializedLogs, err2 := json.Marshal(logCollector.getLogs())
if err2 == nil {
_, err2 = http.Post(server.upstreamEndpoint+"/invocations/"+invokeR.InvokeId+"/logs", "application/json", bytes.NewReader(serializedLogs))
// TODO: handle err
}

var errR map[string]any
marshalErr := json.Unmarshal(invokeResp.Body, &errR)

if !isErr && marshalErr == nil {
_, isErr = errR["errorType"]
if err2 := server.localStackAdapter.SendLogs(invokeR.InvokeId, logCollector.getLogs()); err2 != nil {
log.Error("failed to send logs to LocalStack: ", err2)
}

if isErr {
log.Infoln("Sending to /error")
_, err = http.Post(server.upstreamEndpoint+"/invocations/"+invokeR.InvokeId+"/error", "application/json", bytes.NewReader(invokeResp.Body))
if err != nil {
log.Error(err)
}
} else {
log.Infoln("Sending to /response")
_, err = http.Post(server.upstreamEndpoint+"/invocations/"+invokeR.InvokeId+"/response", "application/json", bytes.NewReader(invokeResp.Body))
if err != nil {
log.Error(err)
}
if err2 := server.localStackAdapter.SendResult(invokeR.InvokeId, invokeResp.Body, isErr); err2 != nil {
log.Error("failed to send result to LocalStack: ", err2)
}
}()

Expand Down
150 changes: 150 additions & 0 deletions cmd/localstack/custom_interop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lsapi"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// --- JSON contract tests ---

// TestInvokeRequestContract verifies that InvokeRequest correctly maps the JSON field names
// that LocalStack sends to the RIE's /invoke endpoint (defined in
// localstack-pro/localstack-core/localstack/services/lambda_/invocation/execution_environment.py).
//
// WARNING: The LocalStack↔RIE API contract is currently unversioned. Any change to these
// field names is a silent breaking change that requires a coordinated update of both
// localstack-pro and lambda-runtime-init with no safe rollback path.
func TestInvokeRequestContract(t *testing.T) {
raw := `{
"invoke-id": "abc-123",
"invoked-function-arn": "arn:aws:lambda:us-east-1:000000000000:function:my-fn",
"payload": "{\"key\":\"value\"}",
"trace-id": "Root=1-abc;Parent=def;Sampled=1"
}`

var req lsapi.InvokeRequest
require.NoError(t, json.Unmarshal([]byte(raw), &req))

assert.Equal(t, "abc-123", req.InvokeId)
assert.Equal(t, "arn:aws:lambda:us-east-1:000000000000:function:my-fn", req.InvokedFunctionArn)
assert.Equal(t, `{"key":"value"}`, req.Payload)
assert.Equal(t, "Root=1-abc;Parent=def;Sampled=1", req.TraceId)
}

// TestLogResponseContract verifies that LogResponse uses the "logs" JSON key expected by
// LocalStack's invocation_logs handler (executor_endpoint.py).
func TestLogResponseContract(t *testing.T) {
raw := `{"logs":"START RequestId: abc\nEND RequestId: abc\n"}`

var lr lsapi.LogResponse
require.NoError(t, json.Unmarshal([]byte(raw), &lr))

assert.Equal(t, "START RequestId: abc\nEND RequestId: abc\n", lr.Logs)
}

// --- LocalStackAdapter.SendStatus tests ---

func TestSendStatus_ReadySendsToCorrectPath(t *testing.T) {
var capturedReq *http.Request
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedReq = r
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

adapter := &LocalStackAdapter{UpstreamEndpoint: srv.URL, RuntimeId: "runtime-abc"}
require.NoError(t, adapter.SendStatus(Ready, []byte{}))

assert.Equal(t, http.MethodPost, capturedReq.Method)
assert.Equal(t, "/status/runtime-abc/ready", capturedReq.URL.Path)
}

func TestSendStatus_ErrorSendsToCorrectPath(t *testing.T) {
var capturedReq *http.Request
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedReq = r
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

adapter := &LocalStackAdapter{UpstreamEndpoint: srv.URL, RuntimeId: "runtime-abc"}
require.NoError(t, adapter.SendStatus(Error, []byte(`{"errorMessage":"init failed"}`)))

assert.Equal(t, http.MethodPost, capturedReq.Method)
assert.Equal(t, "/status/runtime-abc/error", capturedReq.URL.Path)
}

// --- LocalStackAdapter.SendLogs tests ---

func TestSendLogs_SendsJSONWithLogsKey(t *testing.T) {
var capturedPath string
var capturedBody lsapi.LogResponse
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedPath = r.URL.Path
body, _ := io.ReadAll(r.Body)
_ = json.Unmarshal(body, &capturedBody)
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

adapter := &LocalStackAdapter{UpstreamEndpoint: srv.URL}
logs := lsapi.LogResponse{Logs: "START RequestId: invoke-1\nEND RequestId: invoke-1\n"}
require.NoError(t, adapter.SendLogs("invoke-1", logs))

assert.Equal(t, "/invocations/invoke-1/logs", capturedPath)
assert.Equal(t, logs.Logs, capturedBody.Logs)
}

// --- LocalStackAdapter.SendResult routing tests ---

func TestSendResult_SuccessGoesToResponseEndpoint(t *testing.T) {
var capturedPath string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedPath = r.URL.Path
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

adapter := &LocalStackAdapter{UpstreamEndpoint: srv.URL}
require.NoError(t, adapter.SendResult("invoke-1", []byte(`{"result":"ok"}`), false))

assert.Equal(t, "/invocations/invoke-1/response", capturedPath)
}

func TestSendResult_ErrorBodyGoesToErrorEndpoint(t *testing.T) {
var capturedPath string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedPath = r.URL.Path
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

// Body contains "errorType" — LocalStack distinguishes function errors this way
adapter := &LocalStackAdapter{UpstreamEndpoint: srv.URL}
errBody := []byte(`{"errorMessage":"something went wrong","errorType":"RuntimeError"}`)
require.NoError(t, adapter.SendResult("invoke-1", errBody, false))

assert.Equal(t, "/invocations/invoke-1/error", capturedPath)
}

func TestSendResult_ExplicitErrorFlagGoesToErrorEndpoint(t *testing.T) {
var capturedPath string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedPath = r.URL.Path
w.WriteHeader(http.StatusAccepted)
}))
defer srv.Close()

// isError=true covers cases like timeout where the RIE itself constructs the error body
adapter := &LocalStackAdapter{UpstreamEndpoint: srv.URL}
require.NoError(t, adapter.SendResult("invoke-1", []byte(`{"errorMessage":"Task timed out"}`), true))

assert.Equal(t, "/invocations/invoke-1/error", capturedPath)
}
10 changes: 4 additions & 6 deletions cmd/localstack/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package main
import (
"strings"
"sync"
)

type LogResponse struct {
Logs string `json:"logs"`
}
"github.com/aws/aws-lambda-runtime-interface-emulator/internal/lsapi"
)

type LogCollector struct {
mutex *sync.Mutex
Expand Down Expand Up @@ -37,10 +35,10 @@ func (lc *LogCollector) reset() {
lc.RuntimeLogs = []string{}
}

func (lc *LogCollector) getLogs() LogResponse {
func (lc *LogCollector) getLogs() lsapi.LogResponse {
lc.mutex.Lock()
defer lc.mutex.Unlock()
response := LogResponse{
response := lsapi.LogResponse{
Logs: strings.Join(lc.RuntimeLogs, ""),
}
lc.RuntimeLogs = []string{}
Expand Down
52 changes: 52 additions & 0 deletions cmd/ls-mock/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
THIS_MAKEFILE_DIR := $(abspath $(dir $(lastword $(MAKEFILE_LIST))))
REPO_ROOT := $(abspath $(THIS_MAKEFILE_DIR)/../..)

ARCH ?= x86_64
MOCK_PORT := 48490
INTEROP_PORT := 9563
RIE_BINARY := $(REPO_ROOT)/bin/aws-lambda-rie-$(ARCH)
LS_MOCK_BIN := $(REPO_ROOT)/bin/ls-mock

# Common docker flags for start-rie and start-rie-detached.
# Uses deferred assignment (=) so $$LATEST is not expanded until recipe time,
# where Make reduces $$ -> $ before passing the line to the shell.
RIE_DOCKER_OPTS = \
--platform linux/amd64 \
--add-host=host.docker.internal:host-gateway \
-p $(INTEROP_PORT):$(INTEROP_PORT) \
-v $(RIE_BINARY):/var/rapid/init:ro \
-v $(THIS_MAKEFILE_DIR)/handler.py:/var/task/handler.py:ro \
-e LOCALSTACK_RUNTIME_ENDPOINT=http://host.docker.internal:$(MOCK_PORT) \
-e LOCALSTACK_RUNTIME_ID=test-runtime-id \
-e AWS_LAMBDA_FUNCTION_TIMEOUT=30 \
-e AWS_LAMBDA_FUNCTION_VERSION='$$LATEST' \
-e AWS_LAMBDA_FUNCTION_MEMORY_SIZE=128 \
-e AWS_REGION=us-east-1 \
-e _HANDLER=handler.handler \
--entrypoint /var/rapid/init

.PHONY: build-rie build-ls-mock start-mock start-rie start-rie-detached success fail smoke-test

build-rie: ## Build the RIE Linux binary via Go cross-compilation (works on macOS)
$(MAKE) -C $(REPO_ROOT) ARCH=$(ARCH) compile-lambda-linux

build-ls-mock: ## Build the ls-mock binary
go build -o $(LS_MOCK_BIN) $(THIS_MAKEFILE_DIR)

start-mock: ## Run the ls-mock LocalStack endpoint mock natively (no Docker needed)
go run $(THIS_MAKEFILE_DIR)

start-rie: build-rie ## Build and run the RIE inside a Docker Python Lambda container
docker run --rm $(RIE_DOCKER_OPTS) public.ecr.aws/lambda/python:3.12

start-rie-detached: ## Start the RIE in detached mode; prints container ID (binaries must be pre-built)
@docker run --detach $(RIE_DOCKER_OPTS) public.ecr.aws/lambda/python:3.12

success: ## Trigger a successful invocation via the mock's /success endpoint
curl -sf http://localhost:$(MOCK_PORT)/success

fail: ## Trigger an error invocation via the mock's /fail endpoint
curl -sf http://localhost:$(MOCK_PORT)/fail

smoke-test: build-rie build-ls-mock ## Full e2e smoke test: start mock + RIE, verify success + error invocations, cleanup
$(THIS_MAKEFILE_DIR)/smoke-test.sh
Loading