Merge branch 'fix/log-detail-json-body-copy-value-only' of https://github.com/SigNoz/signoz into fix/log-detail-json-body-copy-value-only
This commit is contained in:
@@ -22,7 +22,6 @@ import { StatusCodes } from 'http-status-codes';
|
||||
import history from 'lib/history';
|
||||
import ErrorBoundaryFallback from 'pages/ErrorBoundaryFallback/ErrorBoundaryFallback';
|
||||
import posthog from 'posthog-js';
|
||||
import AlertRuleProvider from 'providers/Alert';
|
||||
import { useAppContext } from 'providers/App/App';
|
||||
import { IUser } from 'providers/App/types';
|
||||
import { DashboardProvider } from 'providers/Dashboard/Dashboard';
|
||||
@@ -374,26 +373,24 @@ function App(): JSX.Element {
|
||||
<QueryBuilderProvider>
|
||||
<DashboardProvider>
|
||||
<KeyboardHotkeysProvider>
|
||||
<AlertRuleProvider>
|
||||
<AppLayout>
|
||||
<PreferenceContextProvider>
|
||||
<Suspense fallback={<Spinner size="large" tip="Loading..." />}>
|
||||
<Switch>
|
||||
{routes.map(({ path, component, exact }) => (
|
||||
<Route
|
||||
key={`${path}`}
|
||||
exact={exact}
|
||||
path={path}
|
||||
component={component}
|
||||
/>
|
||||
))}
|
||||
<Route exact path="/" component={Home} />
|
||||
<Route path="*" component={NotFound} />
|
||||
</Switch>
|
||||
</Suspense>
|
||||
</PreferenceContextProvider>
|
||||
</AppLayout>
|
||||
</AlertRuleProvider>
|
||||
<AppLayout>
|
||||
<PreferenceContextProvider>
|
||||
<Suspense fallback={<Spinner size="large" tip="Loading..." />}>
|
||||
<Switch>
|
||||
{routes.map(({ path, component, exact }) => (
|
||||
<Route
|
||||
key={`${path}`}
|
||||
exact={exact}
|
||||
path={path}
|
||||
component={component}
|
||||
/>
|
||||
))}
|
||||
<Route exact path="/" component={Home} />
|
||||
<Route path="*" component={NotFound} />
|
||||
</Switch>
|
||||
</Suspense>
|
||||
</PreferenceContextProvider>
|
||||
</AppLayout>
|
||||
</KeyboardHotkeysProvider>
|
||||
</DashboardProvider>
|
||||
</QueryBuilderProvider>
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
.overflow-input {
|
||||
overflow: hidden;
|
||||
white-space: nowrap;
|
||||
text-overflow: ellipsis;
|
||||
}
|
||||
|
||||
.overflow-input-mirror {
|
||||
position: absolute;
|
||||
visibility: hidden;
|
||||
white-space: pre;
|
||||
pointer-events: none;
|
||||
font: inherit;
|
||||
letter-spacing: inherit;
|
||||
height: 0;
|
||||
overflow: hidden;
|
||||
}
|
||||
@@ -0,0 +1,119 @@
|
||||
import { render, screen, userEvent, waitFor, within } from 'tests/test-utils';
|
||||
|
||||
import OverflowInputToolTip from './OverflowInputToolTip';
|
||||
|
||||
const TOOLTIP_INNER_SELECTOR = '.ant-tooltip-inner';
|
||||
// Utility to mock overflow behaviour on inputs / elements.
|
||||
// Stubs HTMLElement.prototype.clientWidth, scrollWidth and offsetWidth used by component.
|
||||
function mockOverflow(clientWidth: number, scrollWidth: number): void {
|
||||
Object.defineProperty(HTMLElement.prototype, 'clientWidth', {
|
||||
configurable: true,
|
||||
value: clientWidth,
|
||||
});
|
||||
Object.defineProperty(HTMLElement.prototype, 'scrollWidth', {
|
||||
configurable: true,
|
||||
value: scrollWidth,
|
||||
});
|
||||
// mirror.offsetWidth is used to compute mirrorWidth = offsetWidth + 24.
|
||||
// Use clientWidth so the mirror measurement aligns with the mocked client width in tests.
|
||||
Object.defineProperty(HTMLElement.prototype, 'offsetWidth', {
|
||||
configurable: true,
|
||||
value: clientWidth,
|
||||
});
|
||||
}
|
||||
|
||||
function queryTooltipInner(): HTMLElement | null {
|
||||
// find element that has role="tooltip" (could be the inner itself)
|
||||
const tooltip = document.querySelector<HTMLElement>('[role="tooltip"]');
|
||||
if (!tooltip) return document.querySelector(TOOLTIP_INNER_SELECTOR);
|
||||
|
||||
// if the role element is already the inner, return it; otherwise return its descendant
|
||||
if (tooltip.classList.contains('ant-tooltip-inner')) return tooltip;
|
||||
return (
|
||||
(tooltip.querySelector(TOOLTIP_INNER_SELECTOR) as HTMLElement) ??
|
||||
document.querySelector(TOOLTIP_INNER_SELECTOR)
|
||||
);
|
||||
}
|
||||
|
||||
describe('OverflowInputToolTip', () => {
|
||||
beforeEach(() => {
|
||||
jest.restoreAllMocks();
|
||||
});
|
||||
|
||||
test('shows tooltip when content overflows and input is clamped at maxAutoWidth', async () => {
|
||||
mockOverflow(150, 250); // clientWidth >= maxAutoWidth (150), scrollWidth > clientWidth
|
||||
|
||||
render(<OverflowInputToolTip value="Very long overflowing text" />);
|
||||
|
||||
await userEvent.hover(screen.getByRole('textbox'));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(queryTooltipInner()).not.toBeNull();
|
||||
});
|
||||
|
||||
const tooltipInner = queryTooltipInner();
|
||||
if (!tooltipInner) throw new Error('Tooltip inner not found');
|
||||
expect(
|
||||
within(tooltipInner).getByText('Very long overflowing text'),
|
||||
).toBeInTheDocument();
|
||||
});
|
||||
|
||||
test('does NOT show tooltip when content does not overflow', async () => {
|
||||
mockOverflow(150, 100); // content fits (scrollWidth <= clientWidth)
|
||||
|
||||
render(<OverflowInputToolTip value="Short text" />);
|
||||
|
||||
await userEvent.hover(screen.getByRole('textbox'));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(queryTooltipInner()).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
test('does NOT show tooltip when content overflows but input is NOT at maxAutoWidth', async () => {
|
||||
mockOverflow(100, 250); // clientWidth < maxAutoWidth (150), scrollWidth > clientWidth
|
||||
|
||||
render(<OverflowInputToolTip value="Long but input not clamped" />);
|
||||
|
||||
await userEvent.hover(screen.getByRole('textbox'));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(queryTooltipInner()).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
test('uncontrolled input allows typing', async () => {
|
||||
render(<OverflowInputToolTip defaultValue="Init" />);
|
||||
|
||||
const input = screen.getByRole('textbox') as HTMLInputElement;
|
||||
await userEvent.type(input, 'ABC');
|
||||
|
||||
expect(input).toHaveValue('InitABC');
|
||||
});
|
||||
|
||||
test('disabled input never shows tooltip even if overflowing', async () => {
|
||||
mockOverflow(150, 300);
|
||||
|
||||
render(<OverflowInputToolTip value="Overflowing!" disabled />);
|
||||
|
||||
await userEvent.hover(screen.getByRole('textbox'));
|
||||
|
||||
await waitFor(() => {
|
||||
expect(queryTooltipInner()).toBeNull();
|
||||
});
|
||||
});
|
||||
|
||||
test('renders mirror span and input correctly (structural assertions instead of snapshot)', () => {
|
||||
const { container } = render(<OverflowInputToolTip value="Snapshot" />);
|
||||
const mirror = container.querySelector('.overflow-input-mirror');
|
||||
const input = container.querySelector('input') as HTMLInputElement | null;
|
||||
|
||||
expect(mirror).toBeTruthy();
|
||||
expect(mirror?.textContent).toBe('Snapshot');
|
||||
expect(input).toBeTruthy();
|
||||
expect(input?.value).toBe('Snapshot');
|
||||
|
||||
// width should be set inline (component calculates width on mount)
|
||||
expect(input?.getAttribute('style')).toContain('width:');
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,73 @@
|
||||
/* eslint-disable react/require-default-props */
|
||||
/* eslint-disable react/jsx-props-no-spreading */
|
||||
|
||||
import './OverflowInputToolTip.scss';
|
||||
|
||||
import { Input, InputProps, InputRef, Tooltip } from 'antd';
|
||||
import cx from 'classnames';
|
||||
import { useEffect, useRef, useState } from 'react';
|
||||
|
||||
export interface OverflowTooltipInputProps extends InputProps {
|
||||
tooltipPlacement?: 'top' | 'bottom' | 'left' | 'right';
|
||||
minAutoWidth?: number;
|
||||
maxAutoWidth?: number;
|
||||
}
|
||||
|
||||
function OverflowInputToolTip({
|
||||
value,
|
||||
defaultValue,
|
||||
onChange,
|
||||
disabled = false,
|
||||
tooltipPlacement = 'top',
|
||||
className,
|
||||
minAutoWidth = 70,
|
||||
maxAutoWidth = 150,
|
||||
...rest
|
||||
}: OverflowTooltipInputProps): JSX.Element {
|
||||
const inputRef = useRef<InputRef>(null);
|
||||
const mirrorRef = useRef<HTMLSpanElement | null>(null);
|
||||
const [isOverflowing, setIsOverflowing] = useState<boolean>(false);
|
||||
|
||||
useEffect(() => {
|
||||
const input = inputRef.current?.input;
|
||||
const mirror = mirrorRef.current;
|
||||
if (!input || !mirror) {
|
||||
setIsOverflowing(false);
|
||||
return;
|
||||
}
|
||||
|
||||
mirror.textContent = String(value ?? '') || ' ';
|
||||
const mirrorWidth = mirror.offsetWidth + 24;
|
||||
const newWidth = Math.min(maxAutoWidth, Math.max(minAutoWidth, mirrorWidth));
|
||||
input.style.width = `${newWidth}px`;
|
||||
|
||||
// consider clamped when mirrorWidth reaches maxAutoWidth (allow -5px tolerance)
|
||||
const isClamped = mirrorWidth >= maxAutoWidth - 5;
|
||||
const overflow = input.scrollWidth > input.clientWidth && isClamped;
|
||||
|
||||
setIsOverflowing(overflow);
|
||||
}, [value, disabled, minAutoWidth, maxAutoWidth]);
|
||||
|
||||
const tooltipTitle = !disabled && isOverflowing ? String(value ?? '') : '';
|
||||
|
||||
return (
|
||||
<>
|
||||
<span ref={mirrorRef} aria-hidden className="overflow-input-mirror" />
|
||||
<Tooltip title={tooltipTitle} placement={tooltipPlacement}>
|
||||
<Input
|
||||
{...rest}
|
||||
value={value}
|
||||
defaultValue={defaultValue}
|
||||
onChange={onChange}
|
||||
disabled={disabled}
|
||||
ref={inputRef}
|
||||
className={cx('overflow-input', className)}
|
||||
/>
|
||||
</Tooltip>
|
||||
</>
|
||||
);
|
||||
}
|
||||
|
||||
OverflowInputToolTip.displayName = 'OverflowInputToolTip';
|
||||
|
||||
export default OverflowInputToolTip;
|
||||
3
frontend/src/components/OverflowInputToolTip/index.tsx
Normal file
3
frontend/src/components/OverflowInputToolTip/index.tsx
Normal file
@@ -0,0 +1,3 @@
|
||||
import OverflowInputToolTip from './OverflowInputToolTip';
|
||||
|
||||
export default OverflowInputToolTip;
|
||||
@@ -1,6 +1,8 @@
|
||||
/* eslint-disable react/jsx-props-no-spreading */
|
||||
import { Button, Flex, Input, Select } from 'antd';
|
||||
|
||||
import { Button, Flex, Select } from 'antd';
|
||||
import cx from 'classnames';
|
||||
import OverflowInputToolTip from 'components/OverflowInputToolTip';
|
||||
import {
|
||||
logsQueryFunctionOptions,
|
||||
metricQueryFunctionOptions,
|
||||
@@ -9,6 +11,7 @@ import {
|
||||
import { useIsDarkMode } from 'hooks/useDarkMode';
|
||||
import { debounce, isNil } from 'lodash-es';
|
||||
import { X } from 'lucide-react';
|
||||
import { useMemo, useState } from 'react';
|
||||
import { IBuilderQuery } from 'types/api/queryBuilder/queryBuilderData';
|
||||
import { QueryFunction } from 'types/api/v5/queryRange';
|
||||
import { DataSource, QueryFunctionsTypes } from 'types/common/queryBuilder';
|
||||
@@ -47,9 +50,13 @@ export default function Function({
|
||||
functionValue = funcData.args?.[0]?.value;
|
||||
}
|
||||
|
||||
const debouncedhandleUpdateFunctionArgs = debounce(
|
||||
handleUpdateFunctionArgs,
|
||||
500,
|
||||
const [value, setValue] = useState<string>(
|
||||
functionValue !== undefined ? String(functionValue) : '',
|
||||
);
|
||||
|
||||
const debouncedhandleUpdateFunctionArgs = useMemo(
|
||||
() => debounce(handleUpdateFunctionArgs, 500),
|
||||
[handleUpdateFunctionArgs],
|
||||
);
|
||||
|
||||
// update the logic when we start supporting functions for traces
|
||||
@@ -89,13 +96,18 @@ export default function Function({
|
||||
/>
|
||||
|
||||
{showInput && (
|
||||
<Input
|
||||
className="query-function-value"
|
||||
<OverflowInputToolTip
|
||||
autoFocus
|
||||
defaultValue={functionValue}
|
||||
value={value}
|
||||
onChange={(event): void => {
|
||||
const newVal = event.target.value;
|
||||
setValue(newVal);
|
||||
debouncedhandleUpdateFunctionArgs(funcData, index, event.target.value);
|
||||
}}
|
||||
tooltipPlacement="top"
|
||||
minAutoWidth={70}
|
||||
maxAutoWidth={150}
|
||||
className="query-function-value"
|
||||
/>
|
||||
)}
|
||||
|
||||
|
||||
@@ -99,7 +99,7 @@
|
||||
}
|
||||
|
||||
.query-function-value {
|
||||
width: 55px;
|
||||
width: 70px;
|
||||
border-left: 0;
|
||||
background: var(--bg-ink-200);
|
||||
border-radius: 0;
|
||||
|
||||
@@ -1,3 +1,13 @@
|
||||
import AlertRuleProvider from 'providers/Alert';
|
||||
|
||||
import AlertDetails from './AlertDetails';
|
||||
|
||||
export default AlertDetails;
|
||||
function AlertDetailsPage(): JSX.Element {
|
||||
return (
|
||||
<AlertRuleProvider>
|
||||
<AlertDetails />
|
||||
</AlertRuleProvider>
|
||||
);
|
||||
}
|
||||
|
||||
export default AlertDetailsPage;
|
||||
|
||||
9
go.mod
9
go.mod
@@ -8,7 +8,7 @@ require (
|
||||
github.com/ClickHouse/clickhouse-go/v2 v2.40.1
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.4
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1
|
||||
github.com/antonmedv/expr v1.15.3
|
||||
github.com/cespare/xxhash/v2 v2.3.0
|
||||
@@ -86,12 +86,19 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/bytedance/gopkg v0.1.3 // indirect
|
||||
github.com/bytedance/sonic v1.14.1 // indirect
|
||||
github.com/bytedance/sonic/loader v0.3.0 // indirect
|
||||
github.com/cloudwego/base64x v0.1.6 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/ncruces/go-strftime v0.1.9 // indirect
|
||||
github.com/redis/go-redis/extra/rediscmd/v9 v9.15.1 // indirect
|
||||
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
|
||||
github.com/uptrace/opentelemetry-go-extra/otelsql v0.3.2 // indirect
|
||||
go.opentelemetry.io/collector/config/configretry v1.34.0 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
|
||||
modernc.org/libc v1.66.10 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.11.0 // indirect
|
||||
|
||||
16
go.sum
16
go.sum
@@ -106,8 +106,8 @@ github.com/SigNoz/expr v1.17.7-beta h1:FyZkleM5dTQ0O6muQfwGpoH5A2ohmN/XTasRCO72g
|
||||
github.com/SigNoz/expr v1.17.7-beta/go.mod h1:8/vRC7+7HBzESEqt5kKpYXxrxkr31SaO8r40VO/1IT4=
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd h1:Bk43AsDYe0fhkbj57eGXx8H3ZJ4zhmQXBnrW523ktj8=
|
||||
github.com/SigNoz/govaluate v0.0.0-20240203125216-988004ccc7fd/go.mod h1:nxRcH/OEdM8QxzH37xkGzomr1O0JpYBRS6pwjsWW6Pc=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.4 h1:DGDu9y1I1FU+HX4eECPGmfhnXE4ys4yr7LL6znbf6to=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.4/go.mod h1:xyR+coBzzO04p6Eu+ql2RVYUl/jFD+8hD9lArcc9U7g=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9 h1:WmYDSSwzyW2yiJ3tPq5AFdjsrz3NBdtPkygtFKOsACw=
|
||||
github.com/SigNoz/signoz-otel-collector v0.129.10-rc.9/go.mod h1:4eJCRUd/P4OiCHXvGYZK8q6oyBVGJFVj/G6qKSoN/TQ=
|
||||
github.com/Yiling-J/theine-go v0.6.2 h1:1GeoXeQ0O0AUkiwj2S9Jc0Mzx+hpqzmqsJ4kIC4M9AY=
|
||||
github.com/Yiling-J/theine-go v0.6.2/go.mod h1:08QpMa5JZ2pKN+UJCRrCasWYO1IKCdl54Xa836rpmDU=
|
||||
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c=
|
||||
@@ -162,6 +162,12 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
|
||||
github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M=
|
||||
github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM=
|
||||
github.com/bytedance/sonic v1.14.1 h1:FBMC0zVz5XUmE4z9wF4Jey0An5FueFvOsTKKKtwIl7w=
|
||||
github.com/bytedance/sonic v1.14.1/go.mod h1:gi6uhQLMbTdeP0muCnrjHLeCUPyb70ujhnNlhOylAFc=
|
||||
github.com/bytedance/sonic/loader v0.3.0 h1:dskwH8edlzNMctoruo8FPTJDF3vLtDT0sXZwvZJyqeA=
|
||||
github.com/bytedance/sonic/loader v0.3.0/go.mod h1:N8A3vUdtUebEY2/VQC0MyhYeKUFosQU6FxH2JmUe6VI=
|
||||
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||
@@ -178,6 +184,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
|
||||
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
|
||||
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M=
|
||||
github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
|
||||
@@ -991,6 +999,8 @@ github.com/tmthrgd/go-hex v0.0.0-20190904060850-447a3041c3bc/go.mod h1:bciPuU6GH
|
||||
github.com/trivago/tgo v1.0.7 h1:uaWH/XIy9aWYWpjm2CU3RpcqZXmX2ysQ9/Go+d9gyrM=
|
||||
github.com/trivago/tgo v1.0.7/go.mod h1:w4dpD+3tzNIIiIfkWWa85w5/B77tlvdZckQ+6PkFnhc=
|
||||
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
|
||||
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
|
||||
github.com/uptrace/bun v1.2.9 h1:OOt2DlIcRUMSZPr6iXDFg/LaQd59kOxbAjpIVHddKRs=
|
||||
github.com/uptrace/bun v1.2.9/go.mod h1:r2ZaaGs9Ru5bpGTr8GQfp8jp+TlCav9grYCPOu2CJSg=
|
||||
github.com/uptrace/bun/dialect/pgdialect v1.2.9 h1:caf5uFbOGiXvadV6pA5gn87k0awFFxL1kuuY3SpxnWk=
|
||||
@@ -1235,6 +1245,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
|
||||
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
|
||||
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
|
||||
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU=
|
||||
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
|
||||
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
|
||||
6
pkg/cache/memorycache/provider.go
vendored
6
pkg/cache/memorycache/provider.go
vendored
@@ -105,6 +105,12 @@ func (provider *provider) Set(ctx context.Context, orgID valuer.UUID, cacheKey s
|
||||
return err
|
||||
}
|
||||
|
||||
// To make sure ristretto does not go into no-op
|
||||
if ttl < 0 {
|
||||
provider.settings.Logger().WarnContext(ctx, "ttl is less than 0, setting it to 0")
|
||||
ttl = 0
|
||||
}
|
||||
|
||||
if cloneable, ok := data.(cachetypes.Cloneable); ok {
|
||||
span.SetAttributes(attribute.Bool("memory.cloneable", true))
|
||||
span.SetAttributes(attribute.Int64("memory.cost", 1))
|
||||
|
||||
@@ -208,3 +208,13 @@ func WrapUnexpectedf(cause error, code Code, format string, args ...any) *base {
|
||||
func NewUnexpectedf(code Code, format string, args ...any) *base {
|
||||
return Newf(TypeInvalidInput, code, format, args...)
|
||||
}
|
||||
|
||||
// WrapTimeoutf is a wrapper around Wrapf with TypeTimeout.
|
||||
func WrapTimeoutf(cause error, code Code, format string, args ...any) *base {
|
||||
return Wrapf(cause, TypeTimeout, code, format, args...)
|
||||
}
|
||||
|
||||
// NewTimeoutf is a wrapper around Newf with TypeTimeout.
|
||||
func NewTimeoutf(code Code, format string, args ...any) *base {
|
||||
return Newf(TypeTimeout, code, format, args...)
|
||||
}
|
||||
|
||||
@@ -96,7 +96,6 @@ func (h *handler) UpdateMetricMetadata(rw http.ResponseWriter, req *http.Request
|
||||
|
||||
// Set metric name from URL path
|
||||
in.MetricName = metricName
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
|
||||
err = h.module.UpdateMetricMetadata(req.Context(), orgID, &in)
|
||||
@@ -137,3 +136,47 @@ func (h *handler) GetMetricMetadata(rw http.ResponseWriter, req *http.Request) {
|
||||
|
||||
render.Success(rw, http.StatusOK, metadata)
|
||||
}
|
||||
|
||||
func (h *handler) GetMetricHighlights(rw http.ResponseWriter, req *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(req.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
metricName := strings.TrimSpace(req.URL.Query().Get("metricName"))
|
||||
if metricName == "" {
|
||||
render.Error(rw, errors.NewInvalidInputf(errors.CodeInvalidInput, "metricName query parameter is required"))
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
highlights, err := h.module.GetMetricHighlights(req.Context(), orgID, metricName)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
render.Success(rw, http.StatusOK, highlights)
|
||||
}
|
||||
|
||||
func (h *handler) GetMetricAttributes(rw http.ResponseWriter, req *http.Request) {
|
||||
claims, err := authtypes.ClaimsFromContext(req.Context())
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
var in metricsexplorertypes.MetricAttributesRequest
|
||||
if err := binding.JSON.BindBody(req.Body, &in); err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
|
||||
orgID := valuer.MustNewUUID(claims.OrgID)
|
||||
out, err := h.module.GetMetricAttributes(req.Context(), orgID, &in)
|
||||
if err != nil {
|
||||
render.Error(rw, err)
|
||||
return
|
||||
}
|
||||
render.Success(rw, http.StatusOK, out)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package implmetricsexplorer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
@@ -20,6 +21,7 @@ import (
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/SigNoz/signoz/pkg/valuer"
|
||||
sqlbuilder "github.com/huandu/go-sqlbuilder"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type module struct {
|
||||
@@ -190,6 +192,79 @@ func (m *module) UpdateMetricMetadata(ctx context.Context, orgID valuer.UUID, re
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetMetricHighlights returns highlights for a metric including data points, last received, total time series, and active time series.
|
||||
func (m *module) GetMetricHighlights(ctx context.Context, orgID valuer.UUID, metricName string) (*metricsexplorertypes.MetricHighlightsResponse, error) {
|
||||
if metricName == "" {
|
||||
return nil, errors.NewInvalidInputf(errors.CodeInvalidInput, "metric name is required")
|
||||
}
|
||||
|
||||
var response metricsexplorertypes.MetricHighlightsResponse
|
||||
|
||||
g, gCtx := errgroup.WithContext(ctx)
|
||||
|
||||
// Fetch data points
|
||||
g.Go(func() error {
|
||||
dataPoints, err := m.getMetricDataPoints(gCtx, metricName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response.DataPoints = dataPoints
|
||||
return nil
|
||||
})
|
||||
|
||||
// Fetch last received
|
||||
g.Go(func() error {
|
||||
lastReceived, err := m.getMetricLastReceived(gCtx, metricName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response.LastReceived = lastReceived
|
||||
return nil
|
||||
})
|
||||
|
||||
// Fetch total time series
|
||||
g.Go(func() error {
|
||||
totalTimeSeries, err := m.getTotalTimeSeriesForMetricName(gCtx, metricName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response.TotalTimeSeries = totalTimeSeries
|
||||
return nil
|
||||
})
|
||||
|
||||
// Fetch active time series (using 120 minutes as default duration)
|
||||
g.Go(func() error {
|
||||
activeTimeSeries, err := m.getActiveTimeSeriesForMetricName(gCtx, metricName, 120*time.Minute)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
response.ActiveTimeSeries = activeTimeSeries
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
func (m *module) GetMetricAttributes(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.MetricAttributesRequest) (*metricsexplorertypes.MetricAttributesResponse, error) {
|
||||
if err := req.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
attributes, err := m.fetchMetricAttributes(ctx, req.MetricName, req.Start, req.End)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &metricsexplorertypes.MetricAttributesResponse{
|
||||
Attributes: attributes,
|
||||
TotalKeys: int64(len(attributes)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *module) fetchMetadataFromCache(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]*metricsexplorertypes.MetricMetadata, []string) {
|
||||
hits := make(map[string]*metricsexplorertypes.MetricMetadata)
|
||||
misses := make([]string, 0)
|
||||
@@ -771,3 +846,132 @@ func (m *module) computeSamplesTreemap(ctx context.Context, req *metricsexplorer
|
||||
|
||||
return entries, nil
|
||||
}
|
||||
|
||||
// getMetricDataPoints returns the total number of data points (samples) for a metric.
|
||||
func (m *module) getMetricDataPoints(ctx context.Context, metricName string) (uint64, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("sum(count) AS data_points")
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.SamplesV4Agg30mTableName))
|
||||
sb.Where(sb.E("metric_name", metricName))
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
var dataPoints uint64
|
||||
err := db.QueryRow(ctx, query, args...).Scan(&dataPoints)
|
||||
if err != nil {
|
||||
return 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to get metrics data points")
|
||||
}
|
||||
|
||||
return dataPoints, nil
|
||||
}
|
||||
|
||||
// getMetricLastReceived returns the last received timestamp for a metric.
|
||||
func (m *module) getMetricLastReceived(ctx context.Context, metricName string) (uint64, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("MAX(last_reported_unix_milli) AS last_received_time")
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName))
|
||||
sb.Where(sb.E("metric_name", metricName))
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
var lastReceived sql.NullInt64
|
||||
err := db.QueryRow(ctx, query, args...).Scan(&lastReceived)
|
||||
if err != nil {
|
||||
return 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to get last received timestamp")
|
||||
}
|
||||
|
||||
if !lastReceived.Valid {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
return uint64(lastReceived.Int64), nil
|
||||
}
|
||||
|
||||
// getTotalTimeSeriesForMetricName returns the total number of unique time series for a metric.
|
||||
func (m *module) getTotalTimeSeriesForMetricName(ctx context.Context, metricName string) (uint64, error) {
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("uniq(fingerprint) AS time_series_count")
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.TimeseriesV41weekTableName))
|
||||
sb.Where(sb.E("metric_name", metricName))
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
var timeSeriesCount uint64
|
||||
err := db.QueryRow(ctx, query, args...).Scan(&timeSeriesCount)
|
||||
if err != nil {
|
||||
return 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to get total time series count")
|
||||
}
|
||||
|
||||
return timeSeriesCount, nil
|
||||
}
|
||||
|
||||
// getActiveTimeSeriesForMetricName returns the number of active time series for a metric within the given duration.
|
||||
func (m *module) getActiveTimeSeriesForMetricName(ctx context.Context, metricName string, duration time.Duration) (uint64, error) {
|
||||
milli := time.Now().Add(-duration).UnixMilli()
|
||||
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select("uniq(fingerprint) AS active_time_series")
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.TimeseriesV4TableName))
|
||||
sb.Where(sb.E("metric_name", metricName))
|
||||
sb.Where(sb.GTE("unix_milli", milli))
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
var activeTimeSeries uint64
|
||||
err := db.QueryRow(ctx, query, args...).Scan(&activeTimeSeries)
|
||||
if err != nil {
|
||||
return 0, errors.WrapInternalf(err, errors.CodeInternal, "failed to get active time series count")
|
||||
}
|
||||
|
||||
return activeTimeSeries, nil
|
||||
}
|
||||
|
||||
func (m *module) fetchMetricAttributes(ctx context.Context, metricName string, start, end *int64) ([]metricsexplorertypes.MetricAttribute, error) {
|
||||
// Build query using sqlbuilder
|
||||
sb := sqlbuilder.NewSelectBuilder()
|
||||
sb.Select(
|
||||
"attr_name AS key",
|
||||
"groupUniqArray(1000)(attr_string_value) AS values",
|
||||
"uniq(attr_string_value) AS valueCount",
|
||||
)
|
||||
sb.From(fmt.Sprintf("%s.%s", telemetrymetrics.DBName, telemetrymetrics.AttributesMetadataTableName))
|
||||
sb.Where(sb.E("metric_name", metricName))
|
||||
sb.Where("NOT startsWith(attr_name, '__')")
|
||||
|
||||
// Add time range filtering if provided
|
||||
if start != nil {
|
||||
// Filter by start time: attributes that were active at or after start time
|
||||
sb.Where(sb.GE("last_reported_unix_milli", *start))
|
||||
}
|
||||
if end != nil {
|
||||
// Filter by end time: attributes that were active at or before end time
|
||||
sb.Where(sb.LE("first_reported_unix_milli", *end))
|
||||
}
|
||||
|
||||
sb.GroupBy("attr_name")
|
||||
sb.OrderBy("valueCount DESC")
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
db := m.telemetryStore.ClickhouseDB()
|
||||
rows, err := db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to fetch metric attributes")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
attributes := make([]metricsexplorertypes.MetricAttribute, 0)
|
||||
for rows.Next() {
|
||||
var attr metricsexplorertypes.MetricAttribute
|
||||
if err := rows.Scan(&attr.Key, &attr.Values, &attr.ValueCount); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "failed to scan metric attribute row")
|
||||
}
|
||||
attributes = append(attributes, attr)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, errors.WrapInternalf(err, errors.CodeInternal, "error iterating metric attribute rows")
|
||||
}
|
||||
|
||||
return attributes, nil
|
||||
}
|
||||
|
||||
@@ -13,7 +13,9 @@ type Handler interface {
|
||||
GetStats(http.ResponseWriter, *http.Request)
|
||||
GetTreemap(http.ResponseWriter, *http.Request)
|
||||
GetMetricMetadata(http.ResponseWriter, *http.Request)
|
||||
GetMetricAttributes(http.ResponseWriter, *http.Request)
|
||||
UpdateMetricMetadata(http.ResponseWriter, *http.Request)
|
||||
GetMetricHighlights(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// Module represents the metrics module interface.
|
||||
@@ -22,4 +24,6 @@ type Module interface {
|
||||
GetTreemap(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.TreemapRequest) (*metricsexplorertypes.TreemapResponse, error)
|
||||
GetMetricMetadataMulti(ctx context.Context, orgID valuer.UUID, metricNames []string) (map[string]*metricsexplorertypes.MetricMetadata, error)
|
||||
UpdateMetricMetadata(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.UpdateMetricMetadataRequest) error
|
||||
GetMetricHighlights(ctx context.Context, orgID valuer.UUID, metricName string) (*metricsexplorertypes.MetricHighlightsResponse, error)
|
||||
GetMetricAttributes(ctx context.Context, orgID valuer.UUID, req *metricsexplorertypes.MetricAttributesRequest) (*metricsexplorertypes.MetricAttributesResponse, error)
|
||||
}
|
||||
|
||||
@@ -6252,17 +6252,6 @@ LIMIT 40`, // added rand to get diff value every time we run this query
|
||||
return fingerprints, nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) DeleteMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricName string) *model.ApiError {
|
||||
delQuery := fmt.Sprintf(`ALTER TABLE %s.%s DELETE WHERE metric_name = ?;`, signozMetricDBName, signozUpdatedMetricsMetadataLocalTable)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
err := r.db.Exec(valueCtx, delQuery, metricName)
|
||||
if err != nil {
|
||||
return &model.ApiError{Typ: "ClickHouseError", Err: err}
|
||||
}
|
||||
r.cache.Delete(ctx, orgID, constants.UpdatedMetricsMetadataCachePrefix+metricName)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, orgID valuer.UUID, req *model.UpdateMetricsMetadata) *model.ApiError {
|
||||
if req.MetricType == v3.MetricTypeHistogram {
|
||||
labels := []string{"le"}
|
||||
@@ -6292,10 +6281,7 @@ func (r *ClickHouseReader) UpdateMetricsMetadata(ctx context.Context, orgID valu
|
||||
}
|
||||
}
|
||||
|
||||
apiErr := r.DeleteMetricsMetadata(ctx, orgID, req.MetricName)
|
||||
if apiErr != nil {
|
||||
return apiErr
|
||||
}
|
||||
// Insert new metadata (keeping history of all updates)
|
||||
insertQuery := fmt.Sprintf(`INSERT INTO %s.%s (metric_name, temporality, is_monotonic, type, description, unit, created_at)
|
||||
VALUES ( ?, ?, ?, ?, ?, ?, ?);`, signozMetricDBName, signozUpdatedMetricsMetadataTable)
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
@@ -6364,9 +6350,19 @@ func (r *ClickHouseReader) GetUpdatedMetricsMetadata(ctx context.Context, orgID
|
||||
var stillMissing []string
|
||||
if len(missingMetrics) > 0 {
|
||||
metricList := "'" + strings.Join(missingMetrics, "', '") + "'"
|
||||
query := fmt.Sprintf(`SELECT metric_name, type, description, temporality, is_monotonic, unit
|
||||
FROM %s.%s
|
||||
WHERE metric_name IN (%s);`, signozMetricDBName, signozUpdatedMetricsMetadataTable, metricList)
|
||||
query := fmt.Sprintf(`SELECT
|
||||
metric_name,
|
||||
argMax(type, created_at) AS type,
|
||||
argMax(description, created_at) AS description,
|
||||
argMax(temporality, created_at) AS temporality,
|
||||
argMax(is_monotonic, created_at) AS is_monotonic,
|
||||
argMax(unit, created_at) AS unit
|
||||
FROM %s.%s
|
||||
WHERE metric_name IN (%s)
|
||||
GROUP BY metric_name;`,
|
||||
signozMetricDBName,
|
||||
signozUpdatedMetricsMetadataTable,
|
||||
metricList)
|
||||
|
||||
valueCtx := context.WithValue(ctx, "clickhouse_max_threads", constants.MetricsExplorerClickhouseThreads)
|
||||
rows, err := r.db.Query(valueCtx, query)
|
||||
|
||||
@@ -668,10 +668,12 @@ func (ah *APIHandler) MetricExplorerRoutes(router *mux.Router, am *middleware.Au
|
||||
am.ViewAccess(ah.UpdateMetricsMetadata)).
|
||||
Methods(http.MethodPost)
|
||||
// v2 endpoints
|
||||
router.HandleFunc("/api/v2/metrics/stats", am.ViewAccess(ah.Signoz.Handlers.Metrics.GetStats)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/treemap", am.ViewAccess(ah.Signoz.Handlers.Metrics.GetTreemap)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/metadata", am.ViewAccess(ah.Signoz.Handlers.Metrics.GetMetricMetadata)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v2/metrics/{metric_name}/metadata", am.ViewAccess(ah.Signoz.Handlers.Metrics.UpdateMetricMetadata)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/stats", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetStats)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/treemap", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetTreemap)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/attributes", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricAttributes)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metrics/metadata", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricMetadata)).Methods(http.MethodGet)
|
||||
router.HandleFunc("/api/v2/metrics/{metric_name}/metadata", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.UpdateMetricMetadata)).Methods(http.MethodPost)
|
||||
router.HandleFunc("/api/v2/metric/highlights", am.ViewAccess(ah.Signoz.Handlers.MetricsExplorer.GetMetricHighlights)).Methods(http.MethodGet)
|
||||
}
|
||||
|
||||
func Intersection(a, b []int) (c []int) {
|
||||
|
||||
@@ -3,16 +3,15 @@ package metricsexplorer
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/constants"
|
||||
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model"
|
||||
"github.com/SigNoz/signoz/pkg/query-service/model/metrics_explorer"
|
||||
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
|
||||
)
|
||||
|
||||
func ParseFilterKeySuggestions(r *http.Request) (*metrics_explorer.FilterKeyRequest, *model.ApiError) {
|
||||
|
||||
@@ -127,7 +127,6 @@ type Reader interface {
|
||||
GetInspectMetricsFingerprints(ctx context.Context, attributes []string, req *metrics_explorer.InspectMetricsRequest) ([]string, *model.ApiError)
|
||||
GetInspectMetrics(ctx context.Context, req *metrics_explorer.InspectMetricsRequest, fingerprints []string) (*metrics_explorer.InspectMetricsResponse, *model.ApiError)
|
||||
|
||||
DeleteMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricName string) *model.ApiError
|
||||
UpdateMetricsMetadata(ctx context.Context, orgID valuer.UUID, req *model.UpdateMetricsMetadata) *model.ApiError
|
||||
GetUpdatedMetricsMetadata(ctx context.Context, orgID valuer.UUID, metricNames ...string) (map[string]*model.UpdateMetricsMetadata, *model.ApiError)
|
||||
|
||||
|
||||
@@ -198,7 +198,6 @@ func (v *exprVisitor) VisitFunctionExpr(fn *chparser.FunctionExpr) error {
|
||||
FieldMapper: v.fieldMapper,
|
||||
ConditionBuilder: v.conditionBuilder,
|
||||
FullTextColumn: v.fullTextColumn,
|
||||
JsonBodyPrefix: v.jsonBodyPrefix,
|
||||
JsonKeyToKey: v.jsonKeyToKey,
|
||||
}, 0, 0,
|
||||
)
|
||||
|
||||
17
pkg/querybuilder/constants.go
Normal file
17
pkg/querybuilder/constants.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package querybuilder
|
||||
|
||||
import (
|
||||
"os"
|
||||
)
|
||||
|
||||
var (
|
||||
BodyJSONQueryEnabled = GetOrDefaultEnv("BODY_JSON_QUERY_ENABLED", "false") == "true"
|
||||
)
|
||||
|
||||
func GetOrDefaultEnv(key string, fallback string) string {
|
||||
v := os.Getenv(key)
|
||||
if len(v) == 0 {
|
||||
return fallback
|
||||
}
|
||||
return v
|
||||
}
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
)
|
||||
|
||||
func TestQueryToKeys(t *testing.T) {
|
||||
|
||||
testCases := []struct {
|
||||
query string
|
||||
expectedKeys []telemetrytypes.FieldKeySelector
|
||||
@@ -66,9 +65,9 @@ func TestQueryToKeys(t *testing.T) {
|
||||
query: `body.user_ids[*] = 123`,
|
||||
expectedKeys: []telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "body.user_ids[*]",
|
||||
Name: "user_ids[*]",
|
||||
Signal: telemetrytypes.SignalUnspecified,
|
||||
FieldContext: telemetrytypes.FieldContextUnspecified,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: telemetrytypes.FieldDataTypeUnspecified,
|
||||
},
|
||||
},
|
||||
|
||||
@@ -162,7 +162,6 @@ func (b *resourceFilterStatementBuilder[T]) addConditions(
|
||||
ConditionBuilder: b.conditionBuilder,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: b.fullTextColumn,
|
||||
JsonBodyPrefix: b.jsonBodyPrefix,
|
||||
JsonKeyToKey: b.jsonKeyToKey,
|
||||
SkipFullTextFilter: true,
|
||||
SkipFunctionCalls: true,
|
||||
|
||||
@@ -33,7 +33,6 @@ type filterExpressionVisitor struct {
|
||||
mainErrorURL string
|
||||
builder *sqlbuilder.SelectBuilder
|
||||
fullTextColumn *telemetrytypes.TelemetryFieldKey
|
||||
jsonBodyPrefix string
|
||||
jsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
||||
skipResourceFilter bool
|
||||
skipFullTextFilter bool
|
||||
@@ -53,7 +52,6 @@ type FilterExprVisitorOpts struct {
|
||||
FieldKeys map[string][]*telemetrytypes.TelemetryFieldKey
|
||||
Builder *sqlbuilder.SelectBuilder
|
||||
FullTextColumn *telemetrytypes.TelemetryFieldKey
|
||||
JsonBodyPrefix string
|
||||
JsonKeyToKey qbtypes.JsonKeyToFieldFunc
|
||||
SkipResourceFilter bool
|
||||
SkipFullTextFilter bool
|
||||
@@ -73,7 +71,6 @@ func newFilterExpressionVisitor(opts FilterExprVisitorOpts) *filterExpressionVis
|
||||
fieldKeys: opts.FieldKeys,
|
||||
builder: opts.Builder,
|
||||
fullTextColumn: opts.FullTextColumn,
|
||||
jsonBodyPrefix: opts.JsonBodyPrefix,
|
||||
jsonKeyToKey: opts.JsonKeyToKey,
|
||||
skipResourceFilter: opts.SkipResourceFilter,
|
||||
skipFullTextFilter: opts.SkipFullTextFilter,
|
||||
@@ -173,7 +170,7 @@ func PrepareWhereClause(query string, opts FilterExprVisitorOpts, startNs uint64
|
||||
|
||||
whereClause := sqlbuilder.NewWhereClause().AddWhereExpr(visitor.builder.Args, cond)
|
||||
|
||||
return &PreparedWhereClause{whereClause, visitor.warnings, visitor.mainWarnURL}, nil
|
||||
return &PreparedWhereClause{WhereClause: whereClause, Warnings: visitor.warnings, WarningsDocURL: visitor.mainWarnURL}, nil
|
||||
}
|
||||
|
||||
// Visit dispatches to the specific visit method based on node type
|
||||
@@ -718,7 +715,7 @@ func (v *filterExpressionVisitor) VisitFunctionCall(ctx *grammar.FunctionCallCon
|
||||
conds = append(conds, fmt.Sprintf("hasToken(LOWER(%s), LOWER(%s))", key.Name, v.builder.Var(value[0])))
|
||||
} else {
|
||||
// this is that all other functions only support array fields
|
||||
if strings.HasPrefix(key.Name, v.jsonBodyPrefix) {
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
fieldName, _ = v.jsonKeyToKey(context.Background(), key, qbtypes.FilterOperatorUnknown, value)
|
||||
} else {
|
||||
// TODO(add docs for json body search)
|
||||
@@ -809,10 +806,8 @@ func (v *filterExpressionVisitor) VisitValue(ctx *grammar.ValueContext) any {
|
||||
|
||||
// VisitKey handles field/column references
|
||||
func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
|
||||
fieldKey := telemetrytypes.GetFieldKeyFromKeyText(ctx.GetText())
|
||||
|
||||
keyName := strings.TrimPrefix(fieldKey.Name, v.jsonBodyPrefix)
|
||||
keyName := fieldKey.Name
|
||||
|
||||
fieldKeysForName := v.fieldKeys[keyName]
|
||||
|
||||
@@ -846,10 +841,11 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
// if there is a field with the same name as attribute/resource attribute
|
||||
// Since it will ORed with the fieldKeysForName, it will not result empty
|
||||
// when either of them have values
|
||||
if strings.HasPrefix(fieldKey.Name, v.jsonBodyPrefix) && v.jsonBodyPrefix != "" {
|
||||
if keyName != "" {
|
||||
fieldKeysForName = append(fieldKeysForName, &fieldKey)
|
||||
}
|
||||
// Note: Skip this logic if body json query is enabled so we can look up the key inside fields
|
||||
//
|
||||
// TODO(Piyush): After entire migration this is supposed to be removed.
|
||||
if !BodyJSONQueryEnabled && fieldKey.FieldContext == telemetrytypes.FieldContextBody {
|
||||
fieldKeysForName = append(fieldKeysForName, &fieldKey)
|
||||
}
|
||||
|
||||
if len(fieldKeysForName) == 0 {
|
||||
@@ -860,7 +856,7 @@ func (v *filterExpressionVisitor) VisitKey(ctx *grammar.KeyContext) any {
|
||||
return v.fieldKeys[keyWithContext]
|
||||
}
|
||||
|
||||
if strings.HasPrefix(fieldKey.Name, v.jsonBodyPrefix) && v.jsonBodyPrefix != "" && keyName == "" {
|
||||
if fieldKey.FieldContext == telemetrytypes.FieldContextBody && keyName == "" {
|
||||
v.errors = append(v.errors, "missing key for body json search - expected key of the form `body.key` (ex: `body.status`)")
|
||||
} else if !v.ignoreNotFoundKeys {
|
||||
// TODO(srikanthccv): do we want to return an error here?
|
||||
|
||||
@@ -35,37 +35,37 @@ import (
|
||||
)
|
||||
|
||||
type Handlers struct {
|
||||
Organization organization.Handler
|
||||
Preference preference.Handler
|
||||
User user.Handler
|
||||
SavedView savedview.Handler
|
||||
Apdex apdex.Handler
|
||||
Dashboard dashboard.Handler
|
||||
QuickFilter quickfilter.Handler
|
||||
TraceFunnel tracefunnel.Handler
|
||||
RawDataExport rawdataexport.Handler
|
||||
AuthDomain authdomain.Handler
|
||||
Session session.Handler
|
||||
SpanPercentile spanpercentile.Handler
|
||||
Services services.Handler
|
||||
Metrics metricsexplorer.Handler
|
||||
Organization organization.Handler
|
||||
Preference preference.Handler
|
||||
User user.Handler
|
||||
SavedView savedview.Handler
|
||||
Apdex apdex.Handler
|
||||
Dashboard dashboard.Handler
|
||||
QuickFilter quickfilter.Handler
|
||||
TraceFunnel tracefunnel.Handler
|
||||
RawDataExport rawdataexport.Handler
|
||||
AuthDomain authdomain.Handler
|
||||
Session session.Handler
|
||||
SpanPercentile spanpercentile.Handler
|
||||
Services services.Handler
|
||||
MetricsExplorer metricsexplorer.Handler
|
||||
}
|
||||
|
||||
func NewHandlers(modules Modules, providerSettings factory.ProviderSettings, querier querier.Querier, licensing licensing.Licensing) Handlers {
|
||||
return Handlers{
|
||||
Organization: implorganization.NewHandler(modules.OrgGetter, modules.OrgSetter),
|
||||
Preference: implpreference.NewHandler(modules.Preference),
|
||||
User: impluser.NewHandler(modules.User, modules.UserGetter),
|
||||
SavedView: implsavedview.NewHandler(modules.SavedView),
|
||||
Apdex: implapdex.NewHandler(modules.Apdex),
|
||||
Dashboard: impldashboard.NewHandler(modules.Dashboard, providerSettings, querier, licensing),
|
||||
QuickFilter: implquickfilter.NewHandler(modules.QuickFilter),
|
||||
TraceFunnel: impltracefunnel.NewHandler(modules.TraceFunnel),
|
||||
RawDataExport: implrawdataexport.NewHandler(modules.RawDataExport),
|
||||
AuthDomain: implauthdomain.NewHandler(modules.AuthDomain),
|
||||
Session: implsession.NewHandler(modules.Session),
|
||||
Services: implservices.NewHandler(modules.Services),
|
||||
Metrics: implmetricsexplorer.NewHandler(modules.Metrics),
|
||||
SpanPercentile: implspanpercentile.NewHandler(modules.SpanPercentile),
|
||||
Organization: implorganization.NewHandler(modules.OrgGetter, modules.OrgSetter),
|
||||
Preference: implpreference.NewHandler(modules.Preference),
|
||||
User: impluser.NewHandler(modules.User, modules.UserGetter),
|
||||
SavedView: implsavedview.NewHandler(modules.SavedView),
|
||||
Apdex: implapdex.NewHandler(modules.Apdex),
|
||||
Dashboard: impldashboard.NewHandler(modules.Dashboard, providerSettings, querier, licensing),
|
||||
QuickFilter: implquickfilter.NewHandler(modules.QuickFilter),
|
||||
TraceFunnel: impltracefunnel.NewHandler(modules.TraceFunnel),
|
||||
RawDataExport: implrawdataexport.NewHandler(modules.RawDataExport),
|
||||
AuthDomain: implauthdomain.NewHandler(modules.AuthDomain),
|
||||
Session: implsession.NewHandler(modules.Session),
|
||||
Services: implservices.NewHandler(modules.Services),
|
||||
MetricsExplorer: implmetricsexplorer.NewHandler(modules.MetricsExplorer),
|
||||
SpanPercentile: implspanpercentile.NewHandler(modules.SpanPercentile),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,22 +47,22 @@ import (
|
||||
)
|
||||
|
||||
type Modules struct {
|
||||
OrgGetter organization.Getter
|
||||
OrgSetter organization.Setter
|
||||
Preference preference.Module
|
||||
User user.Module
|
||||
UserGetter user.Getter
|
||||
SavedView savedview.Module
|
||||
Apdex apdex.Module
|
||||
Dashboard dashboard.Module
|
||||
QuickFilter quickfilter.Module
|
||||
TraceFunnel tracefunnel.Module
|
||||
RawDataExport rawdataexport.Module
|
||||
AuthDomain authdomain.Module
|
||||
Session session.Module
|
||||
Services services.Module
|
||||
SpanPercentile spanpercentile.Module
|
||||
Metrics metricsexplorer.Module
|
||||
OrgGetter organization.Getter
|
||||
OrgSetter organization.Setter
|
||||
Preference preference.Module
|
||||
User user.Module
|
||||
UserGetter user.Getter
|
||||
SavedView savedview.Module
|
||||
Apdex apdex.Module
|
||||
Dashboard dashboard.Module
|
||||
QuickFilter quickfilter.Module
|
||||
TraceFunnel tracefunnel.Module
|
||||
RawDataExport rawdataexport.Module
|
||||
AuthDomain authdomain.Module
|
||||
Session session.Module
|
||||
Services services.Module
|
||||
SpanPercentile spanpercentile.Module
|
||||
MetricsExplorer metricsexplorer.Module
|
||||
}
|
||||
|
||||
func NewModules(
|
||||
@@ -86,21 +86,21 @@ func NewModules(
|
||||
userGetter := impluser.NewGetter(impluser.NewStore(sqlstore, providerSettings))
|
||||
|
||||
return Modules{
|
||||
OrgGetter: orgGetter,
|
||||
OrgSetter: orgSetter,
|
||||
Preference: implpreference.NewModule(implpreference.NewStore(sqlstore), preferencetypes.NewAvailablePreference()),
|
||||
SavedView: implsavedview.NewModule(sqlstore),
|
||||
Apdex: implapdex.NewModule(sqlstore),
|
||||
Dashboard: impldashboard.NewModule(sqlstore, providerSettings, analytics, orgGetter, implrole.NewModule(implrole.NewStore(sqlstore), authz, nil)),
|
||||
User: user,
|
||||
UserGetter: userGetter,
|
||||
QuickFilter: quickfilter,
|
||||
TraceFunnel: impltracefunnel.NewModule(impltracefunnel.NewStore(sqlstore)),
|
||||
RawDataExport: implrawdataexport.NewModule(querier),
|
||||
AuthDomain: implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs),
|
||||
Session: implsession.NewModule(providerSettings, authNs, user, userGetter, implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs), tokenizer, orgGetter),
|
||||
SpanPercentile: implspanpercentile.NewModule(querier, providerSettings),
|
||||
Services: implservices.NewModule(querier, telemetryStore),
|
||||
Metrics: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, providerSettings),
|
||||
OrgGetter: orgGetter,
|
||||
OrgSetter: orgSetter,
|
||||
Preference: implpreference.NewModule(implpreference.NewStore(sqlstore), preferencetypes.NewAvailablePreference()),
|
||||
SavedView: implsavedview.NewModule(sqlstore),
|
||||
Apdex: implapdex.NewModule(sqlstore),
|
||||
Dashboard: impldashboard.NewModule(sqlstore, providerSettings, analytics, orgGetter, implrole.NewModule(implrole.NewStore(sqlstore), authz, nil)),
|
||||
User: user,
|
||||
UserGetter: userGetter,
|
||||
QuickFilter: quickfilter,
|
||||
TraceFunnel: impltracefunnel.NewModule(impltracefunnel.NewStore(sqlstore)),
|
||||
RawDataExport: implrawdataexport.NewModule(querier),
|
||||
AuthDomain: implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs),
|
||||
Session: implsession.NewModule(providerSettings, authNs, user, userGetter, implauthdomain.NewModule(implauthdomain.NewStore(sqlstore), authNs), tokenizer, orgGetter),
|
||||
SpanPercentile: implspanpercentile.NewModule(querier, providerSettings),
|
||||
Services: implservices.NewModule(querier, telemetryStore),
|
||||
MetricsExplorer: implmetricsexplorer.NewModule(telemetryStore, telemetryMetadataStore, cache, providerSettings),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
schema "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
@@ -52,7 +51,8 @@ func (c *conditionBuilder) conditionFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
if strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
// Check if this is a body JSON search - either by FieldContext
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
tblFieldName, value = GetBodyJSONKey(ctx, key, operator, value)
|
||||
}
|
||||
|
||||
@@ -164,7 +164,8 @@ func (c *conditionBuilder) conditionFor(
|
||||
// key membership checks, so depending on the column type, the condition changes
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
|
||||
if strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
// Check if this is a body JSON search - by FieldContext
|
||||
if key.FieldContext == telemetrytypes.FieldContextBody {
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return GetBodyJSONKeyForExists(ctx, key, operator, value), nil
|
||||
} else {
|
||||
@@ -173,45 +174,57 @@ func (c *conditionBuilder) conditionFor(
|
||||
}
|
||||
|
||||
var value any
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(tblFieldName), nil
|
||||
} else {
|
||||
return sb.IsNull(tblFieldName), nil
|
||||
}
|
||||
case schema.ColumnTypeString, schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString}:
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
}
|
||||
return sb.E(tblFieldName, value), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.ColumnTypeUInt64, schema.ColumnTypeUInt32, schema.ColumnTypeUInt8:
|
||||
case schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
value = 0
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
@@ -238,7 +251,7 @@ func (c *conditionBuilder) ConditionFor(
|
||||
// skip adding exists filter for intrinsic fields
|
||||
// with an exception for body json search
|
||||
field, _ := c.fm.FieldFor(ctx, key)
|
||||
if slices.Contains(maps.Keys(IntrinsicFields), field) && !strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
if slices.Contains(maps.Keys(IntrinsicFields), field) && key.FieldContext != telemetrytypes.FieldContextBody {
|
||||
return condition, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -465,7 +465,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: 200,
|
||||
@@ -475,7 +476,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - float64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.duration_ms",
|
||||
Name: "duration_ms",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: 405.5,
|
||||
@@ -485,7 +487,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.method",
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: "GET",
|
||||
@@ -495,7 +498,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Equal operator - bool",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.success",
|
||||
Name: "http.success",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorEqual,
|
||||
value: true,
|
||||
@@ -505,7 +509,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Exists operator",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorExists,
|
||||
value: nil,
|
||||
@@ -515,7 +520,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Exists operator",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotExists,
|
||||
value: nil,
|
||||
@@ -525,7 +531,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Greater than operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorGreaterThan,
|
||||
value: "200",
|
||||
@@ -535,7 +542,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Greater than operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorGreaterThan,
|
||||
value: 200,
|
||||
@@ -545,7 +553,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Less than operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorLessThan,
|
||||
value: "300",
|
||||
@@ -555,7 +564,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Less than operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorLessThan,
|
||||
value: 300,
|
||||
@@ -565,7 +575,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Contains operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorContains,
|
||||
value: "200",
|
||||
@@ -575,7 +586,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Contains operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotContains,
|
||||
value: "200",
|
||||
@@ -585,7 +597,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Between operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorBetween,
|
||||
value: []any{"200", "300"},
|
||||
@@ -595,7 +608,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Between operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorBetween,
|
||||
value: []any{400, 500},
|
||||
@@ -605,7 +619,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "In operator - string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorIn,
|
||||
value: []any{"200", "300"},
|
||||
@@ -615,7 +630,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "In operator - int64",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.status_code",
|
||||
Name: "http.status_code",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorIn,
|
||||
value: []any{401, 404, 500},
|
||||
@@ -625,7 +641,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Regexp operator - json body string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.method",
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorRegexp,
|
||||
value: "GET|POST|PUT",
|
||||
@@ -635,7 +652,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Regexp operator - json body string",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.http.method",
|
||||
Name: "http.method",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotRegexp,
|
||||
value: "DELETE|PATCH",
|
||||
@@ -645,7 +663,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Regexp operator - json body with dots in path",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.user.email",
|
||||
Name: "user.email",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorRegexp,
|
||||
value: "^.*@example\\.com$",
|
||||
@@ -655,7 +674,8 @@ func TestConditionForJSONBodySearch(t *testing.T) {
|
||||
{
|
||||
name: "Not Regexp operator - json body nested path",
|
||||
key: telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body.response.headers.content-type",
|
||||
Name: "response.headers.content-type",
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
},
|
||||
operator: qbtypes.FilterOperatorNotRegexp,
|
||||
value: "^text/.*",
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package telemetrylogs
|
||||
|
||||
import (
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz-otel-collector/exporter/jsontypeexporter"
|
||||
qbtypes "github.com/SigNoz/signoz/pkg/types/querybuildertypes/querybuildertypesv5"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
)
|
||||
@@ -16,6 +18,8 @@ const (
|
||||
LogsV2TimestampColumn = "timestamp"
|
||||
LogsV2ObservedTimestampColumn = "observed_timestamp"
|
||||
LogsV2BodyColumn = "body"
|
||||
LogsV2BodyJSONColumn = constants.BodyJSONColumn
|
||||
LogsV2BodyPromotedColumn = constants.BodyPromotedColumn
|
||||
LogsV2TraceIDColumn = "trace_id"
|
||||
LogsV2SpanIDColumn = "span_id"
|
||||
LogsV2TraceFlagsColumn = "trace_flags"
|
||||
@@ -30,6 +34,11 @@ const (
|
||||
LogsV2AttributesBoolColumn = "attributes_bool"
|
||||
LogsV2ResourcesStringColumn = "resources_string"
|
||||
LogsV2ScopeStringColumn = "scope_string"
|
||||
|
||||
BodyJSONColumnPrefix = constants.BodyJSONColumnPrefix
|
||||
BodyPromotedColumnPrefix = constants.BodyPromotedColumnPrefix
|
||||
ArraySep = jsontypeexporter.ArraySeparator
|
||||
ArrayAnyIndex = "[*]."
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -82,10 +82,13 @@ func (m *fieldMapper) getColumn(_ context.Context, key *telemetrytypes.Telemetry
|
||||
case telemetrytypes.FieldDataTypeBool:
|
||||
return logsV2Columns["attributes_bool"], nil
|
||||
}
|
||||
case telemetrytypes.FieldContextBody:
|
||||
// body context fields are stored in the body column
|
||||
return logsV2Columns["body"], nil
|
||||
case telemetrytypes.FieldContextLog, telemetrytypes.FieldContextUnspecified:
|
||||
col, ok := logsV2Columns[key.Name]
|
||||
if !ok {
|
||||
// check if the key has body JSON search
|
||||
// check if the key has body JSON search (backward compatibility)
|
||||
if strings.HasPrefix(key.Name, BodyJSONStringSearchPrefix) {
|
||||
return logsV2Columns["body"], nil
|
||||
}
|
||||
@@ -103,8 +106,8 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
|
||||
return "", err
|
||||
}
|
||||
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
@@ -121,40 +124,32 @@ func (m *fieldMapper) FieldFor(ctx context.Context, key *telemetrytypes.Telemetr
|
||||
} else {
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
}
|
||||
|
||||
case schema.ColumnTypeString,
|
||||
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
schema.ColumnTypeUInt64,
|
||||
schema.ColumnTypeUInt32,
|
||||
schema.ColumnTypeUInt8:
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64, schema.ColumnTypeEnumUInt32, schema.ColumnTypeEnumUInt8:
|
||||
return column.Name, nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
}
|
||||
// should not reach here
|
||||
return column.Name, nil
|
||||
|
||||
@@ -21,7 +21,6 @@ func TestLikeAndILikeWithoutWildcards_Warns(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
@@ -58,7 +57,6 @@ func TestLikeAndILikeWithWildcards_NoWarn(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
|
||||
@@ -27,8 +27,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
|
||||
FullTextColumn: &telemetrytypes.TelemetryFieldKey{
|
||||
Name: "body",
|
||||
},
|
||||
JsonBodyPrefix: "body",
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
@@ -163,7 +162,7 @@ func TestFilterExprLogsBodyJSON(t *testing.T) {
|
||||
for _, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.category, limitString(tc.query, 50)), func(t *testing.T) {
|
||||
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
clause, err := querybuilder.PrepareWhereClause(tc.query, opts, 0, 0)
|
||||
|
||||
if tc.shouldPass {
|
||||
if err != nil {
|
||||
|
||||
@@ -27,7 +27,6 @@ func TestFilterExprLogs(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
@@ -2448,7 +2447,6 @@ func TestFilterExprLogsConflictNegation(t *testing.T) {
|
||||
ConditionBuilder: cb,
|
||||
FieldKeys: keys,
|
||||
FullTextColumn: DefaultFullTextColumn,
|
||||
JsonBodyPrefix: BodyJSONStringSearchPrefix,
|
||||
JsonKeyToKey: GetBodyJSONKey,
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ func inferDataType(value any, operator qbtypes.FilterOperator, key *telemetrytyp
|
||||
}
|
||||
|
||||
func getBodyJSONPath(key *telemetrytypes.TelemetryFieldKey) string {
|
||||
parts := strings.Split(key.Name, ".")[1:]
|
||||
parts := strings.Split(key.Name, ".")
|
||||
newParts := []string{}
|
||||
for _, part := range parts {
|
||||
if strings.HasSuffix(part, "[*]") {
|
||||
|
||||
@@ -589,10 +589,9 @@ func (b *logQueryStatementBuilder) addFilterCondition(
|
||||
FieldKeys: keys,
|
||||
SkipResourceFilter: true,
|
||||
FullTextColumn: b.fullTextColumn,
|
||||
JsonBodyPrefix: b.jsonBodyPrefix,
|
||||
JsonKeyToKey: b.jsonKeyToKey,
|
||||
Variables: variables,
|
||||
}, start, end)
|
||||
}, start, end)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -8,4 +8,7 @@ const (
|
||||
TagAttributesV2LocalTableName = "tag_attributes_v2"
|
||||
LogAttributeKeysTblName = "distributed_logs_attribute_keys"
|
||||
LogResourceKeysTblName = "distributed_logs_resource_keys"
|
||||
PathTypesTableName = "distributed_json_path_types"
|
||||
PromotedPathsTableName = "distributed_json_promoted_paths"
|
||||
SkipIndexTableName = "system.data_skipping_indices"
|
||||
)
|
||||
|
||||
496
pkg/telemetrymetadata/body_json_metadata.go
Normal file
496
pkg/telemetrymetadata/body_json_metadata.go
Normal file
@@ -0,0 +1,496 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ClickHouse/clickhouse-go/v2"
|
||||
"github.com/ClickHouse/clickhouse-go/v2/lib/chcol"
|
||||
schemamigrator "github.com/SigNoz/signoz-otel-collector/cmd/signozschemamigrator/schema_migrator"
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz-otel-collector/utils"
|
||||
"github.com/SigNoz/signoz/pkg/errors"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrystore"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/huandu/go-sqlbuilder"
|
||||
)
|
||||
|
||||
var (
|
||||
defaultPathLimit = 100 // Default limit to prevent full table scans
|
||||
|
||||
CodeUnknownJSONDataType = errors.MustNewCode("unknown_json_data_type")
|
||||
CodeFailLoadPromotedPaths = errors.MustNewCode("fail_load_promoted_paths")
|
||||
CodeFailCheckPathPromoted = errors.MustNewCode("fail_check_path_promoted")
|
||||
CodeFailIterateBodyJSONKeys = errors.MustNewCode("fail_iterate_body_json_keys")
|
||||
CodeFailExtractBodyJSONKeys = errors.MustNewCode("fail_extract_body_json_keys")
|
||||
CodeFailLoadLogsJSONIndexes = errors.MustNewCode("fail_load_logs_json_indexes")
|
||||
CodeFailListJSONValues = errors.MustNewCode("fail_list_json_values")
|
||||
CodeFailScanJSONValue = errors.MustNewCode("fail_scan_json_value")
|
||||
CodeFailScanVariant = errors.MustNewCode("fail_scan_variant")
|
||||
CodeFailBuildJSONPathsQuery = errors.MustNewCode("fail_build_json_paths_query")
|
||||
CodeNoPathsToQueryIndexes = errors.MustNewCode("no_paths_to_query_indexes_provided")
|
||||
)
|
||||
|
||||
// GetBodyJSONPaths extracts body JSON paths from the path_types table
|
||||
// This function can be used by both JSONQueryBuilder and metadata extraction
|
||||
// uniquePathLimit: 0 for no limit, >0 for maximum number of unique paths to return
|
||||
// - For startup load: set to 10000 to get top 10k unique paths
|
||||
// - For lookup: set to 0 (no limit needed for single path)
|
||||
// - For metadata API: set to desired pagination limit
|
||||
//
|
||||
// searchOperator: LIKE for pattern matching, EQUAL for exact match
|
||||
// Returns: (paths, error)
|
||||
// TODO(Piyush): Remove this lint skip
|
||||
//
|
||||
// nolint:unused
|
||||
func getBodyJSONPaths(ctx context.Context, telemetryStore telemetrystore.TelemetryStore,
|
||||
fieldKeySelectors []*telemetrytypes.FieldKeySelector) ([]*telemetrytypes.TelemetryFieldKey, bool, error) {
|
||||
|
||||
query, args, limit, err := buildGetBodyJSONPathsQuery(fieldKeySelectors)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
rows, err := telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to extract body JSON keys")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
fieldKeys := []*telemetrytypes.TelemetryFieldKey{}
|
||||
paths := []string{}
|
||||
rowCount := 0
|
||||
for rows.Next() {
|
||||
var path string
|
||||
var typesArray []string // ClickHouse returns array as []string
|
||||
var lastSeen uint64
|
||||
|
||||
err = rows.Scan(&path, &typesArray, &lastSeen)
|
||||
if err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailExtractBodyJSONKeys, "failed to scan body JSON key row")
|
||||
}
|
||||
|
||||
for _, typ := range typesArray {
|
||||
mapping, found := telemetrytypes.MappingStringToJSONDataType[typ]
|
||||
if !found {
|
||||
return nil, false, errors.NewInternalf(CodeUnknownJSONDataType, "failed to map type string to JSON data type: %s", typ)
|
||||
}
|
||||
fieldKeys = append(fieldKeys, &telemetrytypes.TelemetryFieldKey{
|
||||
Name: path,
|
||||
Signal: telemetrytypes.SignalLogs,
|
||||
FieldContext: telemetrytypes.FieldContextBody,
|
||||
FieldDataType: telemetrytypes.MappingJSONDataTypeToFieldDataType[mapping],
|
||||
JSONDataType: &mapping,
|
||||
})
|
||||
}
|
||||
|
||||
paths = append(paths, path)
|
||||
rowCount++
|
||||
}
|
||||
if rows.Err() != nil {
|
||||
return nil, false, errors.WrapInternalf(rows.Err(), CodeFailIterateBodyJSONKeys, "error iterating body JSON keys")
|
||||
}
|
||||
|
||||
promoted, err := GetPromotedPaths(ctx, telemetryStore.ClickhouseDB(), paths...)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
indexes, err := getJSONPathIndexes(ctx, telemetryStore, paths...)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
for _, fieldKey := range fieldKeys {
|
||||
fieldKey.Materialized = promoted.Contains(fieldKey.Name)
|
||||
fieldKey.Indexes = indexes[fieldKey.Name]
|
||||
}
|
||||
|
||||
return fieldKeys, rowCount <= limit, nil
|
||||
}
|
||||
|
||||
func buildGetBodyJSONPathsQuery(fieldKeySelectors []*telemetrytypes.FieldKeySelector) (string, []any, int, error) {
|
||||
if len(fieldKeySelectors) == 0 {
|
||||
return "", nil, defaultPathLimit, errors.NewInternalf(CodeFailBuildJSONPathsQuery, "no field key selectors provided")
|
||||
}
|
||||
from := fmt.Sprintf("%s.%s", DBName, PathTypesTableName)
|
||||
|
||||
// Build a better query using GROUP BY to deduplicate at database level
|
||||
// This aggregates all types per path and gets the max last_seen, then applies LIMIT
|
||||
sb := sqlbuilder.Select(
|
||||
"path",
|
||||
"groupArray(DISTINCT type) AS types",
|
||||
"max(last_seen) AS last_seen",
|
||||
).From(from)
|
||||
|
||||
limit := 0
|
||||
// Add search filter if provided
|
||||
orClauses := []string{}
|
||||
for _, fieldKeySelector := range fieldKeySelectors {
|
||||
// replace [*] with []
|
||||
fieldKeySelector.Name = strings.ReplaceAll(fieldKeySelector.Name, telemetrylogs.ArrayAnyIndex, telemetrylogs.ArraySep)
|
||||
// Extract search text for body JSON keys
|
||||
keyName := CleanPathPrefixes(fieldKeySelector.Name)
|
||||
if fieldKeySelector.SelectorMatchType == telemetrytypes.FieldSelectorMatchTypeExact {
|
||||
orClauses = append(orClauses, sb.Equal("path", keyName))
|
||||
} else {
|
||||
// Pattern matching for metadata API (defaults to LIKE behavior for other operators)
|
||||
orClauses = append(orClauses, sb.Like("path", querybuilder.FormatValueForContains(keyName)))
|
||||
}
|
||||
limit += fieldKeySelector.Limit
|
||||
}
|
||||
sb.Where(sb.Or(orClauses...))
|
||||
|
||||
// Group by path to get unique paths with aggregated types
|
||||
sb.GroupBy("path")
|
||||
|
||||
// Order by max last_seen to get most recent paths first
|
||||
sb.OrderBy("last_seen DESC")
|
||||
if limit == 0 {
|
||||
limit = defaultPathLimit
|
||||
}
|
||||
sb.Limit(limit)
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
return query, args, limit, nil
|
||||
}
|
||||
|
||||
// TODO(Piyush): Remove this lint skip
|
||||
//
|
||||
// nolint:unused
|
||||
func getJSONPathIndexes(ctx context.Context, telemetryStore telemetrystore.TelemetryStore, paths ...string) (map[string][]telemetrytypes.JSONDataTypeIndex, error) {
|
||||
filteredPaths := []string{}
|
||||
for _, path := range paths {
|
||||
if strings.Contains(path, telemetrylogs.ArraySep) || strings.Contains(path, telemetrylogs.ArrayAnyIndex) {
|
||||
continue
|
||||
}
|
||||
filteredPaths = append(filteredPaths, path)
|
||||
}
|
||||
if len(filteredPaths) == 0 {
|
||||
return nil, errors.NewInternalf(CodeNoPathsToQueryIndexes, "no paths to query indexes provided")
|
||||
}
|
||||
|
||||
// list indexes for the paths
|
||||
indexesMap, err := ListLogsJSONIndexes(ctx, telemetryStore, filteredPaths...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to list JSON path indexes")
|
||||
}
|
||||
|
||||
// build a set of indexes
|
||||
cleanIndexes := make(map[string][]telemetrytypes.JSONDataTypeIndex)
|
||||
for path, indexes := range indexesMap {
|
||||
for _, index := range indexes {
|
||||
columnExpr, columnType, err := schemamigrator.UnfoldJSONSubColumnIndexExpr(index.Expression)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to unfold JSON sub column index expression: %s", index.Expression)
|
||||
}
|
||||
|
||||
jsonDataType, found := telemetrytypes.MappingStringToJSONDataType[columnType]
|
||||
if !found {
|
||||
return nil, errors.NewInternalf(CodeUnknownJSONDataType, "failed to map column type to JSON data type: %s", columnType)
|
||||
}
|
||||
|
||||
if jsonDataType == telemetrytypes.String {
|
||||
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
|
||||
Type: telemetrytypes.String,
|
||||
ColumnExpression: columnExpr,
|
||||
IndexExpression: index.Expression,
|
||||
})
|
||||
} else if strings.HasPrefix(index.Type, "minmax") {
|
||||
cleanIndexes[path] = append(cleanIndexes[path], telemetrytypes.JSONDataTypeIndex{
|
||||
Type: jsonDataType,
|
||||
ColumnExpression: columnExpr,
|
||||
IndexExpression: index.Expression,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return cleanIndexes, nil
|
||||
}
|
||||
|
||||
func buildListLogsJSONIndexesQuery(cluster string, filters ...string) (string, []any) {
|
||||
// This aggregates all types per path and gets the max last_seen, then applies LIMIT
|
||||
sb := sqlbuilder.Select(
|
||||
"name", "type_full", "expr", "granularity",
|
||||
).From(fmt.Sprintf("clusterAllReplicas('%s', %s)", cluster, SkipIndexTableName))
|
||||
|
||||
sb.Where(sb.Equal("database", telemetrylogs.DBName))
|
||||
sb.Where(sb.Equal("table", telemetrylogs.LogsV2LocalTableName))
|
||||
sb.Where(sb.Or(
|
||||
sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyJSONColumnPrefix))),
|
||||
sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyPromotedColumnPrefix))),
|
||||
))
|
||||
|
||||
filterExprs := []string{}
|
||||
for _, filter := range filters {
|
||||
filterExprs = append(filterExprs, sb.ILike("expr", fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(filter))))
|
||||
}
|
||||
sb.Where(sb.Or(filterExprs...))
|
||||
|
||||
return sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
}
|
||||
|
||||
func ListLogsJSONIndexes(ctx context.Context, telemetryStore telemetrystore.TelemetryStore, filters ...string) (map[string][]schemamigrator.Index, error) {
|
||||
query, args := buildListLogsJSONIndexesQuery(telemetryStore.Cluster(), filters...)
|
||||
rows, err := telemetryStore.ClickhouseDB().Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to load string indexed columns")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
indexesMap := make(map[string][]schemamigrator.Index)
|
||||
for rows.Next() {
|
||||
var name string
|
||||
var typeFull string
|
||||
var expr string
|
||||
var granularity uint64
|
||||
if err := rows.Scan(&name, &typeFull, &expr, &granularity); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadLogsJSONIndexes, "failed to scan string indexed column")
|
||||
}
|
||||
indexesMap[name] = append(indexesMap[name], schemamigrator.Index{
|
||||
Name: name,
|
||||
Type: typeFull,
|
||||
Expression: expr,
|
||||
Granularity: int(granularity),
|
||||
})
|
||||
}
|
||||
|
||||
return indexesMap, nil
|
||||
}
|
||||
|
||||
func ListPromotedPaths(ctx context.Context, conn clickhouse.Conn) (map[string]struct{}, error) {
|
||||
query := fmt.Sprintf("SELECT path FROM %s.%s", DBName, PromotedPathsTableName)
|
||||
rows, err := conn.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to load promoted paths")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
next := make(map[string]struct{})
|
||||
for rows.Next() {
|
||||
var path string
|
||||
if err := rows.Scan(&path); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailLoadPromotedPaths, "failed to scan promoted path")
|
||||
}
|
||||
next[path] = struct{}{}
|
||||
}
|
||||
|
||||
return next, nil
|
||||
}
|
||||
|
||||
// TODO(Piyush): Remove this if not used in future
|
||||
func ListJSONValues(ctx context.Context, conn clickhouse.Conn, path string, limit int) (*telemetrytypes.TelemetryFieldValues, bool, error) {
|
||||
path = CleanPathPrefixes(path)
|
||||
|
||||
if strings.Contains(path, telemetrylogs.ArraySep) || strings.Contains(path, telemetrylogs.ArrayAnyIndex) {
|
||||
return nil, false, errors.NewInvalidInputf(errors.CodeInvalidInput, "array paths are not supported")
|
||||
}
|
||||
|
||||
promoted, err := IsPathPromoted(ctx, conn, path)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
if promoted {
|
||||
path = telemetrylogs.BodyPromotedColumnPrefix + path
|
||||
} else {
|
||||
path = telemetrylogs.BodyJSONColumnPrefix + path
|
||||
}
|
||||
|
||||
from := fmt.Sprintf("%s.%s", telemetrylogs.DBName, telemetrylogs.LogsV2TableName)
|
||||
colExpr := func(typ telemetrytypes.JSONDataType) string {
|
||||
return fmt.Sprintf("dynamicElement(%s, '%s')", path, typ.StringValue())
|
||||
}
|
||||
|
||||
sb := sqlbuilder.Select(
|
||||
colExpr(telemetrytypes.String),
|
||||
colExpr(telemetrytypes.Int64),
|
||||
colExpr(telemetrytypes.Float64),
|
||||
colExpr(telemetrytypes.Bool),
|
||||
colExpr(telemetrytypes.ArrayString),
|
||||
colExpr(telemetrytypes.ArrayInt64),
|
||||
colExpr(telemetrytypes.ArrayFloat64),
|
||||
colExpr(telemetrytypes.ArrayBool),
|
||||
colExpr(telemetrytypes.ArrayDynamic),
|
||||
).From(from)
|
||||
sb.Where(fmt.Sprintf("%s IS NOT NULL", path))
|
||||
sb.Limit(limit)
|
||||
|
||||
contextWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
rows, err := conn.Query(contextWithTimeout, query, args...)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return nil, false, errors.WrapTimeoutf(err, errors.CodeTimeout, "query timed out").WithAdditional("failed to list JSON values")
|
||||
}
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailListJSONValues, "failed to list JSON values")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
// Get column types to determine proper scan types
|
||||
colTypes := rows.ColumnTypes()
|
||||
scanTargets := make([]any, len(colTypes))
|
||||
for i := range colTypes {
|
||||
scanTargets[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
||||
}
|
||||
|
||||
values := &telemetrytypes.TelemetryFieldValues{}
|
||||
for rows.Next() {
|
||||
// Create fresh scan targets for each row
|
||||
scan := make([]any, len(colTypes))
|
||||
for i := range colTypes {
|
||||
scan[i] = reflect.New(colTypes[i].ScanType()).Interface()
|
||||
}
|
||||
|
||||
if err := rows.Scan(scan...); err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailListJSONValues, "failed to scan JSON value row")
|
||||
}
|
||||
|
||||
// Extract values from scan targets and process them
|
||||
// Column order: String, Int64, Float64, Bool, ArrayString, ArrayInt64, ArrayFloat64, ArrayBool, ArrayDynamic
|
||||
var consume func(scan []any) error
|
||||
consume = func(scan []any) error {
|
||||
for _, value := range scan {
|
||||
value := derefValue(value) // dereference the double pointer if it is a pointer
|
||||
switch value := value.(type) {
|
||||
case string:
|
||||
values.StringValues = append(values.StringValues, value)
|
||||
case int64:
|
||||
values.NumberValues = append(values.NumberValues, float64(value))
|
||||
case float64:
|
||||
values.NumberValues = append(values.NumberValues, value)
|
||||
case bool:
|
||||
values.BoolValues = append(values.BoolValues, value)
|
||||
case []*string:
|
||||
for _, str := range value {
|
||||
if str != nil {
|
||||
values.StringValues = append(values.StringValues, *str)
|
||||
}
|
||||
}
|
||||
case []*int64:
|
||||
for _, num := range value {
|
||||
if num != nil {
|
||||
values.NumberValues = append(values.NumberValues, float64(*num))
|
||||
}
|
||||
}
|
||||
case []*float64:
|
||||
for _, num := range value {
|
||||
if num != nil {
|
||||
values.NumberValues = append(values.NumberValues, float64(*num))
|
||||
}
|
||||
}
|
||||
case []*bool:
|
||||
for _, boolVal := range value {
|
||||
if boolVal != nil {
|
||||
values.BoolValues = append(values.BoolValues, *boolVal)
|
||||
}
|
||||
}
|
||||
case chcol.Variant:
|
||||
if !value.Nil() {
|
||||
if err := consume([]any{value.Any()}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
case []chcol.Variant:
|
||||
extractedValues := make([]any, len(value))
|
||||
for idx, variant := range value {
|
||||
if !variant.Nil() && variant.Type() != "JSON" { // skip JSON values cuz they're relevant for nested keys
|
||||
extractedValues[idx] = variant.Any()
|
||||
}
|
||||
}
|
||||
if err := consume(extractedValues); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
if value == nil {
|
||||
continue
|
||||
}
|
||||
return errors.NewInternalf(CodeFailScanJSONValue, "unknown JSON value type: %T", value)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
if err := consume(scan); err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, false, errors.WrapInternalf(err, CodeFailListJSONValues, "error iterating JSON values")
|
||||
}
|
||||
|
||||
return values, true, nil
|
||||
}
|
||||
|
||||
func derefValue(v any) any {
|
||||
if v == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
val := reflect.ValueOf(v)
|
||||
for val.Kind() == reflect.Ptr {
|
||||
if val.IsNil() {
|
||||
return nil
|
||||
}
|
||||
val = val.Elem()
|
||||
}
|
||||
|
||||
return val.Interface()
|
||||
}
|
||||
|
||||
// IsPathPromoted checks if a specific path is promoted
|
||||
func IsPathPromoted(ctx context.Context, conn clickhouse.Conn, path string) (bool, error) {
|
||||
split := strings.Split(path, telemetrylogs.ArraySep)
|
||||
query := fmt.Sprintf("SELECT 1 FROM %s.%s WHERE path = ? LIMIT 1", DBName, PromotedPathsTableName)
|
||||
rows, err := conn.Query(ctx, query, split[0])
|
||||
if err != nil {
|
||||
return false, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to check if path %s is promoted", path)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
return rows.Next(), nil
|
||||
}
|
||||
|
||||
// GetPromotedPaths checks if a specific path is promoted
|
||||
func GetPromotedPaths(ctx context.Context, conn clickhouse.Conn, paths ...string) (*utils.ConcurrentSet[string], error) {
|
||||
sb := sqlbuilder.Select("path").From(fmt.Sprintf("%s.%s", DBName, PromotedPathsTableName))
|
||||
pathConditions := []string{}
|
||||
for _, path := range paths {
|
||||
pathConditions = append(pathConditions, sb.Equal("path", path))
|
||||
}
|
||||
sb.Where(sb.Or(pathConditions...))
|
||||
|
||||
query, args := sb.BuildWithFlavor(sqlbuilder.ClickHouse)
|
||||
rows, err := conn.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to get promoted paths")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
promotedPaths := utils.NewConcurrentSet[string]()
|
||||
for rows.Next() {
|
||||
var path string
|
||||
if err := rows.Scan(&path); err != nil {
|
||||
return nil, errors.WrapInternalf(err, CodeFailCheckPathPromoted, "failed to scan promoted path")
|
||||
}
|
||||
promotedPaths.Insert(path)
|
||||
}
|
||||
|
||||
return promotedPaths, nil
|
||||
}
|
||||
|
||||
// TODO(Piyush): Remove this function
|
||||
func CleanPathPrefixes(path string) string {
|
||||
path = strings.TrimPrefix(path, telemetrytypes.BodyJSONStringSearchPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyJSONColumnPrefix)
|
||||
path = strings.TrimPrefix(path, telemetrylogs.BodyPromotedColumnPrefix)
|
||||
return path
|
||||
}
|
||||
151
pkg/telemetrymetadata/body_json_metadata_test.go
Normal file
151
pkg/telemetrymetadata/body_json_metadata_test.go
Normal file
@@ -0,0 +1,151 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/SigNoz/signoz-otel-collector/constants"
|
||||
"github.com/SigNoz/signoz/pkg/querybuilder"
|
||||
"github.com/SigNoz/signoz/pkg/telemetrylogs"
|
||||
"github.com/SigNoz/signoz/pkg/types/telemetrytypes"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestBuildGetBodyJSONPathsQuery(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
fieldKeySelectors []*telemetrytypes.FieldKeySelector
|
||||
expectedSQL string
|
||||
expectedArgs []any
|
||||
expectedLimit int
|
||||
}{
|
||||
|
||||
{
|
||||
name: "Single search text with EQUAL operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user.name",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path = ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user.name", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
{
|
||||
name: "Single search text with LIKE operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user", 100},
|
||||
expectedLimit: 100,
|
||||
},
|
||||
{
|
||||
name: "Multiple search texts with EQUAL operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user.name",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
|
||||
},
|
||||
{
|
||||
Name: "user.age",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeExact,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path = ? OR path = ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user.name", "user.age", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
{
|
||||
name: "Multiple search texts with LIKE operator",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "user",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
{
|
||||
Name: "admin",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ? OR path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"user", "admin", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
{
|
||||
name: "Search with Contains operator (should default to LIKE)",
|
||||
fieldKeySelectors: []*telemetrytypes.FieldKeySelector{
|
||||
{
|
||||
Name: "test",
|
||||
SelectorMatchType: telemetrytypes.FieldSelectorMatchTypeFuzzy,
|
||||
},
|
||||
},
|
||||
expectedSQL: "SELECT path, groupArray(DISTINCT type) AS types, max(last_seen) AS last_seen FROM signoz_metadata.distributed_json_path_types WHERE (path LIKE ?) GROUP BY path ORDER BY last_seen DESC LIMIT ?",
|
||||
expectedArgs: []any{"test", defaultPathLimit},
|
||||
expectedLimit: defaultPathLimit,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
query, args, limit, err := buildGetBodyJSONPathsQuery(tc.fieldKeySelectors)
|
||||
require.NoError(t, err, "Error building query: %v", err)
|
||||
|
||||
require.Equal(t, tc.expectedSQL, query)
|
||||
require.Equal(t, tc.expectedArgs, args)
|
||||
require.Equal(t, tc.expectedLimit, limit)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildListLogsJSONIndexesQuery(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
cluster string
|
||||
filters []string
|
||||
expectedSQL string
|
||||
expectedArgs []any
|
||||
}{
|
||||
{
|
||||
name: "No filters",
|
||||
cluster: "test-cluster",
|
||||
filters: nil,
|
||||
expectedSQL: "SELECT name, type_full, expr, granularity FROM clusterAllReplicas('test-cluster', system.data_skipping_indices) " +
|
||||
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?))",
|
||||
expectedArgs: []any{
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2LocalTableName,
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyJSONColumnPrefix)),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyPromotedColumnPrefix)),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "With filters",
|
||||
cluster: "test-cluster",
|
||||
filters: []string{"foo", "bar"},
|
||||
expectedSQL: "SELECT name, type_full, expr, granularity FROM clusterAllReplicas('test-cluster', system.data_skipping_indices) " +
|
||||
"WHERE database = ? AND table = ? AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?)) AND (LOWER(expr) LIKE LOWER(?) OR LOWER(expr) LIKE LOWER(?))",
|
||||
expectedArgs: []any{
|
||||
telemetrylogs.DBName,
|
||||
telemetrylogs.LogsV2LocalTableName,
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyJSONColumnPrefix)),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains(constants.BodyPromotedColumnPrefix)),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains("foo")),
|
||||
fmt.Sprintf("%%%s%%", querybuilder.FormatValueForContains("bar")),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
query, args := buildListLogsJSONIndexesQuery(tc.cluster, tc.filters...)
|
||||
|
||||
require.Equal(t, tc.expectedSQL, query)
|
||||
require.Equal(t, tc.expectedArgs, args)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,12 @@
|
||||
package telemetrymetadata
|
||||
|
||||
import otelcollectorconst "github.com/SigNoz/signoz-otel-collector/constants"
|
||||
|
||||
const (
|
||||
DBName = "signoz_metadata"
|
||||
AttributesMetadataTableName = "distributed_attributes_metadata"
|
||||
AttributesMetadataLocalTableName = "attributes_metadata"
|
||||
PathTypesTableName = otelcollectorconst.DistributedPathTypesTable
|
||||
PromotedPathsTableName = otelcollectorconst.DistributedPromotedPathsTable
|
||||
SkipIndexTableName = "system.data_skipping_indices"
|
||||
)
|
||||
|
||||
@@ -165,53 +165,65 @@ func (c *conditionBuilder) conditionFor(
|
||||
case qbtypes.FilterOperatorExists, qbtypes.FilterOperatorNotExists:
|
||||
|
||||
var value any
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.IsNotNull(tblFieldName), nil
|
||||
} else {
|
||||
return sb.IsNull(tblFieldName), nil
|
||||
}
|
||||
case schema.ColumnTypeString,
|
||||
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
schema.FixedStringColumnType{Length: 32},
|
||||
schema.DateTime64ColumnType{Precision: 9, Timezone: "UTC"}:
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumFixedString,
|
||||
schema.ColumnTypeEnumDateTime64:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.ColumnTypeUInt64,
|
||||
schema.ColumnTypeUInt32,
|
||||
schema.ColumnTypeUInt8,
|
||||
schema.ColumnTypeInt8,
|
||||
schema.ColumnTypeInt16,
|
||||
schema.ColumnTypeBool:
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
value = ""
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
}
|
||||
return sb.E(tblFieldName, value), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for low cardinality column type %s", elementType)
|
||||
}
|
||||
|
||||
case schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumUInt8,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool:
|
||||
value = 0
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.NE(tblFieldName, value), nil
|
||||
} else {
|
||||
return sb.E(tblFieldName, value), nil
|
||||
}
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}, schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumBool, schema.ColumnTypeEnumFloat64:
|
||||
leftOperand := fmt.Sprintf("mapContains(%s, '%s')", column.Name, key.Name)
|
||||
if key.Materialized {
|
||||
leftOperand = telemetrytypes.FieldKeyToMaterializedColumnNameForExists(key)
|
||||
}
|
||||
if operator == qbtypes.FilterOperatorExists {
|
||||
return sb.E(leftOperand, true), nil
|
||||
} else {
|
||||
return sb.NE(leftOperand, true), nil
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for map column type %s", valueType)
|
||||
}
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "exists operator is not supported for column type %s", column.Type)
|
||||
|
||||
@@ -239,8 +239,8 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
return "", err
|
||||
}
|
||||
|
||||
switch column.Type {
|
||||
case schema.JSONColumnType{}:
|
||||
switch column.Type.GetType() {
|
||||
case schema.ColumnTypeEnumJSON:
|
||||
// json is only supported for resource context as of now
|
||||
if key.FieldContext != telemetrytypes.FieldContextResource {
|
||||
return "", errors.Newf(errors.TypeInvalidInput, errors.CodeInvalidInput, "only resource context fields are supported for json columns, got %s", key.FieldContext.String)
|
||||
@@ -256,44 +256,38 @@ func (m *defaultFieldMapper) FieldFor(
|
||||
} else {
|
||||
return fmt.Sprintf("multiIf(%s.`%s` IS NOT NULL, %s.`%s`::String, mapContains(%s, '%s'), %s, NULL)", column.Name, key.Name, column.Name, key.Name, oldColumn.Name, key.Name, oldKeyName), nil
|
||||
}
|
||||
|
||||
case schema.ColumnTypeString,
|
||||
schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
schema.ColumnTypeUInt64,
|
||||
schema.ColumnTypeUInt32,
|
||||
schema.ColumnTypeInt8,
|
||||
schema.ColumnTypeInt16,
|
||||
schema.ColumnTypeBool,
|
||||
schema.DateTime64ColumnType{Precision: 9, Timezone: "UTC"},
|
||||
schema.FixedStringColumnType{Length: 32}:
|
||||
case schema.ColumnTypeEnumString,
|
||||
schema.ColumnTypeEnumUInt64,
|
||||
schema.ColumnTypeEnumUInt32,
|
||||
schema.ColumnTypeEnumInt8,
|
||||
schema.ColumnTypeEnumInt16,
|
||||
schema.ColumnTypeEnumBool,
|
||||
schema.ColumnTypeEnumDateTime64,
|
||||
schema.ColumnTypeEnumFixedString:
|
||||
return column.Name, nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeString,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
case schema.ColumnTypeEnumLowCardinality:
|
||||
switch elementType := column.Type.(schema.LowCardinalityColumnType).ElementType; elementType.GetType() {
|
||||
case schema.ColumnTypeEnumString:
|
||||
return column.Name, nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for low cardinality column type %s", elementType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeFloat64,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
case schema.ColumnTypeEnumMap:
|
||||
keyType := column.Type.(schema.MapColumnType).KeyType
|
||||
if _, ok := keyType.(schema.LowCardinalityColumnType); !ok {
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "key type %s is not supported for map column type %s", keyType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
case schema.MapColumnType{
|
||||
KeyType: schema.LowCardinalityColumnType{ElementType: schema.ColumnTypeString},
|
||||
ValueType: schema.ColumnTypeBool,
|
||||
}:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
|
||||
switch valueType := column.Type.(schema.MapColumnType).ValueType; valueType.GetType() {
|
||||
case schema.ColumnTypeEnumString, schema.ColumnTypeEnumFloat64, schema.ColumnTypeEnumBool:
|
||||
// a key could have been materialized, if so return the materialized column name
|
||||
if key.Materialized {
|
||||
return telemetrytypes.FieldKeyToMaterializedColumnName(key), nil
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
default:
|
||||
return "", errors.NewInvalidInputf(errors.CodeInvalidInput, "value type %s is not supported for map column type %s", valueType, column.Type)
|
||||
}
|
||||
return fmt.Sprintf("%s['%s']", column.Name, key.Name), nil
|
||||
}
|
||||
// should not reach here
|
||||
return column.Name, nil
|
||||
|
||||
@@ -93,6 +93,16 @@ func newConfig() factory.Config {
|
||||
}
|
||||
|
||||
func (c Config) Validate() error {
|
||||
// Ensure that lifetime idle is not negative
|
||||
if c.Lifetime.Idle < 0 {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "lifetime::idle must not be negative")
|
||||
}
|
||||
|
||||
// Ensure that lifetime max is not negative
|
||||
if c.Lifetime.Max < 0 {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "lifetime::max must not be negative")
|
||||
}
|
||||
|
||||
// Ensure that rotation interval is smaller than lifetime idle
|
||||
if c.Rotation.Interval >= c.Lifetime.Idle {
|
||||
return errors.New(errors.TypeInvalidInput, errors.CodeInvalidInput, "rotation::interval must be smaller than lifetime::idle")
|
||||
|
||||
@@ -263,7 +263,7 @@ func (provider *provider) getOrSetIdentity(ctx context.Context, orgID, userID va
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = provider.cache.Set(ctx, orgID, identityCacheKey(identity.UserID), identity, -1)
|
||||
err = provider.cache.Set(ctx, orgID, identityCacheKey(identity.UserID), identity, 0)
|
||||
if err != nil {
|
||||
provider.settings.Logger().ErrorContext(ctx, "failed to cache identity", "error", err)
|
||||
}
|
||||
|
||||
@@ -410,7 +410,7 @@ func (provider *provider) setToken(ctx context.Context, token *authtypes.Token,
|
||||
}
|
||||
|
||||
func (provider *provider) setIdentity(ctx context.Context, identity *authtypes.Identity) error {
|
||||
err := provider.cache.Set(ctx, emptyOrgID, identityCacheKey(identity.UserID), identity, -1)
|
||||
err := provider.cache.Set(ctx, emptyOrgID, identityCacheKey(identity.UserID), identity, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -434,7 +434,7 @@ func (provider *provider) getOrGetSetIdentity(ctx context.Context, userID valuer
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = provider.cache.Set(ctx, emptyOrgID, identityCacheKey(userID), identity, -1)
|
||||
err = provider.cache.Set(ctx, emptyOrgID, identityCacheKey(userID), identity, 0)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -220,3 +220,61 @@ type TreemapResponse struct {
|
||||
TimeSeries []TreemapEntry `json:"timeseries"`
|
||||
Samples []TreemapEntry `json:"samples"`
|
||||
}
|
||||
|
||||
// MetricHighlightsResponse is the output structure for the metric highlights endpoint.
|
||||
type MetricHighlightsResponse struct {
|
||||
DataPoints uint64 `json:"dataPoints"`
|
||||
LastReceived uint64 `json:"lastReceived"`
|
||||
TotalTimeSeries uint64 `json:"totalTimeSeries"`
|
||||
ActiveTimeSeries uint64 `json:"activeTimeSeries"`
|
||||
}
|
||||
|
||||
// MetricAttributesRequest represents the payload for the metric attributes endpoint.
|
||||
type MetricAttributesRequest struct {
|
||||
MetricName string `json:"metricName"`
|
||||
Start *int64 `json:"start,omitempty"`
|
||||
End *int64 `json:"end,omitempty"`
|
||||
}
|
||||
|
||||
// Validate ensures MetricAttributesRequest contains acceptable values.
|
||||
func (req *MetricAttributesRequest) Validate() error {
|
||||
if req == nil {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "request is nil")
|
||||
}
|
||||
|
||||
if req.MetricName == "" {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "metric_name is required")
|
||||
}
|
||||
|
||||
if req.Start != nil && req.End != nil {
|
||||
if *req.Start >= *req.End {
|
||||
return errors.NewInvalidInputf(errors.CodeInvalidInput, "start (%d) must be less than end (%d)", *req.Start, *req.End)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON validates input immediately after decoding.
|
||||
func (req *MetricAttributesRequest) UnmarshalJSON(data []byte) error {
|
||||
type raw MetricAttributesRequest
|
||||
var decoded raw
|
||||
if err := json.Unmarshal(data, &decoded); err != nil {
|
||||
return err
|
||||
}
|
||||
*req = MetricAttributesRequest(decoded)
|
||||
return req.Validate()
|
||||
}
|
||||
|
||||
// MetricAttribute represents a single attribute with its values and count.
|
||||
type MetricAttribute struct {
|
||||
Key string `json:"key"`
|
||||
Values []string `json:"values"`
|
||||
ValueCount uint64 `json:"valueCount"`
|
||||
}
|
||||
|
||||
// MetricAttributesResponse is the output structure for the metric attributes endpoint.
|
||||
type MetricAttributesResponse struct {
|
||||
Attributes []MetricAttribute `json:"attributes"`
|
||||
TotalKeys int64 `json:"totalKeys"`
|
||||
}
|
||||
|
||||
@@ -17,6 +17,10 @@ var (
|
||||
FieldSelectorMatchTypeFuzzy = FieldSelectorMatchType{valuer.NewString("fuzzy")}
|
||||
)
|
||||
|
||||
// BodyJSONStringSearchPrefix is the prefix used for body JSON search queries
|
||||
// e.g., "body.status" where "body." is the prefix
|
||||
const BodyJSONStringSearchPrefix = `body.`
|
||||
|
||||
type TelemetryFieldKey struct {
|
||||
Name string `json:"name"`
|
||||
Description string `json:"description,omitempty"`
|
||||
@@ -24,7 +28,10 @@ type TelemetryFieldKey struct {
|
||||
Signal Signal `json:"signal,omitempty"`
|
||||
FieldContext FieldContext `json:"fieldContext,omitempty"`
|
||||
FieldDataType FieldDataType `json:"fieldDataType,omitempty"`
|
||||
Materialized bool `json:"-"`
|
||||
|
||||
JSONDataType *JSONDataType `json:"-,omitempty"`
|
||||
Indexes []JSONDataTypeIndex `json:"-"`
|
||||
Materialized bool `json:"-"` // refers to promoted in case of body.... fields
|
||||
}
|
||||
|
||||
func (f TelemetryFieldKey) String() string {
|
||||
|
||||
@@ -36,6 +36,9 @@ import (
|
||||
//
|
||||
// - Use `log.` for explicit log context
|
||||
// - `log.severity_text` will always resolve to `severity_text` of log record
|
||||
//
|
||||
// - Use `body.` to indicate and enforce body context
|
||||
// - `body.key` will look for `key` in the body field
|
||||
type FieldContext struct {
|
||||
valuer.String
|
||||
}
|
||||
@@ -49,6 +52,7 @@ var (
|
||||
FieldContextScope = FieldContext{valuer.NewString("scope")}
|
||||
FieldContextAttribute = FieldContext{valuer.NewString("attribute")}
|
||||
FieldContextEvent = FieldContext{valuer.NewString("event")}
|
||||
FieldContextBody = FieldContext{valuer.NewString("body")}
|
||||
FieldContextUnspecified = FieldContext{valuer.NewString("")}
|
||||
|
||||
// Map string representations to FieldContext values
|
||||
@@ -65,6 +69,7 @@ var (
|
||||
"point": FieldContextAttribute,
|
||||
"attribute": FieldContextAttribute,
|
||||
"event": FieldContextEvent,
|
||||
"body": FieldContextBody,
|
||||
"spanfield": FieldContextSpan,
|
||||
"span": FieldContextSpan,
|
||||
"logfield": FieldContextLog,
|
||||
@@ -144,6 +149,8 @@ func (f FieldContext) TagType() string {
|
||||
return "metricfield"
|
||||
case FieldContextEvent:
|
||||
return "eventfield"
|
||||
case FieldContextBody:
|
||||
return "body"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
@@ -31,6 +31,9 @@ var (
|
||||
FieldDataTypeArrayInt64 = FieldDataType{valuer.NewString("[]int64")}
|
||||
FieldDataTypeArrayNumber = FieldDataType{valuer.NewString("[]number")}
|
||||
|
||||
FieldDataTypeArrayObject = FieldDataType{valuer.NewString("[]object")}
|
||||
FieldDataTypeArrayDynamic = FieldDataType{valuer.NewString("[]dynamic")}
|
||||
|
||||
// Map string representations to FieldDataType values
|
||||
// We want to handle all the possible string representations of the data types.
|
||||
// Even if the user uses some non-standard representation, we want to be able to
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package telemetrytypes
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@@ -86,7 +87,7 @@ func TestGetFieldKeyFromKeyText(t *testing.T) {
|
||||
|
||||
for _, testCase := range testCases {
|
||||
result := GetFieldKeyFromKeyText(testCase.keyText)
|
||||
if result != testCase.expected {
|
||||
if !reflect.DeepEqual(result, testCase.expected) {
|
||||
t.Errorf("expected %v, got %v", testCase.expected, result)
|
||||
}
|
||||
}
|
||||
|
||||
80
pkg/types/telemetrytypes/json_datatype.go
Normal file
80
pkg/types/telemetrytypes/json_datatype.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package telemetrytypes
|
||||
|
||||
type JSONDataTypeIndex struct {
|
||||
Type JSONDataType
|
||||
ColumnExpression string
|
||||
IndexExpression string
|
||||
}
|
||||
|
||||
type JSONDataType struct {
|
||||
str string // Store the correct case for ClickHouse
|
||||
IsArray bool
|
||||
ScalerType string
|
||||
IndexSupported bool
|
||||
}
|
||||
|
||||
// Override StringValue to return the correct case
|
||||
func (jdt JSONDataType) StringValue() string {
|
||||
return jdt.str
|
||||
}
|
||||
|
||||
var (
|
||||
String = JSONDataType{"String", false, "", true}
|
||||
Int64 = JSONDataType{"Int64", false, "", true}
|
||||
Float64 = JSONDataType{"Float64", false, "", true}
|
||||
Bool = JSONDataType{"Bool", false, "", false}
|
||||
Dynamic = JSONDataType{"Dynamic", false, "", false}
|
||||
ArrayString = JSONDataType{"Array(Nullable(String))", true, "String", false}
|
||||
ArrayInt64 = JSONDataType{"Array(Nullable(Int64))", true, "Int64", false}
|
||||
ArrayFloat64 = JSONDataType{"Array(Nullable(Float64))", true, "Float64", false}
|
||||
ArrayBool = JSONDataType{"Array(Nullable(Bool))", true, "Bool", false}
|
||||
ArrayDynamic = JSONDataType{"Array(Dynamic)", true, "Dynamic", false}
|
||||
ArrayJSON = JSONDataType{"Array(JSON)", true, "JSON", false}
|
||||
)
|
||||
|
||||
var MappingStringToJSONDataType = map[string]JSONDataType{
|
||||
"String": String,
|
||||
"Int64": Int64,
|
||||
"Float64": Float64,
|
||||
"Bool": Bool,
|
||||
"Dynamic": Dynamic,
|
||||
"Array(Nullable(String))": ArrayString,
|
||||
"Array(Nullable(Int64))": ArrayInt64,
|
||||
"Array(Nullable(Float64))": ArrayFloat64,
|
||||
"Array(Nullable(Bool))": ArrayBool,
|
||||
"Array(Dynamic)": ArrayDynamic,
|
||||
"Array(JSON)": ArrayJSON,
|
||||
}
|
||||
|
||||
var ScalerTypeToArrayType = map[JSONDataType]JSONDataType{
|
||||
String: ArrayString,
|
||||
Int64: ArrayInt64,
|
||||
Float64: ArrayFloat64,
|
||||
Bool: ArrayBool,
|
||||
Dynamic: ArrayDynamic,
|
||||
}
|
||||
|
||||
var MappingFieldDataTypeToJSONDataType = map[FieldDataType]JSONDataType{
|
||||
FieldDataTypeString: String,
|
||||
FieldDataTypeInt64: Int64,
|
||||
FieldDataTypeFloat64: Float64,
|
||||
FieldDataTypeNumber: Float64,
|
||||
FieldDataTypeBool: Bool,
|
||||
FieldDataTypeArrayString: ArrayString,
|
||||
FieldDataTypeArrayInt64: ArrayInt64,
|
||||
FieldDataTypeArrayFloat64: ArrayFloat64,
|
||||
FieldDataTypeArrayBool: ArrayBool,
|
||||
}
|
||||
|
||||
var MappingJSONDataTypeToFieldDataType = map[JSONDataType]FieldDataType{
|
||||
String: FieldDataTypeString,
|
||||
Int64: FieldDataTypeInt64,
|
||||
Float64: FieldDataTypeFloat64,
|
||||
Bool: FieldDataTypeBool,
|
||||
ArrayString: FieldDataTypeArrayString,
|
||||
ArrayInt64: FieldDataTypeArrayInt64,
|
||||
ArrayFloat64: FieldDataTypeArrayFloat64,
|
||||
ArrayBool: FieldDataTypeArrayBool,
|
||||
ArrayDynamic: FieldDataTypeArrayDynamic,
|
||||
ArrayJSON: FieldDataTypeArrayObject,
|
||||
}
|
||||
Reference in New Issue
Block a user