How do ensure atomic update of a shared global in a multi-threadedapplication?

P

Pushkar Prasad

Hello Everybody

What is the best way to ensure atomic update to a shared volatile global
in multi-threaded application? I have written a small app which creates
bunch of threads and increment a volatile global the thread function will
decrment that global. When the value of the Global is is 0 then then
consumer routine will do some processing. I have tried with Global of
Volatile as well as InterlockedIncrement() function documented in MSDN but
for some reason the global is not getting updated atomically causing the
consumer thread to block forever if number of producer threads are high.
If the thread count is low then it works. I have scratched my heads for
hours but I have not been able to come up with any option to make the
increment / decrement operation of the global as atomic. Putting a
synchronization object like Critical Section will cause a lot of
contention in my program. Any help will be greatly appreciated. Please
fine the code that I have written below.

My apologies for such a long email.


------------> My Code Piece <--------------

/* File: win32.c
* Date: 11-03-2011
*
* Description: Module to test basic win32 API usage.
*/


#include <stdio.h>
#include <windows.h>
#include <strsafe.h>
#include <limits.h>
#include <listpub.h>


#define ITEM_MULTIPLIER 10
#define THREAD_MULTIPLIER 1000

/* Data Type */


/* Globals */

PTRACKER g_Finalizer;
volatile static DWORD g_dwThreadCnt = 0;

/*---------------> Routines <----------------------*/

/* Function: ModifierRoutine()
*
* Description: This routine will increment the value
* in the Array Element for which the
* index has been passed in pHeader->OperatingIndex
*
* Once it finishes, it will insert a Random Value
*/

HRESULT
WINAPI ModifierRoutine
(
LPDWORD lpIndex
)
{
DWORD Cntr = 0;
STATUS Status;
BOOL bStatus;
DWORD dwStatus,dwCntr;
HRESULT hr;
PVOID pTmpData;
PLIST_ITEM pItem;

printf("\nThreadID: %#6x \tCPU: %2d \tIndex: %5d \t[%p]",
GetCurrentThreadId(), GetCurrentProcessorNumber(),
*lpIndex, lpIndex);

/* Increment the ThreadCnt atomically */

for(dwCntr = 0; dwCntr < ITEM_MULTIPLIER; dwCntr++)
{
Status = CreateListItem(&pItem, lpIndex);
Status = AddToList(g_Finalizer, pItem);
}


hr = S_OK;

End:
//Status = BidirListTraversal(g_Finalizer);
//g_dwThreadCnt--;
__asm{
LOCK DEC DWORD PTR [g_dwThreadCnt]
}

return(hr);
}






/* Function: main()
*
*/


void main(int argc, char* argv[])
{

DWORD dwElCnt, dwCPUCnt, dwStatus, dwResult = 0;
DWORD lCntr;
BOOL bStatus;
HANDLE hTmpThread;
HANDLE *hTable;
LPDWORD *lpInputTable, lpOut = NULL, lpInput = NULL;
STATUS Status;
PLIST_ITEM pItem = NULL;
SYSTEM_INFO SystemInfo;
DWORD_PTR dwpAffinity;

char *ErrMsg0 = "\nElement count is expected as a prameter.
Terminating";
char *ErrMsg1 = "\nList Creation Failed!";
char *ErrMsg2 = "\nInsufficient Memory";
char *DispMsg0 = "\nList Item Value: %d";
char *DispMsg1 = "\nSummation: %d";


__asm
{
PUSH SIZE SYSTEM_INFO
PUSH 0
LEA EAX, DWORD PTR [SystemInfo]
PUSH EAX
CALL memset
ADD ESP, 12

LEA EAX, DWORD PTR [SystemInfo]
PUSH EAX
CALL DWORD PTR [GetNativeSystemInfo]

MOV EAX, DWORD PTR [SystemInfo+20]
MOV DWORD PTR [dwCPUCnt], EAX

// Multiply Threads THREAD_MULTIPLIER to create
// MULTIPLE threads to test the Program

XOR EDX, EDX
MOV EAX, THREAD_MULTIPLIER
MUL DWORD PTR [dwCPUCnt]
MOV DWORD PTR [dwElCnt], EAX

// Initialize the List Head

LEA EAX, DWORD PTR [g_Finalizer]
PUSH EAX
CALL CreateList

// Check Return Val of CreateList

CMP EAX, E_SUCCESS
PUSH DWORD PTR [ErrMsg1]
JNE Error
// If Jmp is Not Taken then
POP EAX

// Allocate Memory for the hTable

PUSH 4
MOV ECX, DWORD PTR [dwElCnt]
ADD ECX, 1
PUSH ECX
CALL calloc
ADD ESP, 8
MOV DWORD PTR [hTable], EAX

CMP DWORD PTR [hTable], 0
PUSH DWORD PTR [ErrMsg2]
JE Error
POP EAX

// Allocate Memory for the lpInputTable

PUSH 4
MOV ECX, DWORD PTR [dwElCnt]
ADD ECX, 1
PUSH ECX
CALL calloc
ADD ESP, 8
MOV DWORD PTR [lpInputTable], EAX

CMP DWORD PTR [lpInputTable], 0
PUSH DWORD PTR [ErrMsg2]
JE Error
POP EAX

// Move the Loop Counter in ESI
PUSH ESI
PUSH EDI

MOV DWORD PTR [lCntr], 0

// Start of the Loop
ForLoop:

MOV ESI, DWORD PTR [dwElCnt]
MOV EDI, DWORD PTR [lCntr]

CMP EDI, ESI
JGE ForLoopEnd

// Allocate Memory for input to the thread

PUSH 1
PUSH 4
CALL calloc
ADD ESP, 8
MOV DWORD PTR [lpInput], EAX
MOV ECX, DWORD PTR [lpInput]
MOV DWORD PTR [ECX], EDI

// Call CreateThread Now

PUSH 0
PUSH 0
PUSH DWORD PTR [lpInput]
LEA EDX, DWORD PTR [ModifierRoutine]
PUSH EDX
PUSH 0
PUSH 0
CALL DWORD PTR [CreateThread]

// Capture the Handle returned by CreateThread

MOV EDX, DWORD PTR [hTable]
MOV DWORD PTR [EDX+EDI*4], EAX

// Set the CPU affinity for the thread
// Calculate the CPU MASK by performin
// MOD of EDI with dwCPUCnt

PUSH EBX
MOV EBX, EAX
MOV EAX, EDI
CDQ
MOV ECX, DWORD PTR [dwCPUCnt]
IDIV ECX

PUSH EDX
PUSH EBX
CALL DWORD PTR [SetThreadAffinityMask]

MOV DWORD PTR [dwpAffinity], EAX
POP EBX

MOV EDX, DWORD PTR [lpInputTable]
MOV EAX, DWORD PTR [lpInput]
MOV DWORD PTR [EDX+EDI*4], EAX

LOCK INC DWORD PTR [g_dwThreadCnt]
ADD DWORD PTR [lCntr], 1
JMP ForLoop

ForLoopEnd:

// Restore EDI and ESI
POP EDI
POP ESI


WhileLoop:
CMP DWORD PTR [g_dwThreadCnt], 0
JNE WhileLoop

WhileEnd:

// Remove the Items
RemoveItem:

LEA ECX, DWORD PTR [pItem]
PUSH ECX
PUSH 0
PUSH DWORD PTR [g_Finalizer]
CALL RemoveFromList

// Check the return val of Remove From List

CMP EAX, E_EMPTY
JE RemoveItemEnd
LEA ECX, DWORD PTR [lpOut]
PUSH ECX
LEA EAX, DWORD PTR [pItem]
PUSH EAX
CALL DestroyListItem

MOV ECX, DWORD PTR [lpOut]
PUSH DWORD PTR [ECX]
PUSH DWORD PTR [DispMsg0]
CALL printf
ADD ESP, 8
MOV ECX, DWORD PTR [lpOut]
MOV ECX, DWORD PTR [ECX]
ADD DWORD PTR [dwResult], ECX
JMP RemoveItem

RemoveItemEnd:

PUSH DWORD PTR [dwResult]
PUSH DWORD PTR [DispMsg1]
CALL printf
ADD ESP, 8

//SAVE ESI and EDI
PUSH ESI
PUSH EDI

MOV DWORD PTR [lCntr], 0

CleanupBegin:
MOV EDI, DWORD PTR [lCntr]
MOV ESI, DWORD PTR [dwElCnt]
CMP EDI, ESI
JGE CleanupEnd

MOV ECX, DWORD PTR [hTable]
MOV ECX, DWORD PTR [ECX+EDI*4]
PUSH ECX
CALL DWORD PTR [CloseHandle]

MOV ECX, DWORD PTR [lpInputTable]
MOV ECX, DWORD PTR [ECX+EDI*4]
PUSH ECX
CALL free
ADD ESP, 4

MOV ECX, DWORD PTR [lpInputTable]
MOV DWORD PTR [ECX+EDI*4], 0
ADD DWORD PTR [lCntr], 1
JMP CleanupBegin

CleanupEnd:
POP EDI
POP ESI
JMP End


Error:
CALL printf
ADD ESP, 4
PUSH E_INVAL
CALL exit
ADD ESP,4

}

End:
;
}


/*
void main(int argc, char* argv[])
{

DWORD dwElCnt, dwCPUCnt, dwStatus, dwResult = 0;
DWORD lCntr;
BOOL bStatus;
HANDLE hTmpThread;
HANDLE *hTable;
LPDWORD *lpInputTable, lpOut = NULL;
STATUS Status;
PLIST_ITEM pItem = NULL;
SYSTEM_INFO SystemInfo;
DWORD_PTR dwpAffinity;


GetNativeSystemInfo(&SystemInfo);
dwCPUCnt = SystemInfo.dwNumberOfProcessors;

dwElCnt = dwCPUCnt * THREAD_MULTIPLIER;

Status = CreateList(&g_Finalizer);
if(E_SUCCESS != Status)
{
printf("\n[%s]:\t List Creation Failed!", __FUNCTION__);
exit(ERROR_NOT_ENOUGH_MEMORY);
}

//Allocate Space for Handle Table

hTable = (HANDLE*) calloc(dwElCnt+1, sizeof(HANDLE));
lpInputTable = (LPDWORD*) calloc(dwElCnt+1, sizeof(LPDWORD));

for(lCntr = 0; lCntr < dwElCnt; lCntr++)
{
LPDWORD lpInput = (LPDWORD) calloc(1, sizeof(DWORD));
*lpInput = lCntr;

hTmpThread = CreateThread(
NULL,
0,
ModifierRoutine,
lpInput,
0,
NULL
);

hTable[lCntr] = hTmpThread;
lpInputTable[lCntr] = lpInput;
g_dwThreadCnt++;

dwpAffinity = SetThreadAffinityMask(hTmpThread, (lCntr % dwCPUCnt));

}


while(g_dwThreadCnt)
{}

Status = RemoveFromList(g_Finalizer, NULL, &pItem);
while(E_EMPTY != Status)
{
Status = DestroyListItem(&pItem, &lpOut);
printf("\nList Item Value: %d", *lpOut);

dwResult += (*lpOut);
Status = RemoveFromList(g_Finalizer, NULL, &pItem);
}

printf("\nSummation: %d", dwResult);

// Close Handles to the thread

Cleanup:

for(lCntr = 0; lCntr <= dwElCnt; lCntr++)
{
CloseHandle(hTable[lCntr]);
free(lpInputTable[lCntr]);
lpInputTable[lCntr] = NULL;
}
}
*/





Thanks & Regards
Pushkar Prasad
 
N

Nobody

LOCK;
sharedvariable++;
UNLOCK;

Read-modify-write operations typically require locking, although
some platforms may have atomic increment of an integer variable (e.g. x86
has the "lock" prefix, which makes a single instruction atomic).

Personally, I'd be inclined to write:

ATOMIC_INCREMENT(sharedvariable);

so that such operations can be used where available, falling back to
generic locking otherwise.
int currentvalue;
LOCK;
currentvalue = sharedvariable;
UNLOCK;

The locking here is typically unnecessary. You only need to lock simple
copy operations if it is possible for a thread switch to occur while
the copy operation is only partially complete. This can occur for types
which are larger than the native word size (e.g. a 64-bit int on a 32-bit
platform), unaligned accesses, etc, but probably not for an aligned "int"
access on any common CPU (x86 or 32/64-bit RISC).

FWIW, the Java language creators chose to specify that updates to "long"
(64-bit signed integer) and "double" (64-bit floating-point) are not
guaranteed to be atomic while updates to smaller types (32 bits or fewer)
are.

This was considered to be the optimal trade-off between efficiency and
convenience. Requiring updates to 64-bit types to be atomic would have
required the implementation to either perform locking around all such
updates or to forego the use of native threads. OTOH, waiving the
atomicity requirement for smaller types would have increased the burden on
application programmers with no significant benefit (all common platforms
allow atomic updates of 32-bit values with minimal effort).
 
P

Pushkar Prasad

Thanks everybody.

I will explore further based on your inputs. So far I have tried inline
assembly to LOCK and increment / decrement the global, used
InterlockedIncrement() API in Windows SDK without any success. For small
number of threads the modifications to the Global seems to be serialized
but when I spawn thousands of thread then things get messy due to thread
rescheduling.

As I mentioned earlier. I can put Critical Section around the Global and
ensure that the functions acquire the Critical Section for updating the
global but that will create too much of contention point in my code. I
was looking for a leaner way of doing it, InterlockedIncrement() and
InterlockedDecrement() looked to be suitable initially but it failed
when I spawned thousands of threads.



Thanks & Regards
Pushkar Prasad
 
R

robertwessel2

Thanks everybody.

I will explore further based on your inputs. So far I have tried inline
assembly to LOCK and increment / decrement the global, used
InterlockedIncrement() API in Windows SDK without any success. For small
number of threads the modifications to the Global seems to be serialized
but when I spawn thousands of thread then things get messy due to thread
rescheduling.

As I mentioned earlier. I can put Critical Section around the Global and
ensure that the functions acquire the Critical Section for updating the
global but that will create too much of contention point in my code. I
was looking for a leaner way of doing it, InterlockedIncrement() and
InterlockedDecrement() looked to be suitable initially but it failed
when I spawned thousands of threads.


While this is all *very* OT, the problem is you're doing it wrong
somehow. InterlockedIncrement() and friends will do just what you
want. And Windows (and x86) require that aligned read or write
accesses to variables of that size *are* atomic even without a lock
(and I'm assuming you haven't done anything silly to cause
g_dwThreadCnt to be unaligned).

You've posted too much code for me to want to look through in detail,
but there is a "g_dwThreadCnt++;" in the first for loop in your main()
which is most assuredly *not* an atomic operation, and if you're
interleaving it with other (even atomic) increments, you *will* lose
updates. That may be the source of your problems.
 
I

Ian Collins

On 03/17/11 10:13 AM, Pushkar Prasad wrote:

[please don't top-post]
As I mentioned earlier. I can put Critical Section around the Global and
ensure that the functions acquire the Critical Section for updating the
global but that will create too much of contention point in my code. I
was looking for a leaner way of doing it, InterlockedIncrement() and
InterlockedDecrement() looked to be suitable initially but it failed
when I spawned thousands of threads.

Spawning thousands of threads is never a good idea, unless you want to
grind your system to a halt.
 
M

Mark Bluemel

Hello Everybody

What is the best way to ensure atomic update to a shared volatile global
in multi-threaded application? I have written a small app which creates
bunch of threads and increment a volatile global the thread function
will decrment that global. When the value of the Global is is 0 then
then consumer routine will do some processing. I have tried with Global
of Volatile as well as InterlockedIncrement() function documented in
MSDN but for some reason the global is not getting updated atomically
causing the consumer thread to block forever if number of producer
threads are high. If the thread count is low then it works. I have
scratched my heads for hours but I have not been able to come up with
any option to make the increment / decrement operation of the global as
atomic. Putting a synchronization object like Critical Section will
cause a lot of contention in my program. Any help will be greatly
appreciated. Please fine the code that I have written below.

You're asking about threaded processing on Windows, the bulk of your
posted code is not C but (I presume) Intel assembler.

I would suggest that a newsgroup for Windows programming may be a more
suitable one than this. Don't ask me what newsgroup - I don't program
for Windows.
 
P

Pushkar Prasad

Yes I made a mistake, in the code. For increment and drement I used
InterlockedIncrmenet() but when I was checking the the value of that
global out side the loop, I wasn't using Interlocked Operation. I later
used InterlockedCompareExchange() and the problem was resolved. Thanks
everybody for all the valuable inputs.

Thanks & Regards
Pushkar Prasad
 
B

blmblm

Yes I made a mistake, in the code. For increment and drement I used
InterlockedIncrmenet() but when I was checking the the value of that
global out side the loop, I wasn't using Interlocked Operation. I later
used InterlockedCompareExchange() and the problem was resolved.

This might be a fairly significant point. I was going to reply to
your initial post, the one in which you said:
[ .... ] When the value of the Global is is 0 then then
consumer routine will do some processing. [ .... ]

to draw attention to the fact that if you want to take some action
based on whether the value of a shared-among-threads variable
is zero, you need for the test and the "take some action" to
together be one "atomic" operation (one that executes basically
with exclusive access to the shared variable), since otherwise
it's possible for the value of the variable to change between
when you test it and when you take the action.

Based on what a quick Google search on InterlockedCompareExchange()
turns up, it sounds like it might avoid the problem I'm trying
to draw attention to, depending on what action you want to take
if the variable is zero. Otherwise something to think about?

[ snip ]
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,769
Messages
2,569,580
Members
45,054
Latest member
TrimKetoBoost

Latest Threads

Top