Compare commits

..

12 Commits

Author SHA1 Message Date
vikrantgupta25
700249fae7 chore(test): test release 2025-06-18 13:10:00 +05:30
Shaheer Kochai
bed3dbc698 chore: funnel run and save flow changes (#8231)
* feat: while the funnel steps are invalid, handle auto save in local storage

* chore: handle lightmode style in 'add span to funnel' modal

* fix: don't save incomplete steps state in local storage if last saved configuration has valid steps

* chore: close the 'Add span to funnel' modal on clicking save or discard

* chore: deprecate the run funnel flow for unexecuted funnel

* feat: change the funnel configuration save logic, and deprecate auto save

* refactor: send all steps in the payload of analytics/overview

* refactor: send all steps in the payload of analytics/steps (graph API)

* chore: send all steps in the payload of analytics/steps/overview API

* chore: send funnel steps with slow and error traces + deprecate the refetch on latency type change

* chore: overall improvements

* chore: change the save funnel icon + increase the width of funnel steps

* fix: make the changes w.r.t. the updated funnel steps validation API + bugfixes

* fix: remove funnelId from funnel results APIs

* fix: handle edge case i.e. refetch funnel results on deleting a funnel step

* chore: remove funnel steps configuration cache on removing funnel

* chore: don't refetch the results on changing the latency type

* fix: fix the edge cases of save funnel button being enabled even after saving the funnel steps

* chore: remove the span count column from top traces tables

* fix: fix the failing CI check by removing unnecessary props / fixing the types
2025-06-18 06:08:41 +00:00
Amlan Kumar Nandy
66affb0ece chore: add unit tests for hosts list in infra monitoring (#8230) 2025-06-18 05:53:42 +00:00
Vibhu Pandey
75f62372ae feat(analytics): move frontend event to group_id (#8279)
* chore(api_key): add api key analytics

* feat(analytics): move frontend events

* feat(analytics): add collect config

* feat(analytics): add collect config

* feat(analytics): fix traits

* feat(analytics): fix traits

* feat(analytics): fix traits

* feat(analytics): fix traits

* feat(analytics): fix traits

* feat(analytics): fix factor api key

* fix(analytics): fix org stats

* fix(analytics): fix org stats
2025-06-18 01:54:55 +05:30
Sahil Khan
a3ac307b4e fix: sentry issues SIGNOZ-UI-Q9 SIGNOZ-UI-QA (#8281) 2025-06-17 23:44:21 +05:30
Vikrant Gupta
7672d2f636 chore(user): return user resource on register user request (#8271) 2025-06-17 17:26:06 +05:30
aniketio-ctrl
e3018d9529 fix(8232): added fix for error graph in services tab (#8263) 2025-06-17 08:08:38 +00:00
Nityananda Gohain
385ee268e3 fix: use first org in agent migration (#8269)
* fix: exit gracefull if there are more than one org

* fix: use first org
2025-06-17 06:25:12 +00:00
Piyush Singariya
01036a8a2f fix: top level keys EXIST and NOTEXIST filter simulation (#8255)
* fix: top level keys EXIST and NOTEXIST filter simulation

* test: fix tests

* test: temporarily change collector version

* test: updating go.mod

* fix: tests

* chore: revert changes

* chore: update collector's reference to stable version
2025-06-17 11:28:40 +05:30
Srikanth Chekuri
1542b9d6e9 chore: disallow unknown fields and address gaps (#8237) 2025-06-16 23:11:28 +05:30
Nityananda Gohain
8455349459 fix: support orgId and postgres in agents (#7327)
* fix: initial commit for agents

* fix: remove frontend package manger commit

* fix: use sqlstore

* fix: opamp server changes

* fix: tests

* fix: tests

* fix: minor changes

* fix: migrations

* fix: use uuid7

* fix: use default orgID for single tenant

* fix: pipelines tests fixed

* fix: use correct agentId

* fix: use orgID in coordinator

* fix: fix tests

* fix: remove redundant hash check

* fix: migration

* fix: migration

* fix: address comments

* fix: rename migration file

* fix: remove unwanted orgid code

* fix: use orggetter

* fix: comment

* fix: schema cleanup

* fix: minor changes

* chore: addresses changes

* fix: add back agentID as it used ulid

* fix: keep only 50 agents for an orgId

* chore: explicitly specify text type

* chore: use valuer.uuid for orgid

* fix: linting complain

* chore: final fixes

* chore: minor changes

* fix: add not null

* fix: fe tests

---------

Co-authored-by: Vikrant Gupta <vikrant@signoz.io>
2025-06-16 20:07:16 +05:30
aniketio-ctrl
c488a24d09 fix(prom-aggr): fix prom aggregation queries using utf-8 charset (#8262)
* fix(prom-aggr): added fix for prom aggregation

* fix(prom-aggr): added fix for prom aggregation
2025-06-16 19:42:17 +05:30
132 changed files with 5419 additions and 1442 deletions

View File

@@ -5,6 +5,8 @@
<br>SigNoz
</h1>
ok
<p align="center">All your logs, metrics, and traces in one place. Monitor your application, spot issues before they occur and troubleshoot downtime quickly with rich context. SigNoz is a cost-effective open-source alternative to Datadog and New Relic. Visit <a href="https://signoz.io" target="_blank">signoz.io</a> for the full documentation, tutorials, and guide.</p>
<p align="center">

View File

@@ -224,3 +224,6 @@ statsreporter:
enabled: true
# The interval at which the stats are collected.
interval: 6h
collect:
# Whether to collect identities and traits (emails).
identities: true

View File

@@ -122,10 +122,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
}
// initiate opamp
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil {
return nil, err
}
opAmpModel.InitDB(serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.Instrumentation.Logger(), serverOptions.SigNoz.Modules.OrgGetter)
integrationsController, err := integrations.NewController(serverOptions.SigNoz.SQLStore)
if err != nil {
@@ -143,7 +140,8 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// ingestion pipelines manager
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
serverOptions.SigNoz.SQLStore, integrationsController.GetPipelinesForInstalledIntegrations,
serverOptions.SigNoz.SQLStore,
integrationsController.GetPipelinesForInstalledIntegrations,
)
if err != nil {
return nil, err
@@ -151,7 +149,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
// initiate agent config handler
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: serverOptions.SigNoz.SQLStore.SQLxDB(),
Store: serverOptions.SigNoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{logParsingPipelineController},
})
if err != nil {
@@ -227,6 +225,17 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
&opAmpModel.AllAgents, agentConfMgr,
)
orgs, err := apiHandler.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background())
if err != nil {
return nil, err
}
for _, org := range orgs {
errorList := reader.PreloadMetricsMetadata(context.Background(), org.ID)
for _, er := range errorList {
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
}
}
return s, nil
}

View File

@@ -17,19 +17,21 @@ var (
)
var (
Org = "org"
User = "user"
UserNoCascade = "user_no_cascade"
FactorPassword = "factor_password"
CloudIntegration = "cloud_integration"
Org = "org"
User = "user"
UserNoCascade = "user_no_cascade"
FactorPassword = "factor_password"
CloudIntegration = "cloud_integration"
AgentConfigVersion = "agent_config_version"
)
var (
OrgReference = `("org_id") REFERENCES "organizations" ("id")`
UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE`
UserReferenceNoCascade = `("user_id") REFERENCES "users" ("id")`
FactorPasswordReference = `("password_id") REFERENCES "factor_password" ("id")`
CloudIntegrationReference = `("cloud_integration_id") REFERENCES "cloud_integration" ("id") ON DELETE CASCADE`
OrgReference = `("org_id") REFERENCES "organizations" ("id")`
UserReference = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE`
UserReferenceNoCascade = `("user_id") REFERENCES "users" ("id")`
FactorPasswordReference = `("password_id") REFERENCES "factor_password" ("id")`
CloudIntegrationReference = `("cloud_integration_id") REFERENCES "cloud_integration" ("id") ON DELETE CASCADE`
AgentConfigVersionReference = `("version_id") REFERENCES "agent_config_version" ("id")`
)
type dialect struct{}
@@ -274,6 +276,8 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I
fkReferences = append(fkReferences, FactorPasswordReference)
} else if reference == CloudIntegration && !slices.Contains(fkReferences, CloudIntegrationReference) {
fkReferences = append(fkReferences, CloudIntegrationReference)
} else if reference == AgentConfigVersion && !slices.Contains(fkReferences, AgentConfigVersionReference) {
fkReferences = append(fkReferences, AgentConfigVersionReference)
}
}

View File

@@ -119,6 +119,7 @@ export const updateFunnelSteps = async (
export interface ValidateFunnelPayload {
start_time: number;
end_time: number;
steps: FunnelStepData[];
}
export interface ValidateFunnelResponse {
@@ -132,12 +133,11 @@ export interface ValidateFunnelResponse {
}
export const validateFunnelSteps = async (
funnelId: string,
payload: ValidateFunnelPayload,
signal?: AbortSignal,
): Promise<SuccessResponse<ValidateFunnelResponse> | ErrorResponse> => {
const response = await axios.post(
`${FUNNELS_BASE_PATH}/${funnelId}/analytics/validate`,
`${FUNNELS_BASE_PATH}/analytics/validate`,
payload,
{ signal },
);
@@ -185,6 +185,7 @@ export interface FunnelOverviewPayload {
end_time: number;
step_start?: number;
step_end?: number;
steps: FunnelStepData[];
}
export interface FunnelOverviewResponse {
@@ -202,12 +203,11 @@ export interface FunnelOverviewResponse {
}
export const getFunnelOverview = async (
funnelId: string,
payload: FunnelOverviewPayload,
signal?: AbortSignal,
): Promise<SuccessResponse<FunnelOverviewResponse> | ErrorResponse> => {
const response = await axios.post(
`${FUNNELS_BASE_PATH}/${funnelId}/analytics/overview`,
`${FUNNELS_BASE_PATH}/analytics/overview`,
payload,
{
signal,
@@ -235,12 +235,11 @@ export interface SlowTraceData {
}
export const getFunnelSlowTraces = async (
funnelId: string,
payload: FunnelOverviewPayload,
signal?: AbortSignal,
): Promise<SuccessResponse<SlowTraceData> | ErrorResponse> => {
const response = await axios.post(
`${FUNNELS_BASE_PATH}/${funnelId}/analytics/slow-traces`,
`${FUNNELS_BASE_PATH}/analytics/slow-traces`,
payload,
{
signal,
@@ -273,7 +272,7 @@ export const getFunnelErrorTraces = async (
signal?: AbortSignal,
): Promise<SuccessResponse<ErrorTraceData> | ErrorResponse> => {
const response: AxiosResponse = await axios.post(
`${FUNNELS_BASE_PATH}/${funnelId}/analytics/error-traces`,
`${FUNNELS_BASE_PATH}/analytics/error-traces`,
payload,
{
signal,
@@ -291,6 +290,7 @@ export const getFunnelErrorTraces = async (
export interface FunnelStepsPayload {
start_time: number;
end_time: number;
steps: FunnelStepData[];
}
export interface FunnelStepGraphMetrics {
@@ -307,12 +307,11 @@ export interface FunnelStepsResponse {
}
export const getFunnelSteps = async (
funnelId: string,
payload: FunnelStepsPayload,
signal?: AbortSignal,
): Promise<SuccessResponse<FunnelStepsResponse> | ErrorResponse> => {
const response = await axios.post(
`${FUNNELS_BASE_PATH}/${funnelId}/analytics/steps`,
`${FUNNELS_BASE_PATH}/analytics/steps`,
payload,
{ signal },
);
@@ -330,6 +329,7 @@ export interface FunnelStepsOverviewPayload {
end_time: number;
step_start?: number;
step_end?: number;
steps: FunnelStepData[];
}
export interface FunnelStepsOverviewResponse {
@@ -341,12 +341,11 @@ export interface FunnelStepsOverviewResponse {
}
export const getFunnelStepsOverview = async (
funnelId: string,
payload: FunnelStepsOverviewPayload,
signal?: AbortSignal,
): Promise<SuccessResponse<FunnelStepsOverviewResponse> | ErrorResponse> => {
const response = await axios.post(
`${FUNNELS_BASE_PATH}/${funnelId}/analytics/steps/overview`,
`${FUNNELS_BASE_PATH}/analytics/steps/overview`,
payload,
{ signal },
);

View File

@@ -30,5 +30,5 @@ export enum LOCALSTORAGE {
SHOW_EXCEPTIONS_QUICK_FILTERS = 'SHOW_EXCEPTIONS_QUICK_FILTERS',
BANNER_DISMISSED = 'BANNER_DISMISSED',
QUICK_FILTERS_SETTINGS_ANNOUNCEMENT = 'QUICK_FILTERS_SETTINGS_ANNOUNCEMENT',
UNEXECUTED_FUNNELS = 'UNEXECUTED_FUNNELS',
FUNNEL_STEPS = 'FUNNEL_STEPS',
}

View File

@@ -0,0 +1,43 @@
import { render, screen } from '@testing-library/react';
import HostsEmptyOrIncorrectMetrics from '../HostsEmptyOrIncorrectMetrics';
describe('HostsEmptyOrIncorrectMetrics', () => {
it('shows no data message when noData is true', () => {
render(<HostsEmptyOrIncorrectMetrics noData incorrectData={false} />);
expect(
screen.getByText('No host metrics data received yet.'),
).toBeInTheDocument();
expect(
screen.getByText(/Infrastructure monitoring requires the/),
).toBeInTheDocument();
});
it('shows incorrect data message when incorrectData is true', () => {
render(<HostsEmptyOrIncorrectMetrics noData={false} incorrectData />);
expect(
screen.getByText(
'To see host metrics, upgrade to the latest version of SigNoz k8s-infra chart. Please contact support if you need help.',
),
).toBeInTheDocument();
});
it('does not show no data message when noData is false', () => {
render(<HostsEmptyOrIncorrectMetrics noData={false} incorrectData={false} />);
expect(
screen.queryByText('No host metrics data received yet.'),
).not.toBeInTheDocument();
expect(
screen.queryByText(/Infrastructure monitoring requires the/),
).not.toBeInTheDocument();
});
it('does not show incorrect data message when incorrectData is false', () => {
render(<HostsEmptyOrIncorrectMetrics noData={false} incorrectData={false} />);
expect(
screen.queryByText(
'To see host metrics, upgrade to the latest version of SigNoz k8s-infra chart. Please contact support if you need help.',
),
).not.toBeInTheDocument();
});
});

View File

@@ -0,0 +1,166 @@
/* eslint-disable react/button-has-type */
import { render } from '@testing-library/react';
import ROUTES from 'constants/routes';
import * as useGetHostListHooks from 'hooks/infraMonitoring/useGetHostList';
import * as appContextHooks from 'providers/App/App';
import * as timezoneHooks from 'providers/Timezone';
import { QueryClient, QueryClientProvider } from 'react-query';
import { Provider } from 'react-redux';
import { MemoryRouter } from 'react-router-dom';
import store from 'store';
import { LicenseEvent } from 'types/api/licensesV3/getActive';
import HostsList from '../HostsList';
jest.mock('lib/getMinMax', () => ({
__esModule: true,
default: jest.fn().mockImplementation(() => ({
minTime: 1713734400000,
maxTime: 1713738000000,
isValidTimeFormat: jest.fn().mockReturnValue(true),
})),
}));
jest.mock('components/CustomTimePicker/CustomTimePicker', () => ({
__esModule: true,
default: ({ onSelect, selectedTime, selectedValue }: any): JSX.Element => (
<div data-testid="custom-time-picker">
<button onClick={(): void => onSelect('custom')}>
{selectedTime} - {selectedValue}
</button>
</div>
),
}));
const queryClient = new QueryClient();
jest.mock('uplot', () => {
const paths = {
spline: jest.fn(),
bars: jest.fn(),
};
const uplotMock = jest.fn(() => ({
paths,
}));
return {
paths,
default: uplotMock,
};
});
jest.mock('react-redux', () => ({
...jest.requireActual('react-redux'),
useSelector: (): any => ({
globalTime: {
selectedTime: {
startTime: 1713734400000,
endTime: 1713738000000,
},
maxTime: 1713738000000,
minTime: 1713734400000,
},
}),
}));
jest.mock('react-router-dom', () => ({
...jest.requireActual('react-router-dom'),
useLocation: jest.fn().mockReturnValue({
pathname: ROUTES.INFRASTRUCTURE_MONITORING_HOSTS,
}),
}));
jest.mock('react-router-dom-v5-compat', () => {
const actual = jest.requireActual('react-router-dom-v5-compat');
return {
...actual,
useSearchParams: jest
.fn()
.mockReturnValue([
{ get: jest.fn(), entries: jest.fn().mockReturnValue([]) },
jest.fn(),
]),
useNavigationType: (): any => 'PUSH',
};
});
jest.mock('hooks/useSafeNavigate', () => ({
useSafeNavigate: (): any => ({
safeNavigate: jest.fn(),
}),
}));
jest.spyOn(timezoneHooks, 'useTimezone').mockReturnValue({
timezone: {
offset: 0,
},
browserTimezone: {
offset: 0,
},
} as any);
jest.spyOn(useGetHostListHooks, 'useGetHostList').mockReturnValue({
data: {
payload: {
data: {
records: [
{
hostName: 'test-host',
active: true,
cpu: 0.75,
memory: 0.65,
wait: 0.03,
},
],
isSendingK8SAgentMetrics: false,
sentAnyHostMetricsData: true,
},
},
},
isLoading: false,
isError: false,
} as any);
jest.spyOn(appContextHooks, 'useAppContext').mockReturnValue({
user: {
role: 'admin',
},
activeLicenseV3: {
event_queue: {
created_at: '0',
event: LicenseEvent.NO_EVENT,
scheduled_at: '0',
status: '',
updated_at: '0',
},
license: {
license_key: 'test-license-key',
license_type: 'trial',
org_id: 'test-org-id',
plan_id: 'test-plan-id',
plan_name: 'test-plan-name',
plan_type: 'trial',
plan_version: 'test-plan-version',
},
},
} as any);
describe('HostsList', () => {
it('renders hosts list table', () => {
const { container } = render(
<QueryClientProvider client={queryClient}>
<MemoryRouter>
<Provider store={store}>
<HostsList />
</Provider>
</MemoryRouter>
</QueryClientProvider>,
);
expect(container.querySelector('.hosts-list-table')).toBeInTheDocument();
});
it('renders filters', () => {
const { container } = render(
<QueryClientProvider client={queryClient}>
<MemoryRouter>
<Provider store={store}>
<HostsList />
</Provider>
</MemoryRouter>
</QueryClientProvider>,
);
expect(container.querySelector('.filters')).toBeInTheDocument();
});
});

View File

@@ -0,0 +1,37 @@
import { render, screen } from '@testing-library/react';
import HostsListControls from '../HostsListControls';
jest.mock('container/QueryBuilder/filters/QueryBuilderSearch', () => ({
__esModule: true,
default: (): JSX.Element => (
<div data-testid="query-builder-search">Search</div>
),
}));
jest.mock('container/TopNav/DateTimeSelectionV2', () => ({
__esModule: true,
default: (): JSX.Element => (
<div data-testid="date-time-selection">Date Time</div>
),
}));
describe('HostsListControls', () => {
const mockHandleFiltersChange = jest.fn();
const mockFilters = {
items: [],
op: 'AND',
};
it('renders search and date time filters', () => {
render(
<HostsListControls
handleFiltersChange={mockHandleFiltersChange}
filters={mockFilters}
/>,
);
expect(screen.getByTestId('query-builder-search')).toBeInTheDocument();
expect(screen.getByTestId('date-time-selection')).toBeInTheDocument();
});
});

View File

@@ -0,0 +1,139 @@
/* eslint-disable react/jsx-props-no-spreading */
import { render, screen } from '@testing-library/react';
import HostsListTable from '../HostsListTable';
jest.mock('uplot', () => {
const paths = {
spline: jest.fn(),
bars: jest.fn(),
};
const uplotMock = jest.fn(() => ({
paths,
}));
return {
paths,
default: uplotMock,
};
});
const EMPTY_STATE_CONTAINER_CLASS = '.hosts-empty-state-container';
describe('HostsListTable', () => {
const mockHost = {
hostName: 'test-host-1',
active: true,
cpu: 0.75,
memory: 0.65,
wait: 0.03,
load15: 1.5,
os: 'linux',
};
const mockTableData = {
payload: {
data: {
hosts: [mockHost],
},
},
};
const mockOnHostClick = jest.fn();
const mockSetCurrentPage = jest.fn();
const mockSetOrderBy = jest.fn();
const mockSetPageSize = jest.fn();
const mockProps = {
isLoading: false,
isError: false,
isFetching: false,
tableData: mockTableData,
hostMetricsData: [mockHost],
filters: {
items: [],
op: 'AND',
},
onHostClick: mockOnHostClick,
currentPage: 1,
setCurrentPage: mockSetCurrentPage,
pageSize: 10,
setOrderBy: mockSetOrderBy,
setPageSize: mockSetPageSize,
} as any;
it('renders loading state if isLoading is true', () => {
const { container } = render(<HostsListTable {...mockProps} isLoading />);
expect(container.querySelector('.hosts-list-loading-state')).toBeTruthy();
});
it('renders loading state if isFetching is true', () => {
const { container } = render(<HostsListTable {...mockProps} isFetching />);
expect(container.querySelector('.hosts-list-loading-state')).toBeTruthy();
});
it('renders error state if isError is true', () => {
render(<HostsListTable {...mockProps} isError />);
expect(screen.getByText('Something went wrong')).toBeTruthy();
});
it('renders empty state if no hosts are found', () => {
const { container } = render(<HostsListTable {...mockProps} />);
expect(container.querySelector(EMPTY_STATE_CONTAINER_CLASS)).toBeTruthy();
});
it('renders empty state if sentAnyHostMetricsData is false', () => {
const { container } = render(
<HostsListTable
{...mockProps}
tableData={{
...mockTableData,
payload: {
...mockTableData.payload,
data: {
...mockTableData.payload.data,
sentAnyHostMetricsData: false,
},
},
}}
/>,
);
expect(container.querySelector(EMPTY_STATE_CONTAINER_CLASS)).toBeTruthy();
});
it('renders empty state if isSendingIncorrectK8SAgentMetrics is true', () => {
const { container } = render(
<HostsListTable
{...mockProps}
tableData={{
...mockTableData,
payload: {
...mockTableData.payload,
data: {
...mockTableData.payload.data,
isSendingIncorrectK8SAgentMetrics: true,
},
},
}}
/>,
);
expect(container.querySelector(EMPTY_STATE_CONTAINER_CLASS)).toBeTruthy();
});
it('renders table data', () => {
const { container } = render(
<HostsListTable
{...mockProps}
tableData={{
...mockTableData,
payload: {
...mockTableData.payload,
data: {
...mockTableData.payload.data,
isSendingIncorrectK8SAgentMetrics: false,
sentAnyHostMetricsData: true,
},
},
}}
/>,
);
expect(container.querySelector('.hosts-list-table')).toBeTruthy();
});
});

View File

@@ -0,0 +1,104 @@
import { render } from '@testing-library/react';
import { formatDataForTable, GetHostsQuickFiltersConfig } from '../utils';
const PROGRESS_BAR_CLASS = '.progress-bar';
jest.mock('uplot', () => {
const paths = {
spline: jest.fn(),
bars: jest.fn(),
};
const uplotMock = jest.fn(() => ({
paths,
}));
return {
paths,
default: uplotMock,
};
});
describe('InfraMonitoringHosts utils', () => {
describe('formatDataForTable', () => {
it('should format host data correctly', () => {
const mockData = [
{
hostName: 'test-host',
active: true,
cpu: 0.95,
memory: 0.85,
wait: 0.05,
load15: 2.5,
os: 'linux',
},
] as any;
const result = formatDataForTable(mockData);
expect(result[0].hostName).toBe('test-host');
expect(result[0].wait).toBe('5%');
expect(result[0].load15).toBe(2.5);
// Test active tag rendering
const activeTag = render(result[0].active as JSX.Element);
expect(activeTag.container.textContent).toBe('ACTIVE');
expect(activeTag.container.querySelector('.active')).toBeTruthy();
// Test CPU progress bar
const cpuProgress = render(result[0].cpu as JSX.Element);
const cpuProgressBar = cpuProgress.container.querySelector(
PROGRESS_BAR_CLASS,
);
expect(cpuProgressBar).toBeTruthy();
// Test memory progress bar
const memoryProgress = render(result[0].memory as JSX.Element);
const memoryProgressBar = memoryProgress.container.querySelector(
PROGRESS_BAR_CLASS,
);
expect(memoryProgressBar).toBeTruthy();
});
it('should handle inactive hosts', () => {
const mockData = [
{
hostName: 'test-host',
active: false,
cpu: 0.3,
memory: 0.4,
wait: 0.02,
load15: 1.2,
os: 'linux',
cpuTimeSeries: [],
memoryTimeSeries: [],
waitTimeSeries: [],
load15TimeSeries: [],
},
] as any;
const result = formatDataForTable(mockData);
const inactiveTag = render(result[0].active as JSX.Element);
expect(inactiveTag.container.textContent).toBe('INACTIVE');
expect(inactiveTag.container.querySelector('.inactive')).toBeTruthy();
});
});
describe('GetHostsQuickFiltersConfig', () => {
it('should return correct config when dotMetricsEnabled is true', () => {
const result = GetHostsQuickFiltersConfig(true);
expect(result[0].attributeKey.key).toBe('host.name');
expect(result[1].attributeKey.key).toBe('os.type');
expect(result[0].aggregateAttribute).toBe('system.cpu.load_average.15m');
});
it('should return correct config when dotMetricsEnabled is false', () => {
const result = GetHostsQuickFiltersConfig(false);
expect(result[0].attributeKey.key).toBe('host_name');
expect(result[1].attributeKey.key).toBe('os_type');
expect(result[0].aggregateAttribute).toBe('system_cpu_load_average_15m');
});
});
});

View File

@@ -611,9 +611,7 @@ export const errorPercentage = ({
{
id: '',
key: {
key: dotMetricsEnabled
? WidgetKeys.Service_name
: WidgetKeys.StatusCodeNorm,
key: dotMetricsEnabled ? WidgetKeys.StatusCode : WidgetKeys.StatusCodeNorm,
dataType: DataTypes.Int64,
isColumn: false,
type: MetricsType.Tag,

View File

@@ -9,7 +9,7 @@ export const pipelineData: Pipeline = {
active: false,
is_valid: false,
disabled: false,
deployStatus: 'DEPLOYED',
deployStatus: 'deployed',
deployResult: 'Deployment was successful',
lastHash: 'log_pipelines:24',
lastConf: 'oiwernveroi',
@@ -135,7 +135,7 @@ export const pipelineData: Pipeline = {
active: false,
isValid: false,
disabled: false,
deployStatus: 'DEPLOYED',
deployStatus: 'deployed',
deployResult: 'Deployment was successful',
lastHash: 'log_pipelines:24',
lastConf: 'eovineroiv',
@@ -150,7 +150,7 @@ export const pipelineData: Pipeline = {
active: false,
isValid: false,
disabled: false,
deployStatus: 'DEPLOYED',
deployStatus: 'deployed',
deployResult: 'Deployment was successful',
lastHash: 'log_pipelines:23',
lastConf: 'eivrounreovi',
@@ -169,7 +169,7 @@ export const pipelineDataHistory: Pipeline['history'] = [
active: false,
isValid: false,
disabled: false,
deployStatus: 'DEPLOYED',
deployStatus: 'deployed',
deployResult: 'Deployment was successful',
lastHash: 'log_pipelines:24',
lastConf: 'eovineroiv',
@@ -184,7 +184,7 @@ export const pipelineDataHistory: Pipeline['history'] = [
active: false,
isValid: false,
disabled: false,
deployStatus: 'IN_PROGRESS',
deployStatus: 'in_progress',
deployResult: 'Deployment is in progress',
lastHash: 'log_pipelines:23',
lastConf: 'eivrounreovi',
@@ -199,7 +199,7 @@ export const pipelineDataHistory: Pipeline['history'] = [
active: false,
isValid: false,
disabled: false,
deployStatus: 'DIRTY',
deployStatus: 'dirty',
deployResult: 'Deployment is dirty',
lastHash: 'log_pipelines:23',
lastConf: 'eivrounreovi',
@@ -214,7 +214,7 @@ export const pipelineDataHistory: Pipeline['history'] = [
active: false,
isValid: false,
disabled: false,
deployStatus: 'FAILED',
deployStatus: 'failed',
deployResult: 'Deployment failed',
lastHash: 'log_pipelines:23',
lastConf: 'eivrounreovi',
@@ -229,7 +229,7 @@ export const pipelineDataHistory: Pipeline['history'] = [
active: false,
isValid: false,
disabled: false,
deployStatus: 'UNKNOWN',
deployStatus: 'unknown',
deployResult: '',
lastHash: 'log_pipelines:23',
lastConf: 'eivrounreovi',

View File

@@ -9,15 +9,15 @@ import { Spin } from 'antd';
export function getDeploymentStage(value: string): string {
switch (value) {
case 'IN_PROGRESS':
case 'in_progress':
return 'In Progress';
case 'DEPLOYED':
case 'deployed':
return 'Deployed';
case 'DIRTY':
case 'dirty':
return 'Dirty';
case 'FAILED':
case 'failed':
return 'Failed';
case 'UNKNOWN':
case 'unknown':
return 'Unknown';
default:
return '';
@@ -26,17 +26,17 @@ export function getDeploymentStage(value: string): string {
export function getDeploymentStageIcon(value: string): JSX.Element {
switch (value) {
case 'IN_PROGRESS':
case 'in_progress':
return (
<Spin indicator={<LoadingOutlined style={{ fontSize: 15 }} spin />} />
);
case 'DEPLOYED':
case 'deployed':
return <CheckCircleFilled />;
case 'DIRTY':
case 'dirty':
return <ExclamationCircleFilled />;
case 'FAILED':
case 'failed':
return <CloseCircleFilled />;
case 'UNKNOWN':
case 'unknown':
return <MinusCircleFilled />;
default:
return <span />;

View File

@@ -241,6 +241,15 @@
&-title {
color: var(--bg-ink-500);
}
&-footer {
border-top-color: var(--bg-vanilla-300);
background: var(--bg-vanilla-100);
.add-span-to-funnel-modal__discard-button {
background: var(--bg-vanilla-200);
color: var(--bg-ink-500);
}
}
}
}

View File

@@ -72,7 +72,6 @@ function FunnelDetailsView({
funnel={funnel}
isTraceDetailsPage
span={span}
disableAutoSave
triggerAutoSave={triggerAutoSave}
showNotifications={showNotifications}
/>
@@ -143,13 +142,19 @@ function AddSpanToFunnelModal({
const handleSaveFunnel = (): void => {
setTriggerSave(true);
// Reset trigger after a brief moment to allow the save to be processed
setTimeout(() => setTriggerSave(false), 100);
setTimeout(() => {
setTriggerSave(false);
onClose();
}, 100);
};
const handleDiscard = (): void => {
setTriggerDiscard(true);
// Reset trigger after a brief moment
setTimeout(() => setTriggerDiscard(false), 100);
setTimeout(() => {
setTriggerDiscard(false);
onClose();
}, 100);
};
const renderListView = (): JSX.Element => (
@@ -239,9 +244,6 @@ function AddSpanToFunnelModal({
footer={
activeView === ModalView.DETAILS
? [
<Button key="close" onClick={onClose}>
Close
</Button>,
<Button
type="default"
key="discard"

View File

@@ -1,10 +1,13 @@
import { LOCALSTORAGE } from 'constants/localStorage';
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
import useDebounce from 'hooks/useDebounce';
import { useLocalStorage } from 'hooks/useLocalStorage';
import { useNotifications } from 'hooks/useNotifications';
import { isEqual } from 'lodash-es';
import { useFunnelContext } from 'pages/TracesFunnels/FunnelContext';
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { useCallback, useEffect, useRef, useState } from 'react';
import { useQueryClient } from 'react-query';
import { BaseAutocompleteData } from 'types/api/queryBuilder/queryAutocompleteResponse';
import { FunnelData, FunnelStepData } from 'types/api/traceFunnels';
import { useUpdateFunnelSteps } from './useFunnels';
@@ -13,22 +16,30 @@ interface UseFunnelConfiguration {
isPopoverOpen: boolean;
setIsPopoverOpen: (isPopoverOpen: boolean) => void;
steps: FunnelStepData[];
isSaving: boolean;
}
// Add this helper function
const normalizeSteps = (steps: FunnelStepData[]): FunnelStepData[] => {
export const normalizeSteps = (steps: FunnelStepData[]): FunnelStepData[] => {
if (steps.some((step) => !step.filters)) return steps;
return steps.map((step) => ({
...step,
filters: {
...step.filters,
items: step.filters.items.map((item) => ({
id: '',
key: item.key,
value: item.value,
op: item.op,
})),
items: step.filters.items.map((item) => {
const {
id: unusedId,
isIndexed,
...keyObj
} = item.key as BaseAutocompleteData;
return {
id: '',
key: keyObj,
value: item.value,
op: item.op,
};
}),
},
}));
};
@@ -36,22 +47,22 @@ const normalizeSteps = (steps: FunnelStepData[]): FunnelStepData[] => {
// eslint-disable-next-line sonarjs/cognitive-complexity
export default function useFunnelConfiguration({
funnel,
disableAutoSave = false,
triggerAutoSave = false,
showNotifications = false,
}: {
funnel: FunnelData;
disableAutoSave?: boolean;
triggerAutoSave?: boolean;
showNotifications?: boolean;
}): UseFunnelConfiguration {
const { notifications } = useNotifications();
const queryClient = useQueryClient();
const {
steps,
initialSteps,
hasIncompleteStepFields,
lastUpdatedSteps,
setLastUpdatedSteps,
handleRestoreSteps,
handleRunFunnel,
selectedTime,
setIsUpdatingFunnel,
} = useFunnelContext();
// State management
@@ -59,10 +70,6 @@ export default function useFunnelConfiguration({
const debouncedSteps = useDebounce(steps, 200);
const [lastValidatedSteps, setLastValidatedSteps] = useState<FunnelStepData[]>(
initialSteps,
);
// Mutation hooks
const updateStepsMutation = useUpdateFunnelSteps(
funnel.funnel_id,
@@ -71,6 +78,15 @@ export default function useFunnelConfiguration({
// Derived state
const lastSavedStepsStateRef = useRef<FunnelStepData[]>(steps);
const hasRestoredFromLocalStorage = useRef(false);
// localStorage hook for funnel steps
const localStorageKey = `${LOCALSTORAGE.FUNNEL_STEPS}_${funnel.funnel_id}`;
const [
localStorageSavedSteps,
setLocalStorageSavedSteps,
clearLocalStorageSavedSteps,
] = useLocalStorage<FunnelStepData[] | null>(localStorageKey, null);
const hasStepsChanged = useCallback(() => {
const normalizedLastSavedSteps = normalizeSteps(
@@ -80,6 +96,34 @@ export default function useFunnelConfiguration({
return !isEqual(normalizedDebouncedSteps, normalizedLastSavedSteps);
}, [debouncedSteps]);
// Handle localStorage for funnel steps
useEffect(() => {
// Restore from localStorage on first run if
if (!hasRestoredFromLocalStorage.current) {
const savedSteps = localStorageSavedSteps;
if (savedSteps) {
handleRestoreSteps(savedSteps);
hasRestoredFromLocalStorage.current = true;
return;
}
}
// Save steps to localStorage
if (hasStepsChanged()) {
setLocalStorageSavedSteps(debouncedSteps);
}
}, [
debouncedSteps,
funnel.funnel_id,
hasStepsChanged,
handleRestoreSteps,
localStorageSavedSteps,
setLocalStorageSavedSteps,
queryClient,
selectedTime,
lastUpdatedSteps,
]);
const hasFunnelStepDefinitionsChanged = useCallback(
(prevSteps: FunnelStepData[], nextSteps: FunnelStepData[]): boolean => {
if (prevSteps.length !== nextSteps.length) return true;
@@ -97,15 +141,6 @@ export default function useFunnelConfiguration({
[],
);
const hasFunnelLatencyTypeChanged = useCallback(
(prevSteps: FunnelStepData[], nextSteps: FunnelStepData[]): boolean =>
prevSteps.some((step, index) => {
const nextStep = nextSteps[index];
return step.latency_type !== nextStep.latency_type;
}),
[],
);
// Mutation payload preparation
const getUpdatePayload = useCallback(
() => ({
@@ -116,33 +151,19 @@ export default function useFunnelConfiguration({
[funnel.funnel_id, debouncedSteps],
);
const queryClient = useQueryClient();
const { selectedTime } = useFunnelContext();
const validateStepsQueryKey = useMemo(
() => [REACT_QUERY_KEY.VALIDATE_FUNNEL_STEPS, funnel.funnel_id, selectedTime],
[funnel.funnel_id, selectedTime],
);
// eslint-disable-next-line sonarjs/cognitive-complexity
useEffect(() => {
// Determine if we should save based on the mode
let shouldSave = false;
if (disableAutoSave) {
// Manual save mode: only save when explicitly triggered
shouldSave = triggerAutoSave;
} else {
// Auto-save mode: save when steps have changed and no incomplete fields
shouldSave = hasStepsChanged() && !hasIncompleteStepFields;
}
if (shouldSave && !isEqual(debouncedSteps, lastValidatedSteps)) {
if (triggerAutoSave && !isEqual(debouncedSteps, lastUpdatedSteps)) {
setIsUpdatingFunnel(true);
updateStepsMutation.mutate(getUpdatePayload(), {
onSuccess: (data) => {
const updatedFunnelSteps = data?.payload?.steps;
if (!updatedFunnelSteps) return;
// Clear localStorage since steps are saved successfully
clearLocalStorageSavedSteps();
queryClient.setQueryData(
[REACT_QUERY_KEY.GET_FUNNEL_DETAILS, funnel.funnel_id],
(oldData: any) => {
@@ -163,17 +184,9 @@ export default function useFunnelConfiguration({
(step) => step.service_name === '' || step.span_name === '',
);
if (hasFunnelLatencyTypeChanged(lastValidatedSteps, debouncedSteps)) {
handleRunFunnel();
setLastValidatedSteps(debouncedSteps);
}
// Only validate if funnel steps definitions
else if (
!hasIncompleteStepFields &&
hasFunnelStepDefinitionsChanged(lastValidatedSteps, debouncedSteps)
) {
queryClient.refetchQueries(validateStepsQueryKey);
setLastValidatedSteps(debouncedSteps);
if (!hasIncompleteStepFields) {
setLastUpdatedSteps(debouncedSteps);
}
// Show success notification only when requested
@@ -216,17 +229,18 @@ export default function useFunnelConfiguration({
getUpdatePayload,
hasFunnelStepDefinitionsChanged,
hasStepsChanged,
lastValidatedSteps,
lastUpdatedSteps,
queryClient,
validateStepsQueryKey,
triggerAutoSave,
showNotifications,
disableAutoSave,
localStorageSavedSteps,
clearLocalStorageSavedSteps,
]);
return {
isPopoverOpen,
setIsPopoverOpen,
steps,
isSaving: updateStepsMutation.isLoading,
};
}

View File

@@ -20,10 +20,11 @@ export function useFunnelMetrics({
metricsData: MetricItem[];
conversionRate: number;
} {
const { startTime, endTime } = useFunnelContext();
const { startTime, endTime, steps } = useFunnelContext();
const payload = {
start_time: startTime,
end_time: endTime,
steps,
};
const {
@@ -81,6 +82,7 @@ export function useFunnelStepsMetrics({
end_time: endTime,
step_start: stepStart,
step_end: stepEnd,
steps,
};
const {

View File

@@ -7,6 +7,7 @@ import {
FunnelOverviewResponse,
FunnelStepsOverviewPayload,
FunnelStepsOverviewResponse,
FunnelStepsPayload,
FunnelStepsResponse,
getFunnelById,
getFunnelErrorTraces,
@@ -37,6 +38,7 @@ import {
CreateFunnelPayload,
CreateFunnelResponse,
FunnelData,
FunnelStepData,
} from 'types/api/traceFunnels';
export const useFunnelsList = (): UseQueryResult<
@@ -117,12 +119,14 @@ export const useValidateFunnelSteps = ({
startTime,
endTime,
enabled,
steps,
}: {
funnelId: string;
selectedTime: string;
startTime: number;
endTime: number;
enabled: boolean;
steps: FunnelStepData[];
}): UseQueryResult<
SuccessResponse<ValidateFunnelResponse> | ErrorResponse,
Error
@@ -130,11 +134,19 @@ export const useValidateFunnelSteps = ({
useQuery({
queryFn: ({ signal }) =>
validateFunnelSteps(
funnelId,
{ start_time: startTime, end_time: endTime },
{ start_time: startTime, end_time: endTime, steps },
signal,
),
queryKey: [REACT_QUERY_KEY.VALIDATE_FUNNEL_STEPS, funnelId, selectedTime],
queryKey: [
REACT_QUERY_KEY.VALIDATE_FUNNEL_STEPS,
funnelId,
selectedTime,
steps.map((step) => {
// eslint-disable-next-line @typescript-eslint/naming-convention
const { latency_type, ...rest } = step;
return rest;
}),
],
enabled,
staleTime: 0,
});
@@ -168,18 +180,17 @@ export const useFunnelOverview = (
const {
selectedTime,
validTracesCount,
hasFunnelBeenExecuted,
isUpdatingFunnel,
} = useFunnelContext();
return useQuery({
queryFn: ({ signal }) => getFunnelOverview(funnelId, payload, signal),
queryFn: ({ signal }) => getFunnelOverview(payload, signal),
queryKey: [
REACT_QUERY_KEY.GET_FUNNEL_OVERVIEW,
funnelId,
selectedTime,
payload.step_start ?? '',
payload.step_end ?? '',
payload.steps,
],
enabled: !!funnelId && validTracesCount > 0 && hasFunnelBeenExecuted,
enabled: !!funnelId && validTracesCount > 0 && !isUpdatingFunnel,
});
};
@@ -190,18 +201,19 @@ export const useFunnelSlowTraces = (
const {
selectedTime,
validTracesCount,
hasFunnelBeenExecuted,
isUpdatingFunnel,
} = useFunnelContext();
return useQuery<SuccessResponse<SlowTraceData> | ErrorResponse, Error>({
queryFn: ({ signal }) => getFunnelSlowTraces(funnelId, payload, signal),
queryFn: ({ signal }) => getFunnelSlowTraces(payload, signal),
queryKey: [
REACT_QUERY_KEY.GET_FUNNEL_SLOW_TRACES,
funnelId,
selectedTime,
payload.step_start ?? '',
payload.step_end ?? '',
payload.steps,
],
enabled: !!funnelId && validTracesCount > 0 && hasFunnelBeenExecuted,
enabled: !!funnelId && validTracesCount > 0 && !isUpdatingFunnel,
});
};
@@ -212,7 +224,7 @@ export const useFunnelErrorTraces = (
const {
selectedTime,
validTracesCount,
hasFunnelBeenExecuted,
isUpdatingFunnel,
} = useFunnelContext();
return useQuery({
queryFn: ({ signal }) => getFunnelErrorTraces(funnelId, payload, signal),
@@ -222,35 +234,31 @@ export const useFunnelErrorTraces = (
selectedTime,
payload.step_start ?? '',
payload.step_end ?? '',
payload.steps,
],
enabled: !!funnelId && validTracesCount > 0 && hasFunnelBeenExecuted,
enabled: !!funnelId && validTracesCount > 0 && !isUpdatingFunnel,
});
};
export function useFunnelStepsGraphData(
funnelId: string,
payload: FunnelStepsPayload,
): UseQueryResult<SuccessResponse<FunnelStepsResponse> | ErrorResponse, Error> {
const {
startTime,
endTime,
selectedTime,
validTracesCount,
hasFunnelBeenExecuted,
isUpdatingFunnel,
} = useFunnelContext();
return useQuery({
queryFn: ({ signal }) =>
getFunnelSteps(
funnelId,
{ start_time: startTime, end_time: endTime },
signal,
),
queryFn: ({ signal }) => getFunnelSteps(payload, signal),
queryKey: [
REACT_QUERY_KEY.GET_FUNNEL_STEPS_GRAPH_DATA,
funnelId,
selectedTime,
payload.steps,
],
enabled: !!funnelId && validTracesCount > 0 && hasFunnelBeenExecuted,
enabled: !!funnelId && validTracesCount > 0 && !isUpdatingFunnel,
});
}
@@ -264,17 +272,18 @@ export const useFunnelStepsOverview = (
const {
selectedTime,
validTracesCount,
hasFunnelBeenExecuted,
isUpdatingFunnel,
} = useFunnelContext();
return useQuery({
queryFn: ({ signal }) => getFunnelStepsOverview(funnelId, payload, signal),
queryFn: ({ signal }) => getFunnelStepsOverview(payload, signal),
queryKey: [
REACT_QUERY_KEY.GET_FUNNEL_STEPS_OVERVIEW,
funnelId,
selectedTime,
payload.step_start ?? '',
payload.step_end ?? '',
payload.steps,
],
enabled: !!funnelId && validTracesCount > 0 && hasFunnelBeenExecuted,
enabled: !!funnelId && validTracesCount > 0 && !isUpdatingFunnel,
});
};

View File

@@ -7,6 +7,7 @@ import NewWidget from 'container/NewWidget';
import { useSafeNavigate } from 'hooks/useSafeNavigate';
import useUrlQuery from 'hooks/useUrlQuery';
import { useDashboard } from 'providers/Dashboard/Dashboard';
import { PreferenceContextProvider } from 'providers/preferences/context/PreferenceContextProvider';
import { useEffect, useState } from 'react';
import { generatePath, useLocation, useParams } from 'react-router-dom';
import { Widgets } from 'types/api/dashboard/getAll';
@@ -52,11 +53,13 @@ function DashboardWidget(): JSX.Element | null {
}
return (
<NewWidget
yAxisUnit={selectedWidget?.yAxisUnit}
selectedGraph={selectedGraph}
fillSpans={selectedWidget?.fillSpans}
/>
<PreferenceContextProvider>
<NewWidget
yAxisUnit={selectedWidget?.yAxisUnit}
selectedGraph={selectedGraph}
fillSpans={selectedWidget?.fillSpans}
/>
</PreferenceContextProvider>
);
}

View File

@@ -3,9 +3,14 @@ import ROUTES from 'constants/routes';
import InfraMonitoringHosts from 'container/InfraMonitoringHosts';
import InfraMonitoringK8s from 'container/InfraMonitoringK8s';
import { Inbox } from 'lucide-react';
import { PreferenceContextProvider } from 'providers/preferences/context/PreferenceContextProvider';
export const Hosts: TabRoutes = {
Component: InfraMonitoringHosts,
Component: (): JSX.Element => (
<PreferenceContextProvider>
<InfraMonitoringHosts />
</PreferenceContextProvider>
),
name: (
<div className="tab-item">
<Inbox size={16} /> Hosts
@@ -16,7 +21,11 @@ export const Hosts: TabRoutes = {
};
export const Kubernetes: TabRoutes = {
Component: InfraMonitoringK8s,
Component: (): JSX.Element => (
<PreferenceContextProvider>
<InfraMonitoringK8s />
</PreferenceContextProvider>
),
name: (
<div className="tab-item">
<Inbox size={16} /> Kubernetes

View File

@@ -10,6 +10,7 @@ import LogsFilters from 'container/LogsFilters';
import LogsSearchFilter from 'container/LogsSearchFilter';
import LogsTable from 'container/LogsTable';
import history from 'lib/history';
import { PreferenceContextProvider } from 'providers/preferences/context/PreferenceContextProvider';
import { useCallback, useMemo } from 'react';
import { useDispatch, useSelector } from 'react-redux';
import { useLocation } from 'react-router-dom';
@@ -82,69 +83,71 @@ function OldLogsExplorer(): JSX.Element {
};
return (
<div className="old-logs-explorer">
<SpaceContainer
split={<Divider type="vertical" />}
align="center"
direction="horizontal"
>
<LogsSearchFilter />
<LogLiveTail />
</SpaceContainer>
<PreferenceContextProvider>
<div className="old-logs-explorer">
<SpaceContainer
split={<Divider type="vertical" />}
align="center"
direction="horizontal"
>
<LogsSearchFilter />
<LogLiveTail />
</SpaceContainer>
<LogsAggregate />
<LogsAggregate />
<Row gutter={20} wrap={false}>
<LogsFilters />
<Col flex={1} className="logs-col-container">
<Row>
<Col flex={1}>
<Space align="baseline" direction="horizontal">
<Select
getPopupContainer={popupContainer}
style={defaultSelectStyle}
value={selectedViewModeOption}
onChange={onChangeVeiwMode}
>
{viewModeOptionList.map((option) => (
<Select.Option key={option.value}>{option.label}</Select.Option>
))}
</Select>
{isFormatButtonVisible && (
<Popover
<Row gutter={20} wrap={false}>
<LogsFilters />
<Col flex={1} className="logs-col-container">
<Row>
<Col flex={1}>
<Space align="baseline" direction="horizontal">
<Select
getPopupContainer={popupContainer}
placement="right"
content={renderPopoverContent}
style={defaultSelectStyle}
value={selectedViewModeOption}
onChange={onChangeVeiwMode}
>
<Button>Format</Button>
</Popover>
)}
{viewModeOptionList.map((option) => (
<Select.Option key={option.value}>{option.label}</Select.Option>
))}
</Select>
<Select
getPopupContainer={popupContainer}
style={defaultSelectStyle}
defaultValue={order}
onChange={handleChangeOrder}
>
{orderItems.map((item) => (
<Select.Option key={item.enum}>{item.name}</Select.Option>
))}
</Select>
</Space>
</Col>
{isFormatButtonVisible && (
<Popover
getPopupContainer={popupContainer}
placement="right"
content={renderPopoverContent}
>
<Button>Format</Button>
</Popover>
)}
<Col>
<LogControls />
</Col>
</Row>
<Select
getPopupContainer={popupContainer}
style={defaultSelectStyle}
defaultValue={order}
onChange={handleChangeOrder}
>
{orderItems.map((item) => (
<Select.Option key={item.enum}>{item.name}</Select.Option>
))}
</Select>
</Space>
</Col>
<LogsTable viewMode={viewMode} linesPerRow={linesPerRow} />
</Col>
</Row>
<Col>
<LogControls />
</Col>
</Row>
<LogDetailedView />
</div>
<LogsTable viewMode={viewMode} linesPerRow={linesPerRow} />
</Col>
</Row>
<LogDetailedView />
</div>
</PreferenceContextProvider>
);
}

View File

@@ -4,6 +4,7 @@ import NotFound from 'components/NotFound';
import Spinner from 'components/Spinner';
import NewDashboard from 'container/NewDashboard';
import { useDashboard } from 'providers/Dashboard/Dashboard';
import { PreferenceContextProvider } from 'providers/preferences/context/PreferenceContextProvider';
import { useEffect } from 'react';
import { ErrorType } from 'types/common';
@@ -35,7 +36,11 @@ function DashboardPage(): JSX.Element {
return <Spinner tip="Loading.." />;
}
return <NewDashboard />;
return (
<PreferenceContextProvider>
<NewDashboard />
</PreferenceContextProvider>
);
}
export default DashboardPage;

View File

@@ -2,6 +2,7 @@ import './DeleteFunnelStep.styles.scss';
import SignozModal from 'components/SignozModal/SignozModal';
import { Trash2, X } from 'lucide-react';
import { useFunnelContext } from 'pages/TracesFunnels/FunnelContext';
interface DeleteFunnelStepProps {
isOpen: boolean;
@@ -14,8 +15,10 @@ function DeleteFunnelStep({
onClose,
onStepRemove,
}: DeleteFunnelStepProps): JSX.Element {
const { handleRunFunnel } = useFunnelContext();
const handleStepRemoval = (): void => {
onStepRemove();
handleRunFunnel();
onClose();
};

View File

@@ -6,6 +6,7 @@ import OverlayScrollbar from 'components/OverlayScrollbar/OverlayScrollbar';
import useFunnelConfiguration from 'hooks/TracesFunnels/useFunnelConfiguration';
import { PencilLine } from 'lucide-react';
import FunnelItemPopover from 'pages/TracesFunnels/components/FunnelsList/FunnelItemPopover';
import { useFunnelContext } from 'pages/TracesFunnels/FunnelContext';
import CopyToClipboard from 'periscope/components/CopyToClipboard';
import { memo, useState } from 'react';
import { Span } from 'types/api/trace/getTraceV2';
@@ -21,7 +22,6 @@ interface FunnelConfigurationProps {
funnel: FunnelData;
isTraceDetailsPage?: boolean;
span?: Span;
disableAutoSave?: boolean;
triggerAutoSave?: boolean;
showNotifications?: boolean;
}
@@ -30,15 +30,19 @@ function FunnelConfiguration({
funnel,
isTraceDetailsPage,
span,
disableAutoSave,
triggerAutoSave,
showNotifications,
}: FunnelConfigurationProps): JSX.Element {
const { isPopoverOpen, setIsPopoverOpen, steps } = useFunnelConfiguration({
const { triggerSave } = useFunnelContext();
const {
isPopoverOpen,
setIsPopoverOpen,
steps,
isSaving,
} = useFunnelConfiguration({
funnel,
disableAutoSave,
triggerAutoSave,
showNotifications,
triggerAutoSave: triggerAutoSave || triggerSave,
showNotifications: showNotifications || triggerSave,
});
const [isDescriptionModalOpen, setIsDescriptionModalOpen] = useState<boolean>(
false,
@@ -106,7 +110,7 @@ function FunnelConfiguration({
{!isTraceDetailsPage && (
<>
<StepsFooter stepsCount={steps.length} />
<StepsFooter stepsCount={steps.length} isSaving={isSaving || false} />
<AddFunnelDescriptionModal
isOpen={isDescriptionModalOpen}
onClose={handleDescriptionModalClose}
@@ -122,7 +126,6 @@ function FunnelConfiguration({
FunnelConfiguration.defaultProps = {
isTraceDetailsPage: false,
span: undefined,
disableAutoSave: false,
triggerAutoSave: false,
showNotifications: false,
};

View File

@@ -9,6 +9,7 @@
color: var(--bg-vanilla-400);
border: 1px solid var(--bg-slate-500);
border-radius: 6px;
width: 100%;
.step-popover {
opacity: 0;
width: 22px;

View File

@@ -40,11 +40,6 @@
letter-spacing: 0.12px;
border-radius: 2px;
&--sync {
border: 1px solid var(--bg-slate-400);
background: var(--bg-ink-300);
color: var(--bg-vanilla-400);
}
&--run {
background-color: var(--bg-robin-500);
}

View File

@@ -1,53 +1,14 @@
import './StepsFooter.styles.scss';
import { LoadingOutlined } from '@ant-design/icons';
import { Button, Skeleton, Spin } from 'antd';
import { Button, Skeleton } from 'antd';
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
import { Cone, Play, RefreshCcw } from 'lucide-react';
import { Check, Cone } from 'lucide-react';
import { useFunnelContext } from 'pages/TracesFunnels/FunnelContext';
import { useMemo } from 'react';
import { useIsFetching, useIsMutating } from 'react-query';
const useFunnelResultsLoading = (): boolean => {
const { funnelId } = useFunnelContext();
const isFetchingFunnelOverview = useIsFetching({
queryKey: [REACT_QUERY_KEY.GET_FUNNEL_OVERVIEW, funnelId],
});
const isFetchingStepsGraphData = useIsFetching({
queryKey: [REACT_QUERY_KEY.GET_FUNNEL_STEPS_GRAPH_DATA, funnelId],
});
const isFetchingErrorTraces = useIsFetching({
queryKey: [REACT_QUERY_KEY.GET_FUNNEL_ERROR_TRACES, funnelId],
});
const isFetchingSlowTraces = useIsFetching({
queryKey: [REACT_QUERY_KEY.GET_FUNNEL_SLOW_TRACES, funnelId],
});
return useMemo(() => {
if (!funnelId) {
return false;
}
return (
!!isFetchingFunnelOverview ||
!!isFetchingStepsGraphData ||
!!isFetchingErrorTraces ||
!!isFetchingSlowTraces
);
}, [
funnelId,
isFetchingFunnelOverview,
isFetchingStepsGraphData,
isFetchingErrorTraces,
isFetchingSlowTraces,
]);
};
import { useIsMutating } from 'react-query';
interface StepsFooterProps {
stepsCount: number;
isSaving: boolean;
}
function ValidTracesCount(): JSX.Element {
@@ -93,21 +54,13 @@ function ValidTracesCount(): JSX.Element {
return <span className="steps-footer__valid-traces">Valid traces found</span>;
}
function StepsFooter({ stepsCount }: StepsFooterProps): JSX.Element {
function StepsFooter({ stepsCount, isSaving }: StepsFooterProps): JSX.Element {
const {
validTracesCount,
handleRunFunnel,
hasFunnelBeenExecuted,
funnelId,
hasIncompleteStepFields,
handleSaveFunnel,
hasUnsavedChanges,
} = useFunnelContext();
const isFunnelResultsLoading = useFunnelResultsLoading();
const isFunnelUpdateMutating = useIsMutating([
REACT_QUERY_KEY.UPDATE_FUNNEL_STEPS,
funnelId,
]);
return (
<div className="steps-footer">
<div className="steps-footer__left">
@@ -117,38 +70,16 @@ function StepsFooter({ stepsCount }: StepsFooterProps): JSX.Element {
<ValidTracesCount />
</div>
<div className="steps-footer__right">
{!!isFunnelUpdateMutating && (
<div className="steps-footer__button steps-footer__button--updating">
<Spin
indicator={<LoadingOutlined style={{ color: 'grey' }} />}
size="small"
/>
Updating
</div>
)}
{!hasFunnelBeenExecuted ? (
<Button
disabled={validTracesCount === 0}
onClick={handleRunFunnel}
type="primary"
className="steps-footer__button steps-footer__button--run"
icon={<Play size={16} />}
>
Run funnel
</Button>
) : (
<Button
type="text"
className="steps-footer__button steps-footer__button--sync"
icon={<RefreshCcw size={16} />}
onClick={handleRunFunnel}
loading={isFunnelResultsLoading}
disabled={validTracesCount === 0}
>
Refresh
</Button>
)}
<Button
disabled={hasIncompleteStepFields || !hasUnsavedChanges}
onClick={handleSaveFunnel}
type="primary"
className="steps-footer__button steps-footer__button--run"
icon={<Check size={14} />}
loading={isSaving}
>
Save funnel
</Button>
</div>
</div>
);

View File

@@ -29,13 +29,20 @@ Chart.register(
);
function FunnelGraph(): JSX.Element {
const { funnelId } = useFunnelContext();
const { funnelId, startTime, endTime, steps } = useFunnelContext();
const payload = {
start_time: startTime,
end_time: endTime,
steps,
};
const {
data: stepsData,
isLoading,
isFetching,
isError,
} = useFunnelStepsGraphData(funnelId);
} = useFunnelStepsGraphData(funnelId, payload);
const data = useMemo(() => stepsData?.payload?.data?.[0]?.data, [
stepsData?.payload?.data,

View File

@@ -16,7 +16,6 @@ function FunnelResults(): JSX.Element {
isValidateStepsLoading,
hasIncompleteStepFields,
hasAllEmptyStepFields,
hasFunnelBeenExecuted,
funnelId,
} = useFunnelContext();
@@ -47,14 +46,6 @@ function FunnelResults(): JSX.Element {
/>
);
}
if (!hasFunnelBeenExecuted) {
return (
<EmptyFunnelResults
title="Funnel has not been run yet."
description="Run the funnel to see the results"
/>
);
}
return (
<div className="funnel-results">

View File

@@ -7,6 +7,7 @@ import { useFunnelContext } from 'pages/TracesFunnels/FunnelContext';
import { useMemo } from 'react';
import { UseQueryResult } from 'react-query';
import { ErrorResponse, SuccessResponse } from 'types/api';
import { FunnelStepData } from 'types/api/traceFunnels';
import FunnelTable from './FunnelTable';
import { topTracesTableColumns } from './utils';
@@ -24,6 +25,7 @@ interface FunnelTopTracesTableProps {
SuccessResponse<SlowTraceData | ErrorTraceData> | ErrorResponse,
Error
>;
steps: FunnelStepData[];
}
function FunnelTopTracesTable({
@@ -32,6 +34,7 @@ function FunnelTopTracesTable({
stepBOrder,
title,
tooltip,
steps,
useQueryHook,
}: FunnelTopTracesTableProps): JSX.Element {
const { startTime, endTime } = useFunnelContext();
@@ -41,8 +44,9 @@ function FunnelTopTracesTable({
end_time: endTime,
step_start: stepAOrder,
step_end: stepBOrder,
steps,
}),
[startTime, endTime, stepAOrder, stepBOrder],
[startTime, endTime, stepAOrder, stepBOrder, steps],
);
const { data: response, isLoading, isFetching } = useQueryHook(

View File

@@ -6,7 +6,7 @@ import FunnelMetricsTable from './FunnelMetricsTable';
function OverallMetrics(): JSX.Element {
const { funnelId } = useParams<{ funnelId: string }>();
const { isLoading, metricsData, conversionRate, isError } = useFunnelMetrics({
funnelId: funnelId || '',
funnelId,
});
return (

View File

@@ -52,11 +52,13 @@ function StepsTransitionResults(): JSX.Element {
funnelId={funnelId}
stepAOrder={stepAOrder}
stepBOrder={stepBOrder}
steps={steps}
/>
<TopTracesWithErrors
funnelId={funnelId}
stepAOrder={stepAOrder}
stepBOrder={stepBOrder}
steps={steps}
/>
</div>
</div>

View File

@@ -1,4 +1,5 @@
import { useFunnelSlowTraces } from 'hooks/TracesFunnels/useFunnels';
import { FunnelStepData } from 'types/api/traceFunnels';
import FunnelTopTracesTable from './FunnelTopTracesTable';
@@ -6,6 +7,7 @@ interface TopSlowestTracesProps {
funnelId: string;
stepAOrder: number;
stepBOrder: number;
steps: FunnelStepData[];
}
function TopSlowestTraces(props: TopSlowestTracesProps): JSX.Element {

View File

@@ -1,4 +1,5 @@
import { useFunnelErrorTraces } from 'hooks/TracesFunnels/useFunnels';
import { FunnelStepData } from 'types/api/traceFunnels';
import FunnelTopTracesTable from './FunnelTopTracesTable';
@@ -6,6 +7,7 @@ interface TopTracesWithErrorsProps {
funnelId: string;
stepAOrder: number;
stepBOrder: number;
steps: FunnelStepData[];
}
function TopTracesWithErrors(props: TopTracesWithErrorsProps): JSX.Element {

View File

@@ -18,10 +18,4 @@ export const topTracesTableColumns = [
key: 'duration_ms',
render: (value: string): string => getYAxisFormattedValue(value, 'ms'),
},
{
title: 'SPAN COUNT',
dataIndex: 'span_count',
key: 'span_count',
render: (value: number): string => value.toString(),
},
];

View File

@@ -14,8 +14,6 @@ export const initialStepsData: FunnelStepData[] = [
latency_pointer: 'start',
latency_type: undefined,
has_errors: false,
name: '',
description: '',
},
{
id: v4(),
@@ -29,8 +27,6 @@ export const initialStepsData: FunnelStepData[] = [
latency_pointer: 'start',
latency_type: LatencyOptions.P95,
has_errors: false,
name: '',
description: '',
},
];

View File

@@ -1,15 +1,15 @@
import logEvent from 'api/common/logEvent';
import { ValidateFunnelResponse } from 'api/traceFunnels';
import { LOCALSTORAGE } from 'constants/localStorage';
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
import { Time } from 'container/TopNav/DateTimeSelection/config';
import {
CustomTimeType,
Time as TimeV2,
} from 'container/TopNav/DateTimeSelectionV2/config';
import { normalizeSteps } from 'hooks/TracesFunnels/useFunnelConfiguration';
import { useValidateFunnelSteps } from 'hooks/TracesFunnels/useFunnels';
import { useLocalStorage } from 'hooks/useLocalStorage';
import getStartEndRangeTime from 'lib/getStartEndRangeTime';
import { isEqual } from 'lodash-es';
import { initialStepsData } from 'pages/TracesFunnelDetails/constants';
import {
createContext,
@@ -41,6 +41,9 @@ interface FunnelContextType {
handleStepChange: (index: number, newStep: Partial<FunnelStepData>) => void;
handleStepRemoval: (index: number) => void;
handleRunFunnel: () => void;
handleSaveFunnel: () => void;
triggerSave: boolean;
hasUnsavedChanges: boolean;
validationResponse:
| SuccessResponse<ValidateFunnelResponse>
| ErrorResponse
@@ -54,8 +57,10 @@ interface FunnelContextType {
spanName: string,
) => void;
handleRestoreSteps: (oldSteps: FunnelStepData[]) => void;
hasFunnelBeenExecuted: boolean;
setHasFunnelBeenExecuted: Dispatch<SetStateAction<boolean>>;
isUpdatingFunnel: boolean;
setIsUpdatingFunnel: Dispatch<SetStateAction<boolean>>;
lastUpdatedSteps: FunnelStepData[];
setLastUpdatedSteps: Dispatch<SetStateAction<FunnelStepData[]>>;
}
const FunnelContext = createContext<FunnelContextType | undefined>(undefined);
@@ -86,6 +91,19 @@ export function FunnelProvider({
const funnel = data?.payload;
const initialSteps = funnel?.steps?.length ? funnel.steps : initialStepsData;
const [steps, setSteps] = useState<FunnelStepData[]>(initialSteps);
const [triggerSave, setTriggerSave] = useState<boolean>(false);
const [isUpdatingFunnel, setIsUpdatingFunnel] = useState<boolean>(false);
const [lastUpdatedSteps, setLastUpdatedSteps] = useState<FunnelStepData[]>(
initialSteps,
);
// Check if there are unsaved changes by comparing with initial steps from API
const hasUnsavedChanges = useMemo(() => {
const normalizedCurrentSteps = normalizeSteps(steps);
const normalizedInitialSteps = normalizeSteps(lastUpdatedSteps);
return !isEqual(normalizedCurrentSteps, normalizedInitialSteps);
}, [steps, lastUpdatedSteps]);
const { hasIncompleteStepFields, hasAllEmptyStepFields } = useMemo(
() => ({
hasAllEmptyStepFields: steps.every(
@@ -98,15 +116,6 @@ export function FunnelProvider({
[steps],
);
const [unexecutedFunnels, setUnexecutedFunnels] = useLocalStorage<string[]>(
LOCALSTORAGE.UNEXECUTED_FUNNELS,
[],
);
const [hasFunnelBeenExecuted, setHasFunnelBeenExecuted] = useState(
!unexecutedFunnels.includes(funnelId),
);
const {
data: validationResponse,
isLoading: isValidationLoading,
@@ -116,7 +125,13 @@ export function FunnelProvider({
selectedTime,
startTime,
endTime,
enabled: !!funnelId && !!selectedTime && !!startTime && !!endTime,
enabled:
!!funnelId &&
!!selectedTime &&
!!startTime &&
!!endTime &&
!hasIncompleteStepFields,
steps,
});
const validTracesCount = useMemo(
@@ -185,11 +200,7 @@ export function FunnelProvider({
const handleRunFunnel = useCallback(async (): Promise<void> => {
if (validTracesCount === 0) return;
if (!hasFunnelBeenExecuted) {
setUnexecutedFunnels(unexecutedFunnels.filter((id) => id !== funnelId));
setHasFunnelBeenExecuted(true);
}
queryClient.refetchQueries([
REACT_QUERY_KEY.GET_FUNNEL_OVERVIEW,
funnelId,
@@ -215,15 +226,13 @@ export function FunnelProvider({
funnelId,
selectedTime,
]);
}, [
funnelId,
hasFunnelBeenExecuted,
unexecutedFunnels,
queryClient,
selectedTime,
setUnexecutedFunnels,
validTracesCount,
]);
}, [funnelId, queryClient, selectedTime, validTracesCount]);
const handleSaveFunnel = useCallback(() => {
setTriggerSave(true);
// Reset the trigger after a brief moment to allow useFunnelConfiguration to pick it up
setTimeout(() => setTriggerSave(false), 100);
}, []);
const value = useMemo<FunnelContextType>(
() => ({
@@ -239,14 +248,19 @@ export function FunnelProvider({
handleAddStep: addNewStep,
handleStepRemoval,
handleRunFunnel,
handleSaveFunnel,
triggerSave,
validationResponse,
isValidateStepsLoading: isValidationLoading || isValidationFetching,
hasIncompleteStepFields,
hasAllEmptyStepFields,
handleReplaceStep,
handleRestoreSteps,
hasFunnelBeenExecuted,
setHasFunnelBeenExecuted,
hasUnsavedChanges,
setIsUpdatingFunnel,
isUpdatingFunnel,
lastUpdatedSteps,
setLastUpdatedSteps,
}),
[
funnelId,
@@ -260,6 +274,8 @@ export function FunnelProvider({
addNewStep,
handleStepRemoval,
handleRunFunnel,
handleSaveFunnel,
triggerSave,
validationResponse,
isValidationLoading,
isValidationFetching,
@@ -267,8 +283,11 @@ export function FunnelProvider({
hasAllEmptyStepFields,
handleReplaceStep,
handleRestoreSteps,
hasFunnelBeenExecuted,
setHasFunnelBeenExecuted,
hasUnsavedChanges,
setIsUpdatingFunnel,
isUpdatingFunnel,
lastUpdatedSteps,
setLastUpdatedSteps,
],
);

View File

@@ -4,11 +4,9 @@ import { Input } from 'antd';
import logEvent from 'api/common/logEvent';
import { AxiosError } from 'axios';
import SignozModal from 'components/SignozModal/SignozModal';
import { LOCALSTORAGE } from 'constants/localStorage';
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
import ROUTES from 'constants/routes';
import { useCreateFunnel } from 'hooks/TracesFunnels/useFunnels';
import { useLocalStorage } from 'hooks/useLocalStorage';
import { useNotifications } from 'hooks/useNotifications';
import { useSafeNavigate } from 'hooks/useSafeNavigate';
import { Check, X } from 'lucide-react';
@@ -34,11 +32,6 @@ function CreateFunnel({
const { safeNavigate } = useSafeNavigate();
const { pathname } = useLocation();
const [unexecutedFunnels, setUnexecutedFunnels] = useLocalStorage<string[]>(
LOCALSTORAGE.UNEXECUTED_FUNNELS,
[],
);
const handleCreate = (): void => {
createFunnelMutation.mutate(
{
@@ -61,9 +54,6 @@ function CreateFunnel({
queryClient.invalidateQueries([REACT_QUERY_KEY.GET_FUNNELS_LIST]);
const funnelId = data?.payload?.funnel_id;
if (funnelId) {
setUnexecutedFunnels([...unexecutedFunnels, funnelId]);
}
onClose(funnelId);
if (funnelId && redirectToDetails) {

View File

@@ -2,13 +2,16 @@ import '../RenameFunnel/RenameFunnel.styles.scss';
import './DeleteFunnel.styles.scss';
import SignozModal from 'components/SignozModal/SignozModal';
import { LOCALSTORAGE } from 'constants/localStorage';
import { REACT_QUERY_KEY } from 'constants/reactQueryKeys';
import ROUTES from 'constants/routes';
import { useDeleteFunnel } from 'hooks/TracesFunnels/useFunnels';
import { useLocalStorage } from 'hooks/useLocalStorage';
import { useNotifications } from 'hooks/useNotifications';
import { Trash2, X } from 'lucide-react';
import { useQueryClient } from 'react-query';
import { useHistory } from 'react-router-dom';
import { FunnelStepData } from 'types/api/traceFunnels';
interface DeleteFunnelProps {
isOpen: boolean;
@@ -29,6 +32,13 @@ function DeleteFunnel({
const history = useHistory();
const { pathname } = history.location;
// localStorage hook for funnel steps
const localStorageKey = `${LOCALSTORAGE.FUNNEL_STEPS}_${funnelId}`;
const [, , clearLocalStorageSavedSteps] = useLocalStorage<
FunnelStepData[] | null
>(localStorageKey, null);
const handleDelete = (): void => {
deleteFunnelMutation.mutate(
{
@@ -39,6 +49,7 @@ function DeleteFunnel({
notifications.success({
message: 'Funnel deleted successfully',
});
clearLocalStorageSavedSteps();
onClose();
if (

12
go.mod
View File

@@ -8,7 +8,7 @@ require (
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
github.com/SigNoz/signoz-otel-collector v0.111.39
github.com/SigNoz/signoz-otel-collector v0.111.43
github.com/antlr4-go/antlr/v4 v4.13.1
github.com/antonmedv/expr v1.15.3
github.com/cespare/xxhash/v2 v2.3.0
@@ -69,8 +69,8 @@ require (
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.38.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/oauth2 v0.24.0
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
golang.org/x/oauth2 v0.26.0
golang.org/x/sync v0.14.0
golang.org/x/text v0.25.0
google.golang.org/protobuf v1.36.0
@@ -125,7 +125,7 @@ require (
github.com/go-openapi/spec v0.21.0 // indirect
github.com/go-openapi/swag v0.23.0 // indirect
github.com/go-openapi/validate v0.24.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/goccy/go-json v0.10.4 // indirect
github.com/gofrs/uuid v4.4.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gojek/valkyrie v0.0.0-20180215180059-6aee720afcdf // indirect
@@ -182,7 +182,7 @@ require (
github.com/oklog/ulid v1.3.1 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20220216144756-c35f1ee13d7c // indirect
@@ -267,7 +267,7 @@ require (
golang.org/x/net v0.40.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/time v0.8.0 // indirect
golang.org/x/tools v0.28.0 // indirect
golang.org/x/tools v0.29.0 // indirect
gonum.org/v1/gonum v0.15.1 // indirect
google.golang.org/api v0.213.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20241216192217-9240e9c98484 // indirect

24
go.sum
View File

@@ -100,8 +100,8 @@ github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
github.com/SigNoz/signoz-otel-collector v0.111.39 h1:Dl8QqZNAsj2atxP572OzsszPK0XPpd3LLPNPRAUJ5wo=
github.com/SigNoz/signoz-otel-collector v0.111.39/go.mod h1:DCu/D+lqhsPNSGS4IMD+4gn7q06TGzOCKazSy+GURVc=
github.com/SigNoz/signoz-otel-collector v0.111.43 h1:upWUoxDl5kCE/WI5+di2oqA/wJi2NU/PRyN8zDR078c=
github.com/SigNoz/signoz-otel-collector v0.111.43/go.mod h1:iUGoKEaNQmLNptTwEz9o5kZ0ntbCMQsrV53Y2TDd1Qg=
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -322,8 +322,8 @@ github.com/go-viper/mapstructure/v2 v2.1.0 h1:gHnMa2Y/pIxElCH2GlZZ1lZSsn6XMtufpG
github.com/go-viper/mapstructure/v2 v2.1.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-zookeeper/zk v1.0.4 h1:DPzxraQx7OrPyXq2phlGlNSIyWEsAox0RJmjTseMV6I=
github.com/go-zookeeper/zk v1.0.4/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-json v0.10.4 h1:JSwxQzIqKfmFX1swYPpUThQZp/Ka4wzJdK0LWVytLPM=
github.com/goccy/go-json v0.10.4/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
github.com/goccy/go-yaml v1.9.5/go.mod h1:U/jl18uSupI5rdI2jmuCswEA2htH9eXfferR3KfscvA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
@@ -758,8 +758,8 @@ github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3v
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjLxUqIJNnCWiEdr3bn6IUYi15bNlnbCCU=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
@@ -1138,8 +1138,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -1241,8 +1241,8 @@ golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20220223155221-ee480838109b/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220309155454-6242fa91716a/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5/go.mod h1:DAh4E804XQdzx2j+YRIaUnCqCV2RuMz24cGBJ5QYIrc=
golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE=
golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.26.0 h1:afQXWNNaeC4nvZ0Ed9XvCCzXM6UHJG7iCg0W4fPqSBE=
golang.org/x/oauth2 v0.26.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -1425,8 +1425,8 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8=
golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw=
golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE=
golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -12,4 +12,16 @@ type Analytics interface {
// Sends analytics messages to an analytics backend.
Send(context.Context, ...analyticstypes.Message)
// Tracks an event on a group level. Input is group, event name, and attributes. The user is "stats_<org_id>".
TrackGroup(context.Context, string, string, map[string]any)
// Tracks an event on a user level and attributes it with the group. Input is group, user, event name, and attributes.
TrackUser(context.Context, string, string, string, map[string]any)
// Identifies a group. Input is group, traits.
IdentifyGroup(context.Context, string, map[string]any)
// Identifies a user. Input is group, user, traits.
IdentifyUser(context.Context, string, string, map[string]any)
}

View File

@@ -24,6 +24,18 @@ func (provider *Provider) Start(_ context.Context) error {
func (provider *Provider) Send(ctx context.Context, messages ...analyticstypes.Message) {}
func (provider *Provider) TrackGroup(ctx context.Context, group, event string, attributes map[string]any) {
}
func (provider *Provider) TrackUser(ctx context.Context, group, user, event string, attributes map[string]any) {
}
func (provider *Provider) IdentifyGroup(ctx context.Context, group string, traits map[string]any) {
}
func (provider *Provider) IdentifyUser(ctx context.Context, group, user string, traits map[string]any) {
}
func (provider *Provider) Stop(_ context.Context) error {
close(provider.stopC)
return nil

View File

@@ -27,7 +27,25 @@ func (provider *provider) Start(_ context.Context) error {
return nil
}
func (provider *provider) Send(ctx context.Context, messages ...analyticstypes.Message) {}
func (provider *provider) Send(ctx context.Context, messages ...analyticstypes.Message) {
// do nothing
}
func (provider *provider) TrackGroup(ctx context.Context, group, event string, attributes map[string]any) {
// do nothing
}
func (provider *provider) TrackUser(ctx context.Context, group, user, event string, attributes map[string]any) {
// do nothing
}
func (provider *provider) IdentifyGroup(ctx context.Context, group string, traits map[string]any) {
// do nothing
}
func (provider *provider) IdentifyUser(ctx context.Context, group, user string, traits map[string]any) {
// do nothing
}
func (provider *provider) Stop(_ context.Context) error {
close(provider.stopC)

View File

@@ -50,6 +50,100 @@ func (provider *provider) Send(ctx context.Context, messages ...analyticstypes.M
}
}
func (provider *provider) TrackGroup(ctx context.Context, group, event string, properties map[string]any) {
if properties == nil {
provider.settings.Logger().WarnContext(ctx, "empty attributes received, skipping event", "group", group, "event", event)
return
}
err := provider.client.Enqueue(analyticstypes.Track{
UserId: "stats_" + group,
Event: event,
Properties: analyticstypes.NewPropertiesFromMap(properties),
Context: &analyticstypes.Context{
Extra: map[string]interface{}{
analyticstypes.KeyGroupID: group,
},
},
})
if err != nil {
provider.settings.Logger().WarnContext(ctx, "unable to send message to segment", "err", err)
}
}
func (provider *provider) TrackUser(ctx context.Context, group, user, event string, properties map[string]any) {
if properties == nil {
provider.settings.Logger().WarnContext(ctx, "empty attributes received, skipping event", "user", user, "group", group, "event", event)
return
}
err := provider.client.Enqueue(analyticstypes.Track{
UserId: user,
Event: event,
Properties: analyticstypes.NewPropertiesFromMap(properties),
Context: &analyticstypes.Context{
Extra: map[string]interface{}{
analyticstypes.KeyGroupID: group,
},
},
})
if err != nil {
provider.settings.Logger().WarnContext(ctx, "unable to send message to segment", "err", err)
}
}
func (provider *provider) IdentifyGroup(ctx context.Context, group string, traits map[string]any) {
if traits == nil {
provider.settings.Logger().WarnContext(ctx, "empty attributes received, skipping identify", "group", group)
return
}
// identify the user
err := provider.client.Enqueue(analyticstypes.Identify{
UserId: "stats_" + group,
Traits: analyticstypes.NewTraitsFromMap(traits),
})
if err != nil {
provider.settings.Logger().WarnContext(ctx, "unable to send message to segment", "err", err)
}
// identify the group using the stats user
err = provider.client.Enqueue(analyticstypes.Group{
UserId: "stats_" + group,
GroupId: group,
Traits: analyticstypes.NewTraitsFromMap(traits),
})
if err != nil {
provider.settings.Logger().WarnContext(ctx, "unable to send message to segment", "err", err)
}
}
func (provider *provider) IdentifyUser(ctx context.Context, group, user string, traits map[string]any) {
if traits == nil {
provider.settings.Logger().WarnContext(ctx, "empty attributes received, skipping identify", "user", user, "group", group)
return
}
// identify the user
err := provider.client.Enqueue(analyticstypes.Identify{
UserId: user,
Traits: analyticstypes.NewTraitsFromMap(traits),
})
if err != nil {
provider.settings.Logger().WarnContext(ctx, "unable to send message to segment", "err", err)
}
// associate the user with the group
err = provider.client.Enqueue(analyticstypes.Group{
UserId: user,
GroupId: group,
Traits: analyticstypes.NewTraits().Set("id", group), // A trait is required
})
if err != nil {
provider.settings.Logger().WarnContext(ctx, "unable to send message to segment", "err", err)
}
}
func (provider *provider) Stop(ctx context.Context) error {
if err := provider.client.Close(); err != nil {
provider.settings.Logger().WarnContext(ctx, "unable to close segment client", "err", err)

View File

@@ -9,7 +9,6 @@ import (
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/dashboard"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/analyticstypes"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
@@ -45,19 +44,7 @@ func (module *module) Create(ctx context.Context, orgID valuer.UUID, createdBy s
return nil, err
}
module.analytics.Send(ctx,
analyticstypes.Track{
UserId: creator.String(),
Event: "Dashboard Created",
Properties: analyticstypes.NewPropertiesFromMap(dashboardtypes.NewStatsFromStorableDashboards([]*dashboardtypes.StorableDashboard{storableDashboard})),
Context: &analyticstypes.Context{
Extra: map[string]interface{}{
analyticstypes.KeyGroupID: orgID,
},
},
},
)
module.analytics.TrackUser(ctx, orgID.String(), creator.String(), "Dashboard Created", dashboardtypes.NewStatsFromStorableDashboards([]*dashboardtypes.StorableDashboard{storableDashboard}))
return dashboard, nil
}

View File

@@ -0,0 +1,31 @@
package impluser
import (
"context"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
)
type getter struct {
store types.UserStore
}
func NewGetter(store types.UserStore) user.Getter {
return &getter{store: store}
}
func (module *getter) ListByOrgID(ctx context.Context, orgID valuer.UUID) ([]*types.User, error) {
gettableUsers, err := module.store.ListUsers(ctx, orgID.StringValue())
if err != nil {
return nil, err
}
users := make([]*types.User, len(gettableUsers))
for i, user := range gettableUsers {
users[i] = &user.User
}
return users, nil
}

View File

@@ -326,7 +326,7 @@ func (h *handler) UpdateUser(w http.ResponseWriter, r *http.Request) {
user.UpdatedAt = time.Now()
updatedUser, err := h.module.UpdateUser(ctx, claims.OrgID, id, &user)
updatedUser, err := h.module.UpdateUser(ctx, claims.OrgID, id, &user, claims.UserID)
if err != nil {
render.Error(w, err)
return
@@ -347,7 +347,7 @@ func (h *handler) DeleteUser(w http.ResponseWriter, r *http.Request) {
return
}
if err := h.module.DeleteUser(ctx, claims.OrgID, id); err != nil {
if err := h.module.DeleteUser(ctx, claims.OrgID, id, claims.UserID); err != nil {
render.Error(w, err)
return
}

View File

@@ -18,7 +18,6 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/query-service/telemetry"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/analyticstypes"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/emailtypes"
"github.com/SigNoz/signoz/pkg/valuer"
@@ -135,35 +134,9 @@ func (m *Module) CreateUserWithPassword(ctx context.Context, user *types.User, p
return nil, err
}
m.analytics.Send(ctx,
analyticstypes.Identify{
UserId: user.ID.String(),
Traits: analyticstypes.
NewTraits().
SetName(user.DisplayName).
SetEmail(user.Email).
Set("role", user.Role).
SetCreatedAt(user.CreatedAt),
},
analyticstypes.Group{
UserId: user.ID.String(),
GroupId: user.OrgID,
},
analyticstypes.Track{
UserId: user.ID.String(),
Event: "User Created",
Properties: analyticstypes.NewPropertiesFromMap(map[string]any{
"role": user.Role,
"email": user.Email,
"name": user.DisplayName,
}),
Context: &analyticstypes.Context{
Extra: map[string]interface{}{
analyticstypes.KeyGroupID: user.OrgID,
},
},
},
)
traitsOrProperties := types.NewTraitsFromUser(user)
m.analytics.IdentifyUser(ctx, user.OrgID, user.ID.String(), traitsOrProperties)
m.analytics.TrackUser(ctx, user.OrgID, user.ID.String(), "User Created", traitsOrProperties)
return user, nil
}
@@ -173,35 +146,9 @@ func (m *Module) CreateUser(ctx context.Context, user *types.User) error {
return err
}
m.analytics.Send(ctx,
analyticstypes.Identify{
UserId: user.ID.String(),
Traits: analyticstypes.
NewTraits().
SetName(user.DisplayName).
SetEmail(user.Email).
Set("role", user.Role).
SetCreatedAt(user.CreatedAt),
},
analyticstypes.Group{
UserId: user.ID.String(),
GroupId: user.OrgID,
},
analyticstypes.Track{
UserId: user.ID.String(),
Event: "User Created",
Properties: analyticstypes.NewPropertiesFromMap(map[string]any{
"role": user.Role,
"email": user.Email,
"name": user.DisplayName,
}),
Context: &analyticstypes.Context{
Extra: map[string]interface{}{
analyticstypes.KeyGroupID: user.OrgID,
},
},
},
)
traitsOrProperties := types.NewTraitsFromUser(user)
m.analytics.IdentifyUser(ctx, user.OrgID, user.ID.String(), traitsOrProperties)
m.analytics.TrackUser(ctx, user.OrgID, user.ID.String(), "User Created", traitsOrProperties)
return nil
}
@@ -226,11 +173,22 @@ func (m *Module) ListUsers(ctx context.Context, orgID string) ([]*types.Gettable
return m.store.ListUsers(ctx, orgID)
}
func (m *Module) UpdateUser(ctx context.Context, orgID string, id string, user *types.User) (*types.User, error) {
return m.store.UpdateUser(ctx, orgID, id, user)
func (m *Module) UpdateUser(ctx context.Context, orgID string, id string, user *types.User, updatedBy string) (*types.User, error) {
user, err := m.store.UpdateUser(ctx, orgID, id, user)
if err != nil {
return nil, err
}
traits := types.NewTraitsFromUser(user)
m.analytics.IdentifyUser(ctx, user.OrgID, user.ID.String(), traits)
traits["updated_by"] = updatedBy
m.analytics.TrackUser(ctx, user.OrgID, user.ID.String(), "User Updated", traits)
return user, nil
}
func (m *Module) DeleteUser(ctx context.Context, orgID string, id string) error {
func (m *Module) DeleteUser(ctx context.Context, orgID string, id string, deletedBy string) error {
user, err := m.store.GetUserByID(ctx, orgID, id)
if err != nil {
return err
@@ -250,7 +208,15 @@ func (m *Module) DeleteUser(ctx context.Context, orgID string, id string) error
return errors.New(errors.TypeForbidden, errors.CodeForbidden, "cannot delete the last admin")
}
return m.store.DeleteUser(ctx, orgID, user.ID.StringValue())
if err := m.store.DeleteUser(ctx, orgID, user.ID.StringValue()); err != nil {
return err
}
m.analytics.TrackUser(ctx, user.OrgID, user.ID.String(), "User Deleted", map[string]any{
"deleted_by": deletedBy,
})
return nil
}
func (m *Module) CreateResetPasswordToken(ctx context.Context, userID string) (*types.ResetPasswordRequest, error) {
@@ -644,10 +610,16 @@ func (m *Module) Register(ctx context.Context, req *types.PostableRegisterOrgAnd
}
func (m *Module) Collect(ctx context.Context, orgID valuer.UUID) (map[string]any, error) {
stats := make(map[string]any)
count, err := m.store.CountByOrgID(ctx, orgID)
if err != nil {
return nil, err
if err == nil {
stats["user.count"] = count
}
return map[string]any{"user.count": count}, nil
count, err = m.store.CountAPIKeyByOrgID(ctx, orgID)
if err == nil {
stats["factor.api_key.count"] = count
}
return stats, nil
}

View File

@@ -826,3 +826,21 @@ func (store *store) CountByOrgID(ctx context.Context, orgID valuer.UUID) (int64,
return int64(count), nil
}
func (store *store) CountAPIKeyByOrgID(ctx context.Context, orgID valuer.UUID) (int64, error) {
apiKey := new(types.StorableAPIKey)
count, err := store.
sqlstore.
BunDB().
NewSelect().
Model(apiKey).
Join("JOIN users ON users.id = storable_api_key.user_id").
Where("org_id = ?", orgID).
Count(ctx)
if err != nil {
return 0, err
}
return int64(count), nil
}

View File

@@ -28,8 +28,8 @@ type Module interface {
GetUserByEmailInOrg(ctx context.Context, orgID string, email string) (*types.GettableUser, error)
GetUsersByRoleInOrg(ctx context.Context, orgID string, role types.Role) ([]*types.GettableUser, error)
ListUsers(ctx context.Context, orgID string) ([]*types.GettableUser, error)
UpdateUser(ctx context.Context, orgID string, id string, user *types.User) (*types.User, error)
DeleteUser(ctx context.Context, orgID string, id string) error
UpdateUser(ctx context.Context, orgID string, id string, user *types.User, updatedBy string) (*types.User, error)
DeleteUser(ctx context.Context, orgID string, id string, deletedBy string) error
// login
GetAuthenticatedUser(ctx context.Context, orgID, email, password, refreshToken string) (*types.User, error)
@@ -70,6 +70,11 @@ type Module interface {
statsreporter.StatsCollector
}
type Getter interface {
// Get gets the users based on the given id
ListByOrgID(context.Context, valuer.UUID) ([]*types.User, error)
}
type Handler interface {
// invite
CreateInvite(http.ResponseWriter, *http.Request)

View File

@@ -2,8 +2,7 @@ package clickhouseprometheus
import (
"encoding/json"
"strings"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
)
@@ -19,7 +18,7 @@ func unmarshalLabels(s string) ([]prompb.Label, string, error) {
if n == "__name__" {
metricName = v
} else {
if strings.Contains(n, ".") {
if !model.IsValidLegacyMetricName(n) {
n = `"` + n + `"`
}
}

View File

@@ -33,6 +33,12 @@ func (a *API) QueryRange(rw http.ResponseWriter, req *http.Request) {
return
}
// Validate the query request
if err := queryRangeRequest.Validate(); err != nil {
render.Error(rw, err)
return
}
orgID, err := valuer.NewUUID(claims.OrgID)
if err != nil {
render.Error(rw, err)

View File

@@ -117,7 +117,7 @@ func (bc *bucketCache) GetMissRanges(
}
// Put stores fresh query results in the cache
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result) {
func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result) {
// Get query window
startMs, endMs := q.Window()
@@ -159,8 +159,36 @@ func (bc *bucketCache) Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Que
return
}
// Convert trimmed result to buckets
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, startMs, cachableEndMs)
// Adjust start and end times to only cache complete intervals
cachableStartMs := startMs
stepMs := uint64(step.Duration.Milliseconds())
// If we have a step interval, adjust boundaries to only cache complete intervals
if stepMs > 0 {
// If start is not aligned, round up to next step boundary (first complete interval)
if startMs%stepMs != 0 {
cachableStartMs = ((startMs / stepMs) + 1) * stepMs
}
// If end is not aligned, round down to previous step boundary (last complete interval)
if cachableEndMs%stepMs != 0 {
cachableEndMs = (cachableEndMs / stepMs) * stepMs
}
// If after adjustment we have no complete intervals, don't cache
if cachableStartMs >= cachableEndMs {
bc.logger.DebugContext(ctx, "no complete intervals to cache",
"original_start", startMs,
"original_end", endMs,
"adjusted_start", cachableStartMs,
"adjusted_end", cachableEndMs,
"step", stepMs)
return
}
}
// Convert trimmed result to buckets with adjusted boundaries
freshBuckets := bc.resultToBuckets(ctx, trimmedResult, cachableStartMs, cachableEndMs)
// If no fresh buckets and no existing data, don't cache
if len(freshBuckets) == 0 && len(existingData.Buckets) == 0 {
@@ -485,6 +513,12 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
}
if existingSeries, ok := seriesMap[key]; ok {
// Merge values, avoiding duplicate timestamps
timestampMap := make(map[int64]bool)
for _, v := range existingSeries.Values {
timestampMap[v.Timestamp] = true
}
// Pre-allocate capacity for merged values
newCap := len(existingSeries.Values) + len(series.Values)
if cap(existingSeries.Values) < newCap {
@@ -492,7 +526,13 @@ func (bc *bucketCache) mergeTimeSeriesValues(ctx context.Context, buckets []*cac
copy(newValues, existingSeries.Values)
existingSeries.Values = newValues
}
existingSeries.Values = append(existingSeries.Values, series.Values...)
// Only add values with new timestamps
for _, v := range series.Values {
if !timestampMap[v.Timestamp] {
existingSeries.Values = append(existingSeries.Values, v)
}
}
} else {
// New series
seriesMap[key] = series
@@ -697,7 +737,7 @@ func (bc *bucketCache) trimResultToFluxBoundary(result *qbtypes.Result, fluxBoun
switch result.Type {
case qbtypes.RequestTypeTimeSeries:
// Trim time series data
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok && tsData != nil {
trimmedData := &qbtypes.TimeSeriesData{
QueryName: tsData.QueryName,
}

View File

@@ -30,7 +30,7 @@ func BenchmarkBucketCache_GetMissRanges(b *testing.B) {
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
}
// Create test queries with varying cache hit patterns
@@ -121,7 +121,7 @@ func BenchmarkBucketCache_Put(b *testing.B) {
for i := 0; i < b.N; i++ {
for j := 0; j < tc.numQueries; j++ {
bc.Put(ctx, orgID, queries[j], results[j])
bc.Put(ctx, orgID, queries[j], qbtypes.Step{Duration: 1000 * time.Millisecond}, results[j])
}
}
})
@@ -259,7 +259,7 @@ func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) {
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
}
b.ResetTimer()
@@ -284,7 +284,7 @@ func BenchmarkBucketCache_ConcurrentOperations(b *testing.B) {
endMs: uint64((i + 1) * 10000),
}
result := createBenchmarkResult(query.startMs, query.endMs, 1000)
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
case 2: // Partial read
query := &mockQuery{
fingerprint: fmt.Sprintf("concurrent-query-%d", i%100),

View File

@@ -0,0 +1,117 @@
package querier
import (
"context"
"testing"
"time"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestBucketCacheStepAlignment(t *testing.T) {
ctx := context.Background()
orgID := valuer.UUID{}
cache := createTestCache(t)
bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), cache, time.Hour, 5*time.Minute)
// Test with 5-minute step
step := qbtypes.Step{Duration: 5 * time.Minute}
// Query from 12:02 to 12:58 (both unaligned)
// Complete intervals: 12:05 to 12:55
query := &mockQuery{
fingerprint: "test-step-alignment",
startMs: 1672563720000, // 12:02
endMs: 1672567080000, // 12:58
}
result := &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "test",
Aggregations: []*qbtypes.AggregationBucket{
{
Index: 0,
Series: []*qbtypes.TimeSeries{
{
Labels: []*qbtypes.Label{
{Key: telemetrytypes.TelemetryFieldKey{Name: "service"}, Value: "test"},
},
Values: []*qbtypes.TimeSeriesValue{
{Timestamp: 1672563720000, Value: 1, Partial: true}, // 12:02
{Timestamp: 1672563900000, Value: 2}, // 12:05
{Timestamp: 1672564200000, Value: 2.5}, // 12:10
{Timestamp: 1672564500000, Value: 2.6}, // 12:15
{Timestamp: 1672566600000, Value: 2.9}, // 12:50
{Timestamp: 1672566900000, Value: 3}, // 12:55
{Timestamp: 1672567080000, Value: 4, Partial: true}, // 12:58
},
},
},
},
},
},
}
// Put result in cache
bc.Put(ctx, orgID, query, step, result)
// Get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, step)
// Should have cached data
require.NotNil(t, cached)
// Log the missing ranges to debug
t.Logf("Missing ranges: %v", missing)
for i, r := range missing {
t.Logf("Missing range %d: From=%d, To=%d", i, r.From, r.To)
}
// Should have 2 missing ranges for partial intervals
require.Len(t, missing, 2)
// First partial: 12:02 to 12:05
assert.Equal(t, uint64(1672563720000), missing[0].From)
assert.Equal(t, uint64(1672563900000), missing[0].To)
// Second partial: 12:55 to 12:58
assert.Equal(t, uint64(1672566900000), missing[1].From, "Second missing range From")
assert.Equal(t, uint64(1672567080000), missing[1].To, "Second missing range To")
}
func TestBucketCacheNoStepInterval(t *testing.T) {
ctx := context.Background()
orgID := valuer.UUID{}
cache := createTestCache(t)
bc := NewBucketCache(instrumentationtest.New().ToProviderSettings(), cache, time.Hour, 5*time.Minute)
// Test with no step (stepMs = 0)
step := qbtypes.Step{Duration: 0}
query := &mockQuery{
fingerprint: "test-no-step",
startMs: 1672563720000,
endMs: 1672567080000,
}
result := &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "test",
Aggregations: []*qbtypes.AggregationBucket{{Index: 0, Series: []*qbtypes.TimeSeries{}}},
},
}
// Should cache the entire range when step is 0
bc.Put(ctx, orgID, query, step, result)
cached, missing := bc.GetMissRanges(ctx, orgID, query, step)
assert.NotNil(t, cached)
assert.Len(t, missing, 0)
}

View File

@@ -128,7 +128,7 @@ func TestBucketCache_GetMissRanges_EmptyCache(t *testing.T) {
endMs: 5000,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.Nil(t, cached)
assert.Len(t, missing, 1)
@@ -159,13 +159,13 @@ func TestBucketCache_Put_And_Get(t *testing.T) {
}
// Store in cache
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait a bit for cache to be written
time.Sleep(10 * time.Millisecond)
// Retrieve from cache
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached.Value)
assert.Len(t, missing, 0)
@@ -193,7 +193,7 @@ func TestBucketCache_PartialHit(t *testing.T) {
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries("A", 1000, 3000, 1000),
}
bc.Put(context.Background(), valuer.UUID{}, query1, result1)
bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, result1)
// Wait for cache write
time.Sleep(10 * time.Millisecond)
@@ -205,7 +205,7 @@ func TestBucketCache_PartialHit(t *testing.T) {
endMs: 5000,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached.Value)
@@ -226,7 +226,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) {
startMs: 1000,
endMs: 2000,
}
bc.Put(context.Background(), valuer.UUID{}, query1, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 100 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries("A", 1000, 2000, 100),
})
@@ -236,7 +236,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) {
startMs: 3000,
endMs: 4000,
}
bc.Put(context.Background(), valuer.UUID{}, query2, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 100 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries("A", 3000, 4000, 100),
})
@@ -251,7 +251,7 @@ func TestBucketCache_MultipleBuckets(t *testing.T) {
endMs: 4500,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached.Value)
@@ -284,13 +284,13 @@ func TestBucketCache_FluxInterval(t *testing.T) {
}
// This should not be cached due to flux interval
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait a bit
time.Sleep(10 * time.Millisecond)
// Try to get the data
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no cached data
assert.Nil(t, cached)
@@ -354,7 +354,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) {
startMs: 1000,
endMs: 3000,
}
bc.Put(context.Background(), valuer.UUID{}, query1, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "A",
@@ -370,7 +370,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) {
startMs: 3000,
endMs: 5000,
}
bc.Put(context.Background(), valuer.UUID{}, query2, &qbtypes.Result{
bc.Put(context.Background(), valuer.UUID{}, query2, qbtypes.Step{Duration: 1000 * time.Millisecond}, &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "A",
@@ -390,7 +390,7 @@ func TestBucketCache_MergeTimeSeriesResults(t *testing.T) {
endMs: 5000,
}
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query3, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no missing ranges
assert.Len(t, missing, 0)
@@ -445,10 +445,10 @@ func TestBucketCache_RawData(t *testing.T) {
Value: rawData,
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Raw data should not be cached
assert.Nil(t, cached)
@@ -485,10 +485,10 @@ func TestBucketCache_ScalarData(t *testing.T) {
Value: scalarData,
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Scalar data should not be cached
assert.Nil(t, cached)
@@ -513,11 +513,11 @@ func TestBucketCache_EmptyFingerprint(t *testing.T) {
Value: createTestTimeSeries("A", 1000, 5000, 1000),
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Should still be able to retrieve
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached.Value)
assert.Len(t, missing, 0)
}
@@ -568,7 +568,7 @@ func TestBucketCache_ConcurrentAccess(t *testing.T) {
Type: qbtypes.RequestTypeTimeSeries,
Value: createTestTimeSeries(fmt.Sprintf("Q%d", id), query.startMs, query.endMs, 100),
}
bc.Put(context.Background(), valuer.UUID{}, query, result)
bc.Put(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 100 * time.Microsecond}, result)
done <- true
}(i)
}
@@ -581,7 +581,7 @@ func TestBucketCache_ConcurrentAccess(t *testing.T) {
startMs: uint64(id * 1000),
endMs: uint64((id + 1) * 1000),
}
bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000})
bc.GetMissRanges(context.Background(), valuer.UUID{}, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
done <- true
}(i)
}
@@ -628,10 +628,10 @@ func TestBucketCache_GetMissRanges_FluxInterval(t *testing.T) {
},
}
bc.Put(ctx, orgID, query, cachedResult)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, cachedResult)
// Get miss ranges
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached)
t.Logf("Missing ranges: %+v, query range: %d-%d", missing, query.startMs, query.endMs)
@@ -690,10 +690,10 @@ func TestBucketCache_Put_FluxIntervalTrimming(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Retrieve cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached)
@@ -760,10 +760,10 @@ func TestBucketCache_Put_EntireRangeInFluxInterval(t *testing.T) {
}
// Put the result - should not cache anything
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Try to get cached data - should have no cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no cached value
assert.Nil(t, cached)
@@ -785,18 +785,6 @@ func TestBucketCache_EmptyDataHandling(t *testing.T) {
shouldCache bool
description string
}{
{
name: "truly_empty_time_series",
result: &qbtypes.Result{
Type: qbtypes.RequestTypeTimeSeries,
Value: &qbtypes.TimeSeriesData{
QueryName: "A",
Aggregations: []*qbtypes.AggregationBucket{},
},
},
shouldCache: false,
description: "No aggregations means truly empty - should not cache",
},
{
name: "filtered_empty_time_series",
result: &qbtypes.Result{
@@ -878,17 +866,16 @@ func TestBucketCache_EmptyDataHandling(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, tt.result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, tt.result)
// Wait a bit for cache to be written
time.Sleep(10 * time.Millisecond)
// Try to get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
if tt.shouldCache {
assert.NotNil(t, cached, tt.description)
assert.Len(t, missing, 0, "Should have no missing ranges when data is cached")
} else {
assert.Nil(t, cached, tt.description)
assert.Len(t, missing, 1, "Should have entire range as missing when data is not cached")
@@ -944,13 +931,13 @@ func TestBucketCache_PartialValues(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait for cache to be written
time.Sleep(10 * time.Millisecond)
// Get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have cached data
assert.NotNil(t, cached)
@@ -1014,13 +1001,13 @@ func TestBucketCache_AllPartialValues(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
// Wait for cache to be written
time.Sleep(10 * time.Millisecond)
// Get cached data
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
// When all values are partial and filtered out, the result is cached as empty
// This prevents re-querying for the same misaligned time range
@@ -1075,7 +1062,7 @@ func TestBucketCache_FilteredCachedResults(t *testing.T) {
}
// Cache the wide range
bc.Put(ctx, orgID, query1, result1)
bc.Put(ctx, orgID, query1, qbtypes.Step{Duration: 1000 * time.Millisecond}, result1)
time.Sleep(10 * time.Millisecond)
// Now query for a smaller range (2000-3500ms)
@@ -1086,7 +1073,7 @@ func TestBucketCache_FilteredCachedResults(t *testing.T) {
}
// Get cached data - should be filtered to requested range
cached, missing := bc.GetMissRanges(ctx, orgID, query2, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query2, qbtypes.Step{Duration: 1000 * time.Millisecond})
// Should have no missing ranges
assert.Len(t, missing, 0)
@@ -1246,7 +1233,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Get cached data
@@ -1300,7 +1287,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Get cached data
@@ -1352,7 +1339,7 @@ func TestBucketCache_PartialValueDetection(t *testing.T) {
}
// Put the result
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Get cached data
@@ -1409,11 +1396,11 @@ func TestBucketCache_NoCache(t *testing.T) {
}
// Put the result in cache
bc.Put(ctx, orgID, query, result)
bc.Put(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond}, result)
time.Sleep(10 * time.Millisecond)
// Verify data is cached
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000})
cached, missing := bc.GetMissRanges(ctx, orgID, query, qbtypes.Step{Duration: 1000 * time.Millisecond})
assert.NotNil(t, cached)
assert.Len(t, missing, 0)

View File

@@ -118,6 +118,10 @@ func (q *builderQuery[T]) Fingerprint() string {
parts = append(parts, fmt.Sprintf("having=%s", q.spec.Having.Expression))
}
if q.spec.ShiftBy != 0 {
parts = append(parts, fmt.Sprintf("shiftby=%d", q.spec.ShiftBy))
}
return strings.Join(parts, "&")
}
@@ -204,7 +208,14 @@ func (q *builderQuery[T]) executeWithContext(ctx context.Context, query string,
// Pass query window and step for partial value detection
queryWindow := &qbtypes.TimeRange{From: q.fromMS, To: q.toMS}
payload, err := consume(rows, q.kind, queryWindow, q.spec.StepInterval, q.spec.Name)
kind := q.kind
// all metric queries are time series then reduced if required
if q.spec.Signal == telemetrytypes.SignalMetrics {
kind = qbtypes.RequestTypeTimeSeries
}
payload, err := consume(rows, kind, queryWindow, q.spec.StepInterval, q.spec.Name)
if err != nil {
return nil, err
}
@@ -224,16 +235,18 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
isAsc := len(q.spec.Order) > 0 &&
strings.ToLower(string(q.spec.Order[0].Direction.StringValue())) == "asc"
fromMS, toMS := q.fromMS, q.toMS
// Adjust [fromMS,toMS] window if a cursor was supplied
if cur := strings.TrimSpace(q.spec.Cursor); cur != "" {
if ts, err := decodeCursor(cur); err == nil {
if isAsc {
if uint64(ts) >= q.fromMS {
q.fromMS = uint64(ts + 1)
if uint64(ts) >= fromMS {
fromMS = uint64(ts + 1)
}
} else { // DESC
if uint64(ts) <= q.toMS {
q.toMS = uint64(ts - 1)
if uint64(ts) <= toMS {
toMS = uint64(ts - 1)
}
}
}
@@ -252,7 +265,16 @@ func (q *builderQuery[T]) executeWindowList(ctx context.Context) (*qbtypes.Resul
totalBytes := uint64(0)
start := time.Now()
for _, r := range makeBuckets(q.fromMS, q.toMS) {
// Get buckets and reverse them for ascending order
buckets := makeBuckets(fromMS, toMS)
if isAsc {
// Reverse the buckets for ascending order
for i, j := 0, len(buckets)-1; i < j; i, j = i+1, j-1 {
buckets[i], buckets[j] = buckets[j], buckets[i]
}
}
for _, r := range buckets {
q.spec.Offset = 0
q.spec.Limit = need

View File

@@ -0,0 +1,131 @@
package querier
import (
"strings"
"testing"
"github.com/SigNoz/signoz/pkg/querybuilder"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"github.com/stretchr/testify/assert"
)
func TestBuilderQueryFingerprint(t *testing.T) {
tests := []struct {
name string
query *builderQuery[qbtypes.MetricAggregation]
expectInKey []string
notExpectInKey []string
}{
{
name: "fingerprint includes shiftby when ShiftBy field is set",
query: &builderQuery[qbtypes.MetricAggregation]{
kind: qbtypes.RequestTypeTimeSeries,
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
ShiftBy: 3600,
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
},
expectInKey: []string{"shiftby=3600"},
notExpectInKey: []string{"functions=", "timeshift", "absolute"},
},
{
name: "fingerprint includes shiftby but not other functions",
query: &builderQuery[qbtypes.MetricAggregation]{
kind: qbtypes.RequestTypeTimeSeries,
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
ShiftBy: 3600,
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
{
Name: qbtypes.FunctionNameAbsolute,
},
},
},
},
expectInKey: []string{"shiftby=3600"},
notExpectInKey: []string{"functions=", "absolute"},
},
{
name: "no shiftby in fingerprint when ShiftBy is zero",
query: &builderQuery[qbtypes.MetricAggregation]{
kind: qbtypes.RequestTypeTimeSeries,
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Signal: telemetrytypes.SignalMetrics,
ShiftBy: 0,
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameAbsolute,
},
},
},
},
expectInKey: []string{},
notExpectInKey: []string{"shiftby=", "functions=", "absolute"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fingerprint := tt.query.Fingerprint()
for _, expected := range tt.expectInKey {
assert.True(t, strings.Contains(fingerprint, expected),
"Expected fingerprint to contain '%s', got: %s", expected, fingerprint)
}
for _, notExpected := range tt.notExpectInKey {
assert.False(t, strings.Contains(fingerprint, notExpected),
"Expected fingerprint NOT to contain '%s', got: %s", notExpected, fingerprint)
}
})
}
}
func TestMakeBucketsOrder(t *testing.T) {
// Test that makeBuckets returns buckets in reverse chronological order by default
// Using milliseconds as input - need > 1 hour range to get multiple buckets
now := uint64(1700000000000) // Some timestamp in ms
startMS := now
endMS := now + uint64(10*60*60*1000) // 10 hours later
buckets := makeBuckets(startMS, endMS)
// Should have multiple buckets for a 10 hour range
assert.True(t, len(buckets) > 1, "Should have multiple buckets for 10 hour range, got %d", len(buckets))
// Log buckets for debugging
t.Logf("Generated %d buckets:", len(buckets))
for i, b := range buckets {
durationMs := (b.toNS - b.fromNS) / 1e6
t.Logf("Bucket %d: duration=%dms", i, durationMs)
}
// Verify buckets are in reverse chronological order (newest to oldest)
for i := 0; i < len(buckets)-1; i++ {
assert.True(t, buckets[i].toNS > buckets[i+1].toNS,
"Bucket %d end should be after bucket %d end", i, i+1)
assert.Equal(t, buckets[i].fromNS, buckets[i+1].toNS,
"Bucket %d start should equal bucket %d end (continuous buckets)", i, i+1)
}
// First bucket should end at endNS (converted to nanoseconds)
expectedEndNS := querybuilder.ToNanoSecs(endMS)
assert.Equal(t, expectedEndNS, buckets[0].toNS)
// Last bucket should start at startNS (converted to nanoseconds)
expectedStartNS := querybuilder.ToNanoSecs(startMS)
assert.Equal(t, expectedStartNS, buckets[len(buckets)-1].fromNS)
}

View File

@@ -176,7 +176,7 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
lblVals = append(lblVals, *val)
lblObjs = append(lblObjs, &qbtypes.Label{
Key: telemetrytypes.TelemetryFieldKey{Name: name},
Value: val,
Value: *val,
})
default:
@@ -227,8 +227,9 @@ func readAsTimeSeries(rows driver.Rows, queryWindow *qbtypes.TimeRange, step qbt
}
}
if maxAgg < 0 {
//nolint:nilnil
return nil, nil // empty result-set
return &qbtypes.TimeSeriesData{
QueryName: queryName,
}, nil
}
buckets := make([]*qbtypes.AggregationBucket, maxAgg+1)
@@ -319,8 +320,9 @@ func readAsScalar(rows driver.Rows, queryName string) (*qbtypes.ScalarData, erro
}
return &qbtypes.ScalarData{
Columns: cd,
Data: data,
QueryName: queryName,
Columns: cd,
Data: data,
}, nil
}

View File

@@ -17,5 +17,5 @@ type BucketCache interface {
// cached portion + list of gaps to fetch
GetMissRanges(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step) (cached *qbtypes.Result, missing []*qbtypes.TimeRange)
// store fresh buckets for future hits
Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, fresh *qbtypes.Result)
}
Put(ctx context.Context, orgID valuer.UUID, q qbtypes.Query, step qbtypes.Step, fresh *qbtypes.Result)
}

652
pkg/querier/postprocess.go Normal file
View File

@@ -0,0 +1,652 @@
package querier
import (
"context"
"fmt"
"sort"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
)
// queryInfo holds common query properties
type queryInfo struct {
Name string
Disabled bool
Step qbtypes.Step
}
// getqueryInfo extracts common info from any query type
func getqueryInfo(spec any) queryInfo {
switch s := spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.StepInterval}
case qbtypes.QueryBuilderFormula:
return queryInfo{Name: s.Name, Disabled: false}
case qbtypes.PromQuery:
return queryInfo{Name: s.Name, Disabled: s.Disabled, Step: s.Step}
case qbtypes.ClickHouseQuery:
return queryInfo{Name: s.Name, Disabled: s.Disabled}
}
return queryInfo{}
}
// getQueryName is a convenience function when only name is needed
func getQueryName(spec any) string {
return getqueryInfo(spec).Name
}
func (q *querier) postProcessResults(ctx context.Context, results map[string]any, req *qbtypes.QueryRangeRequest) (map[string]any, error) {
// Convert results to typed format for processing
typedResults := make(map[string]*qbtypes.Result)
for name, result := range results {
typedResults[name] = &qbtypes.Result{
Value: result,
}
}
for _, query := range req.CompositeQuery.Queries {
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessBuilderQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessBuilderQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
if result, ok := typedResults[spec.Name]; ok {
result = postProcessMetricQuery(q, result, spec, req)
typedResults[spec.Name] = result
}
}
}
// Apply formula calculations
typedResults = q.applyFormulas(ctx, typedResults, req)
// Filter out disabled queries
typedResults = q.filterDisabledQueries(typedResults, req)
// Apply table formatting for UI if requested
if req.FormatOptions != nil && req.FormatOptions.FormatTableResultForUI && req.RequestType == qbtypes.RequestTypeScalar {
// Format results as a table - this merges all queries into a single table
tableResult := q.formatScalarResultsAsTable(typedResults, req)
// Return the table under the first query's name so it gets included in results
if len(req.CompositeQuery.Queries) > 0 {
firstQueryName := getQueryName(req.CompositeQuery.Queries[0].Spec)
if firstQueryName != "" && tableResult["table"] != nil {
// Return table under first query name
return map[string]any{firstQueryName: tableResult["table"]}, nil
}
}
return tableResult, nil
}
// Convert back to map[string]any
finalResults := make(map[string]any)
for name, result := range typedResults {
finalResults[name] = result.Value
}
return finalResults, nil
}
// postProcessBuilderQuery applies postprocessing to a single builder query result
func postProcessBuilderQuery[T any](
q *querier,
result *qbtypes.Result,
query qbtypes.QueryBuilderQuery[T],
_ *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
// Apply functions
if len(query.Functions) > 0 {
result = q.applyFunctions(result, query.Functions)
}
return result
}
// postProcessMetricQuery applies postprocessing to a metric query result
func postProcessMetricQuery(
q *querier,
result *qbtypes.Result,
query qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation],
req *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
if query.Limit > 0 {
result = q.applySeriesLimit(result, query.Limit, query.Order)
}
if len(query.Functions) > 0 {
result = q.applyFunctions(result, query.Functions)
}
// Apply reduce to for scalar request type
if req.RequestType == qbtypes.RequestTypeScalar {
if len(query.Aggregations) > 0 && query.Aggregations[0].ReduceTo != qbtypes.ReduceToUnknown {
result = q.applyMetricReduceTo(result, query.Aggregations[0].ReduceTo)
}
}
return result
}
// applyMetricReduceTo applies reduce to operation using the metric's ReduceTo field
func (q *querier) applyMetricReduceTo(result *qbtypes.Result, reduceOp qbtypes.ReduceTo) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
for i, series := range agg.Series {
// Use the FunctionReduceTo helper
reducedSeries := qbtypes.FunctionReduceTo(series, reduceOp)
agg.Series[i] = reducedSeries
}
}
}
scalarData := convertTimeSeriesDataToScalar(tsData, tsData.QueryName)
result.Value = scalarData
return result
}
// applySeriesLimit limits the number of series in the result
func (q *querier) applySeriesLimit(result *qbtypes.Result, limit int, orderBy []qbtypes.OrderBy) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
// Use the ApplySeriesLimit function from querybuildertypes
agg.Series = qbtypes.ApplySeriesLimit(agg.Series, orderBy, limit)
}
}
return result
}
// applyFunctions applies functions to time series data
func (q *querier) applyFunctions(result *qbtypes.Result, functions []qbtypes.Function) *qbtypes.Result {
tsData, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
return result
}
if tsData != nil {
for _, agg := range tsData.Aggregations {
for i, series := range agg.Series {
agg.Series[i] = qbtypes.ApplyFunctions(functions, series)
}
}
}
return result
}
// applyFormulas processes formula queries in the composite query
func (q *querier) applyFormulas(ctx context.Context, results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result {
// Collect formula queries
formulaQueries := make(map[string]qbtypes.QueryBuilderFormula)
for _, query := range req.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeFormula {
if formula, ok := query.Spec.(qbtypes.QueryBuilderFormula); ok {
formulaQueries[formula.Name] = formula
}
}
}
// Process each formula
for name, formula := range formulaQueries {
// Check if we're dealing with time series or scalar data
if req.RequestType == qbtypes.RequestTypeTimeSeries {
result := q.processTimeSeriesFormula(ctx, results, formula, req)
if result != nil {
results[name] = result
}
}
}
return results
}
// processTimeSeriesFormula handles formula evaluation for time series data
func (q *querier) processTimeSeriesFormula(
ctx context.Context,
results map[string]*qbtypes.Result,
formula qbtypes.QueryBuilderFormula,
_ *qbtypes.QueryRangeRequest,
) *qbtypes.Result {
// Prepare time series data for formula evaluation
timeSeriesData := make(map[string]*qbtypes.TimeSeriesData)
// Extract time series data from results
for queryName, result := range results {
if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
timeSeriesData[queryName] = tsData
}
}
// Create formula evaluator
// TODO(srikanthccv): add conditional default zero
canDefaultZero := make(map[string]bool)
evaluator, err := qbtypes.NewFormulaEvaluator(formula.Expression, canDefaultZero)
if err != nil {
q.logger.ErrorContext(ctx, "failed to create formula evaluator", "error", err, "formula", formula.Name)
return nil
}
// Evaluate the formula
formulaSeries, err := evaluator.EvaluateFormula(timeSeriesData)
if err != nil {
q.logger.ErrorContext(ctx, "failed to evaluate formula", "error", err, "formula", formula.Name)
return nil
}
// Create result for formula
formulaResult := &qbtypes.TimeSeriesData{
QueryName: formula.Name,
Aggregations: []*qbtypes.AggregationBucket{
{
Index: 0,
Series: formulaSeries,
},
},
}
// Apply functions if any
result := &qbtypes.Result{
Value: formulaResult,
}
if len(formula.Functions) > 0 {
result = q.applyFunctions(result, formula.Functions)
}
return result
}
// filterDisabledQueries removes results for disabled queries
func (q *querier) filterDisabledQueries(results map[string]*qbtypes.Result, req *qbtypes.QueryRangeRequest) map[string]*qbtypes.Result {
filtered := make(map[string]*qbtypes.Result)
for _, query := range req.CompositeQuery.Queries {
info := getqueryInfo(query.Spec)
if !info.Disabled {
if result, ok := results[info.Name]; ok {
filtered[info.Name] = result
}
}
}
return filtered
}
// formatScalarResultsAsTable formats scalar results as a unified table for UI display
func (q *querier) formatScalarResultsAsTable(results map[string]*qbtypes.Result, _ *qbtypes.QueryRangeRequest) map[string]any {
if len(results) == 0 {
return map[string]any{"table": &qbtypes.ScalarData{}}
}
// Convert all results to ScalarData first
scalarResults := make(map[string]*qbtypes.ScalarData)
for name, result := range results {
if sd, ok := result.Value.(*qbtypes.ScalarData); ok {
scalarResults[name] = sd
} else if tsData, ok := result.Value.(*qbtypes.TimeSeriesData); ok {
scalarResults[name] = convertTimeSeriesDataToScalar(tsData, name)
}
}
// If single result already has multiple queries, just deduplicate
if len(scalarResults) == 1 {
for _, sd := range scalarResults {
if hasMultipleQueries(sd) {
return map[string]any{"table": deduplicateRows(sd)}
}
}
}
// Otherwise merge all results
merged := mergeScalarData(scalarResults)
return map[string]any{"table": merged}
}
// convertTimeSeriesDataToScalar converts time series to scalar format
func convertTimeSeriesDataToScalar(tsData *qbtypes.TimeSeriesData, queryName string) *qbtypes.ScalarData {
if tsData == nil || len(tsData.Aggregations) == 0 {
return &qbtypes.ScalarData{QueryName: queryName}
}
columns := []*qbtypes.ColumnDescriptor{}
// Add group columns from first series
if len(tsData.Aggregations[0].Series) > 0 {
for _, label := range tsData.Aggregations[0].Series[0].Labels {
columns = append(columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: label.Key,
QueryName: queryName,
Type: qbtypes.ColumnTypeGroup,
})
}
}
// Add aggregation columns
for _, agg := range tsData.Aggregations {
name := agg.Alias
if name == "" {
name = fmt.Sprintf("__result_%d", agg.Index)
}
columns = append(columns, &qbtypes.ColumnDescriptor{
TelemetryFieldKey: telemetrytypes.TelemetryFieldKey{Name: name},
QueryName: queryName,
AggregationIndex: int64(agg.Index),
Meta: agg.Meta,
Type: qbtypes.ColumnTypeAggregation,
})
}
// Build rows
data := [][]any{}
for seriesIdx, series := range tsData.Aggregations[0].Series {
row := make([]any, len(columns))
// Add group values
for i, label := range series.Labels {
row[i] = label.Value
}
// Add aggregation values (last value)
groupColCount := len(series.Labels)
for aggIdx, agg := range tsData.Aggregations {
if seriesIdx < len(agg.Series) && len(agg.Series[seriesIdx].Values) > 0 {
lastValue := agg.Series[seriesIdx].Values[len(agg.Series[seriesIdx].Values)-1].Value
row[groupColCount+aggIdx] = lastValue
} else {
row[groupColCount+aggIdx] = "n/a"
}
}
data = append(data, row)
}
return &qbtypes.ScalarData{
QueryName: queryName,
Columns: columns,
Data: data,
}
}
// hasMultipleQueries checks if ScalarData contains columns from multiple queries
func hasMultipleQueries(sd *qbtypes.ScalarData) bool {
queries := make(map[string]bool)
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeAggregation && col.QueryName != "" {
queries[col.QueryName] = true
}
}
return len(queries) > 1
}
// deduplicateRows removes duplicate rows based on group columns
func deduplicateRows(sd *qbtypes.ScalarData) *qbtypes.ScalarData {
// Find group column indices
groupIndices := []int{}
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupIndices = append(groupIndices, i)
}
}
// Build unique rows map
uniqueRows := make(map[string][]any)
for _, row := range sd.Data {
key := buildRowKey(row, groupIndices)
if existing, found := uniqueRows[key]; found {
// Merge non-n/a values
for i, val := range row {
if existing[i] == "n/a" && val != "n/a" {
existing[i] = val
}
}
} else {
rowCopy := make([]any, len(row))
copy(rowCopy, row)
uniqueRows[key] = rowCopy
}
}
// Convert back to slice
data := make([][]any, 0, len(uniqueRows))
for _, row := range uniqueRows {
data = append(data, row)
}
// Sort by first aggregation column
sortByFirstAggregation(data, sd.Columns)
return &qbtypes.ScalarData{
Columns: sd.Columns,
Data: data,
}
}
// mergeScalarData merges multiple scalar data results
func mergeScalarData(results map[string]*qbtypes.ScalarData) *qbtypes.ScalarData {
// Collect unique group columns
groupCols := []string{}
groupColMap := make(map[string]*qbtypes.ColumnDescriptor)
for _, sd := range results {
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
if _, exists := groupColMap[col.Name]; !exists {
groupColMap[col.Name] = col
groupCols = append(groupCols, col.Name)
}
}
}
}
// Build final columns
columns := []*qbtypes.ColumnDescriptor{}
// Add group columns
for _, name := range groupCols {
columns = append(columns, groupColMap[name])
}
// Add aggregation columns from each query (sorted by query name)
queryNames := make([]string, 0, len(results))
for name := range results {
queryNames = append(queryNames, name)
}
sort.Strings(queryNames)
for _, queryName := range queryNames {
sd := results[queryName]
for _, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeAggregation {
columns = append(columns, col)
}
}
}
// Merge rows
rowMap := make(map[string][]any)
for queryName, sd := range results {
// Create index mappings
groupMap := make(map[string]int)
for i, col := range sd.Columns {
if col.Type == qbtypes.ColumnTypeGroup {
groupMap[col.Name] = i
}
}
// Process each row
for _, row := range sd.Data {
key := buildKeyFromGroupCols(row, groupMap, groupCols)
if _, exists := rowMap[key]; !exists {
// Initialize new row
newRow := make([]any, len(columns))
// Set group values
for i, colName := range groupCols {
if idx, ok := groupMap[colName]; ok && idx < len(row) {
newRow[i] = row[idx]
} else {
newRow[i] = "n/a"
}
}
// Initialize all aggregations to n/a
for i := len(groupCols); i < len(columns); i++ {
newRow[i] = "n/a"
}
rowMap[key] = newRow
}
// Set aggregation values for this query
mergedRow := rowMap[key]
colIdx := len(groupCols)
for _, col := range columns[len(groupCols):] {
if col.QueryName == queryName {
// Find the value in the original row
for i, origCol := range sd.Columns {
if origCol.Type == qbtypes.ColumnTypeAggregation &&
origCol.AggregationIndex == col.AggregationIndex {
if i < len(row) {
mergedRow[colIdx] = row[i]
}
break
}
}
}
colIdx++
}
}
}
// Convert to slice
data := make([][]any, 0, len(rowMap))
for _, row := range rowMap {
data = append(data, row)
}
// Sort by first aggregation column
sortByFirstAggregation(data, columns)
return &qbtypes.ScalarData{
Columns: columns,
Data: data,
}
}
// buildRowKey builds a unique key from row values at specified indices
func buildRowKey(row []any, indices []int) string {
parts := make([]string, len(indices))
for i, idx := range indices {
if idx < len(row) {
parts[i] = fmt.Sprintf("%v", row[idx])
} else {
parts[i] = "n/a"
}
}
return fmt.Sprintf("%v", parts)
}
// buildKeyFromGroupCols builds a key from group column values
func buildKeyFromGroupCols(row []any, groupMap map[string]int, groupCols []string) string {
parts := make([]string, len(groupCols))
for i, colName := range groupCols {
if idx, ok := groupMap[colName]; ok && idx < len(row) {
parts[i] = fmt.Sprintf("%v", row[idx])
} else {
parts[i] = "n/a"
}
}
return fmt.Sprintf("%v", parts)
}
// sortByFirstAggregation sorts data by the first aggregation column (descending)
func sortByFirstAggregation(data [][]any, columns []*qbtypes.ColumnDescriptor) {
// Find first aggregation column
aggIdx := -1
for i, col := range columns {
if col.Type == qbtypes.ColumnTypeAggregation {
aggIdx = i
break
}
}
if aggIdx < 0 {
return
}
sort.SliceStable(data, func(i, j int) bool {
return compareValues(data[i][aggIdx], data[j][aggIdx]) > 0
})
}
// compareValues compares two values for sorting (handles n/a and numeric types)
func compareValues(a, b any) int {
// Handle n/a values
if a == "n/a" && b == "n/a" {
return 0
}
if a == "n/a" {
return -1
}
if b == "n/a" {
return 1
}
// Compare numeric values
aFloat, aOk := toFloat64(a)
bFloat, bOk := toFloat64(b)
if aOk && bOk {
if aFloat > bFloat {
return 1
} else if aFloat < bFloat {
return -1
}
return 0
}
// Fallback to string comparison
return 0
}
// toFloat64 attempts to convert a value to float64
func toFloat64(v any) (float64, bool) {
switch val := v.(type) {
case float64:
return val, true
case int64:
return float64(val), true
case int:
return float64(val), true
case int32:
return float64(val), true
}
return 0, false
}

View File

@@ -5,12 +5,14 @@ import (
"fmt"
"log/slog"
"slices"
"strconv"
"sync"
"github.com/SigNoz/signoz/pkg/errors"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/telemetrystore"
"github.com/SigNoz/signoz/pkg/types/metrictypes"
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
"golang.org/x/exp/maps"
@@ -54,8 +56,82 @@ func New(
}
}
// extractShiftFromBuilderQuery extracts the shift value from timeShift function if present
func extractShiftFromBuilderQuery[T any](spec qbtypes.QueryBuilderQuery[T]) int64 {
for _, fn := range spec.Functions {
if fn.Name == qbtypes.FunctionNameTimeShift && len(fn.Args) > 0 {
switch v := fn.Args[0].Value.(type) {
case float64:
return int64(v)
case int64:
return v
case int:
return int64(v)
case string:
if shiftFloat, err := strconv.ParseFloat(v, 64); err == nil {
return int64(shiftFloat)
}
}
}
}
return 0
}
// adjustTimeRangeForShift adjusts the time range based on the shift value from timeShift function
func adjustTimeRangeForShift[T any](spec qbtypes.QueryBuilderQuery[T], tr qbtypes.TimeRange, kind qbtypes.RequestType) qbtypes.TimeRange {
// Only apply time shift for time series and scalar queries
// Raw/list queries don't support timeshift
if kind != qbtypes.RequestTypeTimeSeries && kind != qbtypes.RequestTypeScalar {
return tr
}
// Use the ShiftBy field if it's already populated, otherwise extract it
shiftBy := spec.ShiftBy
if shiftBy == 0 {
shiftBy = extractShiftFromBuilderQuery(spec)
}
if shiftBy == 0 {
return tr
}
// ShiftBy is in seconds, convert to milliseconds and shift backward in time
shiftMS := shiftBy * 1000
return qbtypes.TimeRange{
From: tr.From - uint64(shiftMS),
To: tr.To - uint64(shiftMS),
}
}
func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtypes.QueryRangeRequest) (*qbtypes.QueryRangeResponse, error) {
// First pass: collect all metric names that need temporality
metricNames := make([]string, 0)
for _, query := range req.CompositeQuery.Queries {
if query.Type == qbtypes.QueryTypeBuilder {
if spec, ok := query.Spec.(qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]); ok {
for _, agg := range spec.Aggregations {
if agg.MetricName != "" {
metricNames = append(metricNames, agg.MetricName)
}
}
}
}
}
// Fetch temporality for all metrics at once
var metricTemporality map[string]metrictypes.Temporality
if len(metricNames) > 0 {
var err error
metricTemporality, err = q.metadataStore.FetchTemporalityMulti(ctx, metricNames...)
if err != nil {
q.logger.WarnContext(ctx, "failed to fetch metric temporality", "error", err, "metrics", metricNames)
// Continue without temporality - statement builder will handle unspecified
metricTemporality = make(map[string]metrictypes.Temporality)
}
q.logger.DebugContext(ctx, "fetched metric temporalities", "metric_temporality", metricTemporality)
}
queries := make(map[string]qbtypes.Query)
steps := make(map[string]qbtypes.Step)
@@ -79,15 +155,28 @@ func (q *querier) QueryRange(ctx context.Context, orgID valuer.UUID, req *qbtype
case qbtypes.QueryTypeBuilder:
switch spec := query.Spec.(type) {
case qbtypes.QueryBuilderQuery[qbtypes.TraceAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, spec, timeRange, req.RequestType)
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
case qbtypes.QueryBuilderQuery[qbtypes.LogAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.logStmtBuilder, spec, timeRange, req.RequestType)
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
case qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]:
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
for i := range spec.Aggregations {
if spec.Aggregations[i].MetricName != "" && spec.Aggregations[i].Temporality == metrictypes.Unknown {
if temp, ok := metricTemporality[spec.Aggregations[i].MetricName]; ok && temp != metrictypes.Unknown {
spec.Aggregations[i].Temporality = temp
}
}
}
spec.ShiftBy = extractShiftFromBuilderQuery(spec)
timeRange := adjustTimeRangeForShift(spec, qbtypes.TimeRange{From: req.Start, To: req.End}, req.RequestType)
bq := newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, spec, timeRange, req.RequestType)
queries[spec.Name] = bq
steps[spec.Name] = spec.StepInterval
default:
@@ -133,13 +222,18 @@ func (q *querier) run(ctx context.Context, orgID valuer.UUID, qs map[string]qbty
}
}
processedResults, err := q.postProcessResults(ctx, results, req)
if err != nil {
return nil, err
}
return &qbtypes.QueryRangeResponse{
Type: req.RequestType,
Data: struct {
Results []any `json:"results"`
Warnings []string `json:"warnings"`
}{
Results: maps.Values(results),
Results: maps.Values(processedResults),
Warnings: warnings,
},
Meta: struct {
@@ -173,7 +267,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
return nil, err
}
// Store in cache for future use
q.bucketCache.Put(ctx, orgID, query, result)
q.bucketCache.Put(ctx, orgID, query, step, result)
return result, nil
}
}
@@ -183,6 +277,10 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
errors := make([]error, len(missingRanges))
totalStats := qbtypes.ExecStats{}
q.logger.DebugContext(ctx, "executing queries for missing ranges",
"missing_ranges_count", len(missingRanges),
"ranges", missingRanges)
sem := make(chan struct{}, 4)
var wg sync.WaitGroup
@@ -224,7 +322,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
if err != nil {
return nil, err
}
q.bucketCache.Put(ctx, orgID, query, result)
q.bucketCache.Put(ctx, orgID, query, step, result)
return result, nil
}
}
@@ -248,7 +346,7 @@ func (q *querier) executeWithCache(ctx context.Context, orgID valuer.UUID, query
mergedResult.Stats.DurationMS += totalStats.DurationMS
// Store merged result in cache
q.bucketCache.Put(ctx, orgID, query, mergedResult)
q.bucketCache.Put(ctx, orgID, query, step, mergedResult)
return mergedResult, nil
}
@@ -261,11 +359,17 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
case *chSQLQuery:
return newchSQLQuery(q.telemetryStore, qt.query, qt.args, timeRange, qt.kind)
case *builderQuery[qbtypes.TraceAggregation]:
return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, timeRange, qt.kind)
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
return newBuilderQuery(q.telemetryStore, q.traceStmtBuilder, qt.spec, adjustedTimeRange, qt.kind)
case *builderQuery[qbtypes.LogAggregation]:
return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, timeRange, qt.kind)
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
return newBuilderQuery(q.telemetryStore, q.logStmtBuilder, qt.spec, adjustedTimeRange, qt.kind)
case *builderQuery[qbtypes.MetricAggregation]:
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, timeRange, qt.kind)
qt.spec.ShiftBy = extractShiftFromBuilderQuery(qt.spec)
adjustedTimeRange := adjustTimeRangeForShift(qt.spec, timeRange, qt.kind)
return newBuilderQuery(q.telemetryStore, q.metricStmtBuilder, qt.spec, adjustedTimeRange, qt.kind)
default:
return nil
}
@@ -273,8 +377,29 @@ func (q *querier) createRangedQuery(originalQuery qbtypes.Query, timeRange qbtyp
// mergeResults merges cached result with fresh results
func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result) *qbtypes.Result {
if cached == nil && len(fresh) == 1 {
return fresh[0]
if cached == nil {
if len(fresh) == 1 {
return fresh[0]
}
if len(fresh) == 0 {
return nil
}
// If cached is nil but we have multiple fresh results, we need to merge them
// We need to merge all fresh results properly to avoid duplicates
merged := &qbtypes.Result{
Type: fresh[0].Type,
Stats: fresh[0].Stats,
Warnings: fresh[0].Warnings,
}
// Merge all fresh results including the first one
switch merged.Type {
case qbtypes.RequestTypeTimeSeries:
// Pass nil as cached value to ensure proper merging of all fresh results
merged.Value = q.mergeTimeSeriesResults(nil, fresh)
}
return merged
}
// Start with cached result
@@ -315,23 +440,52 @@ func (q *querier) mergeResults(cached *qbtypes.Result, fresh []*qbtypes.Result)
// mergeTimeSeriesResults merges time series data
func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, freshResults []*qbtypes.Result) *qbtypes.TimeSeriesData {
// Map to store merged series by query name and series key
// Map to store merged series by aggregation index and series key
seriesMap := make(map[int]map[string]*qbtypes.TimeSeries)
// Map to store aggregation bucket metadata
bucketMetadata := make(map[int]*qbtypes.AggregationBucket)
for _, aggBucket := range cachedValue.Aggregations {
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
for _, series := range aggBucket.Series {
key := qbtypes.GetUniqueSeriesKey(series.Labels)
seriesMap[aggBucket.Index][key] = series
// Process cached data if available
if cachedValue != nil && cachedValue.Aggregations != nil {
for _, aggBucket := range cachedValue.Aggregations {
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
if bucketMetadata[aggBucket.Index] == nil {
bucketMetadata[aggBucket.Index] = aggBucket
}
for _, series := range aggBucket.Series {
key := qbtypes.GetUniqueSeriesKey(series.Labels)
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
// Merge values from duplicate series in cached data, avoiding duplicate timestamps
timestampMap := make(map[int64]bool)
for _, v := range existingSeries.Values {
timestampMap[v.Timestamp] = true
}
// Only add values with new timestamps
for _, v := range series.Values {
if !timestampMap[v.Timestamp] {
existingSeries.Values = append(existingSeries.Values, v)
}
}
} else {
// Create a copy to avoid modifying the cached data
seriesCopy := &qbtypes.TimeSeries{
Labels: series.Labels,
Values: make([]*qbtypes.TimeSeriesValue, len(series.Values)),
}
copy(seriesCopy.Values, series.Values)
seriesMap[aggBucket.Index][key] = seriesCopy
}
}
}
}
// Add fresh series
for _, result := range freshResults {
freshTS, ok := result.Value.(*qbtypes.TimeSeriesData)
if !ok {
if !ok || freshTS == nil || freshTS.Aggregations == nil {
continue
}
@@ -339,6 +493,12 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
if seriesMap[aggBucket.Index] == nil {
seriesMap[aggBucket.Index] = make(map[string]*qbtypes.TimeSeries)
}
// Prefer fresh metadata over cached metadata
if aggBucket.Alias != "" || aggBucket.Meta.Unit != "" {
bucketMetadata[aggBucket.Index] = aggBucket
} else if bucketMetadata[aggBucket.Index] == nil {
bucketMetadata[aggBucket.Index] = aggBucket
}
}
for _, aggBucket := range freshTS.Aggregations {
@@ -346,8 +506,19 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
key := qbtypes.GetUniqueSeriesKey(series.Labels)
if existingSeries, ok := seriesMap[aggBucket.Index][key]; ok {
// Merge values
existingSeries.Values = append(existingSeries.Values, series.Values...)
// Merge values, avoiding duplicate timestamps
// Create a map to track existing timestamps
timestampMap := make(map[int64]bool)
for _, v := range existingSeries.Values {
timestampMap[v.Timestamp] = true
}
// Only add values with new timestamps
for _, v := range series.Values {
if !timestampMap[v.Timestamp] {
existingSeries.Values = append(existingSeries.Values, v)
}
}
} else {
// New series
seriesMap[aggBucket.Index][key] = series
@@ -357,10 +528,18 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
}
result := &qbtypes.TimeSeriesData{
QueryName: cachedValue.QueryName,
Aggregations: []*qbtypes.AggregationBucket{},
}
// Set QueryName from cached or first fresh result
if cachedValue != nil {
result.QueryName = cachedValue.QueryName
} else if len(freshResults) > 0 {
if freshTS, ok := freshResults[0].Value.(*qbtypes.TimeSeriesData); ok && freshTS != nil {
result.QueryName = freshTS.QueryName
}
}
for index, series := range seriesMap {
var aggSeries []*qbtypes.TimeSeries
for _, s := range series {
@@ -377,10 +556,17 @@ func (q *querier) mergeTimeSeriesResults(cachedValue *qbtypes.TimeSeriesData, fr
aggSeries = append(aggSeries, s)
}
result.Aggregations = append(result.Aggregations, &qbtypes.AggregationBucket{
// Preserve bucket metadata from either cached or fresh results
bucket := &qbtypes.AggregationBucket{
Index: index,
Series: aggSeries,
})
}
if metadata, ok := bucketMetadata[index]; ok {
bucket.Alias = metadata.Alias
bucket.Meta = metadata.Meta
}
result.Aggregations = append(result.Aggregations, bucket)
}
return result

229
pkg/querier/shift_test.go Normal file
View File

@@ -0,0 +1,229 @@
package querier
import (
"testing"
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
"github.com/stretchr/testify/assert"
)
// TestAdjustTimeRangeForShift tests the time range adjustment logic
func TestAdjustTimeRangeForShift(t *testing.T) {
tests := []struct {
name string
spec qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
timeRange qbtypes.TimeRange
requestType qbtypes.RequestType
expectedFromMS uint64
expectedToMS uint64
}{
{
name: "no shift",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeTimeSeries,
expectedFromMS: 1000000,
expectedToMS: 2000000,
},
{
name: "shift by 60 seconds using timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "60"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeTimeSeries,
expectedFromMS: 940000, // 1000000 - 60000
expectedToMS: 1940000, // 2000000 - 60000
},
{
name: "shift by negative 30 seconds (future shift)",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "-30"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeTimeSeries,
expectedFromMS: 1030000, // 1000000 - (-30000)
expectedToMS: 2030000, // 2000000 - (-30000)
},
{
name: "no shift for raw request type even with timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 1000000,
To: 2000000,
},
requestType: qbtypes.RequestTypeRaw,
expectedFromMS: 1000000, // No shift for raw queries
expectedToMS: 2000000,
},
{
name: "shift applied for scalar request type with timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
timeRange: qbtypes.TimeRange{
From: 10000000,
To: 20000000,
},
requestType: qbtypes.RequestTypeScalar,
expectedFromMS: 6400000, // 10000000 - 3600000
expectedToMS: 16400000, // 20000000 - 3600000
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := adjustTimeRangeForShift(tt.spec, tt.timeRange, tt.requestType)
assert.Equal(t, tt.expectedFromMS, result.From, "fromMS mismatch")
assert.Equal(t, tt.expectedToMS, result.To, "toMS mismatch")
})
}
}
// TestExtractShiftFromBuilderQuery tests the shift extraction logic
func TestExtractShiftFromBuilderQuery(t *testing.T) {
tests := []struct {
name string
spec qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]
expectedShiftBy int64
}{
{
name: "extract from timeShift function with float64",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: float64(3600)},
},
},
},
},
expectedShiftBy: 3600,
},
{
name: "extract from timeShift function with int64",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: int64(3600)},
},
},
},
},
expectedShiftBy: 3600,
},
{
name: "extract from timeShift function with string",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "3600"},
},
},
},
},
expectedShiftBy: 3600,
},
{
name: "no timeShift function",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameAbsolute,
},
},
},
expectedShiftBy: 0,
},
{
name: "invalid timeShift value",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "invalid"},
},
},
},
},
expectedShiftBy: 0,
},
{
name: "multiple functions with timeShift",
spec: qbtypes.QueryBuilderQuery[qbtypes.MetricAggregation]{
Functions: []qbtypes.Function{
{
Name: qbtypes.FunctionNameAbsolute,
},
{
Name: qbtypes.FunctionNameTimeShift,
Args: []qbtypes.FunctionArg{
{Value: "1800"},
},
},
{
Name: qbtypes.FunctionNameClampMax,
Args: []qbtypes.FunctionArg{
{Value: "100"},
},
},
},
},
expectedShiftBy: 1800,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
shiftBy := extractShiftFromBuilderQuery(tt.spec)
assert.Equal(t, tt.expectedShiftBy, shiftBy)
})
}
}

View File

@@ -1,6 +1,10 @@
package agentConf
import "github.com/SigNoz/signoz/pkg/query-service/model"
import (
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
)
// Interface for features implemented via agent config.
// Eg: ingestion side signal pre-processing features like log processing pipelines etc
@@ -11,12 +15,13 @@ type AgentFeature interface {
// Recommend config for an agent based on its `currentConfYaml` and
// `configVersion` for the feature's settings
RecommendAgentConfig(
orgId valuer.UUID,
currentConfYaml []byte,
configVersion *ConfigVersion,
configVersion *opamptypes.AgentConfigVersion,
) (
recommendedConfYaml []byte,
// stored as agent_config_versions.last_config in current agentConf model
// stored as agent_config_version.config in current agentConf model
// TODO(Raj): maybe refactor agentConf further and clean this up
serializedSettingsUsed string,

View File

@@ -4,10 +4,14 @@ import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/exp/slices"
@@ -15,42 +19,33 @@ import (
// Repo handles DDL and DML ops on ingestion rules
type Repo struct {
db *sqlx.DB
store sqlstore.SQLStore
}
func (r *Repo) GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {
var c []ConfigVersion
err := r.db.SelectContext(ctx, &c, fmt.Sprintf(`SELECT
version,
id,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT display_name FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions AS v
WHERE element_type = $1
ORDER BY created_at desc, version desc
limit %v`, limit),
typ)
ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType, limit int,
) ([]opamptypes.AgentConfigVersion, *model.ApiError) {
var c []opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&c).
ColumnExpr("id, version, element_type, deploy_status, deploy_result, created_at").
ColumnExpr("COALESCE(created_by, '') as created_by").
ColumnExpr(`COALESCE((SELECT display_name FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name`).
ColumnExpr("COALESCE(hash, '') as hash, COALESCE(config, '{}') as config").
Where("acv.element_type = ?", typ).
Where("acv.org_id = ?", orgId).
OrderExpr("acv.created_at DESC, acv.version DESC").
Limit(limit).
Scan(ctx)
if err != nil {
return nil, model.InternalError(err)
}
incompleteStatuses := []DeployStatus{DeployInitiated, Deploying}
incompleteStatuses := []opamptypes.DeployStatus{opamptypes.DeployInitiated, opamptypes.Deploying}
for idx := 1; idx < len(c); idx++ {
if slices.Contains(incompleteStatuses, c[idx].DeployStatus) {
c[idx].DeployStatus = DeployStatusUnknown
c[idx].DeployStatus = opamptypes.DeployStatusUnknown
}
}
@@ -58,32 +53,24 @@ func (r *Repo) GetConfigHistory(
}
func (r *Repo) GetConfigVersion(
ctx context.Context, typ ElementTypeDef, v int,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT
id,
version,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT display_name FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result,
coalesce(last_hash, '') as last_hash,
coalesce(last_config, '{}') as last_config
FROM agent_config_versions v
WHERE element_type = $1
AND version = $2`, typ, v)
ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType, v int,
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
var c opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&c).
ColumnExpr("id, version, element_type, deploy_status, deploy_result, created_at").
ColumnExpr("COALESCE(created_by, '') as created_by").
ColumnExpr(`COALESCE((SELECT display_name FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name`).
ColumnExpr("COALESCE(hash, '') as hash, COALESCE(config, '{}') as config").
Where("acv.element_type = ?", typ).
Where("acv.version = ?", v).
Where("acv.org_id = ?", orgId).
Scan(ctx)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, model.NotFoundError(err)
}
return nil, model.InternalError(err)
}
@@ -91,33 +78,23 @@ func (r *Repo) GetConfigVersion(
}
func (r *Repo) GetLatestVersion(
ctx context.Context, typ ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
var c ConfigVersion
err := r.db.GetContext(ctx, &c, `SELECT
id,
version,
element_type,
COALESCE(created_by, -1) as created_by,
created_at,
COALESCE((SELECT display_name FROM users
WHERE id = v.created_by), "unknown") created_by_name,
active,
is_valid,
disabled,
deploy_status,
deploy_result
FROM agent_config_versions AS v
WHERE element_type = $1
AND version = (
SELECT MAX(version)
FROM agent_config_versions
WHERE element_type=$2)`, typ, typ)
ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
var c opamptypes.AgentConfigVersion
err := r.store.BunDB().NewSelect().
Model(&c).
ColumnExpr("id, version, element_type, deploy_status, deploy_result, created_at").
ColumnExpr("COALESCE(created_by, '') as created_by").
ColumnExpr(`COALESCE((SELECT display_name FROM users WHERE users.id = acv.created_by), 'unknown') as created_by_name`).
Where("acv.element_type = ?", typ).
Where("acv.org_id = ?", orgId).
Where("version = (SELECT MAX(version) FROM agent_config_version WHERE acv.element_type = ?)", typ).
Scan(ctx)
if err == sql.ErrNoRows {
return nil, model.NotFoundError(err)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, model.NotFoundError(err)
}
return nil, model.InternalError(err)
}
@@ -125,18 +102,18 @@ func (r *Repo) GetLatestVersion(
}
func (r *Repo) insertConfig(
ctx context.Context, userId string, c *ConfigVersion, elements []string,
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, c *opamptypes.AgentConfigVersion, elements []string,
) (fnerr *model.ApiError) {
if string(c.ElementType) == "" {
if c.ElementType.StringValue() == "" {
return model.BadRequest(fmt.Errorf(
"element type is required for creating agent config version",
))
}
// allowing empty elements for logs - use case is deleting all pipelines
if len(elements) == 0 && c.ElementType != ElementTypeLogPipelines {
zap.L().Error("insert config called with no elements ", zap.String("ElementType", string(c.ElementType)))
if len(elements) == 0 && c.ElementType != opamptypes.ElementTypeLogPipelines {
zap.L().Error("insert config called with no elements ", zap.String("ElementType", c.ElementType.StringValue()))
return model.BadRequest(fmt.Errorf("config must have atleast one element"))
}
@@ -144,20 +121,20 @@ func (r *Repo) insertConfig(
// the version can not be set by the user, we want to auto-assign the versions
// in a monotonically increasing order starting with 1. hence, we reject insert
// requests with version anything other than 0. here, 0 indicates un-assigned
zap.L().Error("invalid version assignment while inserting agent config", zap.Int("version", c.Version), zap.String("ElementType", string(c.ElementType)))
zap.L().Error("invalid version assignment while inserting agent config", zap.Int("version", c.Version), zap.String("ElementType", c.ElementType.StringValue()))
return model.BadRequest(fmt.Errorf(
"user defined versions are not supported in the agent config",
))
}
configVersion, err := r.GetLatestVersion(ctx, c.ElementType)
configVersion, err := r.GetLatestVersion(ctx, orgId, c.ElementType)
if err != nil && err.Type() != model.ErrorNotFound {
zap.L().Error("failed to fetch latest config version", zap.Error(err))
return model.InternalError(fmt.Errorf("failed to fetch latest config version"))
}
if configVersion != nil {
c.Version = updateVersion(configVersion.Version)
c.IncrementVersion(configVersion.Version)
} else {
// first version
c.Version = 1
@@ -166,57 +143,34 @@ func (r *Repo) insertConfig(
defer func() {
if fnerr != nil {
// remove all the damage (invalid rows from db)
_, _ = r.db.Exec("DELETE FROM agent_config_versions WHERE id = $1", c.ID)
_, _ = r.db.Exec("DELETE FROM agent_config_elements WHERE version_id=$1", c.ID)
r.store.BunDB().NewDelete().Model(new(opamptypes.AgentConfigVersion)).Where("id = ?", c.ID).Where("org_id = ?", orgId).Exec(ctx)
r.store.BunDB().NewDelete().Model(new(opamptypes.AgentConfigElement)).Where("version_id = ?", c.ID).Exec(ctx)
}
}()
// insert config
configQuery := `INSERT INTO agent_config_versions(
id,
version,
created_by,
element_type,
active,
is_valid,
disabled,
deploy_status,
deploy_result)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`
_, dbErr := r.db.ExecContext(ctx,
configQuery,
c.ID,
c.Version,
userId,
c.ElementType,
false,
false,
false,
c.DeployStatus,
c.DeployResult)
_, dbErr := r.store.
BunDB().
NewInsert().
Model(c).
Exec(ctx)
if dbErr != nil {
zap.L().Error("error in inserting config version: ", zap.Error(dbErr))
return model.InternalError(errors.Wrap(dbErr, "failed to insert ingestion rule"))
}
elementsQuery := `INSERT INTO agent_config_elements(
id,
version_id,
element_type,
element_id)
VALUES ($1, $2, $3, $4)`
for _, e := range elements {
_, dbErr = r.db.ExecContext(
ctx,
elementsQuery,
uuid.NewString(),
c.ID,
c.ElementType,
e,
)
agentConfigElement := &opamptypes.AgentConfigElement{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
},
VersionID: c.ID,
ElementType: c.ElementType.StringValue(),
ElementID: e,
}
_, dbErr = r.store.BunDB().NewInsert().Model(agentConfigElement).Exec(ctx)
if dbErr != nil {
return model.InternalError(dbErr)
}
@@ -226,40 +180,49 @@ func (r *Repo) insertConfig(
}
func (r *Repo) updateDeployStatus(ctx context.Context,
elementType ElementTypeDef,
orgId valuer.UUID,
elementType opamptypes.ElementType,
version int,
status string,
result string,
lastHash string,
lastconf string) *model.ApiError {
updateQuery := `UPDATE agent_config_versions
set deploy_status = $1,
deploy_result = $2,
last_hash = COALESCE($3, last_hash),
last_config = $4
WHERE version=$5
AND element_type = $6`
// check if it has org orgID prefix
// ensuring it here and also ensuring in coordinator.go
if !strings.HasPrefix(lastHash, orgId.String()) {
lastHash = orgId.String() + lastHash
}
_, err := r.db.ExecContext(ctx, updateQuery, status, result, lastHash, lastconf, version, string(elementType))
_, err := r.store.BunDB().NewUpdate().
Model(new(opamptypes.AgentConfigVersion)).
Set("deploy_status = ?", status).
Set("deploy_result = ?", result).
Set("hash = COALESCE(?, hash)", lastHash).
Set("config = ?", lastconf).
Where("version = ?", version).
Where("element_type = ?", elementType).
Where("org_id = ?", orgId).
Exec(ctx)
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
return model.BadRequest(fmt.Errorf("failed to update deploy status"))
}
return nil
}
func (r *Repo) updateDeployStatusByHash(
ctx context.Context, confighash string, status string, result string,
ctx context.Context, orgId valuer.UUID, confighash string, status string, result string,
) *model.ApiError {
updateQuery := `UPDATE agent_config_versions
set deploy_status = $1,
deploy_result = $2
WHERE last_hash=$4`
_, err := r.db.ExecContext(ctx, updateQuery, status, result, confighash)
_, err := r.store.BunDB().NewUpdate().
Model(new(opamptypes.AgentConfigVersion)).
Set("deploy_status = ?", status).
Set("deploy_result = ?", result).
Where("hash = ?", confighash).
Where("org_id = ?", orgId).
Exec(ctx)
if err != nil {
zap.L().Error("failed to update deploy status", zap.Error(err))
return model.InternalError(errors.Wrap(err, "failed to update deploy status"))

View File

@@ -12,8 +12,10 @@ import (
filterprocessor "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/filterprocessor"
tsp "github.com/SigNoz/signoz/pkg/query-service/app/opamp/otelconfig/tailsampler"
"github.com/SigNoz/signoz/pkg/query-service/model"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.uber.org/zap"
yaml "gopkg.in/yaml.v3"
@@ -39,7 +41,7 @@ type Manager struct {
}
type ManagerOptions struct {
DB *sqlx.DB
Store sqlstore.SQLStore
// When acting as opamp.AgentConfigProvider, agent conf recommendations are
// applied to the base conf in the order the features have been specified here.
@@ -60,7 +62,7 @@ func Initiate(options *ManagerOptions) (*Manager, error) {
}
m = &Manager{
Repo: Repo{options.DB},
Repo: Repo{options.Store},
agentFeatures: options.AgentFeatures,
configSubscribers: map[string]func(){},
}
@@ -90,7 +92,7 @@ func (m *Manager) notifyConfigUpdateSubscribers() {
}
// Implements opamp.AgentConfigProvider
func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
func (m *Manager) RecommendAgentConfig(orgId valuer.UUID, currentConfYaml []byte) (
recommendedConfYaml []byte,
// Opaque id of the recommended config, used for reporting deployment status updates
configId string,
@@ -100,13 +102,13 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
settingVersionsUsed := []string{}
for _, feature := range m.agentFeatures {
featureType := ElementTypeDef(feature.AgentFeatureType())
latestConfig, apiErr := GetLatestVersion(context.Background(), featureType)
featureType := opamptypes.NewElementType(string(feature.AgentFeatureType()))
latestConfig, apiErr := GetLatestVersion(context.Background(), orgId, featureType)
if apiErr != nil && apiErr.Type() != model.ErrorNotFound {
return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version")
}
updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(recommendation, latestConfig)
updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(orgId, recommendation, latestConfig)
if apiErr != nil {
return nil, "", errors.Wrap(apiErr.ToError(), fmt.Sprintf(
"failed to generate agent config recommendation for %s", featureType,
@@ -129,9 +131,10 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
_ = m.updateDeployStatus(
context.Background(),
orgId,
featureType,
configVersion,
string(DeployInitiated),
opamptypes.DeployInitiated.StringValue(),
"Deployment has started",
configId,
serializedSettingsUsed,
@@ -154,52 +157,53 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
// Implements opamp.AgentConfigProvider
func (m *Manager) ReportConfigDeploymentStatus(
orgId valuer.UUID,
agentId string,
configId string,
err error,
) {
featureConfigIds := strings.Split(configId, ",")
for _, featureConfId := range featureConfigIds {
newStatus := string(Deployed)
newStatus := opamptypes.Deployed.StringValue()
message := "Deployment was successful"
if err != nil {
newStatus = string(DeployFailed)
newStatus = opamptypes.DeployFailed.StringValue()
message = fmt.Sprintf("%s: %s", agentId, err.Error())
}
_ = m.updateDeployStatusByHash(
context.Background(), featureConfId, newStatus, message,
context.Background(), orgId, featureConfId, newStatus, message,
)
}
}
func GetLatestVersion(
ctx context.Context, elementType ElementTypeDef,
) (*ConfigVersion, *model.ApiError) {
return m.GetLatestVersion(ctx, elementType)
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType,
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
return m.GetLatestVersion(ctx, orgId, elementType)
}
func GetConfigVersion(
ctx context.Context, elementType ElementTypeDef, version int,
) (*ConfigVersion, *model.ApiError) {
return m.GetConfigVersion(ctx, elementType, version)
ctx context.Context, orgId valuer.UUID, elementType opamptypes.ElementType, version int,
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
return m.GetConfigVersion(ctx, orgId, elementType, version)
}
func GetConfigHistory(
ctx context.Context, typ ElementTypeDef, limit int,
) ([]ConfigVersion, *model.ApiError) {
return m.GetConfigHistory(ctx, typ, limit)
ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType, limit int,
) ([]opamptypes.AgentConfigVersion, *model.ApiError) {
return m.GetConfigHistory(ctx, orgId, typ, limit)
}
// StartNewVersion launches a new config version for given set of elements
func StartNewVersion(
ctx context.Context, userId string, eleType ElementTypeDef, elementIds []string,
) (*ConfigVersion, *model.ApiError) {
ctx context.Context, orgId valuer.UUID, userId valuer.UUID, eleType opamptypes.ElementType, elementIds []string,
) (*opamptypes.AgentConfigVersion, *model.ApiError) {
// create a new version
cfg := NewConfigVersion(eleType)
cfg := opamptypes.NewAgentConfigVersion(orgId, userId, eleType)
// insert new config and elements into database
err := m.insertConfig(ctx, userId, cfg, elementIds)
err := m.insertConfig(ctx, orgId, userId, cfg, elementIds)
if err != nil {
return nil, err
}
@@ -213,22 +217,22 @@ func NotifyConfigUpdate(ctx context.Context) {
m.notifyConfigUpdateSubscribers()
}
func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiError {
func Redeploy(ctx context.Context, orgId valuer.UUID, typ opamptypes.ElementType, version int) *model.ApiError {
configVersion, err := GetConfigVersion(ctx, typ, version)
configVersion, err := GetConfigVersion(ctx, orgId, typ, version)
if err != nil {
zap.L().Error("failed to fetch config version during redeploy", zap.Error(err))
return model.WrapApiError(err, "failed to fetch details of the config version")
}
if configVersion == nil || (configVersion != nil && configVersion.LastConf == "") {
if configVersion == nil || (configVersion != nil && configVersion.Config == "") {
zap.L().Debug("config version has no conf yaml", zap.Any("configVersion", configVersion))
return model.BadRequest(fmt.Errorf("the config version can not be redeployed"))
}
switch typ {
case ElementTypeSamplingRules:
case opamptypes.ElementTypeSamplingRules:
var config *tsp.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &config); err != nil {
if err := yaml.Unmarshal([]byte(configVersion.Config), &config); err != nil {
zap.L().Debug("failed to read last conf correctly", zap.Error(err))
return model.BadRequest(fmt.Errorf("failed to read the stored config correctly"))
}
@@ -245,10 +249,10 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
return model.InternalError(fmt.Errorf("failed to deploy the config"))
}
_ = m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
case ElementTypeDropRules:
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeSamplingRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, configVersion.Config)
case opamptypes.ElementTypeDropRules:
var filterConfig *filterprocessor.Config
if err := yaml.Unmarshal([]byte(configVersion.LastConf), &filterConfig); err != nil {
if err := yaml.Unmarshal([]byte(configVersion.Config), &filterConfig); err != nil {
zap.L().Error("failed to read last conf correctly", zap.Error(err))
return model.InternalError(fmt.Errorf("failed to read the stored config correctly"))
}
@@ -263,14 +267,14 @@ func Redeploy(ctx context.Context, typ ElementTypeDef, version int) *model.ApiEr
return err
}
_ = m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, configVersion.LastConf)
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeSamplingRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, configVersion.Config)
}
return nil
}
// UpsertFilterProcessor updates the agent config with new filter processor params
func UpsertFilterProcessor(ctx context.Context, version int, config *filterprocessor.Config) error {
func UpsertFilterProcessor(ctx context.Context, orgId valuer.UUID, version int, config *filterprocessor.Config) error {
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
return fmt.Errorf("agent updater is busy")
}
@@ -294,7 +298,7 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
}
_ = m.updateDeployStatus(ctx, ElementTypeDropRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeDropRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, string(processorConfYaml))
return nil
}
@@ -303,9 +307,9 @@ func UpsertFilterProcessor(ctx context.Context, version int, config *filterproce
// successful deployment if no error is received.
// this method is currently expected to be called only once in the lifecycle
// but can be improved in future to accept continuous request status updates from opamp
func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
func (m *Manager) OnConfigUpdate(orgId valuer.UUID, agentId string, hash string, err error) {
status := string(Deployed)
status := opamptypes.Deployed.StringValue()
message := "Deployment was successful"
@@ -314,15 +318,15 @@ func (m *Manager) OnConfigUpdate(agentId string, hash string, err error) {
}()
if err != nil {
status = string(DeployFailed)
status = opamptypes.DeployFailed.StringValue()
message = fmt.Sprintf("%s: %s", agentId, err.Error())
}
_ = m.updateDeployStatusByHash(context.Background(), hash, status, message)
_ = m.updateDeployStatusByHash(context.Background(), orgId, hash, status, message)
}
// UpsertSamplingProcessor updates the agent config with new filter processor params
func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Config) error {
func UpsertSamplingProcessor(ctx context.Context, orgId valuer.UUID, version int, config *tsp.Config) error {
if !atomic.CompareAndSwapUint32(&m.lock, 0, 1) {
return fmt.Errorf("agent updater is busy")
}
@@ -345,6 +349,6 @@ func UpsertSamplingProcessor(ctx context.Context, version int, config *tsp.Confi
zap.L().Warn("unexpected error while transforming processor config to yaml", zap.Error(yamlErr))
}
_ = m.updateDeployStatus(ctx, ElementTypeSamplingRules, version, string(DeployInitiated), "Deployment started", configHash, string(processorConfYaml))
m.updateDeployStatus(ctx, orgId, opamptypes.ElementTypeSamplingRules, version, opamptypes.DeployInitiated.StringValue(), "Deployment started", configHash, string(processorConfYaml))
return nil
}

View File

@@ -1,72 +0,0 @@
package agentConf
import (
"time"
"github.com/google/uuid"
)
type ElementTypeDef string
const (
ElementTypeSamplingRules ElementTypeDef = "sampling_rules"
ElementTypeDropRules ElementTypeDef = "drop_rules"
ElementTypeLogPipelines ElementTypeDef = "log_pipelines"
ElementTypeLbExporter ElementTypeDef = "lb_exporter"
)
type DeployStatus string
const (
PendingDeploy DeployStatus = "DIRTY"
Deploying DeployStatus = "DEPLOYING"
Deployed DeployStatus = "DEPLOYED"
DeployInitiated DeployStatus = "IN_PROGRESS"
DeployFailed DeployStatus = "FAILED"
DeployStatusUnknown DeployStatus = "UNKNOWN"
)
type ConfigVersion struct {
ID string `json:"id" db:"id"`
Version int `json:"version" db:"version"`
ElementType ElementTypeDef `json:"elementType" db:"element_type"`
Active bool `json:"active" db:"active"`
IsValid bool `json:"is_valid" db:"is_valid"`
Disabled bool `json:"disabled" db:"disabled"`
DeployStatus DeployStatus `json:"deployStatus" db:"deploy_status"`
DeployResult string `json:"deployResult" db:"deploy_result"`
LastHash string `json:"lastHash" db:"last_hash"`
LastConf string `json:"lastConf" db:"last_config"`
CreatedBy string `json:"createdBy" db:"created_by"`
CreatedByName string `json:"createdByName" db:"created_by_name"`
CreatedAt time.Time `json:"createdAt" db:"created_at"`
}
func NewConfigVersion(typeDef ElementTypeDef) *ConfigVersion {
return &ConfigVersion{
ID: uuid.NewString(),
ElementType: typeDef,
Active: false,
IsValid: false,
Disabled: false,
DeployStatus: PendingDeploy,
LastHash: "",
LastConf: "{}",
// todo: get user id from context?
// CreatedBy
}
}
func updateVersion(v int) int {
return v + 1
}
type ConfigElements struct {
VersionID string
Version int
ElementType ElementTypeDef
ElementId string
}

View File

@@ -2340,25 +2340,41 @@ func (r *ClickHouseReader) GetTotalLogs(ctx context.Context) (uint64, error) {
func (r *ClickHouseReader) FetchTemporality(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]map[v3.Temporality]bool, error) {
metricNameToTemporality := make(map[string]map[v3.Temporality]bool)
// Batch fetch all metadata at once
metadataMap, apiErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
if apiErr != nil {
zap.L().Warn("Failed to fetch updated metrics metadata", zap.Error(apiErr))
// best-effort return, not failing outright
return metricNameToTemporality, nil
var metricNamesToQuery []string
for _, metricName := range metricNames {
updatedMetadata, cacheErr := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
if cacheErr != nil {
zap.L().Info("Error in getting metrics cached metadata", zap.Error(cacheErr))
}
if metadata, exist := updatedMetadata[metricName]; exist {
if _, exists := metricNameToTemporality[metricName]; !exists {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][metadata.Temporality] = true
} else {
metricNamesToQuery = append(metricNamesToQuery, metricName)
}
}
for metricName, metadata := range metadataMap {
if metadata == nil {
continue
query := fmt.Sprintf(`SELECT DISTINCT metric_name, temporality FROM %s.%s WHERE metric_name IN $1`, signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricNames)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var metricName, temporality string
err := rows.Scan(&metricName, &temporality)
if err != nil {
return nil, err
}
if _, exists := metricNameToTemporality[metricName]; !exists {
if _, ok := metricNameToTemporality[metricName]; !ok {
metricNameToTemporality[metricName] = make(map[v3.Temporality]bool)
}
metricNameToTemporality[metricName][metadata.Temporality] = true
metricNameToTemporality[metricName][v3.Temporality(temporality)] = true
}
return metricNameToTemporality, nil
}
@@ -3013,80 +3029,67 @@ func (r *ClickHouseReader) QueryDashboardVars(ctx context.Context, query string)
}
func (r *ClickHouseReader) GetMetricAggregateAttributes(ctx context.Context, orgID valuer.UUID, req *v3.AggregateAttributeRequest, skipSignozMetrics bool) (*v3.AggregateAttributeResponse, error) {
var query string
var err error
var rows driver.Rows
var response v3.AggregateAttributeResponse
normalized := true
if constants.IsDotMetricsEnabled {
normalized = false
}
// Query all relevant metric names from time_series_v4, but leave metadata retrieval to cache/db
query := fmt.Sprintf(
`SELECT DISTINCT metric_name
FROM %s.%s
WHERE metric_name ILIKE $1 AND __normalized = $2`,
signozMetricDBName, signozTSTableNameV41Day)
query = fmt.Sprintf("SELECT metric_name, type, is_monotonic, temporality FROM %s.%s WHERE metric_name ILIKE $1 and __normalized = $2 GROUP BY metric_name, type, is_monotonic, temporality", signozMetricDBName, signozTSTableNameV41Day)
if req.Limit != 0 {
query = query + fmt.Sprintf(" LIMIT %d;", req.Limit)
}
rows, err = r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized)
rows, err := r.db.Query(ctx, query, fmt.Sprintf("%%%s%%", req.SearchText), normalized)
if err != nil {
zap.L().Error("Error while querying metric names", zap.Error(err))
return nil, fmt.Errorf("error while executing metric name query: %s", err.Error())
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
var metricNames []string
for rows.Next() {
var name string
if err := rows.Scan(&name); err != nil {
return nil, fmt.Errorf("error while scanning metric name: %s", err.Error())
}
if skipSignozMetrics && strings.HasPrefix(name, "signoz") {
continue
}
metricNames = append(metricNames, name)
}
if len(metricNames) == 0 {
return &response, nil
}
// Get all metadata in one shot
metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricNames...)
if apiError != nil {
return &response, fmt.Errorf("error getting updated metrics metadata: %s", apiError.Error())
}
seen := make(map[string]struct{})
for _, name := range metricNames {
metadata, ok := metadataMap[name]
if !ok {
var metricName, typ, temporality string
var isMonotonic bool
for rows.Next() {
if err := rows.Scan(&metricName, &typ, &isMonotonic, &temporality); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if skipSignozMetrics && strings.HasPrefix(metricName, "signoz") {
continue
}
typ := string(metadata.MetricType)
temporality := string(metadata.Temporality)
isMonotonic := metadata.IsMonotonic
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[metricName]; exist {
typ = string(updatedMetadata.MetricType)
isMonotonic = updatedMetadata.IsMonotonic
temporality = string(updatedMetadata.Temporality)
}
// Non-monotonic cumulative sums are treated as gauges
if typ == "Sum" && !isMonotonic && temporality == string(v3.Cumulative) {
typ = "Gauge"
}
// unlike traces/logs `tag`/`resource` type, the `Type` will be metric type
key := v3.AttributeKey{
Key: name,
Key: metricName,
DataType: v3.AttributeKeyDataTypeFloat64,
Type: v3.AttributeKeyType(typ),
IsColumn: true,
}
if _, ok := seen[name+typ]; ok {
// remove duplicates
if _, ok := seen[metricName+typ]; ok {
continue
}
seen[name+typ] = struct{}{}
seen[metricName+typ] = struct{}{}
response.AttributeKeys = append(response.AttributeKeys, key)
}
@@ -3178,67 +3181,72 @@ func (r *ClickHouseReader) GetMetricMetadata(ctx context.Context, orgID valuer.U
unixMilli := common.PastDayRoundOff()
// 1. Fetch metadata from cache/db using unified function
metadataMap, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
if apiError != nil {
zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError))
return nil, fmt.Errorf("error fetching metric metadata: %s", apiError.Err.Error())
// Note: metric metadata should be accessible regardless of the time range selection
// our standard retention period is 30 days, so we are querying the table v4_1_day to reduce the
// amount of data scanned
query := fmt.Sprintf("SELECT temporality, description, type, unit, is_monotonic from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 GROUP BY temporality, description, type, unit, is_monotonic", signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricName, unixMilli)
if err != nil {
zap.L().Error("Error while fetching metric metadata", zap.Error(err))
return nil, fmt.Errorf("error while fetching metric metadata: %s", err.Error())
}
defer rows.Close()
// Defaults in case metadata is not found
var (
deltaExists bool
isMonotonic bool
temporality string
description string
metricType string
unit string
)
if metadata, exists := metadataMap[metricName]; exists {
metricType = string(metadata.MetricType)
temporality = string(metadata.Temporality)
isMonotonic = metadata.IsMonotonic
description = metadata.Description
unit = metadata.Unit
var deltaExists, isMonotonic bool
var temporality, description, metricType, unit string
for rows.Next() {
if err := rows.Scan(&temporality, &description, &metricType, &unit, &isMonotonic); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
if temporality == string(v3.Delta) {
deltaExists = true
}
}
metadata, apiError := r.GetUpdatedMetricsMetadata(ctx, orgID, metricName)
if apiError != nil {
zap.L().Error("Error in getting metric cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[metricName]; exist {
metricType = string(updatedMetadata.MetricType)
temporality = string(updatedMetadata.Temporality)
if temporality == string(v3.Delta) {
deltaExists = true
}
isMonotonic = updatedMetadata.IsMonotonic
if updatedMetadata.Description != "" {
description = updatedMetadata.Description
}
if updatedMetadata.Unit != "" {
unit = updatedMetadata.Unit
}
}
query = fmt.Sprintf("SELECT JSONExtractString(labels, 'le') as le from %s.%s WHERE metric_name=$1 AND unix_milli >= $2 AND type = 'Histogram' AND JSONExtractString(labels, 'service_name') = $3 GROUP BY le ORDER BY le", signozMetricDBName, signozTSTableNameV41Day)
rows, err = r.db.Query(ctx, query, metricName, unixMilli, serviceName)
if err != nil {
zap.L().Error("Error while executing query", zap.Error(err))
return nil, fmt.Errorf("error while executing query: %s", err.Error())
}
defer rows.Close()
// 2. Only for Histograms, get `le` buckets
var leFloat64 []float64
if metricType == string(v3.MetricTypeHistogram) {
query := fmt.Sprintf(`
SELECT JSONExtractString(labels, 'le') AS le
FROM %s.%s
WHERE metric_name = $1
AND unix_milli >= $2
AND type = 'Histogram'
AND JSONExtractString(labels, 'service_name') = $3
GROUP BY le
ORDER BY le`, signozMetricDBName, signozTSTableNameV41Day)
rows, err := r.db.Query(ctx, query, metricName, unixMilli, serviceName)
for rows.Next() {
var leStr string
if err := rows.Scan(&leStr); err != nil {
return nil, fmt.Errorf("error while scanning rows: %s", err.Error())
}
le, err := strconv.ParseFloat(leStr, 64)
// ignore the error and continue if the value is not a float
// ideally this should not happen but we have seen ClickHouse
// returning empty string for some values
if err != nil {
zap.L().Error("Error while querying histogram buckets", zap.Error(err))
return nil, fmt.Errorf("error while querying histogram buckets: %s", err.Error())
zap.L().Error("error while parsing le value", zap.Error(err))
continue
}
defer rows.Close()
for rows.Next() {
var leStr string
if err := rows.Scan(&leStr); err != nil {
return nil, fmt.Errorf("error while scanning le: %s", err.Error())
}
le, err := strconv.ParseFloat(leStr, 64)
if err != nil || math.IsInf(le, 0) {
zap.L().Error("Invalid 'le' bucket value", zap.String("value", leStr), zap.Error(err))
continue
}
leFloat64 = append(leFloat64, le)
if math.IsInf(le, 0) {
continue
}
leFloat64 = append(leFloat64, le)
}
return &v3.MetricMetadataResponse{
@@ -6250,11 +6258,33 @@ func (r *ClickHouseReader) CheckForLabelsInMetric(ctx context.Context, metricNam
return hasLE, nil
}
func (r *ClickHouseReader) PreloadMetricsMetadata(ctx context.Context, orgID valuer.UUID) []error {
var allMetricsMetadata []model.UpdateMetricsMetadata
var errorList []error
// Fetch all rows from ClickHouse
query := fmt.Sprintf(`SELECT metric_name, type, description , temporality, is_monotonic, unit
FROM %s.%s;`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
err := r.db.Select(valueCtx, &allMetricsMetadata, query)
if err != nil {
errorList = append(errorList, err)
return errorList
}
for _, m := range allMetricsMetadata {
err := r.cache.Set(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+m.MetricName, &m, -1)
if err != nil {
errorList = append(errorList, err)
}
}
return errorList
}
func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError) {
cachedMetadata := make(map[string]*model.UpdateMetricsMetadata)
var missingMetrics []string
// 1. Try cache
// First, try retrieving each metric from cache.
for _, metricName := range metricNames {
metadata := new(model.UpdateMetricsMetadata)
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metricName
@@ -6266,10 +6296,10 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
}
}
// 2. Try updated_metrics_metadata table
var stillMissing []string
// If there are any metrics missing in the cache, query them from the database.
if len(missingMetrics) > 0 {
metricList := "'" + strings.Join(missingMetrics, "', '") + "'"
// Join the missing metric names; ensure proper quoting if needed.
metricList := "'" + strings.Join(metricNames, "', '") + "'"
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
FROM %s.%s
WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList)
@@ -6281,7 +6311,6 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
}
defer rows.Close()
found := make(map[string]struct{})
for rows.Next() {
metadata := new(model.UpdateMetricsMetadata)
if err := rows.Scan(
@@ -6295,57 +6324,15 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning metrics metadata: %v", err)}
}
// Cache the result for future requests.
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil {
zap.L().Error("Failed to store metrics metadata in cache", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
}
cachedMetadata[metadata.MetricName] = metadata
found[metadata.MetricName] = struct{}{}
}
// Determine which metrics are still missing
for _, m := range missingMetrics {
if _, ok := found[m]; !ok {
stillMissing = append(stillMissing, m)
}
}
}
// 3. Fallback: Try time_series_v4_1week table
if len(stillMissing) > 0 {
metricList := "'" + strings.Join(stillMissing, "', '") + "'"
query := fmt.Sprintf(`SELECT DISTINCT metric_name, type, description, temporality, is_monotonic, unit
FROM %s.%s
WHERE metric_name IN (%s)`, signozMetricDBName, signozTSTableNameV4, metricList)
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
rows, err := r.db.Query(valueCtx, query)
if err != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error querying time_series_v4 to get metrics metadata: %v", err)}
}
defer rows.Close()
for rows.Next() {
metadata := new(model.UpdateMetricsMetadata)
if err := rows.Scan(
&metadata.MetricName,
&metadata.MetricType,
&metadata.Description,
&metadata.Temporality,
&metadata.IsMonotonic,
&metadata.Unit,
); err != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)}
}
cacheKey := constants.UpdatedMetricsMetadataCachePrefix + metadata.MetricName
if cacheErr := r.cache.Set(ctx, orgID, cacheKey, metadata, -1); cacheErr != nil {
zap.L().Error("Failed to cache fallback metadata", zap.String("metric_name", metadata.MetricName), zap.Error(cacheErr))
}
cachedMetadata[metadata.MetricName] = metadata
}
if rows.Err() != nil {
return cachedMetadata, &model.ApiError{Typ: "ClickhouseErr", Err: fmt.Errorf("error scanning fallback metadata: %v", err)}
}
}
return cachedMetadata, nil
}

View File

@@ -63,6 +63,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/licensetypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
ruletypes "github.com/SigNoz/signoz/pkg/types/ruletypes"
@@ -1583,6 +1584,7 @@ func (aH *APIHandler) registerEvent(w http.ResponseWriter, r *http.Request) {
switch request.EventType {
case model.TrackEvent:
telemetry.GetInstance().SendEvent(request.EventName, request.Attributes, claims.Email, request.RateLimited, true)
aH.Signoz.Analytics.TrackUser(r.Context(), claims.OrgID, claims.UserID, request.EventName, request.Attributes)
case model.GroupEvent:
telemetry.GetInstance().SendGroupEvent(request.Attributes, claims.Email)
case model.IdentifyEvent:
@@ -2022,7 +2024,7 @@ func (aH *APIHandler) registerUser(w http.ResponseWriter, r *http.Request) {
return
}
_, errv2 := aH.Signoz.Modules.User.Register(r.Context(), &req)
user, errv2 := aH.Signoz.Modules.User.Register(r.Context(), &req)
if errv2 != nil {
render.Error(w, errv2)
return
@@ -2032,7 +2034,7 @@ func (aH *APIHandler) registerUser(w http.ResponseWriter, r *http.Request) {
// from here onwards, we expect admin (owner) to invite other users.
aH.SetupCompleted = true
aH.Respond(w, nil)
aH.Respond(w, user)
}
func handleSsoError(w http.ResponseWriter, r *http.Request, redirectURL string) {
@@ -3460,10 +3462,9 @@ func (aH *APIHandler) InstallIntegration(w http.ResponseWriter, r *http.Request)
RespondError(w, model.BadRequest(err), nil)
return
}
claims, errv2 := authtypes.ClaimsFromContext(r.Context())
if errv2 != nil {
render.Error(w, errv2)
claims, err := authtypes.ClaimsFromContext(r.Context())
if err != nil {
RespondError(w, model.UnauthorizedError(errors.New("unauthorized")), nil)
return
}
@@ -4142,8 +4143,6 @@ func (aH *APIHandler) logAggregate(w http.ResponseWriter, r *http.Request) {
aH.WriteJSON(w, r, res)
}
const logPipelines = "log_pipelines"
func parseAgentConfigVersion(r *http.Request) (int, *model.ApiError) {
versionString := mux.Vars(r)["version"]
@@ -4191,6 +4190,12 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
return
}
orgID, errv2 := valuer.NewUUID(claims.OrgID)
if errv2 != nil {
render.Error(w, errv2)
return
}
version, err := parseAgentConfigVersion(r)
if err != nil {
RespondError(w, model.WrapApiError(err, "Failed to parse agent config version"), nil)
@@ -4201,9 +4206,9 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
var apierr *model.ApiError
if version != -1 {
payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), claims.OrgID, version)
payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), orgID, version)
} else {
payload, apierr = aH.listLogsPipelines(context.Background(), claims.OrgID)
payload, apierr = aH.listLogsPipelines(context.Background(), orgID)
}
if apierr != nil {
@@ -4214,12 +4219,12 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
}
// listLogsPipelines lists logs piplines for latest version
func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) (
func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID valuer.UUID) (
*logparsingpipeline.PipelinesResponse, *model.ApiError,
) {
// get lateset agent config
latestVersion := -1
lastestConfig, err := agentConf.GetLatestVersion(ctx, logPipelines)
lastestConfig, err := agentConf.GetLatestVersion(ctx, orgID, opamptypes.ElementTypeLogPipelines)
if err != nil && err.Type() != model.ErrorNotFound {
return nil, model.WrapApiError(err, "failed to get latest agent config version")
}
@@ -4228,14 +4233,14 @@ func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) (
latestVersion = lastestConfig.Version
}
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, latestVersion)
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, orgID, latestVersion)
if err != nil {
return nil, model.WrapApiError(err, "failed to get pipelines")
}
// todo(Nitya): make a new API for history pagination
limit := 10
history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit)
history, err := agentConf.GetConfigHistory(ctx, orgID, opamptypes.ElementTypeLogPipelines, limit)
if err != nil {
return nil, model.WrapApiError(err, "failed to get config history")
}
@@ -4244,17 +4249,17 @@ func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) (
}
// listLogsPipelinesByVersion lists pipelines along with config version history
func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, orgID string, version int) (
func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, orgID valuer.UUID, version int) (
*logparsingpipeline.PipelinesResponse, *model.ApiError,
) {
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version)
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, orgID, version)
if err != nil {
return nil, model.WrapApiError(err, "failed to get pipelines by version")
}
// todo(Nitya): make a new API for history pagination
limit := 10
history, err := agentConf.GetConfigHistory(ctx, logPipelines, limit)
history, err := agentConf.GetConfigHistory(ctx, orgID, opamptypes.ElementTypeLogPipelines, limit)
if err != nil {
return nil, model.WrapApiError(err, "failed to retrieve agent config history")
}
@@ -4270,6 +4275,18 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
return
}
// prepare config by calling gen func
orgID, errv2 := valuer.NewUUID(claims.OrgID)
if errv2 != nil {
render.Error(w, errv2)
return
}
userID, errv2 := valuer.NewUUID(claims.UserID)
if errv2 != nil {
render.Error(w, errv2)
return
}
req := pipelinetypes.PostablePipelines{}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@@ -4290,7 +4307,7 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
return nil, validationErr
}
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, claims.OrgID, postable)
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, orgID, userID, postable)
}
res, err := createPipeline(r.Context(), req.Pipelines)

View File

@@ -13,7 +13,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
@@ -41,24 +41,19 @@ func NewLogParsingPipelinesController(
// PipelinesResponse is used to prepare http response for pipelines config related requests
type PipelinesResponse struct {
*agentConf.ConfigVersion
*opamptypes.AgentConfigVersion
Pipelines []pipelinetypes.GettablePipeline `json:"pipelines"`
History []agentConf.ConfigVersion `json:"history"`
History []opamptypes.AgentConfigVersion `json:"history"`
}
// ApplyPipelines stores new or changed pipelines and initiates a new config update
func (ic *LogParsingPipelineController) ApplyPipelines(
ctx context.Context,
orgID string,
orgID valuer.UUID,
userID valuer.UUID,
postable []pipelinetypes.PostablePipeline,
) (*PipelinesResponse, *model.ApiError) {
// get user id from context
claims, errv2 := authtypes.ClaimsFromContext(ctx)
if errv2 != nil {
return nil, model.UnauthorizedError(fmt.Errorf("failed to get userId from context"))
}
var pipelines []pipelinetypes.GettablePipeline
// scan through postable pipelines, to select the existing pipelines or insert missing ones
@@ -87,13 +82,12 @@ func (ic *LogParsingPipelineController) ApplyPipelines(
elements[i] = p.ID.StringValue()
}
// prepare config by calling gen func
cfg, err := agentConf.StartNewVersion(ctx, claims.UserID, agentConf.ElementTypeLogPipelines, elements)
cfg, err := agentConf.StartNewVersion(ctx, orgID, userID, opamptypes.ElementTypeLogPipelines, elements)
if err != nil || cfg == nil {
return nil, err
return nil, model.InternalError(fmt.Errorf("failed to start new version: %w", err))
}
return ic.GetPipelinesByVersion(ctx, cfg.Version)
return ic.GetPipelinesByVersion(ctx, orgID, cfg.Version)
}
func (ic *LogParsingPipelineController) ValidatePipelines(
@@ -142,21 +136,12 @@ func (ic *LogParsingPipelineController) ValidatePipelines(
// Returns effective list of pipelines including user created
// pipelines and pipelines for installed integrations
func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
ctx context.Context, version int,
ctx context.Context, orgID valuer.UUID, version int,
) ([]pipelinetypes.GettablePipeline, *model.ApiError) {
result := []pipelinetypes.GettablePipeline{}
// todo(nitya): remove this once we fix agents in multitenancy
defaultOrgID, err := ic.GetDefaultOrgID(ctx)
if err != nil {
// we don't want to fail the request if we can't get the default org ID
// we will just return an empty list of pipelines
zap.L().Warn("failed to get default org ID", zap.Error(err))
return result, nil
}
if version >= 0 {
savedPipelines, errors := ic.getPipelinesByVersion(ctx, defaultOrgID, version)
savedPipelines, errors := ic.getPipelinesByVersion(ctx, orgID.String(), version)
if errors != nil {
zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Errors("errors", errors))
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors))
@@ -164,7 +149,7 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
result = savedPipelines
}
integrationPipelines, apiErr := ic.GetIntegrationPipelines(ctx, defaultOrgID)
integrationPipelines, apiErr := ic.GetIntegrationPipelines(ctx, orgID.String())
if apiErr != nil {
return nil, model.WrapApiError(
apiErr, "could not get pipelines for installed integrations",
@@ -208,18 +193,18 @@ func (ic *LogParsingPipelineController) getEffectivePipelinesByVersion(
// GetPipelinesByVersion responds with version info and associated pipelines
func (ic *LogParsingPipelineController) GetPipelinesByVersion(
ctx context.Context, version int,
ctx context.Context, orgId valuer.UUID, version int,
) (*PipelinesResponse, *model.ApiError) {
pipelines, errors := ic.getEffectivePipelinesByVersion(ctx, version)
pipelines, errors := ic.getEffectivePipelinesByVersion(ctx, orgId, version)
if errors != nil {
zap.L().Error("failed to get pipelines for version", zap.Int("version", version), zap.Error(errors))
return nil, model.InternalError(fmt.Errorf("failed to get pipelines for given version %v", errors))
}
var configVersion *agentConf.ConfigVersion
var configVersion *opamptypes.AgentConfigVersion
if version >= 0 {
cv, err := agentConf.GetConfigVersion(ctx, agentConf.ElementTypeLogPipelines, version)
cv, err := agentConf.GetConfigVersion(ctx, orgId, opamptypes.ElementTypeLogPipelines, version)
if err != nil {
zap.L().Error("failed to get config for version", zap.Int("version", version), zap.Error(err))
return nil, model.WrapApiError(err, "failed to get config for given version")
@@ -228,8 +213,8 @@ func (ic *LogParsingPipelineController) GetPipelinesByVersion(
}
return &PipelinesResponse{
ConfigVersion: configVersion,
Pipelines: pipelines,
AgentConfigVersion: configVersion,
Pipelines: pipelines,
}, nil
}
@@ -268,8 +253,9 @@ func (pc *LogParsingPipelineController) AgentFeatureType() agentConf.AgentFeatur
// Implements agentConf.AgentFeature interface.
func (pc *LogParsingPipelineController) RecommendAgentConfig(
orgId valuer.UUID,
currentConfYaml []byte,
configVersion *agentConf.ConfigVersion,
configVersion *opamptypes.AgentConfigVersion,
) (
recommendedConfYaml []byte,
serializedSettingsUsed string,
@@ -281,7 +267,7 @@ func (pc *LogParsingPipelineController) RecommendAgentConfig(
}
pipelinesResp, apiErr := pc.GetPipelinesByVersion(
context.Background(), pipelinesVersion,
context.Background(), orgId, pipelinesVersion,
)
if apiErr != nil {
return nil, "", apiErr

View File

@@ -32,7 +32,7 @@ func NewRepo(sqlStore sqlstore.SQLStore) Repo {
// insertPipeline stores a given postable pipeline to database
func (r *Repo) insertPipeline(
ctx context.Context, orgID string, postable *pipelinetypes.PostablePipeline,
ctx context.Context, orgID valuer.UUID, postable *pipelinetypes.PostablePipeline,
) (*pipelinetypes.GettablePipeline, *model.ApiError) {
if err := postable.IsValid(); err != nil {
return nil, model.BadRequest(errors.Wrap(err,
@@ -60,7 +60,7 @@ func (r *Repo) insertPipeline(
insertRow := &pipelinetypes.GettablePipeline{
StoreablePipeline: pipelinetypes.StoreablePipeline{
OrgID: orgID,
OrgID: orgID.String(),
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
@@ -102,11 +102,11 @@ func (r *Repo) getPipelinesByVersion(
storablePipelines := []pipelinetypes.StoreablePipeline{}
err := r.sqlStore.BunDB().NewSelect().
Model(&storablePipelines).
Join("JOIN agent_config_elements e ON p.id = e.element_id").
Join("JOIN agent_config_versions v ON v.id = e.version_id").
Where("e.element_type = ?", logPipelines). // TODO: nitya - add org_id to this as well
Where("v.version = ?", version). // TODO: nitya - add org_id to this as well
Where("p.org_id = ?", orgID).
Join("JOIN agent_config_element e ON p.id = e.element_id").
Join("JOIN agent_config_version v ON v.id = e.version_id").
Where("e.element_type = ?", logPipelines).
Where("v.version = ?", version).
Where("v.org_id = ?", orgID).
Order("p.order_id ASC").
Scan(ctx)
if err != nil {
@@ -131,20 +131,6 @@ func (r *Repo) getPipelinesByVersion(
return gettablePipelines, errors
}
func (r *Repo) GetDefaultOrgID(ctx context.Context) (string, *model.ApiError) {
var orgs []types.Organization
err := r.sqlStore.BunDB().NewSelect().
Model(&orgs).
Scan(ctx)
if err != nil {
return "", model.InternalError(errors.Wrap(err, "failed to get default org ID"))
}
if len(orgs) == 0 {
return "", model.InternalError(errors.New("no orgs found"))
}
return orgs[0].ID.StringValue(), nil
}
// GetPipelines returns pipeline and errors (if any)
func (r *Repo) GetPipeline(
ctx context.Context, orgID string, id string,

View File

@@ -0,0 +1 @@
package opamp

View File

@@ -1,11 +1,20 @@
package opamp
import (
"context"
"fmt"
"log/slog"
"testing"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/utils"
"github.com/SigNoz/signoz/pkg/sharder"
"github.com/SigNoz/signoz/pkg/sharder/noopsharder"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
"github.com/knadh/koanf/providers/rawbytes"
@@ -21,6 +30,9 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
tb := newTestbed(t)
orgID, err := utils.GetTestOrgId(tb.sqlStore)
require.Nil(err)
require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"there should be no agent config subscribers at the start",
@@ -35,7 +47,8 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
// Even if there are no recommended changes to the agent's initial config
require.False(tb.testConfigProvider.HasRecommendations())
agent1Conn := &MockOpAmpConnection{}
agent1Id := "testAgent1"
agent1Id := valuer.GenerateUUID().String()
// get orgId from the db
tb.opampServer.OnMessage(
agent1Conn,
&protobufs.AgentToServer{
@@ -57,7 +70,7 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
tb.testConfigProvider.ZPagesEndpoint = "localhost:55555"
require.True(tb.testConfigProvider.HasRecommendations())
agent2Id := "testAgent2"
agent2Id := valuer.GenerateUUID().String()
agent2Conn := &MockOpAmpConnection{}
tb.opampServer.OnMessage(
agent2Conn,
@@ -97,10 +110,10 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
},
})
expectedConfId := tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(expectedConfId, agent2Id),
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2Id),
"Server should report deployment success to config provider on receiving update from agent.",
)
require.True(tb.testConfigProvider.ReportedDeploymentStatuses[expectedConfId][agent2Id])
require.True(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2Id])
require.Nil(
agent2Conn.LatestMsgFromServer(),
"Server should not recommend a RemoteConfig if agent is already running it.",
@@ -130,10 +143,10 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
},
})
expectedConfId = tb.testConfigProvider.ZPagesEndpoint
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(expectedConfId, agent2Id),
require.True(tb.testConfigProvider.HasReportedDeploymentStatus(orgID, expectedConfId, agent2Id),
"Server should report deployment failure to config provider on receiving update from agent.",
)
require.False(tb.testConfigProvider.ReportedDeploymentStatuses[expectedConfId][agent2Id])
require.False(tb.testConfigProvider.ReportedDeploymentStatuses[orgID.String()+expectedConfId][agent2Id])
lastAgent1Msg = agent1Conn.LatestMsgFromServer()
agent1Conn.ClearMsgsFromServer()
@@ -158,26 +171,88 @@ func TestOpAMPServerToAgentCommunicationWithConfigProvider(t *testing.T) {
)
}
func TestOpAMPServerAgentLimit(t *testing.T) {
require := require.New(t)
tb := newTestbed(t)
// Create 51 agents and check if the first one gets deleted
var agentConnections []*MockOpAmpConnection
var agentIds []string
for i := 0; i < 51; i++ {
agentConn := &MockOpAmpConnection{}
agentId := valuer.GenerateUUID().String()
agentIds = append(agentIds, agentId)
tb.opampServer.OnMessage(
agentConn,
&protobufs.AgentToServer{
InstanceUid: agentId,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: initialAgentConf(),
},
},
)
agentConnections = append(agentConnections, agentConn)
}
// Perform a DB level check to ensure the first agent is removed
count, err := tb.sqlStore.BunDB().NewSelect().
Model(new(opamptypes.StorableAgent)).
Where("agent_id = ?", agentIds[0]).
Count(context.Background())
require.Nil(err, "Error querying the database for agent count")
require.Equal(0, count, "First agent should be removed from the database after exceeding the limit of 50 agents")
// verify there are 50 agents in the db
count, err = tb.sqlStore.BunDB().NewSelect().
Model(new(opamptypes.StorableAgent)).
Count(context.Background())
require.Nil(err, "Error querying the database for agent count")
require.Equal(50, count, "There should be 50 agents in the database")
// Check if the 51st agent received a config
lastAgentConn := agentConnections[50]
lastAgentMsg := lastAgentConn.LatestMsgFromServer()
require.NotNil(
lastAgentMsg,
"51st agent should receive a remote config from the server",
)
tb.opampServer.Stop()
require.Equal(
0, len(tb.testConfigProvider.ConfigUpdateSubscribers),
"Opamp server should have unsubscribed to config provider updates after shutdown",
)
}
type testbed struct {
testConfigProvider *MockAgentConfigProvider
opampServer *Server
t *testing.T
sqlStore sqlstore.SQLStore
}
func newTestbed(t *testing.T) *testbed {
testDB := utils.NewQueryServiceDBForTests(t)
_, err := model.InitDB(testDB.SQLxDB())
if err != nil {
t.Fatalf("could not init opamp model: %v", err)
}
providerSettings := instrumentationtest.New().ToProviderSettings()
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
require.Nil(t, err)
orgGetter := implorganization.NewGetter(implorganization.NewStore(testDB), sharder)
model.InitDB(testDB, slog.Default(), orgGetter)
testConfigProvider := NewMockAgentConfigProvider()
opampServer := InitializeServer(nil, testConfigProvider)
// create a test org
err = utils.CreateTestOrg(t, testDB)
if err != nil {
t.Fatalf("could not create test org: %v", err)
}
return &testbed{
testConfigProvider: testConfigProvider,
opampServer: opampServer,
t: t,
sqlStore: testDB,
}
}

View File

@@ -60,13 +60,13 @@ func UpsertControlProcessors(
agenthash, err := addIngestionControlToAgent(agent, signal, processors, false)
if err != nil {
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.ID), zap.Error(err))
zap.L().Error("failed to push ingestion rules config to agent", zap.String("agentID", agent.AgentID), zap.Error(err))
continue
}
if agenthash != "" {
// subscribe callback
model.ListenToConfigUpdate(agent.ID, agenthash, callback)
model.ListenToConfigUpdate(agent.OrgID, agent.AgentID, agenthash, callback)
}
hash = agenthash
@@ -78,7 +78,7 @@ func UpsertControlProcessors(
// addIngestionControlToAgent adds ingestion contorl rules to agent config
func addIngestionControlToAgent(agent *model.Agent, signal string, processors map[string]interface{}, withLB bool) (string, error) {
confHash := ""
config := agent.EffectiveConfig
config := agent.Config
c, err := yaml.Parser().Unmarshal([]byte(config))
if err != nil {
return confHash, err
@@ -89,7 +89,7 @@ func addIngestionControlToAgent(agent *model.Agent, signal string, processors ma
// add ingestion control spec
err = makeIngestionControlSpec(agentConf, Signal(signal), processors)
if err != nil {
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.ID), zap.Error(err))
zap.L().Error("failed to prepare ingestion control processors for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
return confHash, err
}
@@ -106,7 +106,7 @@ func addIngestionControlToAgent(agent *model.Agent, signal string, processors ma
return confHash, err
}
confHash = string(hash.Sum(nil))
agent.EffectiveConfig = string(configR)
agent.Config = string(configR)
err = agent.Upsert()
if err != nil {
return confHash, err

View File

@@ -5,6 +5,7 @@ import (
"log"
"net"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
@@ -67,7 +68,7 @@ func (ta *MockAgentConfigProvider) HasRecommendations() bool {
}
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) RecommendAgentConfig(baseConfYaml []byte) (
func (ta *MockAgentConfigProvider) RecommendAgentConfig(orgId valuer.UUID, baseConfYaml []byte) (
[]byte, string, error,
) {
if len(ta.ZPagesEndpoint) < 1 {
@@ -92,11 +93,14 @@ func (ta *MockAgentConfigProvider) RecommendAgentConfig(baseConfYaml []byte) (
// AgentConfigProvider interface
func (ta *MockAgentConfigProvider) ReportConfigDeploymentStatus(
orgId valuer.UUID,
agentId string,
configId string,
err error,
) {
confIdReports := ta.ReportedDeploymentStatuses[configId]
// using orgID + configId as key to avoid collisions with other orgs
// check code in model/coordinator.go for more details
confIdReports := ta.ReportedDeploymentStatuses[orgId.String()+configId]
if confIdReports == nil {
confIdReports = map[string]bool{}
ta.ReportedDeploymentStatuses[configId] = confIdReports
@@ -106,10 +110,12 @@ func (ta *MockAgentConfigProvider) ReportConfigDeploymentStatus(
}
// Test helper.
func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(
func (ta *MockAgentConfigProvider) HasReportedDeploymentStatus(orgID valuer.UUID,
configId string, agentId string,
) bool {
confIdReports := ta.ReportedDeploymentStatuses[configId]
// using orgID + configId as key to avoid collisions with other orgs
// check code in model/coordinator.go for more details
confIdReports := ta.ReportedDeploymentStatuses[orgID.String()+configId]
if confIdReports == nil {
return false
}

View File

@@ -4,36 +4,24 @@ import (
"bytes"
"context"
"crypto/sha256"
"log/slog"
"sync"
"time"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
opampTypes "github.com/open-telemetry/opamp-go/server/types"
)
type AgentStatus int
const (
AgentStatusUnknown AgentStatus = iota
AgentStatusConnected
AgentStatusDisconnected
)
// set in agent description when agent is capable of supporting
// lb exporter configuration. values: 1 (true) or 0 (false)
const lbExporterFlag = "capabilities.lbexporter"
type Agent struct {
ID string `json:"agentId" yaml:"agentId" db:"agent_id"`
StartedAt time.Time `json:"startedAt" yaml:"startedAt" db:"started_at"`
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" db:"terminated_at"`
EffectiveConfig string `json:"effectiveConfig" yaml:"effectiveConfig" db:"effective_config"`
CurrentStatus AgentStatus `json:"currentStatus" yaml:"currentStatus" db:"current_status"`
remoteConfig *protobufs.AgentRemoteConfig
Status *protobufs.AgentToServer
opamptypes.StorableAgent
remoteConfig *protobufs.AgentRemoteConfig
Status *protobufs.AgentToServer
// can this agent be load balancer
CanLB bool
@@ -41,13 +29,24 @@ type Agent struct {
// is this agent setup as load balancer
IsLb bool
conn types.Connection
conn opampTypes.Connection
connMutex sync.Mutex
mux sync.RWMutex
store sqlstore.SQLStore
logger *slog.Logger
}
func New(ID string, conn types.Connection) *Agent {
return &Agent{ID: ID, StartedAt: time.Now(), CurrentStatus: AgentStatusConnected, conn: conn}
// set in agent description when agent is capable of supporting
// lb exporter configuration. values: 1 (true) or 0 (false)
const lbExporterFlag = "capabilities.lbexporter"
func New(store sqlstore.SQLStore, logger *slog.Logger, orgID valuer.UUID, agentID string, conn opampTypes.Connection) *Agent {
return &Agent{
StorableAgent: opamptypes.NewStorableAgent(store, orgID, agentID, opamptypes.AgentStatusConnected),
conn: conn,
store: store,
logger: logger,
}
}
// Upsert inserts or updates the agent in the database.
@@ -55,17 +54,13 @@ func (agent *Agent) Upsert() error {
agent.mux.Lock()
defer agent.mux.Unlock()
_, err := db.NamedExec(`INSERT OR REPLACE INTO agents (
agent_id,
started_at,
effective_config,
current_status
) VALUES (
:agent_id,
:started_at,
:effective_config,
:current_status
)`, agent)
_, err := agent.store.BunDB().NewInsert().
Model(&agent.StorableAgent).
On("CONFLICT (agent_id) DO UPDATE").
Set("updated_at = EXCLUDED.updated_at").
Set("config = EXCLUDED.config").
Set("status = EXCLUDED.status").
Exec(context.Background())
if err != nil {
return err
}
@@ -73,6 +68,27 @@ func (agent *Agent) Upsert() error {
return nil
}
// keep only the last 50 agents in the database
func (agent *Agent) KeepOnlyLast50Agents(ctx context.Context) {
// Delete all agents except the last 50 in a single query
_, err := agent.store.BunDB().
NewDelete().
Model(new(opamptypes.StorableAgent)).
Where("org_id = ?", agent.OrgID).
Where("agent_id NOT IN (?)",
agent.store.BunDB().
NewSelect().
ColumnExpr("distinct(agent_id)").
Model(new(opamptypes.StorableAgent)).
Where("org_id = ?", agent.OrgID).
OrderExpr("created_at DESC").
Limit(50)).
Exec(ctx)
if err != nil {
agent.logger.Error("failed to delete old agents", "error", err)
}
}
// extracts lb exporter support flag from agent description. the flag
// is used to decide if lb exporter can be enabled on the agent.
func ExtractLbFlag(agentDescr *protobufs.AgentDescription) bool {
@@ -135,11 +151,11 @@ func (agent *Agent) updateAgentDescription(newStatus *protobufs.AgentToServer) (
// todo: need to address multiple agent scenario here
// for now, the first response will be sent back to the UI
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_APPLIED {
onConfigSuccess(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
onConfigSuccess(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash))
}
if agent.Status.RemoteConfigStatus.Status == protobufs.RemoteConfigStatuses_RemoteConfigStatuses_FAILED {
onConfigFailure(agent.ID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
onConfigFailure(agent.OrgID, agent.AgentID, string(agent.Status.RemoteConfigStatus.LastRemoteConfigHash), agent.Status.RemoteConfigStatus.ErrorMessage)
}
}
}
@@ -159,7 +175,7 @@ func (agent *Agent) updateHealth(newStatus *protobufs.AgentToServer) {
agent.Status.Health = newStatus.Health
if agent.Status != nil && agent.Status.Health != nil && agent.Status.Health.Healthy {
agent.StartedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
agent.TimeAuditable.UpdatedAt = time.Unix(0, int64(agent.Status.Health.StartTimeUnixNano)).UTC()
}
}
@@ -190,10 +206,10 @@ func (agent *Agent) updateEffectiveConfig(newStatus *protobufs.AgentToServer, re
agent.Status.EffectiveConfig = newStatus.EffectiveConfig
// Convert to string for displaying purposes.
agent.EffectiveConfig = ""
agent.Config = ""
// There should be only one config in the map.
for _, cfg := range newStatus.EffectiveConfig.ConfigMap.ConfigMap {
agent.EffectiveConfig = string(cfg.Body)
agent.Config = string(cfg.Body)
}
}
}
@@ -269,7 +285,8 @@ func (agent *Agent) processStatusUpdate(
agent.SendToAgent(response)
ListenToConfigUpdate(
agent.ID,
agent.OrgID,
agent.AgentID,
string(response.RemoteConfig.ConfigHash),
configProvider.ReportConfigDeploymentStatus,
)
@@ -277,9 +294,9 @@ func (agent *Agent) processStatusUpdate(
}
func (agent *Agent) updateRemoteConfig(configProvider AgentConfigProvider) bool {
recommendedConfig, confId, err := configProvider.RecommendAgentConfig([]byte(agent.EffectiveConfig))
recommendedConfig, confId, err := configProvider.RecommendAgentConfig(agent.OrgID, []byte(agent.Config))
if err != nil {
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.ID), zap.Error(err))
zap.L().Error("could not generate config recommendation for agent", zap.String("agentID", agent.AgentID), zap.Error(err))
return false
}

View File

@@ -1,19 +1,22 @@
package model
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/jmoiron/sqlx"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server/types"
"github.com/pkg/errors"
"go.uber.org/zap"
)
var db *sqlx.DB
var AllAgents = Agents{
agentsById: map[string]*Agent{},
connections: map[types.Connection]map[string]bool{},
@@ -23,6 +26,9 @@ type Agents struct {
mux sync.RWMutex
agentsById map[string]*Agent
connections map[types.Connection]map[string]bool
store sqlstore.SQLStore
OrgGetter organization.Getter
logger *slog.Logger
}
func (a *Agents) Count() int {
@@ -30,15 +36,16 @@ func (a *Agents) Count() int {
}
// Initialize the database and create schema if needed
func InitDB(qsDB *sqlx.DB) (*sqlx.DB, error) {
db = qsDB
func InitDB(sqlStore sqlstore.SQLStore, logger *slog.Logger, orgGetter organization.Getter) {
AllAgents = Agents{
agentsById: make(map[string]*Agent),
connections: make(map[types.Connection]map[string]bool),
mux: sync.RWMutex{},
store: sqlStore,
OrgGetter: orgGetter,
logger: logger,
}
return db, nil
}
// RemoveConnection removes the connection all Agent instances associated with the
@@ -49,8 +56,8 @@ func (agents *Agents) RemoveConnection(conn types.Connection) {
for instanceId := range agents.connections[conn] {
agent := agents.agentsById[instanceId]
agent.CurrentStatus = AgentStatusDisconnected
agent.TerminatedAt = time.Now()
agent.StorableAgent.Status = opamptypes.AgentStatusDisconnected
agent.StorableAgent.TerminatedAt = time.Now()
_ = agent.Upsert()
delete(agents.agentsById, instanceId)
}
@@ -67,27 +74,32 @@ func (agents *Agents) FindAgent(agentID string) *Agent {
// FindOrCreateAgent returns the Agent instance associated with the given agentID.
// If the Agent instance does not exist, it is created and added to the list of
// Agent instances.
func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection) (*Agent, bool, error) {
func (agents *Agents) FindOrCreateAgent(agentID string, conn types.Connection, orgID valuer.UUID) (*Agent, bool, error) {
agents.mux.Lock()
defer agents.mux.Unlock()
var created bool
agent, ok := agents.agentsById[agentID]
var err error
if !ok || agent == nil {
agent = New(agentID, conn)
err = agent.Upsert()
if err != nil {
return nil, created, err
}
agents.agentsById[agentID] = agent
if agents.connections[conn] == nil {
agents.connections[conn] = map[string]bool{}
}
agents.connections[conn][agentID] = true
created = true
if ok && agent != nil {
return agent, false, nil
}
return agent, created, nil
if !ok && orgID.IsZero() {
return nil, false, errors.New("cannot create agent without orgId")
}
agent = New(agents.store, agents.logger, orgID, agentID, conn)
err := agent.Upsert()
if err != nil {
return nil, false, err
}
agent.KeepOnlyLast50Agents(context.Background())
agents.agentsById[agentID] = agent
if agents.connections[conn] == nil {
agents.connections[conn] = map[string]bool{}
}
agents.connections[conn][agentID] = true
return agent, true, nil
}
func (agents *Agents) GetAllAgents() []*Agent {
@@ -108,18 +120,19 @@ func (agents *Agents) RecommendLatestConfigToAll(
) error {
for _, agent := range agents.GetAllAgents() {
newConfig, confId, err := provider.RecommendAgentConfig(
[]byte(agent.EffectiveConfig),
agent.OrgID,
[]byte(agent.Config),
)
if err != nil {
return errors.Wrap(err, fmt.Sprintf(
"could not generate conf recommendation for %v", agent.ID,
"could not generate conf recommendation for %v", agent.AgentID,
))
}
// Recommendation is same as current config
if string(newConfig) == agent.EffectiveConfig {
if string(newConfig) == agent.Config {
zap.L().Info(
"Recommended config same as current effective config for agent", zap.String("agentID", agent.ID),
"Recommended config same as current effective config for agent", zap.String("agentID", agent.AgentID),
)
return nil
}
@@ -144,7 +157,7 @@ func (agents *Agents) RecommendLatestConfigToAll(
RemoteConfig: newRemoteConfig,
})
ListenToConfigUpdate(agent.ID, confId, provider.ReportConfigDeploymentStatus)
ListenToConfigUpdate(agent.OrgID, agent.AgentID, confId, provider.ReportConfigDeploymentStatus)
}
return nil
}

View File

@@ -1,10 +1,12 @@
package model
import "github.com/SigNoz/signoz/pkg/valuer"
// Interface for source of otel collector config recommendations.
type AgentConfigProvider interface {
// Generate recommended config for an agent based on its `currentConfYaml`
// and current state of user facing settings for agent based features.
RecommendAgentConfig(currentConfYaml []byte) (
RecommendAgentConfig(orgId valuer.UUID, currentConfYaml []byte) (
recommendedConfYaml []byte,
// Opaque id of the recommended config, used for reporting deployment status updates
configId string,
@@ -13,6 +15,7 @@ type AgentConfigProvider interface {
// Report deployment status for config recommendations generated by RecommendAgentConfig
ReportConfigDeploymentStatus(
orgId valuer.UUID,
agentId string,
configId string,
err error,

View File

@@ -3,6 +3,8 @@ package model
import (
"fmt"
"sync"
"github.com/SigNoz/signoz/pkg/valuer"
)
// communicates with calling apis when config is applied or fails
@@ -15,7 +17,7 @@ func init() {
}
}
type OnChangeCallback func(agentId string, hash string, err error)
type OnChangeCallback func(orgId valuer.UUID, agentId string, hash string, err error)
// responsible for managing subscribers on config change
type Coordinator struct {
@@ -25,42 +27,49 @@ type Coordinator struct {
subscribers map[string][]OnChangeCallback
}
func onConfigSuccess(agentId string, hash string) {
notifySubscribers(agentId, hash, nil)
func getSubscriberKey(orgId valuer.UUID, hash string) string {
return orgId.String() + hash
}
func onConfigFailure(agentId string, hash string, errorMessage string) {
notifySubscribers(agentId, hash, fmt.Errorf(errorMessage))
func onConfigSuccess(orgId valuer.UUID, agentId string, hash string) {
key := getSubscriberKey(orgId, hash)
notifySubscribers(orgId, agentId, key, nil)
}
func onConfigFailure(orgId valuer.UUID, agentId string, hash string, errorMessage string) {
key := getSubscriberKey(orgId, hash)
notifySubscribers(orgId, agentId, key, fmt.Errorf(errorMessage))
}
// OnSuccess listens to config changes and notifies subscribers
func notifySubscribers(agentId string, hash string, err error) {
func notifySubscribers(orgId valuer.UUID, agentId string, key string, err error) {
// this method currently does not handle multi-agent scenario.
// as soon as a message is delivered, we release all the subscribers
// for a given hash
subs, ok := coordinator.subscribers[hash]
// for a given key
subs, ok := coordinator.subscribers[key]
if !ok {
return
}
for _, s := range subs {
s(agentId, hash, err)
s(orgId, agentId, key, err)
}
// delete all subscribers for this hash, assume future
// delete all subscribers for this key, assume future
// notifies will be disabled. the first response is processed
delete(coordinator.subscribers, hash)
delete(coordinator.subscribers, key)
}
// callers subscribe to this function to listen on config change requests
func ListenToConfigUpdate(agentId string, hash string, ss OnChangeCallback) {
func ListenToConfigUpdate(orgId valuer.UUID, agentId string, hash string, ss OnChangeCallback) {
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
if subs, ok := coordinator.subscribers[hash]; ok {
key := getSubscriberKey(orgId, hash)
if subs, ok := coordinator.subscribers[key]; ok {
subs = append(subs, ss)
coordinator.subscribers[hash] = subs
coordinator.subscribers[key] = subs
} else {
coordinator.subscribers[hash] = []OnChangeCallback{ss}
coordinator.subscribers[key] = []OnChangeCallback{ss}
}
}

View File

@@ -2,8 +2,10 @@ package opamp
import (
"context"
"time"
model "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/open-telemetry/opamp-go/protobufs"
"github.com/open-telemetry/opamp-go/server"
"github.com/open-telemetry/opamp-go/server/types"
@@ -53,6 +55,7 @@ func (srv *Server) Start(listener string) error {
ListenEndpoint: listener,
}
// This will have to send request to all the agents of all tenants
unsubscribe := srv.agentConfigProvider.SubscribeToConfigUpdates(func() {
err := srv.agents.RecommendLatestConfigToAll(srv.agentConfigProvider)
if err != nil {
@@ -78,21 +81,47 @@ func (srv *Server) onDisconnect(conn types.Connection) {
srv.agents.RemoveConnection(conn)
}
// When the agent sends the message for the first time, then we need to know the orgID
// For the subsequent requests, agents don't send the attributes unless something is changed
// but we keep them in context mapped which is mapped to the instanceID, so we would know the
// orgID from the context
// note :- there can only be 50 agents in the db for a given orgID, we don't have a check in-memory but we delete from the db after insert.
func (srv *Server) OnMessage(conn types.Connection, msg *protobufs.AgentToServer) *protobufs.ServerToAgent {
agentID := msg.InstanceUid
agent, created, err := srv.agents.FindOrCreateAgent(agentID, conn)
// find the orgID, if nothing is found keep it empty.
// the find or create agent will return an error if orgID is empty
// thus retry will happen
var orgID valuer.UUID
orgIDs, err := srv.agents.OrgGetter.ListByOwnedKeyRange(context.Background())
if err == nil && len(orgIDs) == 1 {
orgID = orgIDs[0].ID
}
agent, created, err := srv.agents.FindOrCreateAgent(agentID, conn, orgID)
if err != nil {
zap.L().Error("Failed to find or create agent", zap.String("agentID", agentID), zap.Error(err))
// TODO: handle error
// Return error response according to OpAMP protocol
return &protobufs.ServerToAgent{
InstanceUid: agentID,
ErrorResponse: &protobufs.ServerErrorResponse{
Type: protobufs.ServerErrorResponseType_ServerErrorResponseType_Unavailable,
Details: &protobufs.ServerErrorResponse_RetryInfo{
RetryInfo: &protobufs.RetryInfo{
RetryAfterNanoseconds: uint64(5 * time.Second), // minimum recommended retry interval
},
},
},
}
}
if created {
agent.CanLB = model.ExtractLbFlag(msg.AgentDescription)
zap.L().Debug(
"New agent added", zap.Bool("canLb", agent.CanLB),
zap.String("ID", agent.ID),
zap.Any("status", agent.CurrentStatus),
zap.String("agentID", agent.AgentID),
zap.Any("status", agent.Status),
)
}
@@ -119,6 +148,6 @@ func Ready() bool {
return true
}
func Subscribe(agentId string, hash string, f model.OnChangeCallback) {
model.ListenToConfigUpdate(agentId, hash, f)
func Subscribe(orgId valuer.UUID, agentId string, hash string, f model.OnChangeCallback) {
model.ListenToConfigUpdate(orgId, agentId, hash, f)
}

View File

@@ -213,6 +213,17 @@ func (q *querier) runBuilderQuery(
return
}
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
builderQuery.Temporality = updatedMetadata.Temporality
}
}
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.

View File

@@ -214,6 +214,17 @@ func (q *querier) runBuilderQuery(
return
}
if builderQuery.DataSource == v3.DataSourceMetrics && !q.testingMode {
metadata, apiError := q.reader.GetUpdatedMetricsMetadata(ctx, orgID, builderQuery.AggregateAttribute.Key)
if apiError != nil {
zap.L().Error("Error in getting metrics cached metadata", zap.Error(apiError))
}
if updatedMetadata, exist := metadata[builderQuery.AggregateAttribute.Key]; exist {
builderQuery.AggregateAttribute.Type = v3.AttributeKeyType(updatedMetadata.MetricType)
builderQuery.Temporality = updatedMetadata.Temporality
}
}
// What is happening here?
// We are only caching the graph panel queries. A non-existant cache key means that the query is not cached.
// If the query is not cached, we execute the query and return the result without caching it.

View File

@@ -179,13 +179,10 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.privateHTTP = privateServer
_, err = opAmpModel.InitDB(serverOptions.SigNoz.SQLStore.SQLxDB())
if err != nil {
return nil, err
}
opAmpModel.InitDB(serverOptions.SigNoz.SQLStore, serverOptions.SigNoz.Instrumentation.Logger(), serverOptions.SigNoz.Modules.OrgGetter)
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: serverOptions.SigNoz.SQLStore.SQLxDB(),
Store: serverOptions.SigNoz.SQLStore,
AgentFeatures: []agentConf.AgentFeature{
logParsingPipelineController,
},
@@ -197,7 +194,18 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
s.opampServer = opamp.InitializeServer(
&opAmpModel.AllAgents, agentConfMgr,
)
orgs, err := apiHandler.Signoz.Modules.OrgGetter.ListByOwnedKeyRange(context.Background())
if err != nil {
return nil, err
}
for _, org := range orgs {
errorList := reader.PreloadMetricsMetadata(context.Background(), org.ID)
for _, er := range errorList {
zap.L().Error("failed to preload metrics metadata", zap.Error(er))
}
}
return s, nil
}

View File

@@ -65,11 +65,19 @@ func Parse(filters *v3.FilterSet) (string, error) {
case v3.FilterOperatorExists, v3.FilterOperatorNotExists:
// accustom log filters like `body.log.message EXISTS` into EXPR language
// where User is attempting to check for keys present in JSON log body
key, found := strings.CutPrefix(v.Key.Key, "body.")
if found {
filter = fmt.Sprintf("%s %s %s", exprFormattedValue(key), logOperatorsToExpr[v.Operator], "fromJSON(body)")
if strings.HasPrefix(v.Key.Key, "body.") {
filter = fmt.Sprintf("%s %s %s", exprFormattedValue(strings.TrimPrefix(v.Key.Key, "body.")), logOperatorsToExpr[v.Operator], "fromJSON(body)")
} else if typ := getTypeName(v.Key.Type); typ != "" {
filter = fmt.Sprintf("%s %s %s", exprFormattedValue(v.Key.Key), logOperatorsToExpr[v.Operator], typ)
} else {
filter = fmt.Sprintf("%s %s %s", exprFormattedValue(v.Key.Key), logOperatorsToExpr[v.Operator], getTypeName(v.Key.Type))
// if type of key is not available; is considered as TOP LEVEL key in OTEL Log Data model hence
// switch Exist and Not Exists operators with NOT EQUAL and EQUAL respectively
operator := v3.FilterOperatorNotEqual
if v.Operator == v3.FilterOperatorNotExists {
operator = v3.FilterOperatorEqual
}
filter = fmt.Sprintf("%s %s nil", v.Key.Key, logOperatorsToExpr[operator])
}
default:
filter = fmt.Sprintf("%s %s %s", name, logOperatorsToExpr[v.Operator], exprFormattedValue(v.Value))

View File

@@ -141,6 +141,34 @@ var testCases = []struct {
}},
Expr: `"key" not in attributes`,
},
{
Name: "trace_id not exists",
Query: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}, Value: "", Operator: "nexists"},
}},
Expr: `trace_id == nil`,
},
{
Name: "trace_id exists",
Query: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "trace_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}, Value: "", Operator: "exists"},
}},
Expr: `trace_id != nil`,
},
{
Name: "span_id not exists",
Query: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "span_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}, Value: "", Operator: "nexists"},
}},
Expr: `span_id == nil`,
},
{
Name: "span_id exists",
Query: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{
{Key: v3.AttributeKey{Key: "span_id", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeUnspecified, IsColumn: true}, Value: "", Operator: "exists"},
}},
Expr: `span_id != nil`,
},
{
Name: "Multi filter",
Query: &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{

View File

@@ -2,6 +2,7 @@ package rules
import (
"context"
"fmt"
"strings"
"testing"
"time"
@@ -1223,7 +1224,7 @@ func TestThresholdRuleUnitCombinations(t *testing.T) {
for idx, c := range cases {
rows := cmock.NewRows(cols, c.values)
//telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run
// so we don't care about the query string here
queryString := "SELECT any"
@@ -1321,7 +1322,7 @@ func TestThresholdRuleNoData(t *testing.T) {
for idx, c := range cases {
rows := cmock.NewRows(cols, c.values)
//telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
telemetryStore.Mock().ExpectQuery(".*").WillReturnError(fmt.Errorf("error"))
// We are testing the eval logic after the query is run
// so we don't care about the query string here

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http/httptest"
"runtime/debug"
"strings"
@@ -24,7 +25,7 @@ import (
"github.com/SigNoz/signoz/pkg/query-service/app/integrations"
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp"
opampModel "github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/app/opamp/model"
"github.com/SigNoz/signoz/pkg/query-service/constants"
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
"github.com/SigNoz/signoz/pkg/query-service/queryBuilderToExpr"
@@ -35,7 +36,9 @@ import (
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/google/uuid"
"github.com/gorilla/mux"
"github.com/knadh/koanf/parsers/yaml"
@@ -47,7 +50,8 @@ import (
)
func TestLogPipelinesLifecycle(t *testing.T) {
testbed := NewLogPipelinesTestBed(t, nil)
agentID := valuer.GenerateUUID().String()
testbed := NewLogPipelinesTestBed(t, nil, agentID)
require := require.New(t)
getPipelinesResp := testbed.GetPipelinesFromQS()
@@ -134,19 +138,19 @@ func TestLogPipelinesLifecycle(t *testing.T) {
"pipelines config history should not be empty after 1st configuration",
)
require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
opamptypes.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"pipelines deployment should be in progress after 1st configuration",
)
// Deployment status should get updated after acknowledgement from opamp client
testbed.simulateOpampClientAcknowledgementForLatestConfig()
testbed.simulateOpampClientAcknowledgementForLatestConfig(agentID)
getPipelinesResp = testbed.GetPipelinesFromQS()
assertPipelinesResponseMatchesPostedPipelines(
t, postablePipelines, getPipelinesResp,
)
require.Equal(
agentConf.Deployed,
opamptypes.Deployed,
getPipelinesResp.History[0].DeployStatus,
"pipeline deployment should be complete after acknowledgment from opamp client",
)
@@ -166,19 +170,19 @@ func TestLogPipelinesLifecycle(t *testing.T) {
"there should be 2 history entries after posting pipelines config for the 2nd time",
)
require.Equal(
agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
opamptypes.DeployInitiated, getPipelinesResp.History[0].DeployStatus,
"deployment should be in progress for latest pipeline config",
)
// Deployment status should get updated again on receiving msg from client.
testbed.simulateOpampClientAcknowledgementForLatestConfig()
testbed.simulateOpampClientAcknowledgementForLatestConfig(agentID)
getPipelinesResp = testbed.GetPipelinesFromQS()
assertPipelinesResponseMatchesPostedPipelines(
t, postablePipelines, getPipelinesResp,
)
require.Equal(
agentConf.Deployed,
opamptypes.Deployed,
getPipelinesResp.History[0].DeployStatus,
"deployment for latest pipeline config should be complete after acknowledgment from opamp client",
)
@@ -186,7 +190,8 @@ func TestLogPipelinesLifecycle(t *testing.T) {
func TestLogPipelinesHistory(t *testing.T) {
require := require.New(t)
testbed := NewLogPipelinesTestBed(t, nil)
agentID := valuer.GenerateUUID().String()
testbed := NewLogPipelinesTestBed(t, nil, agentID)
// Only the latest config version can be "IN_PROGRESS",
// other incomplete deployments should have status "UNKNOWN"
@@ -232,7 +237,7 @@ func TestLogPipelinesHistory(t *testing.T) {
testbed.PostPipelinesToQS(postablePipelines)
getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(1, len(getPipelinesResp.History))
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
require.Equal(opamptypes.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
postablePipelines.Pipelines[0].Config = append(
postablePipelines.Pipelines[0].Config,
@@ -251,8 +256,8 @@ func TestLogPipelinesHistory(t *testing.T) {
getPipelinesResp = testbed.GetPipelinesFromQS()
require.Equal(2, len(getPipelinesResp.History))
require.Equal(agentConf.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
require.Equal(agentConf.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus)
require.Equal(opamptypes.DeployInitiated, getPipelinesResp.History[0].DeployStatus)
require.Equal(opamptypes.DeployStatusUnknown, getPipelinesResp.History[1].DeployStatus)
}
func TestLogPipelinesValidation(t *testing.T) {
@@ -389,7 +394,8 @@ func TestLogPipelinesValidation(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
testbed := NewLogPipelinesTestBed(t, nil)
agentID := valuer.GenerateUUID().String()
testbed := NewLogPipelinesTestBed(t, nil, agentID)
testbed.PostPipelinesToQSExpectingStatusCode(
pipelinetypes.PostablePipelines{
Pipelines: []pipelinetypes.PostablePipeline{tc.Pipeline},
@@ -460,6 +466,7 @@ type LogPipelinesTestBed struct {
agentConfMgr *agentConf.Manager
opampServer *opamp.Server
opampClientConn *opamp.MockOpAmpConnection
store sqlstore.SQLStore
userModule user.Module
}
@@ -469,9 +476,6 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
sqlStore = utils.NewQueryServiceDBForTests(t)
}
// create test org
// utils.CreateTestOrg(t, sqlStore)
ic, err := integrations.NewController(sqlStore)
if err != nil {
t.Fatalf("could not create integrations controller: %v", err)
@@ -508,17 +512,17 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
t.Fatalf("could not create a new ApiHandler: %v", err)
}
// organizationModule := implorganization.NewModule(implorganization.NewStore(store))
user, apiErr := createTestUser(modules.OrgSetter, modules.User)
if apiErr != nil {
t.Fatalf("could not create a test user: %v", apiErr)
}
// Mock an available opamp agent
testDB, err := opampModel.InitDB(sqlStore.SQLxDB())
require.Nil(t, err, "failed to init opamp model")
agentConfMgr, err := agentConf.Initiate(&agentConf.ManagerOptions{
DB: testDB,
Store: sqlStore,
AgentFeatures: []agentConf.AgentFeature{
apiHandler.LogsParsingPipelineController,
}})
@@ -529,15 +533,22 @@ func NewTestbedWithoutOpamp(t *testing.T, sqlStore sqlstore.SQLStore) *LogPipeli
testUser: user,
apiHandler: apiHandler,
agentConfMgr: agentConfMgr,
store: sqlStore,
userModule: modules.User,
}
}
func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore) *LogPipelinesTestBed {
func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore, agentID string) *LogPipelinesTestBed {
testbed := NewTestbedWithoutOpamp(t, testDB)
providerSettings := instrumentationtest.New().ToProviderSettings()
sharder, err := noopsharder.New(context.TODO(), providerSettings, sharder.Config{})
orgGetter := implorganization.NewGetter(implorganization.NewStore(testbed.store), sharder)
model.InitDB(testbed.store, slog.Default(), orgGetter)
opampServer := opamp.InitializeServer(nil, testbed.agentConfMgr)
err := opampServer.Start(opamp.GetAvailableLocalAddress())
err = opampServer.Start(opamp.GetAvailableLocalAddress())
require.Nil(t, err, "failed to start opamp server")
t.Cleanup(func() {
@@ -548,7 +559,7 @@ func NewLogPipelinesTestBed(t *testing.T, testDB sqlstore.SQLStore) *LogPipeline
opampServer.OnMessage(
opampClientConnection,
&protobufs.AgentToServer{
InstanceUid: "test",
InstanceUid: agentID,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: newInitialAgentConfigMap(),
},
@@ -743,10 +754,10 @@ func assertPipelinesRecommendedInRemoteConfig(
}
}
func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig() {
func (tb *LogPipelinesTestBed) simulateOpampClientAcknowledgementForLatestConfig(agentID string) {
lastMsg := tb.opampClientConn.LatestMsgFromServer()
tb.opampServer.OnMessage(tb.opampClientConn, &protobufs.AgentToServer{
InstanceUid: "test",
InstanceUid: agentID,
EffectiveConfig: &protobufs.EffectiveConfig{
ConfigMap: lastMsg.RemoteConfig.Config,
},

View File

@@ -32,6 +32,7 @@ import (
"github.com/SigNoz/signoz/pkg/types/authtypes"
"github.com/SigNoz/signoz/pkg/types/dashboardtypes"
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
"github.com/SigNoz/signoz/pkg/valuer"
mockhouse "github.com/srikanthccv/ClickHouse-go-mock"
"github.com/stretchr/testify/require"
)
@@ -42,6 +43,9 @@ func TestSignozIntegrationLifeCycle(t *testing.T) {
require := require.New(t)
testbed := NewIntegrationsTestBed(t, nil)
merr := utils.CreateTestOrg(t, testbed.store)
require.NoError(merr)
installedResp := testbed.GetInstalledIntegrationsFromQS()
require.Equal(
len(installedResp.Integrations), 0,
@@ -125,8 +129,12 @@ func TestLogPipelinesForInstalledSignozIntegrations(t *testing.T) {
require := require.New(t)
testDB := utils.NewQueryServiceDBForTests(t)
utils.CreateTestOrg(t, testDB)
agentID := valuer.GenerateUUID().String()
integrationsTB := NewIntegrationsTestBed(t, testDB)
pipelinesTB := NewLogPipelinesTestBed(t, testDB)
pipelinesTB := NewLogPipelinesTestBed(t, testDB, agentID)
availableIntegrationsResp := integrationsTB.GetAvailableIntegrationsFromQS()
availableIntegrations := availableIntegrationsResp.Integrations
@@ -380,6 +388,7 @@ type IntegrationsTestBed struct {
testUser *types.User
qsHttpHandler http.Handler
mockClickhouse mockhouse.ClickConnMockCommon
store sqlstore.SQLStore
userModule user.Module
}
@@ -618,6 +627,7 @@ func NewIntegrationsTestBed(t *testing.T, testDB sqlstore.SQLStore) *Integration
testUser: user,
qsHttpHandler: router,
mockClickhouse: mockClickhouse,
store: testDB,
userModule: modules.User,
}
}

View File

@@ -11,6 +11,8 @@ import (
"github.com/SigNoz/signoz/pkg/sqlmigrator"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/sqlstore/sqlitesqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/valuer"
_ "github.com/mattn/go-sqlite3"
)
@@ -69,6 +71,7 @@ func NewTestSqliteDB(t *testing.T) (sqlStore sqlstore.SQLStore, testDBFilePath s
sqlmigration.NewUpdateApiMonitoringFiltersFactory(sqlStore),
sqlmigration.NewAddKeyOrganizationFactory(sqlStore),
sqlmigration.NewUpdateDashboardFactory(sqlStore),
sqlmigration.NewUpdateAgentsFactory(sqlStore),
),
)
if err != nil {
@@ -87,3 +90,30 @@ func NewQueryServiceDBForTests(t *testing.T) sqlstore.SQLStore {
sqlStore, _ := NewTestSqliteDB(t)
return sqlStore
}
func CreateTestOrg(t *testing.T, store sqlstore.SQLStore) error {
org := &types.Organization{
Identifiable: types.Identifiable{
ID: valuer.GenerateUUID(),
},
Name: "testOrg",
}
_, err := store.BunDB().NewInsert().Model(org).Exec(context.Background())
if err != nil {
return err
}
return nil
}
func GetTestOrgId(store sqlstore.SQLStore) (valuer.UUID, error) {
var orgID valuer.UUID
err := store.BunDB().NewSelect().
Model(&types.Organization{}).
Column("id").
Limit(1).
Scan(context.Background(), &orgID)
if err != nil {
return orgID, err
}
return orgID, nil
}

View File

@@ -68,7 +68,7 @@ func CollisionHandledFinalExpr(
return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, correction)
} else {
// not even a close match, return an error
return "", nil, err
return "", nil, errors.Wrapf(err, errors.TypeInvalidInput, errors.CodeInvalidInput, "field %s not found", field.Name)
}
} else {
for _, key := range keysForField {

View File

@@ -46,7 +46,7 @@ func (b *defaultConditionBuilder) ConditionFor(
) (string, error) {
if key.FieldContext != telemetrytypes.FieldContextResource {
return "", nil
return "true", nil
}
column, err := b.fm.ColumnFor(ctx, key)

View File

@@ -22,7 +22,7 @@ type filterExpressionVisitor struct {
conditionBuilder qbtypes.ConditionBuilder
warnings []string
fieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
errors []error
errors []string
builder *sqlbuilder.SelectBuilder
fullTextColumn *telemetrytypes.TelemetryFieldKey
jsonBodyPrefix string
@@ -90,11 +90,14 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*sqlbuilder.W
combinedErrors := errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"found %d syntax errors while parsing the filter expression: %v",
"found %d syntax errors while parsing the filter expression",
len(parserErrorListener.SyntaxErrors),
parserErrorListener.SyntaxErrors,
)
return nil, nil, combinedErrors
additionals := make([]string, len(parserErrorListener.SyntaxErrors))
for _, err := range parserErrorListener.SyntaxErrors {
additionals = append(additionals, err.Error())
}
return nil, nil, combinedErrors.WithAdditional(additionals...)
}
// Visit the parse tree with our ClickHouse visitor
@@ -105,11 +108,10 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts) (*sqlbuilder.W
combinedErrors := errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"found %d errors while parsing the search expression: %v",
"found %d errors while parsing the search expression",
len(visitor.errors),
visitor.errors,
)
return nil, nil, combinedErrors
return nil, nil, combinedErrors.WithAdditional(visitor.errors...)
}
whereClause := sqlbuilder.NewWhereClause().AddWhereExpr(visitor.builder.Args, cond)
@@ -234,15 +236,11 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
// Handle standalone key/value as a full text search term
if ctx.GetChildCount() == 1 {
if v.skipFullTextFilter {
return ""
return "true"
}
if v.fullTextColumn == nil {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"full text search is not supported",
))
v.errors = append(v.errors, "full text search is not supported")
return ""
}
child := ctx.GetChild(0)
@@ -251,7 +249,7 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
keyText := keyCtx.GetText()
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, keyText, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
@@ -266,12 +264,12 @@ func (v *filterExpressionVisitor) VisitPrimary(ctx *grammar.PrimaryContext) any
} else if valCtx.KEY() != nil {
text = valCtx.KEY().GetText()
} else {
v.errors = append(v.errors, errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "unsupported value type: %s", valCtx.GetText()))
v.errors = append(v.errors, fmt.Sprintf("unsupported value type: %s", valCtx.GetText()))
return ""
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, text, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
@@ -419,7 +417,7 @@ func (v *filterExpressionVisitor) VisitComparison(ctx *grammar.ComparisonContext
for _, key := range keys {
condition, err := v.conditionBuilder.ConditionFor(context.Background(), key, op, value, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build condition: %s", err.Error()))
return ""
}
conds = append(conds, condition)
@@ -459,7 +457,7 @@ func (v *filterExpressionVisitor) VisitValueList(ctx *grammar.ValueListContext)
func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) any {
if v.skipFullTextFilter {
return ""
return "true"
}
var text string
@@ -471,16 +469,12 @@ func (v *filterExpressionVisitor) VisitFullText(ctx *grammar.FullTextContext) an
}
if v.fullTextColumn == nil {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"full text search is not supported",
))
v.errors = append(v.errors, "full text search is not supported")
return ""
}
cond, err := v.conditionBuilder.ConditionFor(context.Background(), v.fullTextColumn, qbtypes.FilterOperatorRegexp, text, v.builder)
if err != nil {
v.errors = append(v.errors, errors.WrapInternalf(err, errors.CodeInternal, "failed to build full text search condition"))
v.errors = append(v.errors, fmt.Sprintf("failed to build full text search condition: %s", err.Error()))
return ""
}
return cond
@@ -498,34 +492,19 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
functionName = "hasAll"
} else {
// Default fallback
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"unknown function `%s`",
ctx.GetText(),
))
v.errors = append(v.errors, fmt.Sprintf("unknown function `%s`", ctx.GetText()))
return ""
}
params := v.Visit(ctx.FunctionParamList()).([]any)
if len(params) < 2 {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"function `%s` expects key and value parameters",
functionName,
))
v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key and value parameters", functionName))
return ""
}
keys, ok := params[0].([]*telemetrytypes.TelemetryFieldKey)
if !ok {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"function `%s` expects key parameter to be a field key",
functionName,
))
v.errors = append(v.errors, fmt.Sprintf("function `%s` expects key parameter to be a field key", functionName))
return ""
}
value := params[1:]
@@ -536,12 +515,7 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
if strings.HasPrefix(key.Name, v.jsonBodyPrefix) {
fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value)
} else {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"function `%s` supports only body JSON search",
functionName,
))
v.errors = append(v.errors, fmt.Sprintf("function `%s` supports only body JSON search", functionName))
return ""
}
@@ -603,12 +577,7 @@ func (v *filterExpressionVisitor) VisitValue(ctx *grammar.ValueContext) any {
} else if ctx.NUMBER() != nil {
number, err := strconv.ParseFloat(ctx.NUMBER().GetText(), 64)
if err != nil {
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"failed to parse number %s",
ctx.NUMBER().GetText(),
))
v.errors = append(v.errors, fmt.Sprintf("failed to parse number %s", ctx.NUMBER().GetText()))
return ""
}
return number
@@ -648,19 +617,11 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
if len(fieldKeysForName) == 0 {
if strings.HasPrefix(fieldKey.Name, v.jsonBodyPrefix) && v.jsonBodyPrefix != "" && keyName == "" {
v.errors = append(v.errors, errors.NewInvalidInputf(
errors.CodeInvalidInput,
"missing key for body json search - expected key of the form `body.key` (ex: `body.status`)",
))
v.errors = append(v.errors, "missing key for body json search - expected key of the form `body.key` (ex: `body.status`)")
} else {
// TODO(srikanthccv): do we want to return an error here?
// should we infer the type and auto-magically build a key for expression?
v.errors = append(v.errors, errors.Newf(
errors.TypeInvalidInput,
errors.CodeInvalidInput,
"key `%s` not found",
fieldKey.Name,
))
v.errors = append(v.errors, fmt.Sprintf("key `%s` not found", fieldKey.Name))
}
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/SigNoz/signoz/pkg/emailing/smtpemailing"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/user"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/prometheus/clickhouseprometheus"
"github.com/SigNoz/signoz/pkg/querier"
@@ -110,6 +111,7 @@ func NewSQLMigrationProviderFactories(sqlstore sqlstore.SQLStore) factory.NamedM
sqlmigration.NewUpdateDashboardFactory(sqlstore),
sqlmigration.NewDropFeatureSetFactory(),
sqlmigration.NewDropDeprecatedTablesFactory(),
sqlmigration.NewUpdateAgentsFactory(sqlstore),
)
}
@@ -152,9 +154,9 @@ func NewSharderProviderFactories() factory.NamedMap[factory.ProviderFactory[shar
)
}
func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
func NewStatsReporterProviderFactories(telemetryStore telemetrystore.TelemetryStore, collectors []statsreporter.StatsCollector, orgGetter organization.Getter, userGetter user.Getter, build version.Build, analyticsConfig analytics.Config) factory.NamedMap[factory.ProviderFactory[statsreporter.StatsReporter, statsreporter.Config]] {
return factory.MustNewNamedMap(
analyticsstatsreporter.NewFactory(telemetryStore, collectors, orgGetter, build, analyticsConfig),
analyticsstatsreporter.NewFactory(telemetryStore, collectors, orgGetter, userGetter, build, analyticsConfig),
noopstatsreporter.NewFactory(),
)
}

View File

@@ -5,7 +5,9 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/SigNoz/signoz/pkg/analytics"
"github.com/SigNoz/signoz/pkg/instrumentation/instrumentationtest"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/sqlstore/sqlstoretest"
"github.com/SigNoz/signoz/pkg/statsreporter"
@@ -61,8 +63,9 @@ func TestNewProviderFactories(t *testing.T) {
})
assert.NotPanics(t, func() {
userGetter := impluser.NewGetter(impluser.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual), instrumentationtest.New().ToProviderSettings()))
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstoretest.New(sqlstore.Config{Provider: "sqlite"}, sqlmock.QueryMatcherEqual)), nil)
telemetryStore := telemetrystoretest.New(telemetrystore.Config{Provider: "clickhouse"}, sqlmock.QueryMatcherEqual)
NewStatsReporterProviderFactories(telemetryStore, []statsreporter.StatsCollector{}, orgGetter, version.Build{}, analytics.Config{Enabled: true})
NewStatsReporterProviderFactories(telemetryStore, []statsreporter.StatsCollector{}, orgGetter, userGetter, version.Build{}, analytics.Config{Enabled: true})
})
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/SigNoz/signoz/pkg/licensing"
"github.com/SigNoz/signoz/pkg/modules/organization"
"github.com/SigNoz/signoz/pkg/modules/organization/implorganization"
"github.com/SigNoz/signoz/pkg/modules/user/impluser"
"github.com/SigNoz/signoz/pkg/prometheus"
"github.com/SigNoz/signoz/pkg/querier"
"github.com/SigNoz/signoz/pkg/ruler"
@@ -31,6 +32,7 @@ import (
type SigNoz struct {
*factory.Registry
Instrumentation instrumentation.Instrumentation
Analytics analytics.Analytics
Cache cache.Cache
Web web.Web
SQLStore sqlstore.SQLStore
@@ -212,6 +214,9 @@ func New(
// Initialize organization getter
orgGetter := implorganization.NewGetter(implorganization.NewStore(sqlstore), sharder)
// Initialize user getter
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings))
// Initialize alertmanager from the available alertmanager provider factories
alertmanager, err := factory.NewProviderFromNamedMap(
ctx,
@@ -267,7 +272,7 @@ func New(
ctx,
providerSettings,
config.StatsReporter,
NewStatsReporterProviderFactories(telemetrystore, statsCollectors, orgGetter, version.Info, config.Analytics),
NewStatsReporterProviderFactories(telemetrystore, statsCollectors, orgGetter, userGetter, version.Info, config.Analytics),
config.StatsReporter.Provider(),
)
if err != nil {
@@ -288,6 +293,7 @@ func New(
return &SigNoz{
Registry: registry,
Analytics: analytics,
Instrumentation: instrumentation,
Cache: cache,
Web: web,

View File

@@ -0,0 +1,324 @@
package sqlmigration
import (
"context"
"database/sql"
"time"
"github.com/SigNoz/signoz/pkg/factory"
"github.com/SigNoz/signoz/pkg/sqlstore"
"github.com/SigNoz/signoz/pkg/types"
"github.com/SigNoz/signoz/pkg/types/opamptypes"
"github.com/SigNoz/signoz/pkg/valuer"
"github.com/uptrace/bun"
"github.com/uptrace/bun/migrate"
)
type updateAgents struct {
store sqlstore.SQLStore
}
type existingAgent41 struct {
bun.BaseModel `bun:"table:agents"`
AgentID string `bun:"agent_id,pk,type:text,unique"`
StartedAt time.Time `bun:"started_at,notnull"`
TerminatedAt time.Time `bun:"terminated_at"`
CurrentStatus opamptypes.AgentStatus `bun:"current_status,type:text,notnull"`
EffectiveConfig string `bun:"effective_config,type:text,notnull"`
}
type newAgent41 struct {
bun.BaseModel `bun:"table:agent"`
types.Identifiable
types.TimeAuditable
// AgentID is needed as the ID from opamp client is ULID and not UUID, so we are keeping it like this
AgentID string `json:"agentId" yaml:"agentId" bun:"agent_id,type:text,notnull,unique"`
OrgID string `json:"orgId" yaml:"orgId" bun:"org_id,type:text,notnull"`
TerminatedAt time.Time `json:"terminatedAt" yaml:"terminatedAt" bun:"terminated_at"`
Status opamptypes.AgentStatus `json:"currentStatus" yaml:"currentStatus" bun:"status,type:text,notnull"`
Config string `bun:"config,type:text,notnull"`
}
type existingAgentConfigVersions41 struct {
bun.BaseModel `bun:"table:agent_config_versions"`
ID string `bun:"id,pk,type:text"`
CreatedBy string `bun:"created_by,type:text"`
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"`
UpdatedBy string `bun:"updated_by,type:text"`
UpdatedAt time.Time `bun:"updated_at,default:CURRENT_TIMESTAMP"`
Version int `bun:"version,default:1,unique:element_version_idx"`
Active int `bun:"active"`
IsValid int `bun:"is_valid"`
Disabled int `bun:"disabled"`
ElementType opamptypes.ElementType `bun:"element_type,notnull,type:varchar(120),unique:element_version_idx"`
DeployStatus opamptypes.DeployStatus `bun:"deploy_status,notnull,type:varchar(80),default:'DIRTY'"`
DeploySequence int `bun:"deploy_sequence"`
DeployResult string `bun:"deploy_result,type:text"`
LastHash string `bun:"last_hash,type:text"`
LastConfig string `bun:"last_config,type:text"`
}
type newAgentConfigVersion41 struct {
bun.BaseModel `bun:"table:agent_config_version"`
types.Identifiable
types.TimeAuditable
types.UserAuditable
OrgID string `json:"orgId" bun:"org_id,type:text,notnull,unique:element_version_org_idx"`
Version int `json:"version" bun:"version,unique:element_version_org_idx"`
ElementType opamptypes.ElementType `json:"elementType" bun:"element_type,type:text,notnull,unique:element_version_org_idx"`
DeployStatus opamptypes.DeployStatus `json:"deployStatus" bun:"deploy_status,type:text,notnull,default:'dirty'"`
DeploySequence int `json:"deploySequence" bun:"deploy_sequence"`
DeployResult string `json:"deployResult" bun:"deploy_result,type:text"`
Hash string `json:"lastHash" bun:"hash,type:text"`
Config string `json:"config" bun:"config,type:text"`
}
type existingAgentConfigElement41 struct {
bun.BaseModel `bun:"table:agent_config_elements"`
ID string `bun:"id,pk,type:text"`
CreatedBy string `bun:"created_by,type:text"`
CreatedAt time.Time `bun:"created_at,default:CURRENT_TIMESTAMP"`
UpdatedBy string `bun:"updated_by,type:text"`
UpdatedAt time.Time `bun:"updated_at,default:CURRENT_TIMESTAMP"`
ElementID string `bun:"element_id,type:text,notnull,unique:agent_config_elements_u1"`
ElementType string `bun:"element_type,type:varchar(120),notnull,unique:agent_config_elements_u1"`
VersionID string `bun:"version_id,type:text,notnull,unique:agent_config_elements_u1"`
}
type newAgentConfigElement41 struct {
bun.BaseModel `bun:"table:agent_config_element"`
types.Identifiable
types.TimeAuditable
ElementID string `bun:"element_id,type:text,notnull,unique:element_type_version_idx"`
ElementType string `bun:"element_type,type:text,notnull,unique:element_type_version_idx"`
VersionID string `bun:"version_id,type:text,notnull,unique:element_type_version_idx"`
}
func NewUpdateAgentsFactory(sqlstore sqlstore.SQLStore) factory.ProviderFactory[SQLMigration, Config] {
return factory.NewProviderFactory(factory.MustNewName("update_agents"), func(ctx context.Context, ps factory.ProviderSettings, c Config) (SQLMigration, error) {
return newUpdateAgents(ctx, ps, c, sqlstore)
})
}
func newUpdateAgents(_ context.Context, _ factory.ProviderSettings, _ Config, store sqlstore.SQLStore) (SQLMigration, error) {
return &updateAgents{
store: store,
}, nil
}
func (migration *updateAgents) Register(migrations *migrate.Migrations) error {
if err := migrations.Register(migration.Up, migration.Down); err != nil {
return err
}
return nil
}
func (migration *updateAgents) Up(ctx context.Context, db *bun.DB) error {
// begin transaction
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer func() {
_ = tx.Rollback()
}()
var orgID string
err = tx.
NewSelect().
ColumnExpr("id").
Table("organizations").
Limit(1).
Scan(ctx, &orgID)
if err != nil {
if err != sql.ErrNoRows {
return err
}
}
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingAgent41), new(newAgent41), []string{OrgReference}, func(ctx context.Context) error {
existingAgents := make([]*existingAgent41, 0)
err = tx.
NewSelect().
Model(&existingAgents).
Scan(ctx)
if err != nil && err != sql.ErrNoRows {
return err
}
if err == nil && len(existingAgents) > 0 {
newAgents, err := migration.
CopyOldAgentToNewAgent(ctx, tx, existingAgents, orgID)
if err != nil {
return err
}
_, err = tx.
NewInsert().
Model(&newAgents).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingAgentConfigVersions41), new(newAgentConfigVersion41), []string{OrgReference}, func(ctx context.Context) error {
existingAgentConfigVersions := make([]*existingAgentConfigVersions41, 0)
err = tx.
NewSelect().
Model(&existingAgentConfigVersions).
Scan(ctx)
if err != nil && err != sql.ErrNoRows {
return err
}
if err == nil && len(existingAgentConfigVersions) > 0 {
newAgentConfigVersions, err := migration.
CopyOldAgentConfigVersionToNewAgentConfigVersion(ctx, tx, existingAgentConfigVersions, orgID)
if err != nil {
return err
}
_, err = tx.
NewInsert().
Model(&newAgentConfigVersions).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
err = migration.
store.
Dialect().
RenameTableAndModifyModel(ctx, tx, new(existingAgentConfigElement41), new(newAgentConfigElement41), []string{AgentConfigVersionReference}, func(ctx context.Context) error {
existingAgentConfigElements := make([]*existingAgentConfigElement41, 0)
err = tx.
NewSelect().
Model(&existingAgentConfigElements).
Scan(ctx)
if err != nil && err != sql.ErrNoRows {
return err
}
if err == nil && len(existingAgentConfigElements) > 0 {
newAgentConfigElements, err := migration.
CopyOldAgentConfigElementToNewAgentConfigElement(ctx, tx, existingAgentConfigElements, orgID)
if err != nil {
return err
}
_, err = tx.
NewInsert().
Model(&newAgentConfigElements).
Exec(ctx)
if err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}
func (migration *updateAgents) Down(ctx context.Context, db *bun.DB) error {
return nil
}
func (migration *updateAgents) CopyOldAgentToNewAgent(ctx context.Context, tx bun.IDB, existingAgents []*existingAgent41, orgID string) ([]*newAgent41, error) {
newAgents := make([]*newAgent41, 0)
for _, existingAgent := range existingAgents {
newAgents = append(newAgents, &newAgent41{
Identifiable: types.Identifiable{ID: valuer.GenerateUUID()},
AgentID: existingAgent.AgentID,
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Unix(existingAgent.StartedAt.Unix(), 0),
UpdatedAt: time.Unix(existingAgent.StartedAt.Unix(), 0),
},
Status: existingAgent.CurrentStatus,
Config: existingAgent.EffectiveConfig,
TerminatedAt: existingAgent.TerminatedAt,
OrgID: orgID,
})
}
return newAgents, nil
}
func (migration *updateAgents) CopyOldAgentConfigVersionToNewAgentConfigVersion(ctx context.Context, tx bun.IDB, existingAgentConfigVersions []*existingAgentConfigVersions41, orgID string) ([]*newAgentConfigVersion41, error) {
newAgentConfigVersions := make([]*newAgentConfigVersion41, 0)
for _, existingAgentConfigVersion := range existingAgentConfigVersions {
versionID, err := valuer.NewUUID(existingAgentConfigVersion.ID)
if err != nil {
return nil, err
}
newAgentConfigVersions = append(newAgentConfigVersions, &newAgentConfigVersion41{
Identifiable: types.Identifiable{ID: versionID},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Unix(existingAgentConfigVersion.CreatedAt.Unix(), 0),
UpdatedAt: time.Unix(existingAgentConfigVersion.UpdatedAt.Unix(), 0),
},
UserAuditable: types.UserAuditable{
CreatedBy: existingAgentConfigVersion.CreatedBy,
UpdatedBy: existingAgentConfigVersion.UpdatedBy,
},
OrgID: orgID,
Version: existingAgentConfigVersion.Version,
ElementType: existingAgentConfigVersion.ElementType,
DeployStatus: existingAgentConfigVersion.DeployStatus,
DeploySequence: existingAgentConfigVersion.DeploySequence,
DeployResult: existingAgentConfigVersion.DeployResult,
Hash: orgID + existingAgentConfigVersion.LastHash,
Config: existingAgentConfigVersion.LastConfig,
})
}
return newAgentConfigVersions, nil
}
func (migration *updateAgents) CopyOldAgentConfigElementToNewAgentConfigElement(ctx context.Context, tx bun.IDB, existingAgentConfigElements []*existingAgentConfigElement41, orgID string) ([]*newAgentConfigElement41, error) {
newAgentConfigElements := make([]*newAgentConfigElement41, 0)
for _, existingAgentConfigElement := range existingAgentConfigElements {
elementID, err := valuer.NewUUID(existingAgentConfigElement.ElementID)
if err != nil {
return nil, err
}
newAgentConfigElements = append(newAgentConfigElements, &newAgentConfigElement41{
Identifiable: types.Identifiable{ID: elementID},
TimeAuditable: types.TimeAuditable{
CreatedAt: time.Unix(existingAgentConfigElement.CreatedAt.Unix(), 0),
UpdatedAt: time.Unix(existingAgentConfigElement.UpdatedAt.Unix(), 0),
},
VersionID: existingAgentConfigElement.VersionID,
ElementID: existingAgentConfigElement.ElementID,
ElementType: existingAgentConfigElement.ElementType,
})
}
return newAgentConfigElements, nil
}

View File

@@ -25,11 +25,12 @@ var (
)
var (
OrgReference = "org"
UserReference = "user"
UserReferenceNoCascade = "user_no_cascade"
FactorPasswordReference = "factor_password"
CloudIntegrationReference = "cloud_integration"
OrgReference = "org"
UserReference = "user"
UserReferenceNoCascade = "user_no_cascade"
FactorPasswordReference = "factor_password"
CloudIntegrationReference = "cloud_integration"
AgentConfigVersionReference = "agent_config_version"
)
func New(

View File

@@ -18,19 +18,21 @@ const (
)
const (
Org string = "org"
User string = "user"
UserNoCascade string = "user_no_cascade"
FactorPassword string = "factor_password"
CloudIntegration string = "cloud_integration"
Org string = "org"
User string = "user"
UserNoCascade string = "user_no_cascade"
FactorPassword string = "factor_password"
CloudIntegration string = "cloud_integration"
AgentConfigVersion string = "agent_config_version"
)
const (
OrgReference string = `("org_id") REFERENCES "organizations" ("id")`
UserReference string = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE`
UserNoCascadeReference string = `("user_id") REFERENCES "users" ("id")`
FactorPasswordReference string = `("password_id") REFERENCES "factor_password" ("id")`
CloudIntegrationReference string = `("cloud_integration_id") REFERENCES "cloud_integration" ("id") ON DELETE CASCADE`
OrgReference string = `("org_id") REFERENCES "organizations" ("id")`
UserReference string = `("user_id") REFERENCES "users" ("id") ON DELETE CASCADE ON UPDATE CASCADE`
UserNoCascadeReference string = `("user_id") REFERENCES "users" ("id")`
FactorPasswordReference string = `("password_id") REFERENCES "factor_password" ("id")`
CloudIntegrationReference string = `("cloud_integration_id") REFERENCES "cloud_integration" ("id") ON DELETE CASCADE`
AgentConfigVersionReference string = `("version_id") REFERENCES "agent_config_version" ("id")`
)
const (
@@ -269,6 +271,8 @@ func (dialect *dialect) RenameTableAndModifyModel(ctx context.Context, bun bun.I
fkReferences = append(fkReferences, FactorPasswordReference)
} else if reference == CloudIntegration && !slices.Contains(fkReferences, CloudIntegrationReference) {
fkReferences = append(fkReferences, CloudIntegrationReference)
} else if reference == AgentConfigVersion && !slices.Contains(fkReferences, AgentConfigVersionReference) {
fkReferences = append(fkReferences, AgentConfigVersionReference)
}
}

Some files were not shown because too many files have changed in this diff Show More